From eba1c015b21c0b18c22d85571e2b2632b81493c7 Mon Sep 17 00:00:00 2001 From: Korina Cordero Date: Thu, 11 Feb 2021 07:43:51 +0000 Subject: [PATCH] Add a rider message converter script. #535 --- .../mqtt_rider_convert/mqtt_rider_convert.py | 70 ++++++++++++++++++ .../mqtt_rider_convert/mqtt_rider_convert.pyc | Bin 0 -> 2459 bytes utils/mqtt_rider_convert/riderconvert.py | 23 ++++++ 3 files changed, 93 insertions(+) create mode 100644 utils/mqtt_rider_convert/mqtt_rider_convert.py create mode 100644 utils/mqtt_rider_convert/mqtt_rider_convert.pyc create mode 100644 utils/mqtt_rider_convert/riderconvert.py diff --git a/utils/mqtt_rider_convert/mqtt_rider_convert.py b/utils/mqtt_rider_convert/mqtt_rider_convert.py new file mode 100644 index 00000000..e669e1a9 --- /dev/null +++ b/utils/mqtt_rider_convert/mqtt_rider_convert.py @@ -0,0 +1,70 @@ +import paho.mqtt.client as mqtt +import ssl +import redis +import time +import signal +import sys +import os +import json + +class MQTTRiderConvert(object): + + def run(self, client): + print "running loop..." + client.loop_forever() + +# TODO: fix this and put these guys back under the class +def init_subscriptions(client): + print "subscribing to wildcard #" + client.subscribe('#') + +def on_connect(client, userdata, flags, rc): + init_subscriptions(client) + #print("Connected with result code "+str(rc)) + # client.subscribe("$SYS/#") + +def user_data_set(userdata): + conn = redis.StrictRedis(host='localhost', port=6379, db=0) + return conn + +def on_publish(client, userdata, mid): + pass + +def on_message(client, userdata, message): + redis_conn = user_data_set(userdata) + #print("message topic=", message.topic[0:10]) + + if message.topic[0:10] != 'motorider_': + return + #print repr(message) + + # check if json decodable + res = json.loads(message.payload) + #print res + + # get rider session id + sess_id = message.topic[10:] + + # check if it has event + if 'event' not in res: + return + + # check if event is driver_location + if res['event'] != 'driver_location': + return + + # save the longitude and latitude + rider_long = str(res['longitude']) + rider_lat = str(res['latitude']) + + # get the rider id from redis using rider.id. + session_rider_key = 'rider.id.%s' % sess_id + rider_id = redis_conn.get(session_rider_key) + + channel = 'rider/%s/location' % rider_id + payload = rider_lat + ':' + rider_long + + print channel + print payload + + client.publish(channel, payload) diff --git a/utils/mqtt_rider_convert/mqtt_rider_convert.pyc b/utils/mqtt_rider_convert/mqtt_rider_convert.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5cd5c3fbe3c9785da9b53d4c0bbbaf9774e12909 GIT binary patch literal 2459 zcmcIl%W~sH6upxC&XZ6igvCsTd6ZZ7o|>X4W&s;gg)%JgMktcvv?EI%b$e2&a#rCB z_z(VvDn5ZF9{}gvwv$<~o8Y9c@9n<6???3a&fvE{U;L8E`V-;%8yx#PK#u=FN|9M0 zvrs0%<|dL!w82|4X>IVfOxhc~Ba_Ys@5-dR!Fw|4ZScNK`jUrI4CDgBBl%V2PUPpw zmgFrGlWob{itI?LS!s&O4S|UQ`x@BzecFt#udl~VVc(6H z&XjgMf9G7%(4!>tF8#;VXmR71Ls&!ImZ~IZ2SZ-E9zc>*>AXl1H&9|;=gSggOwI=@ z9>Gn#`O@0k1MJSXef7q_|VegQ+o_(m*+NX%sIpB>XU0z=2_b0 zC(j*aJ@=tz`ZtV)Ud5E~;L^tj_5kusWn8k>vv8)aw2bcKoH2|%_a5$oa3U)d&BilY zed$ZTMxyEEJCaupgjaA7s_c5ce1HlY;!gwdPa9`kJoy#Xd#`@u5zBCkc zBWK(D41yAEEKQy_ll*g&Kc?WnE3xMIQ^e=D6L+2lQ(C;Ar-V;M34 literal 0 HcmV?d00001 diff --git a/utils/mqtt_rider_convert/riderconvert.py b/utils/mqtt_rider_convert/riderconvert.py new file mode 100644 index 00000000..075febd1 --- /dev/null +++ b/utils/mqtt_rider_convert/riderconvert.py @@ -0,0 +1,23 @@ +import paho.mqtt.client as mqtt +import mqtt_rider_convert as rconvert +import ssl +import logging + + +client = mqtt.Client() +client.on_connect = rconvert.on_connect +# client.on_publish = on_publish +client.on_message = rconvert.on_message + +#client.tls_set( +# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, +# tls_version=ssl.PROTOCOL_TLSv1) +#client.tls_set( +# "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, +# tls_version=ssl.PROTOCOL_TLSv1) +#client.connect("resqaws.jankstudio.com", 8883, 60) +client.connect("localhost", 8883, 60) + + +rider_convert = rconvert.MQTTRiderConvert() +rider_convert.run(client)