88 lines
2.1 KiB
Python
88 lines
2.1 KiB
Python
import paho.mqtt.client as mqtt
|
|
import ssl
|
|
from threading import Thread
|
|
import redis
|
|
import time
|
|
import signal
|
|
import sys
|
|
import os
|
|
import json
|
|
|
|
def init_subscriptions(client):
|
|
print "subscribing to wildcard"
|
|
client.subscribe('#')
|
|
|
|
def on_connect(client, userdata, flags, rc):
|
|
init_subscriptions(client)
|
|
print("Connected with result code "+str(rc))
|
|
# client.subscribe("$SYS/#")
|
|
|
|
def on_publish(client, userdata, mid):
|
|
pass
|
|
|
|
def on_message(client, userdata, message):
|
|
print("message topic=",message.topic[0:10])
|
|
|
|
if message.topic[0:10] != 'motorider_':
|
|
return
|
|
#print repr(message)
|
|
|
|
# check if json decodable
|
|
res = json.loads(message.payload)
|
|
print res
|
|
|
|
# get rider session id
|
|
sess_id = message.topic[10:]
|
|
|
|
# check if it has event
|
|
if 'event' not in res:
|
|
return
|
|
|
|
# check if event is driver_location
|
|
if res['event'] != 'driver_location':
|
|
return
|
|
|
|
# save the longitude and latitude
|
|
# get the rider id from sess_id
|
|
rider_key = "rider.location.%s" % sess_id
|
|
rider_long = str(res['longitude'])
|
|
rider_lat = str(res['latitude'])
|
|
|
|
# set the location
|
|
redis_conn.hmset(rider_key, {'longitude': rider_long, 'latitude': rider_lat})
|
|
|
|
# update our redis key
|
|
key = 'location_%s' % sess_id
|
|
print "setting %s" % key
|
|
redis_conn.setex(key, 1600, message.payload)
|
|
|
|
def sigint_handler(signal, frame):
|
|
print 'Interrupted'
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
client = mqtt.Client()
|
|
client.on_connect = on_connect
|
|
# client.on_publish = on_publish
|
|
client.on_message = on_message
|
|
|
|
redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0)
|
|
|
|
|
|
#client.tls_set(
|
|
# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
|
# tls_version=ssl.PROTOCOL_TLSv1)
|
|
#client.tls_set(
|
|
# "/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
|
# tls_version=ssl.PROTOCOL_TLSv1)
|
|
#client.connect("resqaws.jankstudio.com", 8883, 60)
|
|
client.connect("localhost", 1883, 60)
|
|
|
|
|
|
#t = Thread(target=getRedis, args=(1,))
|
|
|
|
#t.start()
|
|
|
|
#signal.signal(signal.SIGINT, sigint_handler)
|
|
client.loop_forever()
|