resq/utils/mqtt_sender/mqtt_sender.py
2018-08-14 21:55:02 +08:00

88 lines
2.1 KiB
Python

import paho.mqtt.client as mqtt
import ssl
from threading import Thread
from daemonize import Daemonize
import redis
import time
import signal
import sys
import os
import logging
def sigint_handler(signal, frame):
#logging.warning('Interrupted')
sys.exit(0)
os._exit(0)
def on_connect(client, userdata, flags, rc):
#logging.info("Connected with result code "+str(rc))
client.subscribe("$SYS/#")
def on_publish(client, userdata, mid):
pass
def getRedis(i, client, logger):
logger.info("Listening in redis events")
r = redis.StrictRedis(host='localhost', port=6379, db=0)
while 1:
time.sleep(0)
data = r.brpop("events", 10)
if data:
info = data[1].split('|')
logger.info("Channel: " + info[0] + " message: " + info[1])
client.publish(info[0], info[1])
def sigint_handler(signal, frame):
sys.exit(0)
def get_logger():
logger = logging.getLogger("mqtt_logger")
logger.setLevel(logging.INFO)
fh = logging.FileHandler("/tmp/mqtt_sender.log")
fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(fmt)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
def main():
logger = get_logger()
logger.info("Starting mqtt_sender")
logger.info("Connecting to MQTT server")
client = mqtt.Client()
client.on_connect = on_connect
client.on_publish = on_publish
client.tls_set(
"/etc/letsencrypt/live/resqaws.jankstudio.com/fullchain.pem", cert_reqs=ssl.CERT_NONE,
tls_version=ssl.PROTOCOL_TLSv1)
client.connect("resqaws.jankstudio.com", 8883, 60)
logger.info("Starting redis thread")
t = Thread(target=getRedis, args=(1, client, logger))
t.start()
signal.signal(signal.SIGINT, sigint_handler)
client.loop_forever()
#logging.basicConfig(filename='/tmp/mqtt_sender.log', level=logging.INFO)
#logging.info('Started mqtt_sender')
#pid = "/tmp/mqtt_sender.pid"
#daemon = Daemonize(app="mqtt_sender", pid=pid, action=main)
#daemon.start()
main()