resq/utils/rider_location_cache/rider_location_cache.py

80 lines
1.7 KiB
Python

import paho.mqtt.client as mqtt
import ssl
from threading import Thread
import redis
import time
import signal
import sys
import os
import mysql.connector
import json
def init_subscriptions(client):
print "subscribing to wildcard"
client.subscribe('#')
def on_connect(client, userdata, flags, rc):
init_subscriptions(client)
print("Connected with result code "+str(rc))
# client.subscribe("$SYS/#")
def on_publish(client, userdata, mid):
pass
def on_message(client, userdata, message):
print("message topic=",message.topic[0:10])
if message.topic[0:10] != 'motorider_':
return
#print repr(message)
# check if json decodable
res = json.loads(message.payload)
# get rider session id
sess_id = message.topic[10:]
# check if it has event
if 'event' not in res:
return
# check if event is driver_location
if res['event'] != 'driver_location':
return
# update our redis key
key = 'location_%s' % sess_id
print "setting %s" % key
redis_conn.setex(key, 1600, message.payload)
def sigint_handler(signal, frame):
print 'Interrupted'
sys.exit(0)
client = mqtt.Client()
client.on_connect = on_connect
# client.on_publish = on_publish
client.on_message = on_message
redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0)
#client.tls_set(
# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
# tls_version=ssl.PROTOCOL_TLSv1)
client.tls_set(
"/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE,
tls_version=ssl.PROTOCOL_TLSv1)
client.connect("resqaws.jankstudio.com", 8883, 60)
#t = Thread(target=getRedis, args=(1,))
#t.start()
#signal.signal(signal.SIGINT, sigint_handler)
client.loop_forever()