Merge branch '217-fix-daemonization-of-rider-location-script' into 'master'
Update rider location cache service for systemd #217 Closes #217 See merge request jankstudio/resq!255
This commit is contained in:
commit
4879503476
6 changed files with 73 additions and 217 deletions
|
|
@ -1,129 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
|
|
||||||
import sys, os, time, atexit
|
|
||||||
from signal import SIGTERM
|
|
||||||
|
|
||||||
class Daemon:
|
|
||||||
"""
|
|
||||||
A generic daemon class.
|
|
||||||
|
|
||||||
Usage: subclass the Daemon class and override the run() method
|
|
||||||
"""
|
|
||||||
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
|
|
||||||
self.stdin = stdin
|
|
||||||
self.stdout = stdout
|
|
||||||
self.stderr = stderr
|
|
||||||
self.pidfile = pidfile
|
|
||||||
|
|
||||||
def daemonize(self):
|
|
||||||
"""
|
|
||||||
do the UNIX double-fork magic, see Stevens' "Advanced
|
|
||||||
Programming in the UNIX Environment" for details (ISBN 0201563177)
|
|
||||||
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
pid = os.fork()
|
|
||||||
if pid > 0:
|
|
||||||
# exit first parent
|
|
||||||
sys.exit(0)
|
|
||||||
except OSError, e:
|
|
||||||
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# decouple from parent environment
|
|
||||||
os.chdir("/")
|
|
||||||
os.setsid()
|
|
||||||
os.umask(0)
|
|
||||||
|
|
||||||
# do second fork
|
|
||||||
try:
|
|
||||||
pid = os.fork()
|
|
||||||
if pid > 0:
|
|
||||||
# exit from second parent
|
|
||||||
sys.exit(0)
|
|
||||||
except OSError, e:
|
|
||||||
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# redirect standard file descriptors
|
|
||||||
sys.stdout.flush()
|
|
||||||
sys.stderr.flush()
|
|
||||||
si = file(self.stdin, 'r')
|
|
||||||
so = file(self.stdout, 'a+')
|
|
||||||
se = file(self.stderr, 'a+', 0)
|
|
||||||
os.dup2(si.fileno(), sys.stdin.fileno())
|
|
||||||
os.dup2(so.fileno(), sys.stdout.fileno())
|
|
||||||
os.dup2(se.fileno(), sys.stderr.fileno())
|
|
||||||
|
|
||||||
# write pidfile
|
|
||||||
atexit.register(self.delpid)
|
|
||||||
pid = str(os.getpid())
|
|
||||||
file(self.pidfile,'w+').write("%s\n" % pid)
|
|
||||||
|
|
||||||
def delpid(self):
|
|
||||||
os.remove(self.pidfile)
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""
|
|
||||||
Start the daemon
|
|
||||||
"""
|
|
||||||
# Check for a pidfile to see if the daemon already runs
|
|
||||||
try:
|
|
||||||
pf = file(self.pidfile,'r')
|
|
||||||
pid = int(pf.read().strip())
|
|
||||||
pf.close()
|
|
||||||
except IOError:
|
|
||||||
pid = None
|
|
||||||
|
|
||||||
if pid:
|
|
||||||
message = "pidfile %s already exist. Daemon already running?\n"
|
|
||||||
sys.stderr.write(message % self.pidfile)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Start the daemon
|
|
||||||
self.daemonize()
|
|
||||||
self.run()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""
|
|
||||||
Stop the daemon
|
|
||||||
"""
|
|
||||||
# Get the pid from the pidfile
|
|
||||||
try:
|
|
||||||
pf = file(self.pidfile,'r')
|
|
||||||
pid = int(pf.read().strip())
|
|
||||||
pf.close()
|
|
||||||
except IOError:
|
|
||||||
pid = None
|
|
||||||
|
|
||||||
if not pid:
|
|
||||||
message = "pidfile %s does not exist. Daemon not running?\n"
|
|
||||||
sys.stderr.write(message % self.pidfile)
|
|
||||||
return # not an error in a restart
|
|
||||||
|
|
||||||
# Try killing the daemon process
|
|
||||||
try:
|
|
||||||
while 1:
|
|
||||||
os.kill(pid, SIGTERM)
|
|
||||||
time.sleep(0.1)
|
|
||||||
except OSError, err:
|
|
||||||
err = str(err)
|
|
||||||
if err.find("No such process") > 0:
|
|
||||||
if os.path.exists(self.pidfile):
|
|
||||||
os.remove(self.pidfile)
|
|
||||||
else:
|
|
||||||
print str(err)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
def restart(self):
|
|
||||||
"""
|
|
||||||
Restart the daemon
|
|
||||||
"""
|
|
||||||
self.stop()
|
|
||||||
self.start()
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
"""
|
|
||||||
You should override this method when you subclass Daemon. It will be called after the process has been
|
|
||||||
daemonized by start() or restart().
|
|
||||||
"""
|
|
||||||
Binary file not shown.
|
|
@ -1,7 +1,5 @@
|
||||||
import paho.mqtt.client as mqtt
|
import paho.mqtt.client as mqtt
|
||||||
import ssl
|
import ssl
|
||||||
from threading import Thread
|
|
||||||
from daemon import Daemon
|
|
||||||
import redis
|
import redis
|
||||||
import time
|
import time
|
||||||
import signal
|
import signal
|
||||||
|
|
@ -11,79 +9,61 @@ import json
|
||||||
|
|
||||||
class RiderLocationCache(object):
|
class RiderLocationCache(object):
|
||||||
|
|
||||||
def run(self):
|
def run(self, client):
|
||||||
client = mqtt.Client()
|
print "running loop..."
|
||||||
|
|
||||||
client.on_connect = on_connect
|
|
||||||
# client.on_publish = on_publish
|
|
||||||
client.on_message = on_message
|
|
||||||
|
|
||||||
#client.tls_set(
|
|
||||||
# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
|
||||||
# tls_version=ssl.PROTOCOL_TLSv1)
|
|
||||||
#client.tls_set(
|
|
||||||
# "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
|
||||||
# tls_version=ssl.PROTOCOL_TLSv1)
|
|
||||||
#client.connect("resqaws.jankstudio.com", 8883, 60)
|
|
||||||
client.connect("localhost", 1883, 60)
|
|
||||||
|
|
||||||
#t = Thread(target=getRedis, args=(1,))
|
|
||||||
|
|
||||||
#t.start()
|
|
||||||
|
|
||||||
#signal.signal(signal.SIGINT, sigint_handler)
|
|
||||||
client.loop_forever()
|
client.loop_forever()
|
||||||
|
|
||||||
|
# TODO: fix this and put these guys back under the class
|
||||||
def init_subscriptions(client):
|
def init_subscriptions(client):
|
||||||
#print "subscribing to wildcard"
|
print "subscribing to wildcard #"
|
||||||
client.subscribe('#')
|
client.subscribe('#')
|
||||||
|
|
||||||
def on_connect(client, userdata, flags, rc):
|
def on_connect(client, userdata, flags, rc):
|
||||||
init_subscriptions(client)
|
init_subscriptions(client)
|
||||||
#print("Connected with result code "+str(rc))
|
#print("Connected with result code "+str(rc))
|
||||||
# client.subscribe("$SYS/#")
|
# client.subscribe("$SYS/#")
|
||||||
|
|
||||||
def user_data_set(userdata):
|
def user_data_set(userdata):
|
||||||
conn = redis.StrictRedis(host='localhost', port=6379, db=0)
|
conn = redis.StrictRedis(host='localhost', port=6379, db=0)
|
||||||
return conn
|
return conn
|
||||||
|
|
||||||
def on_publish(client, userdata, mid):
|
def on_publish(client, userdata, mid):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def on_message(client, userdata, message):
|
def on_message(client, userdata, message):
|
||||||
redis_conn = user_data_set(userdata)
|
redis_conn = user_data_set(userdata)
|
||||||
#print("message topic=", message.topic[0:10])
|
#print("message topic=", message.topic[0:10])
|
||||||
|
|
||||||
if message.topic[0:10] != 'motorider_':
|
if message.topic[0:10] != 'motorider_':
|
||||||
return
|
return
|
||||||
#print repr(message)
|
#print repr(message)
|
||||||
|
|
||||||
# check if json decodable
|
# check if json decodable
|
||||||
res = json.loads(message.payload)
|
res = json.loads(message.payload)
|
||||||
#print res
|
#print res
|
||||||
|
|
||||||
# get rider session id
|
# get rider session id
|
||||||
sess_id = message.topic[10:]
|
sess_id = message.topic[10:]
|
||||||
|
|
||||||
# check if it has event
|
# check if it has event
|
||||||
if 'event' not in res:
|
if 'event' not in res:
|
||||||
return
|
return
|
||||||
|
|
||||||
# check if event is driver_location
|
# check if event is driver_location
|
||||||
if res['event'] != 'driver_location':
|
if res['event'] != 'driver_location':
|
||||||
return
|
return
|
||||||
|
|
||||||
# save the longitude and latitude
|
# save the longitude and latitude
|
||||||
# get the rider id from sess_id
|
# get the rider id from sess_id
|
||||||
rider_key = "rider.location.%s" % sess_id
|
rider_key = "rider.location.%s" % sess_id
|
||||||
rider_long = str(res['longitude'])
|
rider_long = str(res['longitude'])
|
||||||
rider_lat = str(res['latitude'])
|
rider_lat = str(res['latitude'])
|
||||||
|
|
||||||
# set the location
|
# set the location
|
||||||
redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat})
|
redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat})
|
||||||
|
|
||||||
# update our redis key
|
# update our redis key
|
||||||
key = 'location_%s' % sess_id
|
key = 'location_%s' % sess_id
|
||||||
#print "setting %s" % key
|
#print "setting %s" % key
|
||||||
redis_conn.setex(key, 1600, message.payload)
|
redis_conn.setex(key, 1600, message.payload)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,30 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
from daemon import Daemon
|
|
||||||
import rider_location_cache
|
|
||||||
|
|
||||||
class RiderLocationDaemon(Daemon):
|
|
||||||
def run(self):
|
|
||||||
rider_location = rider_location_cache.RiderLocationCache()
|
|
||||||
rider_location.run()
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
daemon = RiderLocationDaemon('/tmp/rider_location_daemon.pid')
|
|
||||||
if len(sys.argv) == 2:
|
|
||||||
if 'start' == sys.argv[1]:
|
|
||||||
daemon.start()
|
|
||||||
elif 'stop' == sys.argv[1]:
|
|
||||||
daemon.stop()
|
|
||||||
elif 'restart' == sys.argv[1]:
|
|
||||||
daemon.restart()
|
|
||||||
elif 'foreground' == sys.argv[1]:
|
|
||||||
daemon.run()
|
|
||||||
else:
|
|
||||||
print "Unknown command"
|
|
||||||
sys.exit(2)
|
|
||||||
sys.exit(0)
|
|
||||||
else:
|
|
||||||
print "usage: %s start|stop|restart" % sys.argv[0]
|
|
||||||
sys.exit(2)
|
|
||||||
23
utils/rider_location_cache/riderloc.py
Normal file
23
utils/rider_location_cache/riderloc.py
Normal file
|
|
@ -0,0 +1,23 @@
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
import rider_location_cache as rlc
|
||||||
|
import ssl
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
client = mqtt.Client()
|
||||||
|
client.on_connect = rlc.on_connect
|
||||||
|
# client.on_publish = on_publish
|
||||||
|
client.on_message = rlc.on_message
|
||||||
|
|
||||||
|
#client.tls_set(
|
||||||
|
# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
||||||
|
# tls_version=ssl.PROTOCOL_TLSv1)
|
||||||
|
client.tls_set(
|
||||||
|
"/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
||||||
|
tls_version=ssl.PROTOCOL_TLSv1)
|
||||||
|
#client.connect("resqaws.jankstudio.com", 8883, 60)
|
||||||
|
client.connect("localhost", 8883, 60)
|
||||||
|
|
||||||
|
|
||||||
|
rider_location = rlc.RiderLocationCache()
|
||||||
|
rider_location.run(client)
|
||||||
12
utils/rider_location_cache/riderloc.service
Normal file
12
utils/rider_location_cache/riderloc.service
Normal file
|
|
@ -0,0 +1,12 @@
|
||||||
|
[Unit]
|
||||||
|
Description=Rider Location Cache Service
|
||||||
|
After=mosquitto.service redis.service
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
ExecStart=/usr/bin/python /root/www/resq/utils/rider_location_cache/riderloc.py
|
||||||
|
StandardInput=tty-force
|
||||||
|
Restart=always
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
Loading…
Reference in a new issue