From aed3b6ff204e72e70bb56215f0ab7016cf39d433 Mon Sep 17 00:00:00 2001 From: Greg Troxel Date: Fri, 9 Aug 2024 19:03:35 -0400 Subject: [PATCH] examples: Add dup filtering to mqtt_relay (#3018) Keep information about the previous value sent. If it's been 5 seconds, or new value is different (ignoring keys like snr and frequency), then send it. Otherwise, just don't. This causes bursts of e.g. 4 transmissions to result in one MQTT message, on the theory that the 4 transmissions are not actually 4 messags, but a strategy to transmit one message more reliably. Define a new configuration option to enable duplicate filtering, and default it to True. Steal logging config from mqtt_filter.py, and add a configuration option DEBUG that if True results in debug logging instead of info. --- examples/rtl_433_mqtt_relay.py | 111 +++++++++++++++++++++++++++++++-- 1 file changed, 105 insertions(+), 6 deletions(-) diff --git a/examples/rtl_433_mqtt_relay.py b/examples/rtl_433_mqtt_relay.py index 6248b75c6..a644a80ae 100755 --- a/examples/rtl_433_mqtt_relay.py +++ b/examples/rtl_433_mqtt_relay.py @@ -20,10 +20,14 @@ from __future__ import print_function from __future__ import with_statement -import socket import json +import logging +import socket +import time + import paho.mqtt.client as mqtt + # The config class represents a config object. The constructor takes # an optional pathname, and will switch on the suffix (.yaml for now) # and read a dictionary. @@ -31,17 +35,23 @@ class rtlconfig(object): # Initialize with default values. c = { - # Syslog socket configuration + # Log level info (False) or debug (True) + 'DEBUG': False, + + # Address to listen on for syslog/json messages from rtl_433 'UDP_IP': "127.0.0.1", 'UDP_PORT': 1433, - # MQTT broker configuration + # MQTT broker address and credentials 'MQTT_HOST': "127.0.0.1", 'MQTT_PORT': 1883, 'MQTT_USERNAME': None, 'MQTT_PASSWORD': None, 'MQTT_TLS': False, + + # MQTT content 'MQTT_PREFIX': "sensor/rtl_433", + 'MQTT_DEDUP': True, 'MQTT_INDIVIDUAL_TOPICS': True, 'MQTT_JSON_TOPIC': True, } @@ -68,17 +78,90 @@ def __init__(self, f=None): def __getitem__(self, k): return self.c[k] +class dedup(object): + """ A dedup class object supports deduping a stream of reports by + answering if a report is interesting relative to the history. While + more complicated deduping is allowed by the interface, for now it is + very simple, keeping track of only the previous interesting object. + For now, we more or less require that all reports have the same keys. """ + + # \todo Consider a cache with several entries. + + def __init__(self): + # Make this long enough to skip repeats, but allow messages + # every 10s to come through. + self.duration = 5 + # Exclude reception metadata (time and RF). + self.boring_keys = ('time', 'freq', 'freq1', 'freq2', 'rssi', 'snr', 'noise', 'raw_msg') + # Initialize storage for what was last sent. + (self.last_report, self.last_now) = (None, None) + + def send_store(self, report, n): + """ Record report, n as the last report declared interesting, and + return True (to denote interesting). """ + (self.last_report, self.last_now) = (report, n) + return True + + def equiv(self, j1, j2): + """ Return True if j1 and j2 are the same, except for boring_keys. """ + for (k, v) in j1.items(): + # If in boring, we don't care. + if k not in self.boring_keys: + # If in j1 and not j2, they are different. + if k not in j2: + logging.debug("equiv: %s in j1 and not j2" % (k)) + return False + if j1[k] != j2[k]: + logging.debug("equiv: %s differs j1=%s and j2=%s" % (k, j1[k], j2[k])) + return False + # If the lengths are different, they must be different. + if len(j1) != len(j2): + logging.debug("equiv: len(j1) %d != len(j2) %d" % (len(j1), len(j2))) + return False + + # If we get here, then the lengths are the same, and all + # non-boring keys in j1 exist in j2, and have the same value. + # It could be that j2 is missing a boring key and also has a + # new non-boring key, but boring keys in particular should not + # be variable. + return True + + # report is a python dictionary + def is_interesting(self, report): + """ If report is intersting, return True and update records of the + most recent interesting report. Otherwise return False. """ + n = time.time() + + # If previous interesting is missing or empty, accept this one. + if self.last_report is None or self.last_now is None: + logging.debug("interesting: no previous") + return self.send_store(report, n) + + # If previous one was too long ago, accept this one. + if n - self.last_now > self.duration: + logging.debug("interesting: time") + return self.send_store(report, n) + + if not self.equiv(self.last_report, report): + logging.debug("interesting: different") + return self.send_store(report, n) + + return False + # Create a config object, defaults modified by the config file if present. c = rtlconfig("rtl_433_mqtt_relay.yaml") +# Create a dedup object for later use, even if it's configured off. +d = dedup() + def mqtt_connect(client, userdata, flags, rc): """Handle MQTT connection callback.""" - print("MQTT connected: " + mqtt.connack_string(rc)) + logging.info("MQTT connected: " + mqtt.connack_string(rc)) def mqtt_disconnect(client, userdata, rc): """Handle MQTT disconnection callback.""" - print("MQTT disconnected: " + mqtt.connack_string(rc)) + logging.info("MQTT disconnected: " + mqtt.connack_string(rc)) # Create listener for incoming json string packets. @@ -100,6 +183,14 @@ def sanitize(text): def publish_sensor_to_mqtt(mqttc, data, line): """Publish rtl_433 sensor data to MQTT.""" + if c['MQTT_DEDUP']: + # If this data is not novel relative to recent data, just skip it. + # Otherwise, send it via MQTT. + if not d.is_interesting(data): + logging.debug(" not interesting") + return + logging.debug( "INTERESTING") + # Construct a topic from the information that identifies which # device this frame is from. # NB: id is only used if channel is not present. @@ -166,6 +257,7 @@ def rtl_433_probe(): try: line = parse_syslog(line) data = json.loads(line) + logging.debug("received %s" % line) publish_sensor_to_mqtt(mqttc, data, line) except ValueError: @@ -179,8 +271,15 @@ def run(): # uid # gid # working_directory - rtl_433_probe() + # Set up logging at INFO, and change to DEBUG if config asks for that. + logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',datefmt='%Y-%m-%dT%H:%M:%S%z') + logging.getLogger().setLevel(logging.INFO) + if c['DEBUG']: + logging.getLogger().setLevel(logging.DEBUG) + logging.debug("DEBUG LOGGING ENABLED") + + rtl_433_probe() if __name__ == "__main__": run()