73 lines
1.8 KiB
Python
73 lines
1.8 KiB
Python
import paho.mqtt.client as mqtt
|
|
import ssl
|
|
import redis
|
|
import time
|
|
import signal
|
|
import sys
|
|
import os
|
|
import json
|
|
|
|
class MQTTRiderConvert(object):
|
|
|
|
def run(self, client):
|
|
print "running loop..."
|
|
client.loop_forever()
|
|
|
|
# TODO: fix this and put these guys back under the class
|
|
def init_subscriptions(client):
|
|
print "subscribing to wildcard #"
|
|
client.subscribe('#')
|
|
|
|
def on_connect(client, userdata, flags, rc):
|
|
init_subscriptions(client)
|
|
#print("Connected with result code "+str(rc))
|
|
# client.subscribe("$SYS/#")
|
|
|
|
def user_data_set(userdata):
|
|
conn = redis.StrictRedis(host='localhost', port=6379, db=0)
|
|
return conn
|
|
|
|
def on_publish(client, userdata, mid):
|
|
pass
|
|
|
|
def on_message(client, userdata, message):
|
|
redis_conn = user_data_set(userdata)
|
|
#print("message topic=", message.topic[0:10])
|
|
|
|
if message.topic[0:10] != 'motorider_':
|
|
return
|
|
#print repr(message)
|
|
|
|
# check if json decodable
|
|
res = json.loads(message.payload)
|
|
#print res
|
|
|
|
# get rider session id
|
|
sess_id = message.topic[10:]
|
|
|
|
# check if it has event
|
|
if 'event' not in res:
|
|
return
|
|
|
|
# check if event is driver_location
|
|
if res['event'] != 'driver_location':
|
|
return
|
|
|
|
# save the longitude and latitude
|
|
rider_long = str(res['longitude'])
|
|
rider_lat = str(res['latitude'])
|
|
|
|
# get the rider id from redis using rider.id.<session id>
|
|
session_rider_key = 'rider.id.%s' % sess_id
|
|
rider_id = redis_conn.get(session_rider_key)
|
|
|
|
channel = 'rider/%s/location' % rider_id
|
|
payload = rider_lat + ':' + rider_long
|
|
|
|
#print channel
|
|
#print payload
|
|
|
|
client.publish(channel, payload)
|
|
|
|
# add to rider_cache rider's latest location
|
|
redis_conn.geoadd('loc_rider_active', rider_long, rider_lat, rider_id)
|