From 44ddd4775b7acaaa8f1baf2fdcfe62c11b465020 Mon Sep 17 00:00:00 2001 From: Kendrick Chan Date: Tue, 28 May 2019 20:59:39 +0800 Subject: [PATCH] Update rider location cache service for systemd #217 --- utils/rider_location_cache/daemon.py | 129 ------------------ utils/rider_location_cache/daemon.pyc | Bin 4096 -> 0 bytes .../rider_location_cache.py | 96 ++++++------- .../rider_location_daemon.py | 30 ---- utils/rider_location_cache/riderloc.py | 23 ++++ utils/rider_location_cache/riderloc.service | 12 ++ 6 files changed, 73 insertions(+), 217 deletions(-) delete mode 100644 utils/rider_location_cache/daemon.py delete mode 100644 utils/rider_location_cache/daemon.pyc delete mode 100644 utils/rider_location_cache/rider_location_daemon.py create mode 100644 utils/rider_location_cache/riderloc.py create mode 100644 utils/rider_location_cache/riderloc.service diff --git a/utils/rider_location_cache/daemon.py b/utils/rider_location_cache/daemon.py deleted file mode 100644 index f371720d..00000000 --- a/utils/rider_location_cache/daemon.py +++ /dev/null @@ -1,129 +0,0 @@ -#!/usr/bin/env python - -import sys, os, time, atexit -from signal import SIGTERM - -class Daemon: - """ - A generic daemon class. - - Usage: subclass the Daemon class and override the run() method - """ - def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): - self.stdin = stdin - self.stdout = stdout - self.stderr = stderr - self.pidfile = pidfile - - def daemonize(self): - """ - do the UNIX double-fork magic, see Stevens' "Advanced - Programming in the UNIX Environment" for details (ISBN 0201563177) - http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 - """ - try: - pid = os.fork() - if pid > 0: - # exit first parent - sys.exit(0) - except OSError, e: - sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) - sys.exit(1) - - # decouple from parent environment - os.chdir("/") - os.setsid() - os.umask(0) - - # do second fork - try: - pid = os.fork() - if pid > 0: - # exit from second parent - sys.exit(0) - except OSError, e: - sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) - sys.exit(1) - - # redirect standard file descriptors - sys.stdout.flush() - sys.stderr.flush() - si = file(self.stdin, 'r') - so = file(self.stdout, 'a+') - se = file(self.stderr, 'a+', 0) - os.dup2(si.fileno(), sys.stdin.fileno()) - os.dup2(so.fileno(), sys.stdout.fileno()) - os.dup2(se.fileno(), sys.stderr.fileno()) - - # write pidfile - atexit.register(self.delpid) - pid = str(os.getpid()) - file(self.pidfile,'w+').write("%s\n" % pid) - - def delpid(self): - os.remove(self.pidfile) - - def start(self): - """ - Start the daemon - """ - # Check for a pidfile to see if the daemon already runs - try: - pf = file(self.pidfile,'r') - pid = int(pf.read().strip()) - pf.close() - except IOError: - pid = None - - if pid: - message = "pidfile %s already exist. Daemon already running?\n" - sys.stderr.write(message % self.pidfile) - sys.exit(1) - - # Start the daemon - self.daemonize() - self.run() - - def stop(self): - """ - Stop the daemon - """ - # Get the pid from the pidfile - try: - pf = file(self.pidfile,'r') - pid = int(pf.read().strip()) - pf.close() - except IOError: - pid = None - - if not pid: - message = "pidfile %s does not exist. Daemon not running?\n" - sys.stderr.write(message % self.pidfile) - return # not an error in a restart - - # Try killing the daemon process - try: - while 1: - os.kill(pid, SIGTERM) - time.sleep(0.1) - except OSError, err: - err = str(err) - if err.find("No such process") > 0: - if os.path.exists(self.pidfile): - os.remove(self.pidfile) - else: - print str(err) - sys.exit(1) - - def restart(self): - """ - Restart the daemon - """ - self.stop() - self.start() - - def run(self): - """ - You should override this method when you subclass Daemon. It will be called after the process has been - daemonized by start() or restart(). - """ diff --git a/utils/rider_location_cache/daemon.pyc b/utils/rider_location_cache/daemon.pyc deleted file mode 100644 index 51504185fb3183608e794c1827f259f9ab078099..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4096 zcmb_fQEwZ^5#HsIl#Z5cS+?Y)DUwAT#Z+yRwu;z+QPhbY!_`BX^w`9)kl=8>Tarf| z@2K0QWCj-ckmSASTMOieBo6`ln~J=q{brV=Tr~}f+9Yo|JG(nOyZg;Id)_}*8-Kp? z_s?Q6`*`?&5080-BEe6JOo+XOEXAIj>nmcfGM7EE=doN7SyhZ`Vy`A-0#!{URq>S& zUkdSfuTJ*5%{pwS|HP-$lo0I_itg^iKRyV4Z>jBrNRNtq;Zqf=dw9$zC^Wob4`^-( z?pG0CIS)LVYp*(+6i&ba(qQH~3riwdVqsY%%PjaJ=r(<_|2c|=|Gw($T$gFA681yI zS!7JB;Wzw`P1M(Sl$ktXow5V1oU^WzitojR|EEzF2Nps#lNX5j? zML5G7JWi8dn&~F>wKU4;tQSx%p!owj`GEA~ge%Mrr1oJ{wvUdE+NCxJ?TJk@)28K> zVOGSEO^ZB?qj;d(Zns+Fsa?jfFii8*hT#fiLkb~%{>F6Vbjma}ulfgQrq_fOVkikG z$5wZ;~SB0&JA(o6LDP%6hqx?Nl#R~QtqRfRnt_Vq8YGQ~(#wI~t;&NH^Dq@pH z%}jXCgcP6BTt0z>TTyAy#FEr!=2QJwm%jg5m!v%9Qtf|q34X7L;i_}QEofFbOUFJu zM-cw9B91*_oo0owOq%Lk1H1#^*eXvP*LX+Q2uH*7WR-T|+)TzOUM-rd&o#pV?)qFa z_H{9OE_3s`7+zqapvVE3N(zM6tcpJe)auPFD`DC(#2HD$C`U8@guZmy^|-%k#sJl2WAz>mtJUq+)*n)g+j zpBQ`t{A*x3^(rVD0!$n{eLJWrj&? zRJIbLp;lb9Cj0G4o<42&qJwa!HL#=Xa`(ago3}79h4;d|f`Yxhd{gx>iB9gQtwc4q z%yuJXj-#UeT)&6Zftwl2 zW6I!KG8ymiP$Cx?r^t>hJ3~=l>V9ghcGeS}frXsc)~Wj1Ld7)1X(6ET+=F`3j%Z%B zN!?%J{`8BJ6j8CGtn>@Lj*58~g{XUSMXppX%5zf5^WLg#$W`yOZdI=1yCL78ueSy{ zCRgcMaQQ`49Hmk4RyalyO(6ofwV*Iwt=khHoWi}gV$i{E*?gca844% zFR1q$RHpBepX4NyE+Um;m$MCc7tuTfO}dzN${JPWqx=$RxXv{E2LaXtn}UmTuq!CI z{F=C!sA`#*7X0ec8Rl?|3^kZ`#>j5^6}C%`INpQ7E~hKA2{#dIT?D1&0CxumRltdx z^3*K9sV$?Ttkh95RY0M!t$Bu?Ymt(3aK&90n4RWX3=U`p-hRd&5-Ee5%rN>9F6r3C zN>&)nakl#*#~F9*6uAy4)p67X=SW(m;xY<0Nf!aydIxYq$+#E1POWtN8EHdTix)oe znX}lYd$7uMBWfP-`HDO%f#phF*5xI+<~1A`2nY)>ybk2OG%z3mEx=%=KMw}LpgX-H zo+@F_P!a@wEFeHB>MZ9ym!}W@gxr4w(5*4(UPYppa15W|Zzh@+|8lr+)CiAEl5%7b5=*n0Ro!tEyKSGsio)Z}E7D9vo3U^Euvp8{hW z&Ch``DYQ`_##3k{%>s#?{@?yepKtFPf=;Kv6*C^FaaqIw2B#R#BF#%^J;7?QMb)>d zxIzWNC)lRqS5%zBBa;n5a6iqm0RQe9Xj9xy2?Hk6dK_>x{BqvdOY?*QK91~wnTlh| z1aF}SBb)=2K^k*yd($sBISaJ_y7EO16Q*_heF2ODI3tA|c%wG;!GY{a7 zO_LsyAGB;yodqiSzgPpcocQ|})Zp*qVxr8Tm}LL<29lcD%ZEA|=v+;y)BNqly}7j1 zu2n~XgnFV?9A&t>sHlgO&DfqnPz@r3cAaCQMbb>vlc{2V@rr}{)y(HmE5323>L&&?VMwYprd``3NX-|*k?m)soe3x_5-9NeJdRTLauChg~u3r@E6 Wa@^t@-*4Q`{FcI!v2{bPRsIF*e>fNb diff --git a/utils/rider_location_cache/rider_location_cache.py b/utils/rider_location_cache/rider_location_cache.py index 504884e3..54a47ddd 100644 --- a/utils/rider_location_cache/rider_location_cache.py +++ b/utils/rider_location_cache/rider_location_cache.py @@ -1,7 +1,5 @@ import paho.mqtt.client as mqtt import ssl -from threading import Thread -from daemon import Daemon import redis import time import signal @@ -11,79 +9,61 @@ import json class RiderLocationCache(object): - def run(self): - client = mqtt.Client() - - client.on_connect = on_connect - # client.on_publish = on_publish - client.on_message = on_message - - #client.tls_set( - # "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, - # tls_version=ssl.PROTOCOL_TLSv1) - #client.tls_set( - # "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, - # tls_version=ssl.PROTOCOL_TLSv1) - #client.connect("resqaws.jankstudio.com", 8883, 60) - client.connect("localhost", 1883, 60) - - #t = Thread(target=getRedis, args=(1,)) - - #t.start() - - #signal.signal(signal.SIGINT, sigint_handler) + def run(self, client): + print "running loop..." client.loop_forever() +# TODO: fix this and put these guys back under the class def init_subscriptions(client): - #print "subscribing to wildcard" - client.subscribe('#') + print "subscribing to wildcard #" + client.subscribe('#') def on_connect(client, userdata, flags, rc): - init_subscriptions(client) - #print("Connected with result code "+str(rc)) - # client.subscribe("$SYS/#") + init_subscriptions(client) + #print("Connected with result code "+str(rc)) + # client.subscribe("$SYS/#") def user_data_set(userdata): - conn = redis.StrictRedis(host='localhost', port=6379, db=0) - return conn + conn = redis.StrictRedis(host='localhost', port=6379, db=0) + return conn def on_publish(client, userdata, mid): - pass + pass def on_message(client, userdata, message): - redis_conn = user_data_set(userdata) - #print("message topic=", message.topic[0:10]) + redis_conn = user_data_set(userdata) + #print("message topic=", message.topic[0:10]) - if message.topic[0:10] != 'motorider_': - return - #print repr(message) + if message.topic[0:10] != 'motorider_': + return + #print repr(message) - # check if json decodable - res = json.loads(message.payload) - #print res + # check if json decodable + res = json.loads(message.payload) + #print res - # get rider session id - sess_id = message.topic[10:] + # get rider session id + sess_id = message.topic[10:] - # check if it has event - if 'event' not in res: - return + # check if it has event + if 'event' not in res: + return - # check if event is driver_location - if res['event'] != 'driver_location': - return + # check if event is driver_location + if res['event'] != 'driver_location': + return - # save the longitude and latitude - # get the rider id from sess_id - rider_key = "rider.location.%s" % sess_id - rider_long = str(res['longitude']) - rider_lat = str(res['latitude']) + # save the longitude and latitude + # get the rider id from sess_id + rider_key = "rider.location.%s" % sess_id + rider_long = str(res['longitude']) + rider_lat = str(res['latitude']) - # set the location - redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat}) + # set the location + redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat}) - # update our redis key - key = 'location_%s' % sess_id - #print "setting %s" % key - redis_conn.setex(key, 1600, message.payload) + # update our redis key + key = 'location_%s' % sess_id + #print "setting %s" % key + redis_conn.setex(key, 1600, message.payload) diff --git a/utils/rider_location_cache/rider_location_daemon.py b/utils/rider_location_cache/rider_location_daemon.py deleted file mode 100644 index f56de6a5..00000000 --- a/utils/rider_location_cache/rider_location_daemon.py +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env python - -import sys -import time -from daemon import Daemon -import rider_location_cache - -class RiderLocationDaemon(Daemon): - def run(self): - rider_location = rider_location_cache.RiderLocationCache() - rider_location.run() - -if __name__ == "__main__": - daemon = RiderLocationDaemon('/tmp/rider_location_daemon.pid') - if len(sys.argv) == 2: - if 'start' == sys.argv[1]: - daemon.start() - elif 'stop' == sys.argv[1]: - daemon.stop() - elif 'restart' == sys.argv[1]: - daemon.restart() - elif 'foreground' == sys.argv[1]: - daemon.run() - else: - print "Unknown command" - sys.exit(2) - sys.exit(0) - else: - print "usage: %s start|stop|restart" % sys.argv[0] - sys.exit(2) diff --git a/utils/rider_location_cache/riderloc.py b/utils/rider_location_cache/riderloc.py new file mode 100644 index 00000000..26a1a92f --- /dev/null +++ b/utils/rider_location_cache/riderloc.py @@ -0,0 +1,23 @@ +import paho.mqtt.client as mqtt +import rider_location_cache as rlc +import ssl +import logging + + +client = mqtt.Client() +client.on_connect = rlc.on_connect +# client.on_publish = on_publish +client.on_message = rlc.on_message + +#client.tls_set( +# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, +# tls_version=ssl.PROTOCOL_TLSv1) +client.tls_set( + "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, + tls_version=ssl.PROTOCOL_TLSv1) +#client.connect("resqaws.jankstudio.com", 8883, 60) +client.connect("localhost", 8883, 60) + + +rider_location = rlc.RiderLocationCache() +rider_location.run(client) diff --git a/utils/rider_location_cache/riderloc.service b/utils/rider_location_cache/riderloc.service new file mode 100644 index 00000000..f3d43264 --- /dev/null +++ b/utils/rider_location_cache/riderloc.service @@ -0,0 +1,12 @@ +[Unit] +Description=Rider Location Cache Service +After=mosquitto.service redis.service + +[Service] +Type=simple +ExecStart=/usr/bin/python /root/www/resq/utils/rider_location_cache/riderloc.py +StandardInput=tty-force +Restart=always + +[Install] +WantedBy=multi-user.target