import paho.mqtt.client as mqtt import ssl from threading import Thread import redis import time import signal import sys import os import mysql.connector import json def mysql_connect(user, password, host, database): conn = mysql.connector.connect(user=user, password=password, host=host, database=database) return conn def init_subscriptions(client, conn): # given mysql connection, get all rider sessions query = ("select id from rider_session") cursor = conn.cursor() cursor.execute(query) for (id) in cursor: print "subscribing to rider session %s" % id client.subscribe('motorider_%s' % id) cursor.close() def on_connect(client, userdata, flags, rc): conn = mysql_connect('resq', 'Motolite456', '127.0.0.1', 'resq') init_subscriptions(client, conn) print("Connected with result code "+str(rc)) client.subscribe("$SYS/#") def on_publish(client, userdata, mid): pass def on_message(client, userdata, message): print("message topic=",message.topic[0:10]) if message.topic[0:10] != 'motorider_': return #print repr(message) # check if json decodable res = json.loads(message.payload) # get rider session id sess_id = message.topic[10:] # check if it has event if 'event' not in res: return # check if event is driver_location if res['event'] != 'driver_location': return # update our redis key key = 'location_%s' % sess_id print "setting %s" % key redis_conn.setex(key, 1600, message.payload) def getRedis(i): r = redis.StrictRedis(host='localhost', port=6379, db=0) while 1: time.sleep(0) data = r.brpop("events", 10) if data: info = data[1].split('|') print "Channel: " + info[0] + " message: " + info[1] client.publish(info[0], info[1]) def sigint_handler(signal, frame): print 'Interrupted' sys.exit(0) client = mqtt.Client() client.on_connect = on_connect # client.on_publish = on_publish client.on_message = on_message redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0) #client.tls_set( # "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, # tls_version=ssl.PROTOCOL_TLSv1) client.tls_set( "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1) client.connect("resqaws.jankstudio.com", 8883, 60) #t = Thread(target=getRedis, args=(1,)) #t.start() #signal.signal(signal.SIGINT, sigint_handler) client.loop_forever()