Add python utilities for redis / mqtt functionality #159
This commit is contained in:
parent
9ae9a550d0
commit
8d08dc9551
2 changed files with 170 additions and 0 deletions
64
utils/mqtt_sender/mqtt_sender.py
Normal file
64
utils/mqtt_sender/mqtt_sender.py
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
import paho.mqtt.client as mqtt
|
||||
import ssl
|
||||
from threading import Thread
|
||||
import redis
|
||||
import time
|
||||
import signal
|
||||
import sys
|
||||
import os
|
||||
|
||||
|
||||
|
||||
def sigint_handler(signal, frame):
|
||||
print 'Interrupted'
|
||||
sys.exit(0)
|
||||
os._exit(0)
|
||||
|
||||
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
print("Connected with result code "+str(rc))
|
||||
client.subscribe("$SYS/#")
|
||||
|
||||
|
||||
|
||||
def on_publish(client, userdata, mid):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
def getRedis(i):
|
||||
r = redis.StrictRedis(host='localhost', port=6379, db=0)
|
||||
while 1:
|
||||
time.sleep(0)
|
||||
data = r.brpop("events", 10)
|
||||
if data:
|
||||
info = data[1].split('|')
|
||||
print "Channel: " + info[0] + " message: " + info[1]
|
||||
client.publish(info[0], info[1])
|
||||
|
||||
|
||||
|
||||
def sigint_handler(signal, frame):
|
||||
print 'Interrupted'
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
|
||||
client = mqtt.Client()
|
||||
client.on_connect = on_connect
|
||||
client.on_publish = on_publish
|
||||
|
||||
|
||||
|
||||
client.tls_set(
|
||||
"/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
||||
tls_version=ssl.PROTOCOL_TLSv1)
|
||||
|
||||
client.connect("resqaws.jankstudio.com", 8883, 60)
|
||||
t = Thread(target=getRedis, args=(1,))
|
||||
|
||||
t.start()
|
||||
|
||||
signal.signal(signal.SIGINT, sigint_handler)
|
||||
client.loop_forever()
|
||||
|
||||
106
utils/rider_location_cache/rider_location_cache.py
Normal file
106
utils/rider_location_cache/rider_location_cache.py
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
import paho.mqtt.client as mqtt
|
||||
import ssl
|
||||
from threading import Thread
|
||||
import redis
|
||||
import time
|
||||
import signal
|
||||
import sys
|
||||
import os
|
||||
import mysql.connector
|
||||
import json
|
||||
|
||||
def mysql_connect(user, password, host, database):
|
||||
conn = mysql.connector.connect(user=user,
|
||||
password=password,
|
||||
host=host,
|
||||
database=database)
|
||||
return conn
|
||||
|
||||
def init_subscriptions(client, conn):
|
||||
# given mysql connection, get all rider sessions
|
||||
query = ("select id from rider_session")
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(query)
|
||||
for (id) in cursor:
|
||||
print "subscribing to rider session %s" % id
|
||||
client.subscribe('motorider_%s' % id)
|
||||
cursor.close()
|
||||
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
conn = mysql_connect('resq', 'Motolite456', '127.0.0.1', 'resq')
|
||||
init_subscriptions(client, conn)
|
||||
print("Connected with result code "+str(rc))
|
||||
client.subscribe("$SYS/#")
|
||||
|
||||
def on_publish(client, userdata, mid):
|
||||
pass
|
||||
|
||||
def on_message(client, userdata, message):
|
||||
print("message topic=",message.topic[0:10])
|
||||
|
||||
if message.topic[0:10] != 'motorider_':
|
||||
return
|
||||
#print repr(message)
|
||||
|
||||
# check if json decodable
|
||||
res = json.loads(message.payload)
|
||||
|
||||
# get rider session id
|
||||
sess_id = message.topic[10:]
|
||||
|
||||
# check if it has event
|
||||
if 'event' not in res:
|
||||
return
|
||||
|
||||
# check if event is driver_location
|
||||
if res['event'] != 'driver_location':
|
||||
return
|
||||
|
||||
# update our redis key
|
||||
key = 'location_%s' % sess_id
|
||||
print "setting %s" % key
|
||||
redis_conn.setex(key, 1600, message.payload)
|
||||
|
||||
|
||||
|
||||
def getRedis(i):
|
||||
r = redis.StrictRedis(host='localhost', port=6379, db=0)
|
||||
while 1:
|
||||
time.sleep(0)
|
||||
data = r.brpop("events", 10)
|
||||
if data:
|
||||
info = data[1].split('|')
|
||||
print "Channel: " + info[0] + " message: " + info[1]
|
||||
client.publish(info[0], info[1])
|
||||
|
||||
|
||||
def sigint_handler(signal, frame):
|
||||
print 'Interrupted'
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
|
||||
client = mqtt.Client()
|
||||
client.on_connect = on_connect
|
||||
# client.on_publish = on_publish
|
||||
client.on_message = on_message
|
||||
|
||||
redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0)
|
||||
|
||||
|
||||
#client.tls_set(
|
||||
# "/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
||||
# tls_version=ssl.PROTOCOL_TLSv1)
|
||||
client.tls_set(
|
||||
"/root/aws_ssl_keys/fullchain.pem", cert_reqs=ssl.CERT_NONE,
|
||||
tls_version=ssl.PROTOCOL_TLSv1)
|
||||
client.connect("resqaws.jankstudio.com", 8883, 60)
|
||||
|
||||
|
||||
#t = Thread(target=getRedis, args=(1,))
|
||||
|
||||
#t.start()
|
||||
|
||||
#signal.signal(signal.SIGINT, sigint_handler)
|
||||
client.loop_forever()
|
||||
|
||||
Loading…
Reference in a new issue