import paho.mqtt.client as mqtt import ssl from threading import Thread from daemon import Daemon import redis import time import signal import sys import os import json class RiderLocationCache(object): def run(self): global client, redis_conn client = mqtt.Client() client.on_connect = on_connect # client.on_publish = on_publish client.on_message = on_message redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0) #client.tls_set( # "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE, # tls_version=ssl.PROTOCOL_TLSv1) #client.tls_set( # "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE, # tls_version=ssl.PROTOCOL_TLSv1) #client.connect("resqaws.jankstudio.com", 8883, 60) client.connect("localhost", 1883, 60) #t = Thread(target=getRedis, args=(1,)) #t.start() #signal.signal(signal.SIGINT, sigint_handler) client.loop_forever() def init_subscriptions(client): print "subscribing to wildcard" client.subscribe('#') def on_connect(client, userdata, flags, rc): init_subscriptions(client) print("Connected with result code "+str(rc)) # client.subscribe("$SYS/#") def on_publish(client, userdata, mid): pass def on_message(client, userdata, message): print("message topic=",message.topic[0:10]) if message.topic[0:10] != 'motorider_': return #print repr(message) # check if json decodable res = json.loads(message.payload) print res # get rider session id sess_id = message.topic[10:] # check if it has event if 'event' not in res: return # check if event is driver_location if res['event'] != 'driver_location': return # save the longitude and latitude # get the rider id from sess_id rider_key = "rider.location.%s" % sess_id rider_long = str(res['longitude']) rider_lat = str(res['latitude']) # set the location redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat}) # update our redis key key = 'location_%s' % sess_id print "setting %s" % key redis_conn.setex(key, 1600, message.payload) def sigint_handler(signal, frame): print 'Interrupted' sys.exit(0)