69 lines
1.6 KiB
Python
69 lines
1.6 KiB
Python
import paho.mqtt.client as mqtt
|
|
import yaml
|
|
import ssl
|
|
import redis
|
|
import time
|
|
import signal
|
|
import sys
|
|
import os
|
|
import logging
|
|
|
|
|
|
# TODO: yaml configuration file for redis and mqtt settings
|
|
|
|
def on_connect(client, userdata, flags, rc):
|
|
logging.info("Connected with result code "+str(rc))
|
|
#client.subscribe("$SYS/#")
|
|
|
|
|
|
def on_publish(client, userdata, mid):
|
|
pass
|
|
|
|
|
|
def redis_listen(client, logger):
|
|
logger.info("Listening in redis events")
|
|
r = redis.StrictRedis(host='localhost', port=6379, db=0)
|
|
while 1:
|
|
time.sleep(0)
|
|
data = r.brpop("mqtt_events", 10)
|
|
if data:
|
|
info = data[1].split('|')
|
|
logger.info("Channel: " + info[0] + " message: " + info[1])
|
|
#client.publish(info[0], info[1])
|
|
# set QoS level to 1 for published messages
|
|
client.publish(info[0], info[1], 1)
|
|
|
|
|
|
|
|
def get_logger():
|
|
logger = logging.getLogger("mqtt_logger")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
fh = logging.FileHandler("/tmp/mqtt_sender.log")
|
|
fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
|
|
formatter = logging.Formatter(fmt)
|
|
fh.setFormatter(formatter)
|
|
|
|
logger.addHandler(fh)
|
|
return logger
|
|
|
|
def main():
|
|
logger = get_logger()
|
|
logger.info("Starting mqtt_sender")
|
|
logger.info("Connecting to MQTT server")
|
|
|
|
client = mqtt.Client()
|
|
client.on_connect = on_connect
|
|
client.on_publish = on_publish
|
|
|
|
# configure mqtt broker to accept localhost
|
|
client.connect("localhost", 1883, 60)
|
|
|
|
client.loop_start()
|
|
redis_listen(client, logger)
|
|
client.loop_end()
|
|
|
|
#client.loop_forever()
|
|
|
|
|
|
main()
|