From 917633065bef09d0156c9c0ae314ffdcb6751be2 Mon Sep 17 00:00:00 2001 From: Till Schulte-Coerne Date: Sat, 25 Jan 2025 09:25:41 +0100 Subject: [PATCH] First beta of mqtt2can sending --- can2mqtt/app.py | 21 ++++++++----- can2mqtt/can_listener.py | 45 ++++++--------------------- can2mqtt/converter.py | 67 ++++++++++++++++++++++++++++++++++++++++ can2mqtt/mqtt_handler.py | 51 ++++++++++++++++++++++++++++++ config.example.yaml | 5 +-- 5 files changed, 144 insertions(+), 45 deletions(-) create mode 100644 can2mqtt/converter.py create mode 100644 can2mqtt/mqtt_handler.py diff --git a/can2mqtt/app.py b/can2mqtt/app.py index 7e05337..ca12c69 100644 --- a/can2mqtt/app.py +++ b/can2mqtt/app.py @@ -1,9 +1,10 @@ import can import cantools -import paho.mqtt.client as mqtt import yaml import threading +from can2mqtt.converter import Converter from can2mqtt.can_listener import CanListener +from can2mqtt.mqtt_handler import MqttHandler def load_config(): with open('config.yaml', 'r') as stream: @@ -17,12 +18,16 @@ def load_dbc_db(dbc_files): return dbc_db def main_program(config, dbc_db): - mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) - mqtt_client.username_pw_set(username=config['mqtt']['username'],password=config['mqtt']['password']) - mqtt_client.connect(config['mqtt']['host']) + converter = Converter(dbc_db, config) + + can_config = config.get('can', {}) + can_bus = can.interface.Bus(bustype = can_config.get('bustype', 'socketcan'), \ + channel = can_config.get('interface', 'can0'), \ + bitrate = can_config.get('bitrate', 125000)) - can_listener = CanListener(dbc_db, mqtt_client, config) - bus = can.interface.Bus(bustype='socketcan', channel=config['can']['interface'], bitrate=config['can']['bitrate']) - can.Notifier(bus, [can_listener]) + mqtt_handler = MqttHandler(config.get('mqtt', {}), can_bus, converter) - threading.Event().wait() + can_listener = CanListener(mqtt_handler, converter, config.get('resend_unchanged_events_after', 30)) + can.Notifier(can_bus, [can_listener]) + + mqtt_handler.loop_forever() diff --git a/can2mqtt/can_listener.py b/can2mqtt/can_listener.py index e642773..27cc9c2 100644 --- a/can2mqtt/can_listener.py +++ b/can2mqtt/can_listener.py @@ -1,44 +1,19 @@ import can class CanListener(can.Listener): - def __init__(self, dbc_db, mqtt_client, config): + def __init__(self, mqtt_handler, converter, resend_unchanged_events_after): self.last = {} - self.first_underscores_to_slash = False - self.prefix = False - self.resend_unchanged_events_after = 30 - if 'mqtt' in config: - if 'topic_names' in config['mqtt']: - if 'first_underscores_to_slash' in config['mqtt']['topic_names']: - self.first_underscores_to_slash = config['mqtt']['topic_names']['first_underscores_to_slash'] - if 'prefix' in config['mqtt']['topic_names']: - self.prefix = config['mqtt']['topic_names']['prefix'] - if 'resend_unchanged_events_after' in config: - self.resend_unchanged_events_after = config['resend_unchanged_events_after'] - - self.dbc_db = dbc_db - self.mqtt_client = mqtt_client + self.mqtt_handler = mqtt_handler + self.converter = converter + self.resend_unchanged_events_after = resend_unchanged_events_after def on_message_received(self, m): - msg = None - try: - msg = self.dbc_db.decode_message(m.arbitration_id, m.data) - except KeyError: - pass - - if msg != None: - self.handle_message(msg, m.timestamp) - - def handle_message(self, msg, timestamp): - for signal_id in msg: - topic = signal_id.lower() - if self.first_underscores_to_slash: - topic = topic.replace("_", "/", self.first_underscores_to_slash) - if self.prefix: - topic = self.prefix + topic - data = round(msg[signal_id], 5) + topic, data = self.converter.can2mqtt(m) + + if topic: if self.resend_unchanged_events_after == 0 or \ topic not in self.last or \ self.last[topic]['data'] != data or \ - timestamp - self.last[topic]['timestamp'] > self.resend_unchanged_events_after: - self.last[topic] = {'data': data, 'timestamp': timestamp} - self.mqtt_client.publish(topic, data) + m.timestamp - self.last[topic]['timestamp'] > self.resend_unchanged_events_after: + self.last[topic] = {'data': data, 'timestamp': m.timestamp} + self.mqtt_handler.publish(topic, data) diff --git a/can2mqtt/converter.py b/can2mqtt/converter.py new file mode 100644 index 0000000..7203586 --- /dev/null +++ b/can2mqtt/converter.py @@ -0,0 +1,67 @@ +import json +import can +import re + +class Converter(): + def __init__(self, dbc_db, config): + self.dbc_db = dbc_db + self.only_one_signal_per_message = config.get('only_one_signal_per_message', False) + self.send_to_can_from_topic = config.get('send_to_can_from_topic') + + name_conversion_config = config.get('name_conversion', {}) + self.first_underscores_to_slash = name_conversion_config.get('first_underscores_to_slash', False) + self.prefix = name_conversion_config.get('prefix') + + def can2mqtt(self, raw_can_msg): + signal_data = None + try: + signal_data = self.dbc_db.decode_message(raw_can_msg.arbitration_id, raw_can_msg.data) + except KeyError: + return None, None + + dbc_msg = self.dbc_db.get_message_by_frame_id(raw_can_msg.arbitration_id) + topic = self.message_name_to_topic(dbc_msg.name) + data = None + if self.only_one_signal_per_message: + data = round(signal_data[dbc_msg.signals[0].name], 5) + else: + data = {} + for signal_id in signal_data: + data[signal_id] = round(signal_data[signal_id], 5) + data = json.dumps(data) + return topic, data + + def mqtt2can(self, mqtt_message): + if not self.send_to_can_from_topic: + return + + sub_topic = re.sub("^%s/" % self.send_to_can_from_topic, "", mqtt_message.topic) + message_name = self.topic_to_message_name(sub_topic) + if not message_name: + return + + dbc_msg = self.dbc_db.get_message_by_name(message_name) + + signal_data = json.loads(mqtt_message.payload) + if self.only_one_signal_per_message: + signal_data = { dbc_msg.signals[0].name: signal_data } + + + + return can.Message(arbitration_id=dbc_msg.frame_id, data = dbc_msg.encode(signal_data)) + + def message_name_to_topic(self, message_name): + topic = message_name.lower() + if self.first_underscores_to_slash: + topic = topic.replace("_", "/", self.first_underscores_to_slash) + if self.prefix: + topic = self.prefix + topic + return topic + + def topic_to_message_name(self, topic): + message_name = topic.upper() + if self.first_underscores_to_slash: + message_name = message_name.replace("/", "_", self.first_underscores_to_slash) + if self.prefix: + message_name = re.sub("^#{self.prefix}", "", message_name) + return message_name \ No newline at end of file diff --git a/can2mqtt/mqtt_handler.py b/can2mqtt/mqtt_handler.py new file mode 100644 index 0000000..f69cad4 --- /dev/null +++ b/can2mqtt/mqtt_handler.py @@ -0,0 +1,51 @@ +import paho.mqtt.client as mqtt +import time +import json + +class MqttHandler(): + def __init__(self, mqtt_config, can_bus, converter): + self.mqtt_config = mqtt_config + self.can_bus = can_bus + self.converter = converter + + self.connect() + + def on_connect(self, client, userdata, flags, reason_code, properties): + print(f"Connected to MQTT broker with result code {reason_code}") + self.subscribe() + + def on_message(self, client, userdata, message): + raw_msg = None + try: + print("MQTT Message to be sent to can%s: %s" % (message.topic, message.payload)) + raw_msg = self.converter.mqtt2can(message) + except ValueError: + print('Decoding MQTT message has failed') + + if raw_msg: + print(raw_msg) + self.can_bus.send(raw_msg) + + def connect(self): + self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + + self.client.username_pw_set(username = self.mqtt_config.get('username'), password = self.mqtt_config.get('password')) + host = self.mqtt_config.get('host', 'localhost') + print("Trying to connect to MQTT broker at %s..." % host) + self.client.connect(host) + + def subscribe(self): + if self.converter.send_to_can_from_topic: + topic = "%s/#" % self.converter.send_to_can_from_topic + print(topic, "<<<") + self.client.subscribe(topic) + print("Listening to '%s' for incoming mqtt messages to send to can bus" % topic) + + def publish(self, topic, data): + self.client.publish(topic, data) + + def loop_forever(self): + self.client.loop_forever() diff --git a/config.example.yaml b/config.example.yaml index 2023e99..bcdf2c2 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -9,5 +9,6 @@ mqtt: host: some_host_or_ip username: my_mqtt_user password: my_mqtt_password - topic_names: - first_underscores_to_slash: 2 +send_to_can_from_topic: mqtt2can +name_conversion: + first_underscores_to_slash: 2