resq/utils/rider_location_cache/rider_location_cache.py

106 lines
2.5 KiB
Python

import paho.mqtt.client as mqtt
import ssl
from threading import Thread
import redis
import time
import signal
import sys
import os
import mysql.connector
import json
def mysql_connect(user, password, host, database):
conn = mysql.connector.connect(user=user,
password=password,
host=host,
database=database)
return conn
def init_subscriptions(client, conn):
# given mysql connection, get all rider sessions
query = ("select id from rider_session")
cursor = conn.cursor()
cursor.execute(query)
for (id) in cursor:
print "subscribing to rider session %s" % id
client.subscribe('motorider_%s' % id)
cursor.close()
def on_connect(client, userdata, flags, rc):
conn = mysql_connect('resq', 'Motolite456', '127.0.0.1', 'resq')
init_subscriptions(client, conn)
print("Connected with result code "+str(rc))
client.subscribe("$SYS/#")
def on_publish(client, userdata, mid):
pass
def on_message(client, userdata, message):
print("message topic=",message.topic[0:10])
if message.topic[0:10] != 'motorider_':
return
#print repr(message)
# check if json decodable
res = json.loads(message.payload)
# get rider session id
sess_id = message.topic[10:]
# check if it has event
if 'event' not in res:
return
# check if event is driver_location
if res['event'] != 'driver_location':
return
# update our redis key
key = 'location_%s' % sess_id
print "setting %s" % key
redis_conn.setex(key, 1600, message.payload)
def getRedis(i):
r = redis.StrictRedis(host='localhost', port=6379, db=0)
while 1:
time.sleep(0)
data = r.brpop("events", 10)
if data:
info = data[1].split('|')
print "Channel: " + info[0] + " message: " + info[1]
client.publish(info[0], info[1])
def sigint_handler(signal, frame):
print 'Interrupted'
sys.exit(0)
client = mqtt.Client()
client.on_connect = on_connect
# client.on_publish = on_publish
client.on_message = on_message
redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0)
#client.tls_set(
# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
# tls_version=ssl.PROTOCOL_TLSv1)
client.tls_set(
"/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE,
tls_version=ssl.PROTOCOL_TLSv1)
client.connect("resqaws.jankstudio.com", 8883, 60)
#t = Thread(target=getRedis, args=(1,))
#t.start()
#signal.signal(signal.SIGINT, sigint_handler)
client.loop_forever()