From 8d08dc955158b3c9a4166df831d738c24e6f9dd6 Mon Sep 17 00:00:00 2001 From: Kendrick Chan Date: Tue, 24 Jul 2018 01:31:29 +0800 Subject: [PATCH] Add python utilities for redis / mqtt functionality #159 --- utils/mqtt_sender/mqtt_sender.py | 64 +++++++++++ .../rider_location_cache.py | 106 ++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 utils/mqtt_sender/mqtt_sender.py create mode 100644 utils/rider_location_cache/rider_location_cache.py diff --git a/utils/mqtt_sender/mqtt_sender.py b/utils/mqtt_sender/mqtt_sender.py new file mode 100644 index 00000000..e314b337 --- /dev/null +++ b/utils/mqtt_sender/mqtt_sender.py @@ -0,0 +1,64 @@ +import paho.mqtt.client as mqtt +import ssl +from threading import Thread +import redis +import time +import signal +import sys +import os + + + +def sigint_handler(signal, frame): + print 'Interrupted' + sys.exit(0) + os._exit(0) + + +def on_connect(client, userdata, flags, rc): + print("Connected with result code "+str(rc)) + client.subscribe("$SYS/#") + + + +def on_publish(client, userdata, mid): + pass + + + +def getRedis(i): + r = redis.StrictRedis(host='localhost', port=6379, db=0) + while 1: + time.sleep(0) + data = r.brpop("events", 10) + if data: + info = data[1].split('|') + print "Channel: " + info[0] + " message: " + info[1] + client.publish(info[0], info[1]) + + + +def sigint_handler(signal, frame): + print 'Interrupted' + sys.exit(0) + + + +client = mqtt.Client() +client.on_connect = on_connect +client.on_publish = on_publish + + + +client.tls_set( + "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, + tls_version=ssl.PROTOCOL_TLSv1) + +client.connect("resqaws.jankstudio.com", 8883, 60) +t = Thread(target=getRedis, args=(1,)) + +t.start() + +signal.signal(signal.SIGINT, sigint_handler) +client.loop_forever() + diff --git a/utils/rider_location_cache/rider_location_cache.py b/utils/rider_location_cache/rider_location_cache.py new file mode 100644 index 00000000..441938e4 --- /dev/null +++ b/utils/rider_location_cache/rider_location_cache.py @@ -0,0 +1,106 @@ +import paho.mqtt.client as mqtt +import ssl +from threading import Thread +import redis +import time +import signal +import sys +import os +import mysql.connector +import json + +def mysql_connect(user, password, host, database): + conn = mysql.connector.connect(user=user, + password=password, + host=host, + database=database) + return conn + +def init_subscriptions(client, conn): + # given mysql connection, get all rider sessions + query = ("select id from rider_session") + cursor = conn.cursor() + cursor.execute(query) + for (id) in cursor: + print "subscribing to rider session %s" % id + client.subscribe('motorider_%s' % id) + cursor.close() + +def on_connect(client, userdata, flags, rc): + conn = mysql_connect('resq', 'Motolite456', '127.0.0.1', 'resq') + init_subscriptions(client, conn) + print("Connected with result code "+str(rc)) + client.subscribe("$SYS/#") + +def on_publish(client, userdata, mid): + pass + +def on_message(client, userdata, message): + print("message topic=",message.topic[0:10]) + + if message.topic[0:10] != 'motorider_': + return + #print repr(message) + + # check if json decodable + res = json.loads(message.payload) + + # get rider session id + sess_id = message.topic[10:] + + # check if it has event + if 'event' not in res: + return + + # check if event is driver_location + if res['event'] != 'driver_location': + return + + # update our redis key + key = 'location_%s' % sess_id + print "setting %s" % key + redis_conn.setex(key, 1600, message.payload) + + + +def getRedis(i): + r = redis.StrictRedis(host='localhost', port=6379, db=0) + while 1: + time.sleep(0) + data = r.brpop("events", 10) + if data: + info = data[1].split('|') + print "Channel: " + info[0] + " message: " + info[1] + client.publish(info[0], info[1]) + + +def sigint_handler(signal, frame): + print 'Interrupted' + sys.exit(0) + + + +client = mqtt.Client() +client.on_connect = on_connect +# client.on_publish = on_publish +client.on_message = on_message + +redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0) + + +#client.tls_set( +# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, +# tls_version=ssl.PROTOCOL_TLSv1) +client.tls_set( + "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, + tls_version=ssl.PROTOCOL_TLSv1) +client.connect("resqaws.jankstudio.com", 8883, 60) + + +#t = Thread(target=getRedis, args=(1,)) + +#t.start() + +#signal.signal(signal.SIGINT, sigint_handler) +client.loop_forever() +