diff --git a/src/Command/TestRiderTrackerCommand.php b/src/Command/TestRiderTrackerCommand.php new file mode 100644 index 00000000..7b26a9ed --- /dev/null +++ b/src/Command/TestRiderTrackerCommand.php @@ -0,0 +1,46 @@ +setName('test:ridertracker') + ->setDescription('Test the rider tracker service') + ->setHelp('Test the rider tracker service.') + ->addArgument('rider_id', InputArgument::REQUIRED, 'Rider ID'); + } + + public function __construct(RiderTracker $rtracker) + { + $this->rtracker = $rtracker; + + parent::__construct(); + } + + public function execute(InputInterface $input, OutputInterface $output) + { + $rider_id = $input->getArgument('rider_id'); + + $coordinates = $this->rtracker->getRiderLocation($rider_id); + + if ($coordinates != null) + { + echo "Rider Location: " . $coordinates->getLongitude() . "," . $coordinates->getLatitude() . "\n"; + } + else + { + echo "Invalid rider id." . "\n"; + } + } +} diff --git a/src/Controller/APIController.php b/src/Controller/APIController.php index 4925cf9b..d85978a8 100644 --- a/src/Controller/APIController.php +++ b/src/Controller/APIController.php @@ -26,6 +26,7 @@ use App\Service\InvoiceCreator; use App\Service\RisingTideGateway; use App\Service\MQTTClient; use App\Service\GeofenceTracker; +use App\Service\RiderTracker; use App\Entity\MobileSession; use App\Entity\Customer; @@ -1116,7 +1117,7 @@ class APIController extends Controller return $res->getReturnResponse(); } - public function getRiderStatus(Request $req) + public function getRiderStatus(Request $req, RiderTracker $rt) { $required_params = []; $em = $this->getDoctrine()->getManager(); @@ -1243,8 +1244,9 @@ class APIController extends Controller case JOStatus::ASSIGNED: case JOStatus::IN_TRANSIT: case JOStatus::IN_PROGRESS: - $coord = $jo->getHub()->getCoordinates(); $rider = $jo->getRider(); + // get rider coordinates from redis + $coord = $rt->getRiderLocation($rider->getID()); // default image url $url_prefix = $req->getSchemeAndHttpHost(); diff --git a/src/Service/RiderTracker.php b/src/Service/RiderTracker.php new file mode 100644 index 00000000..c34d215d --- /dev/null +++ b/src/Service/RiderTracker.php @@ -0,0 +1,57 @@ +em = $em; + + // TODO: make it read redis settings from config + // build a service maybe? + $this->redis = new PredisClient(); + } + + protected function getRiderKey($rider_id) + { + return self::RIDER_PREFIX_KEY . '.' . $rider_id; + } + + public function getRiderLocation($rider_id) + { + $coordinates = new Point(0,0); + $key = $this->getRiderKey($rider_id); + + // check redis cache for rider information + if (($this->redis->hexists($key, 'longitude')) && + ($this->redis->hexists($key, 'latitude'))) + { + $long = $this->redis->hget($key, 'longitude'); + $lat = $this->redis->hget($key, 'latitude'); + + $coordinates = new Point($long, $lat); + } + else + { + $rider = $this->em->getRepository(Rider::class)->find($rider_id); + $coordinates = $rider->getHub()->getCoordinates(); + } + + return $coordinates; + + } +} diff --git a/utils/rider_location_cache/daemon.py b/utils/rider_location_cache/daemon.py new file mode 100644 index 00000000..f371720d --- /dev/null +++ b/utils/rider_location_cache/daemon.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python + +import sys, os, time, atexit +from signal import SIGTERM + +class Daemon: + """ + A generic daemon class. + + Usage: subclass the Daemon class and override the run() method + """ + def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr + self.pidfile = pidfile + + def daemonize(self): + """ + do the UNIX double-fork magic, see Stevens' "Advanced + Programming in the UNIX Environment" for details (ISBN 0201563177) + http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 + """ + try: + pid = os.fork() + if pid > 0: + # exit first parent + sys.exit(0) + except OSError, e: + sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + # decouple from parent environment + os.chdir("/") + os.setsid() + os.umask(0) + + # do second fork + try: + pid = os.fork() + if pid > 0: + # exit from second parent + sys.exit(0) + except OSError, e: + sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + # redirect standard file descriptors + sys.stdout.flush() + sys.stderr.flush() + si = file(self.stdin, 'r') + so = file(self.stdout, 'a+') + se = file(self.stderr, 'a+', 0) + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + + # write pidfile + atexit.register(self.delpid) + pid = str(os.getpid()) + file(self.pidfile,'w+').write("%s\n" % pid) + + def delpid(self): + os.remove(self.pidfile) + + def start(self): + """ + Start the daemon + """ + # Check for a pidfile to see if the daemon already runs + try: + pf = file(self.pidfile,'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if pid: + message = "pidfile %s already exist. Daemon already running?\n" + sys.stderr.write(message % self.pidfile) + sys.exit(1) + + # Start the daemon + self.daemonize() + self.run() + + def stop(self): + """ + Stop the daemon + """ + # Get the pid from the pidfile + try: + pf = file(self.pidfile,'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if not pid: + message = "pidfile %s does not exist. Daemon not running?\n" + sys.stderr.write(message % self.pidfile) + return # not an error in a restart + + # Try killing the daemon process + try: + while 1: + os.kill(pid, SIGTERM) + time.sleep(0.1) + except OSError, err: + err = str(err) + if err.find("No such process") > 0: + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + else: + print str(err) + sys.exit(1) + + def restart(self): + """ + Restart the daemon + """ + self.stop() + self.start() + + def run(self): + """ + You should override this method when you subclass Daemon. It will be called after the process has been + daemonized by start() or restart(). + """ diff --git a/utils/rider_location_cache/daemon.pyc b/utils/rider_location_cache/daemon.pyc new file mode 100644 index 00000000..51504185 Binary files /dev/null and b/utils/rider_location_cache/daemon.pyc differ diff --git a/utils/rider_location_cache/rider_location_cache.py b/utils/rider_location_cache/rider_location_cache.py index 441938e4..504884e3 100644 --- a/utils/rider_location_cache/rider_location_cache.py +++ b/utils/rider_location_cache/rider_location_cache.py @@ -1,106 +1,89 @@ import paho.mqtt.client as mqtt import ssl from threading import Thread +from daemon import Daemon import redis import time import signal import sys import os -import mysql.connector import json -def mysql_connect(user, password, host, database): - conn = mysql.connector.connect(user=user, - password=password, - host=host, - database=database) - return conn +class RiderLocationCache(object): -def init_subscriptions(client, conn): - # given mysql connection, get all rider sessions - query = ("select id from rider_session") - cursor = conn.cursor() - cursor.execute(query) - for (id) in cursor: - print "subscribing to rider session %s" % id - client.subscribe('motorider_%s' % id) - cursor.close() + def run(self): + client = mqtt.Client() + + client.on_connect = on_connect + # client.on_publish = on_publish + client.on_message = on_message + + #client.tls_set( + # "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, + # tls_version=ssl.PROTOCOL_TLSv1) + #client.tls_set( + # "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, + # tls_version=ssl.PROTOCOL_TLSv1) + #client.connect("resqaws.jankstudio.com", 8883, 60) + client.connect("localhost", 1883, 60) + + #t = Thread(target=getRedis, args=(1,)) + + #t.start() + + #signal.signal(signal.SIGINT, sigint_handler) + client.loop_forever() + +def init_subscriptions(client): + #print "subscribing to wildcard" + client.subscribe('#') def on_connect(client, userdata, flags, rc): - conn = mysql_connect('resq', 'Motolite456', '127.0.0.1', 'resq') - init_subscriptions(client, conn) - print("Connected with result code "+str(rc)) - client.subscribe("$SYS/#") + init_subscriptions(client) + #print("Connected with result code "+str(rc)) + # client.subscribe("$SYS/#") + +def user_data_set(userdata): + conn = redis.StrictRedis(host='localhost', port=6379, db=0) + return conn def on_publish(client, userdata, mid): - pass + pass def on_message(client, userdata, message): - print("message topic=",message.topic[0:10]) + redis_conn = user_data_set(userdata) + #print("message topic=", message.topic[0:10]) - if message.topic[0:10] != 'motorider_': - return - #print repr(message) + if message.topic[0:10] != 'motorider_': + return + #print repr(message) - # check if json decodable - res = json.loads(message.payload) + # check if json decodable + res = json.loads(message.payload) + #print res - # get rider session id - sess_id = message.topic[10:] + # get rider session id + sess_id = message.topic[10:] - # check if it has event - if 'event' not in res: - return + # check if it has event + if 'event' not in res: + return - # check if event is driver_location - if res['event'] != 'driver_location': - return + # check if event is driver_location + if res['event'] != 'driver_location': + return - # update our redis key - key = 'location_%s' % sess_id - print "setting %s" % key - redis_conn.setex(key, 1600, message.payload) + # save the longitude and latitude + # get the rider id from sess_id + rider_key = "rider.location.%s" % sess_id + rider_long = str(res['longitude']) + rider_lat = str(res['latitude']) + # set the location + redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat}) - -def getRedis(i): - r = redis.StrictRedis(host='localhost', port=6379, db=0) - while 1: - time.sleep(0) - data = r.brpop("events", 10) - if data: - info = data[1].split('|') - print "Channel: " + info[0] + " message: " + info[1] - client.publish(info[0], info[1]) - - -def sigint_handler(signal, frame): - print 'Interrupted' - sys.exit(0) - - - -client = mqtt.Client() -client.on_connect = on_connect -# client.on_publish = on_publish -client.on_message = on_message - -redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0) - - -#client.tls_set( -# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, -# tls_version=ssl.PROTOCOL_TLSv1) -client.tls_set( - "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, - tls_version=ssl.PROTOCOL_TLSv1) -client.connect("resqaws.jankstudio.com", 8883, 60) - - -#t = Thread(target=getRedis, args=(1,)) - -#t.start() - -#signal.signal(signal.SIGINT, sigint_handler) -client.loop_forever() + # update our redis key + key = 'location_%s' % sess_id + #print "setting %s" % key + redis_conn.setex(key, 1600, message.payload) diff --git a/utils/rider_location_cache/rider_location_daemon.py b/utils/rider_location_cache/rider_location_daemon.py new file mode 100644 index 00000000..f56de6a5 --- /dev/null +++ b/utils/rider_location_cache/rider_location_daemon.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python + +import sys +import time +from daemon import Daemon +import rider_location_cache + +class RiderLocationDaemon(Daemon): + def run(self): + rider_location = rider_location_cache.RiderLocationCache() + rider_location.run() + +if __name__ == "__main__": + daemon = RiderLocationDaemon('/tmp/rider_location_daemon.pid') + if len(sys.argv) == 2: + if 'start' == sys.argv[1]: + daemon.start() + elif 'stop' == sys.argv[1]: + daemon.stop() + elif 'restart' == sys.argv[1]: + daemon.restart() + elif 'foreground' == sys.argv[1]: + daemon.run() + else: + print "Unknown command" + sys.exit(2) + sys.exit(0) + else: + print "usage: %s start|stop|restart" % sys.argv[0] + sys.exit(2)