From deff9b7f741ed5ba4e68d9a0861f6eabac3e9f69 Mon Sep 17 00:00:00 2001 From: Kendrick Chan Date: Mon, 22 Apr 2019 14:19:13 +0800 Subject: [PATCH 1/8] Remove mysql connector and misc code #180 --- .../rider_location_cache.py | 36 +++---------------- 1 file changed, 5 insertions(+), 31 deletions(-) diff --git a/utils/rider_location_cache/rider_location_cache.py b/utils/rider_location_cache/rider_location_cache.py index 441938e4..0b4e85b6 100644 --- a/utils/rider_location_cache/rider_location_cache.py +++ b/utils/rider_location_cache/rider_location_cache.py @@ -9,28 +9,14 @@ import os import mysql.connector import json -def mysql_connect(user, password, host, database): - conn = mysql.connector.connect(user=user, - password=password, - host=host, - database=database) - return conn - -def init_subscriptions(client, conn): - # given mysql connection, get all rider sessions - query = ("select id from rider_session") - cursor = conn.cursor() - cursor.execute(query) - for (id) in cursor: - print "subscribing to rider session %s" % id - client.subscribe('motorider_%s' % id) - cursor.close() +def init_subscriptions(client): + print "subscribing to wildcard" + client.subscribe('#') def on_connect(client, userdata, flags, rc): - conn = mysql_connect('resq', 'Motolite456', '127.0.0.1', 'resq') - init_subscriptions(client, conn) + init_subscriptions(client) print("Connected with result code "+str(rc)) - client.subscribe("$SYS/#") + # client.subscribe("$SYS/#") def on_publish(client, userdata, mid): pass @@ -62,18 +48,6 @@ def on_message(client, userdata, message): redis_conn.setex(key, 1600, message.payload) - -def getRedis(i): - r = redis.StrictRedis(host='localhost', port=6379, db=0) - while 1: - time.sleep(0) - data = r.brpop("events", 10) - if data: - info = data[1].split('|') - print "Channel: " + info[0] + " message: " + info[1] - client.publish(info[0], info[1]) - - def sigint_handler(signal, frame): print 'Interrupted' sys.exit(0) From d1b41ca36ce62b91221776879ec3f73e9efe40ab Mon Sep 17 00:00:00 2001 From: Korina Cordero Date: Tue, 23 Apr 2019 05:59:06 +0000 Subject: [PATCH 2/8] Save rider's coordinates into redis hash. #180 --- .../rider_location_cache.py | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/utils/rider_location_cache/rider_location_cache.py b/utils/rider_location_cache/rider_location_cache.py index 0b4e85b6..d1a3de15 100644 --- a/utils/rider_location_cache/rider_location_cache.py +++ b/utils/rider_location_cache/rider_location_cache.py @@ -6,7 +6,6 @@ import time import signal import sys import os -import mysql.connector import json def init_subscriptions(client): @@ -30,6 +29,7 @@ def on_message(client, userdata, message): # check if json decodable res = json.loads(message.payload) + print res # get rider session id sess_id = message.topic[10:] @@ -42,12 +42,20 @@ def on_message(client, userdata, message): if res['event'] != 'driver_location': return + # save the longitude and latitude + # get the rider id from sess_id + rider_key = "rider.location.%s" % sess_id + rider_long = str(res['longitude']) + rider_lat = str(res['latitude']) + + # set the location + redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat}) + # update our redis key key = 'location_%s' % sess_id print "setting %s" % key redis_conn.setex(key, 1600, message.payload) - def sigint_handler(signal, frame): print 'Interrupted' sys.exit(0) @@ -65,10 +73,11 @@ redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0) #client.tls_set( # "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, # tls_version=ssl.PROTOCOL_TLSv1) -client.tls_set( - "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, - tls_version=ssl.PROTOCOL_TLSv1) -client.connect("resqaws.jankstudio.com", 8883, 60) +#client.tls_set( +# "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, +# tls_version=ssl.PROTOCOL_TLSv1) +#client.connect("resqaws.jankstudio.com", 8883, 60) +client.connect("localhost", 1883, 60) #t = Thread(target=getRedis, args=(1,)) @@ -77,4 +86,3 @@ client.connect("resqaws.jankstudio.com", 8883, 60) #signal.signal(signal.SIGINT, sigint_handler) client.loop_forever() - From 76b1f07febd72166f73b47188acfd1b3e9102298 Mon Sep 17 00:00:00 2001 From: Korina Cordero Date: Tue, 23 Apr 2019 11:02:12 +0000 Subject: [PATCH 3/8] Create rider tracker service. Create a test command to test rider tracker service. #180 --- src/Command/TestRiderTrackerCommand.php | 46 ++++++++++++++++++++ src/Service/RiderTracker.php | 58 +++++++++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 src/Command/TestRiderTrackerCommand.php create mode 100644 src/Service/RiderTracker.php diff --git a/src/Command/TestRiderTrackerCommand.php b/src/Command/TestRiderTrackerCommand.php new file mode 100644 index 00000000..7b26a9ed --- /dev/null +++ b/src/Command/TestRiderTrackerCommand.php @@ -0,0 +1,46 @@ +setName('test:ridertracker') + ->setDescription('Test the rider tracker service') + ->setHelp('Test the rider tracker service.') + ->addArgument('rider_id', InputArgument::REQUIRED, 'Rider ID'); + } + + public function __construct(RiderTracker $rtracker) + { + $this->rtracker = $rtracker; + + parent::__construct(); + } + + public function execute(InputInterface $input, OutputInterface $output) + { + $rider_id = $input->getArgument('rider_id'); + + $coordinates = $this->rtracker->getRiderLocation($rider_id); + + if ($coordinates != null) + { + echo "Rider Location: " . $coordinates->getLongitude() . "," . $coordinates->getLatitude() . "\n"; + } + else + { + echo "Invalid rider id." . "\n"; + } + } +} diff --git a/src/Service/RiderTracker.php b/src/Service/RiderTracker.php new file mode 100644 index 00000000..59bf1f17 --- /dev/null +++ b/src/Service/RiderTracker.php @@ -0,0 +1,58 @@ +em = $em; + + // TODO: make it read redis settings from config + // build a service maybe? + $this->redis = new PredisClient(); + } + + protected function getRiderKey($rider_id) + { + return self::RIDER_PREFIX_KEY . '.' . $rider_id; + } + + public function getRiderLocation($rider_id) + { + // check if rider id exists or is valid + $rider = $this->em->getRepository(Rider::class)->find($rider_id); + if ($rider != null) + { + $coordinates = $rider->getHub()->getCoordinates(); + + $key = $this->getRiderKey($rider_id); + + // check redis cache for rider information + if (($this->redis->hexists($key, 'longitude')) && + ($this->redis->hexists($key, 'latitude'))) + { + $long = $this->redis->hget($key, 'longitude'); + $lat = $this->redis->hget($key, 'latitude'); + + $coordinates = new Point($long, $lat); + } + + return $coordinates; + + } + } +} From 82eec39cfd5cdadfb7b5dbed2afaee1d0552a49b Mon Sep 17 00:00:00 2001 From: Korina Cordero Date: Thu, 25 Apr 2019 09:57:30 +0000 Subject: [PATCH 4/8] Create daemon for graceful shutdown for rider location script. #180 --- utils/rider_location_cache/daemon.py | 129 ++++++++++++++++++ utils/rider_location_cache/daemon.pyc | Bin 0 -> 4096 bytes .../rider_location_cache.py | 55 ++++---- .../rider_location_daemon.py | 31 +++++ 4 files changed, 189 insertions(+), 26 deletions(-) create mode 100644 utils/rider_location_cache/daemon.py create mode 100644 utils/rider_location_cache/daemon.pyc create mode 100644 utils/rider_location_cache/rider_location_daemon.py diff --git a/utils/rider_location_cache/daemon.py b/utils/rider_location_cache/daemon.py new file mode 100644 index 00000000..f371720d --- /dev/null +++ b/utils/rider_location_cache/daemon.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python + +import sys, os, time, atexit +from signal import SIGTERM + +class Daemon: + """ + A generic daemon class. + + Usage: subclass the Daemon class and override the run() method + """ + def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr + self.pidfile = pidfile + + def daemonize(self): + """ + do the UNIX double-fork magic, see Stevens' "Advanced + Programming in the UNIX Environment" for details (ISBN 0201563177) + http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 + """ + try: + pid = os.fork() + if pid > 0: + # exit first parent + sys.exit(0) + except OSError, e: + sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + # decouple from parent environment + os.chdir("/") + os.setsid() + os.umask(0) + + # do second fork + try: + pid = os.fork() + if pid > 0: + # exit from second parent + sys.exit(0) + except OSError, e: + sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + # redirect standard file descriptors + sys.stdout.flush() + sys.stderr.flush() + si = file(self.stdin, 'r') + so = file(self.stdout, 'a+') + se = file(self.stderr, 'a+', 0) + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + + # write pidfile + atexit.register(self.delpid) + pid = str(os.getpid()) + file(self.pidfile,'w+').write("%s\n" % pid) + + def delpid(self): + os.remove(self.pidfile) + + def start(self): + """ + Start the daemon + """ + # Check for a pidfile to see if the daemon already runs + try: + pf = file(self.pidfile,'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if pid: + message = "pidfile %s already exist. Daemon already running?\n" + sys.stderr.write(message % self.pidfile) + sys.exit(1) + + # Start the daemon + self.daemonize() + self.run() + + def stop(self): + """ + Stop the daemon + """ + # Get the pid from the pidfile + try: + pf = file(self.pidfile,'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if not pid: + message = "pidfile %s does not exist. Daemon not running?\n" + sys.stderr.write(message % self.pidfile) + return # not an error in a restart + + # Try killing the daemon process + try: + while 1: + os.kill(pid, SIGTERM) + time.sleep(0.1) + except OSError, err: + err = str(err) + if err.find("No such process") > 0: + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + else: + print str(err) + sys.exit(1) + + def restart(self): + """ + Restart the daemon + """ + self.stop() + self.start() + + def run(self): + """ + You should override this method when you subclass Daemon. It will be called after the process has been + daemonized by start() or restart(). + """ diff --git a/utils/rider_location_cache/daemon.pyc b/utils/rider_location_cache/daemon.pyc new file mode 100644 index 0000000000000000000000000000000000000000..51504185fb3183608e794c1827f259f9ab078099 GIT binary patch literal 4096 zcmb_fQEwZ^5#HsIl#Z5cS+?Y)DUwAT#Z+yRwu;z+QPhbY!_`BX^w`9)kl=8>Tarf| z@2K0QWCj-ckmSASTMOieBo6`ln~J=q{brV=Tr~}f+9Yo|JG(nOyZg;Id)_}*8-Kp? z_s?Q6`*`?&5080-BEe6JOo+XOEXAIj>nmcfGM7EE=doN7SyhZ`Vy`A-0#!{URq>S& zUkdSfuTJ*5%{pwS|HP-$lo0I_itg^iKRyV4Z>jBrNRNtq;Zqf=dw9$zC^Wob4`^-( z?pG0CIS)LVYp*(+6i&ba(qQH~3riwdVqsY%%PjaJ=r(<_|2c|=|Gw($T$gFA681yI zS!7JB;Wzw`P1M(Sl$ktXow5V1oU^WzitojR|EEzF2Nps#lNX5j? zML5G7JWi8dn&~F>wKU4;tQSx%p!owj`GEA~ge%Mrr1oJ{wvUdE+NCxJ?TJk@)28K> zVOGSEO^ZB?qj;d(Zns+Fsa?jfFii8*hT#fiLkb~%{>F6Vbjma}ulfgQrq_fOVkikG z$5wZ;~SB0&JA(o6LDP%6hqx?Nl#R~QtqRfRnt_Vq8YGQ~(#wI~t;&NH^Dq@pH z%}jXCgcP6BTt0z>TTyAy#FEr!=2QJwm%jg5m!v%9Qtf|q34X7L;i_}QEofFbOUFJu zM-cw9B91*_oo0owOq%Lk1H1#^*eXvP*LX+Q2uH*7WR-T|+)TzOUM-rd&o#pV?)qFa z_H{9OE_3s`7+zqapvVE3N(zM6tcpJe)auPFD`DC(#2HD$C`U8@guZmy^|-%k#sJl2WAz>mtJUq+)*n)g+j zpBQ`t{A*x3^(rVD0!$n{eLJWrj&? zRJIbLp;lb9Cj0G4o<42&qJwa!HL#=Xa`(ago3}79h4;d|f`Yxhd{gx>iB9gQtwc4q z%yuJXj-#UeT)&6Zftwl2 zW6I!KG8ymiP$Cx?r^t>hJ3~=l>V9ghcGeS}frXsc)~Wj1Ld7)1X(6ET+=F`3j%Z%B zN!?%J{`8BJ6j8CGtn>@Lj*58~g{XUSMXppX%5zf5^WLg#$W`yOZdI=1yCL78ueSy{ zCRgcMaQQ`49Hmk4RyalyO(6ofwV*Iwt=khHoWi}gV$i{E*?gca844% zFR1q$RHpBepX4NyE+Um;m$MCc7tuTfO}dzN${JPWqx=$RxXv{E2LaXtn}UmTuq!CI z{F=C!sA`#*7X0ec8Rl?|3^kZ`#>j5^6}C%`INpQ7E~hKA2{#dIT?D1&0CxumRltdx z^3*K9sV$?Ttkh95RY0M!t$Bu?Ymt(3aK&90n4RWX3=U`p-hRd&5-Ee5%rN>9F6r3C zN>&)nakl#*#~F9*6uAy4)p67X=SW(m;xY<0Nf!aydIxYq$+#E1POWtN8EHdTix)oe znX}lYd$7uMBWfP-`HDO%f#phF*5xI+<~1A`2nY)>ybk2OG%z3mEx=%=KMw}LpgX-H zo+@F_P!a@wEFeHB>MZ9ym!}W@gxr4w(5*4(UPYppa15W|Zzh@+|8lr+)CiAEl5%7b5=*n0Ro!tEyKSGsio)Z}E7D9vo3U^Euvp8{hW z&Ch``DYQ`_##3k{%>s#?{@?yepKtFPf=;Kv6*C^FaaqIw2B#R#BF#%^J;7?QMb)>d zxIzWNC)lRqS5%zBBa;n5a6iqm0RQe9Xj9xy2?Hk6dK_>x{BqvdOY?*QK91~wnTlh| z1aF}SBb)=2K^k*yd($sBISaJ_y7EO16Q*_heF2ODI3tA|c%wG;!GY{a7 zO_LsyAGB;yodqiSzgPpcocQ|})Zp*qVxr8Tm}LL<29lcD%ZEA|=v+;y)BNqly}7j1 zu2n~XgnFV?9A&t>sHlgO&DfqnPz@r3cAaCQMbb>vlc{2V@rr}{)y(HmE5323>L&&?VMwYprd``3NX-|*k?m)soe3x_5-9NeJdRTLauChg~u3r@E6 Wa@^t@-*4Q`{FcI!v2{bPRsIF*e>fNb literal 0 HcmV?d00001 diff --git a/utils/rider_location_cache/rider_location_cache.py b/utils/rider_location_cache/rider_location_cache.py index d1a3de15..e96cba0e 100644 --- a/utils/rider_location_cache/rider_location_cache.py +++ b/utils/rider_location_cache/rider_location_cache.py @@ -1,6 +1,7 @@ import paho.mqtt.client as mqtt import ssl from threading import Thread +from daemon import Daemon import redis import time import signal @@ -8,6 +9,34 @@ import sys import os import json +class RiderLocationCache(object): + + def run(self): + global client, redis_conn + client = mqtt.Client() + client.on_connect = on_connect + # client.on_publish = on_publish + client.on_message = on_message + + redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0) + + #client.tls_set( + # "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, + # tls_version=ssl.PROTOCOL_TLSv1) + #client.tls_set( + # "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, + # tls_version=ssl.PROTOCOL_TLSv1) + #client.connect("resqaws.jankstudio.com", 8883, 60) + client.connect("localhost", 1883, 60) + + #t = Thread(target=getRedis, args=(1,)) + + #t.start() + + #signal.signal(signal.SIGINT, sigint_handler) + client.loop_forever() + + def init_subscriptions(client): print "subscribing to wildcard" client.subscribe('#') @@ -60,29 +89,3 @@ def sigint_handler(signal, frame): print 'Interrupted' sys.exit(0) - - -client = mqtt.Client() -client.on_connect = on_connect -# client.on_publish = on_publish -client.on_message = on_message - -redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0) - - -#client.tls_set( -# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, -# tls_version=ssl.PROTOCOL_TLSv1) -#client.tls_set( -# "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, -# tls_version=ssl.PROTOCOL_TLSv1) -#client.connect("resqaws.jankstudio.com", 8883, 60) -client.connect("localhost", 1883, 60) - - -#t = Thread(target=getRedis, args=(1,)) - -#t.start() - -#signal.signal(signal.SIGINT, sigint_handler) -client.loop_forever() diff --git a/utils/rider_location_cache/rider_location_daemon.py b/utils/rider_location_cache/rider_location_daemon.py new file mode 100644 index 00000000..6889628f --- /dev/null +++ b/utils/rider_location_cache/rider_location_daemon.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python + +import sys +import time +from daemon import Daemon +import rider_location_cache + +class RiderLocationDaemon(Daemon): + def run(self): + print "mogol in daemon" + rider_location = rider_location_cache.RiderLocationCache() + rider_location.run() + +if __name__ == "__main__": + daemon = RiderLocationDaemon('/tmp/rider_location_daemon.pid') + if len(sys.argv) == 2: + if 'start' == sys.argv[1]: + daemon.start() + elif 'stop' == sys.argv[1]: + daemon.stop() + elif 'restart' == sys.argv[1]: + daemon.restart() + elif 'foreground' == sys.argv[1]: + daemon.run() + else: + print "Unknown command" + sys.exit(2) + sys.exit(0) + else: + print "usage: %s start|stop|restart" % sys.argv[0] + sys.exit(2) From d15099b456690129051a81deaa5919d450734526 Mon Sep 17 00:00:00 2001 From: Korina Cordero Date: Fri, 26 Apr 2019 02:12:00 +0000 Subject: [PATCH 5/8] Remove debug message. #180 --- utils/rider_location_cache/rider_location_daemon.py | 1 - 1 file changed, 1 deletion(-) diff --git a/utils/rider_location_cache/rider_location_daemon.py b/utils/rider_location_cache/rider_location_daemon.py index 6889628f..f56de6a5 100644 --- a/utils/rider_location_cache/rider_location_daemon.py +++ b/utils/rider_location_cache/rider_location_daemon.py @@ -7,7 +7,6 @@ import rider_location_cache class RiderLocationDaemon(Daemon): def run(self): - print "mogol in daemon" rider_location = rider_location_cache.RiderLocationCache() rider_location.run() From 950308964248d7aa54ae4bcd717866f19a640f6e Mon Sep 17 00:00:00 2001 From: Korina Cordero Date: Fri, 26 Apr 2019 03:10:16 +0000 Subject: [PATCH 6/8] Remove the declaration of global variables from the run method. #180 --- utils/rider_location_cache/rider_location_cache.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/utils/rider_location_cache/rider_location_cache.py b/utils/rider_location_cache/rider_location_cache.py index e96cba0e..cbd1bd4b 100644 --- a/utils/rider_location_cache/rider_location_cache.py +++ b/utils/rider_location_cache/rider_location_cache.py @@ -12,14 +12,10 @@ import json class RiderLocationCache(object): def run(self): - global client, redis_conn - client = mqtt.Client() client.on_connect = on_connect # client.on_publish = on_publish client.on_message = on_message - redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0) - #client.tls_set( # "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, # tls_version=ssl.PROTOCOL_TLSv1) @@ -36,6 +32,8 @@ class RiderLocationCache(object): #signal.signal(signal.SIGINT, sigint_handler) client.loop_forever() +client = mqtt.Client() +redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0) def init_subscriptions(client): print "subscribing to wildcard" From e61d77ad8bccc36aedac36e3042a258ad553e07f Mon Sep 17 00:00:00 2001 From: Korina Cordero Date: Mon, 29 Apr 2019 02:16:03 +0000 Subject: [PATCH 7/8] Remove checking for rider validity. Add call to RiderTracker service from the APIController. #180 --- src/Controller/APIController.php | 6 ++++-- src/Service/RiderTracker.php | 35 ++++++++++++++++---------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/Controller/APIController.php b/src/Controller/APIController.php index 4925cf9b..d85978a8 100644 --- a/src/Controller/APIController.php +++ b/src/Controller/APIController.php @@ -26,6 +26,7 @@ use App\Service\InvoiceCreator; use App\Service\RisingTideGateway; use App\Service\MQTTClient; use App\Service\GeofenceTracker; +use App\Service\RiderTracker; use App\Entity\MobileSession; use App\Entity\Customer; @@ -1116,7 +1117,7 @@ class APIController extends Controller return $res->getReturnResponse(); } - public function getRiderStatus(Request $req) + public function getRiderStatus(Request $req, RiderTracker $rt) { $required_params = []; $em = $this->getDoctrine()->getManager(); @@ -1243,8 +1244,9 @@ class APIController extends Controller case JOStatus::ASSIGNED: case JOStatus::IN_TRANSIT: case JOStatus::IN_PROGRESS: - $coord = $jo->getHub()->getCoordinates(); $rider = $jo->getRider(); + // get rider coordinates from redis + $coord = $rt->getRiderLocation($rider->getID()); // default image url $url_prefix = $req->getSchemeAndHttpHost(); diff --git a/src/Service/RiderTracker.php b/src/Service/RiderTracker.php index 59bf1f17..c34d215d 100644 --- a/src/Service/RiderTracker.php +++ b/src/Service/RiderTracker.php @@ -33,26 +33,25 @@ class RiderTracker public function getRiderLocation($rider_id) { - // check if rider id exists or is valid - $rider = $this->em->getRepository(Rider::class)->find($rider_id); - if ($rider != null) + $coordinates = new Point(0,0); + $key = $this->getRiderKey($rider_id); + + // check redis cache for rider information + if (($this->redis->hexists($key, 'longitude')) && + ($this->redis->hexists($key, 'latitude'))) { - $coordinates = $rider->getHub()->getCoordinates(); - - $key = $this->getRiderKey($rider_id); - - // check redis cache for rider information - if (($this->redis->hexists($key, 'longitude')) && - ($this->redis->hexists($key, 'latitude'))) - { - $long = $this->redis->hget($key, 'longitude'); - $lat = $this->redis->hget($key, 'latitude'); - - $coordinates = new Point($long, $lat); - } - - return $coordinates; + $long = $this->redis->hget($key, 'longitude'); + $lat = $this->redis->hget($key, 'latitude'); + $coordinates = new Point($long, $lat); } + else + { + $rider = $this->em->getRepository(Rider::class)->find($rider_id); + $coordinates = $rider->getHub()->getCoordinates(); + } + + return $coordinates; + } } From 2c8dad91649b4015cfe92cb30389946bef8ab714 Mon Sep 17 00:00:00 2001 From: Korina Cordero Date: Mon, 29 Apr 2019 09:04:37 +0000 Subject: [PATCH 8/8] Remove debug messages and sigint_handler. Create user_data_set to set the redis client. #180 --- .../rider_location_cache.py | 78 +++++++++---------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/utils/rider_location_cache/rider_location_cache.py b/utils/rider_location_cache/rider_location_cache.py index cbd1bd4b..504884e3 100644 --- a/utils/rider_location_cache/rider_location_cache.py +++ b/utils/rider_location_cache/rider_location_cache.py @@ -12,6 +12,8 @@ import json class RiderLocationCache(object): def run(self): + client = mqtt.Client() + client.on_connect = on_connect # client.on_publish = on_publish client.on_message = on_message @@ -32,58 +34,56 @@ class RiderLocationCache(object): #signal.signal(signal.SIGINT, sigint_handler) client.loop_forever() -client = mqtt.Client() -redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0) - def init_subscriptions(client): - print "subscribing to wildcard" - client.subscribe('#') + #print "subscribing to wildcard" + client.subscribe('#') def on_connect(client, userdata, flags, rc): - init_subscriptions(client) - print("Connected with result code "+str(rc)) - # client.subscribe("$SYS/#") + init_subscriptions(client) + #print("Connected with result code "+str(rc)) + # client.subscribe("$SYS/#") + +def user_data_set(userdata): + conn = redis.StrictRedis(host='localhost', port=6379, db=0) + return conn def on_publish(client, userdata, mid): - pass + pass def on_message(client, userdata, message): - print("message topic=",message.topic[0:10]) + redis_conn = user_data_set(userdata) + #print("message topic=", message.topic[0:10]) - if message.topic[0:10] != 'motorider_': - return - #print repr(message) + if message.topic[0:10] != 'motorider_': + return + #print repr(message) - # check if json decodable - res = json.loads(message.payload) - print res + # check if json decodable + res = json.loads(message.payload) + #print res - # get rider session id - sess_id = message.topic[10:] + # get rider session id + sess_id = message.topic[10:] - # check if it has event - if 'event' not in res: - return + # check if it has event + if 'event' not in res: + return - # check if event is driver_location - if res['event'] != 'driver_location': - return + # check if event is driver_location + if res['event'] != 'driver_location': + return - # save the longitude and latitude - # get the rider id from sess_id - rider_key = "rider.location.%s" % sess_id - rider_long = str(res['longitude']) - rider_lat = str(res['latitude']) + # save the longitude and latitude + # get the rider id from sess_id + rider_key = "rider.location.%s" % sess_id + rider_long = str(res['longitude']) + rider_lat = str(res['latitude']) - # set the location - redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat}) + # set the location + redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat}) - # update our redis key - key = 'location_%s' % sess_id - print "setting %s" % key - redis_conn.setex(key, 1600, message.payload) - -def sigint_handler(signal, frame): - print 'Interrupted' - sys.exit(0) + # update our redis key + key = 'location_%s' % sess_id + #print "setting %s" % key + redis_conn.setex(key, 1600, message.payload)