Skip to content

Commit

Permalink
First beta of mqtt2can sending
Browse files Browse the repository at this point in the history
  • Loading branch information
tillsc committed Jan 25, 2025
1 parent 8981424 commit 9176330
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 45 deletions.
21 changes: 13 additions & 8 deletions can2mqtt/app.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import can
import cantools
import paho.mqtt.client as mqtt
import yaml
import threading
from can2mqtt.converter import Converter
from can2mqtt.can_listener import CanListener
from can2mqtt.mqtt_handler import MqttHandler

def load_config():
with open('config.yaml', 'r') as stream:
Expand All @@ -17,12 +18,16 @@ def load_dbc_db(dbc_files):
return dbc_db

def main_program(config, dbc_db):
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqtt_client.username_pw_set(username=config['mqtt']['username'],password=config['mqtt']['password'])
mqtt_client.connect(config['mqtt']['host'])
converter = Converter(dbc_db, config)

can_config = config.get('can', {})
can_bus = can.interface.Bus(bustype = can_config.get('bustype', 'socketcan'), \
channel = can_config.get('interface', 'can0'), \
bitrate = can_config.get('bitrate', 125000))

can_listener = CanListener(dbc_db, mqtt_client, config)
bus = can.interface.Bus(bustype='socketcan', channel=config['can']['interface'], bitrate=config['can']['bitrate'])
can.Notifier(bus, [can_listener])
mqtt_handler = MqttHandler(config.get('mqtt', {}), can_bus, converter)

threading.Event().wait()
can_listener = CanListener(mqtt_handler, converter, config.get('resend_unchanged_events_after', 30))
can.Notifier(can_bus, [can_listener])

mqtt_handler.loop_forever()
45 changes: 10 additions & 35 deletions can2mqtt/can_listener.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,19 @@
import can

class CanListener(can.Listener):
def __init__(self, dbc_db, mqtt_client, config):
def __init__(self, mqtt_handler, converter, resend_unchanged_events_after):
self.last = {}
self.first_underscores_to_slash = False
self.prefix = False
self.resend_unchanged_events_after = 30
if 'mqtt' in config:
if 'topic_names' in config['mqtt']:
if 'first_underscores_to_slash' in config['mqtt']['topic_names']:
self.first_underscores_to_slash = config['mqtt']['topic_names']['first_underscores_to_slash']
if 'prefix' in config['mqtt']['topic_names']:
self.prefix = config['mqtt']['topic_names']['prefix']
if 'resend_unchanged_events_after' in config:
self.resend_unchanged_events_after = config['resend_unchanged_events_after']

self.dbc_db = dbc_db
self.mqtt_client = mqtt_client
self.mqtt_handler = mqtt_handler
self.converter = converter
self.resend_unchanged_events_after = resend_unchanged_events_after

def on_message_received(self, m):
msg = None
try:
msg = self.dbc_db.decode_message(m.arbitration_id, m.data)
except KeyError:
pass

if msg != None:
self.handle_message(msg, m.timestamp)

def handle_message(self, msg, timestamp):
for signal_id in msg:
topic = signal_id.lower()
if self.first_underscores_to_slash:
topic = topic.replace("_", "/", self.first_underscores_to_slash)
if self.prefix:
topic = self.prefix + topic
data = round(msg[signal_id], 5)
topic, data = self.converter.can2mqtt(m)

if topic:
if self.resend_unchanged_events_after == 0 or \
topic not in self.last or \
self.last[topic]['data'] != data or \
timestamp - self.last[topic]['timestamp'] > self.resend_unchanged_events_after:
self.last[topic] = {'data': data, 'timestamp': timestamp}
self.mqtt_client.publish(topic, data)
m.timestamp - self.last[topic]['timestamp'] > self.resend_unchanged_events_after:
self.last[topic] = {'data': data, 'timestamp': m.timestamp}
self.mqtt_handler.publish(topic, data)
67 changes: 67 additions & 0 deletions can2mqtt/converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import json
import can
import re

class Converter():
def __init__(self, dbc_db, config):
self.dbc_db = dbc_db
self.only_one_signal_per_message = config.get('only_one_signal_per_message', False)
self.send_to_can_from_topic = config.get('send_to_can_from_topic')

name_conversion_config = config.get('name_conversion', {})
self.first_underscores_to_slash = name_conversion_config.get('first_underscores_to_slash', False)
self.prefix = name_conversion_config.get('prefix')

def can2mqtt(self, raw_can_msg):
signal_data = None
try:
signal_data = self.dbc_db.decode_message(raw_can_msg.arbitration_id, raw_can_msg.data)
except KeyError:
return None, None

dbc_msg = self.dbc_db.get_message_by_frame_id(raw_can_msg.arbitration_id)
topic = self.message_name_to_topic(dbc_msg.name)
data = None
if self.only_one_signal_per_message:
data = round(signal_data[dbc_msg.signals[0].name], 5)
else:
data = {}
for signal_id in signal_data:
data[signal_id] = round(signal_data[signal_id], 5)
data = json.dumps(data)
return topic, data

def mqtt2can(self, mqtt_message):
if not self.send_to_can_from_topic:
return

sub_topic = re.sub("^%s/" % self.send_to_can_from_topic, "", mqtt_message.topic)
message_name = self.topic_to_message_name(sub_topic)
if not message_name:
return

dbc_msg = self.dbc_db.get_message_by_name(message_name)

signal_data = json.loads(mqtt_message.payload)
if self.only_one_signal_per_message:
signal_data = { dbc_msg.signals[0].name: signal_data }



return can.Message(arbitration_id=dbc_msg.frame_id, data = dbc_msg.encode(signal_data))

def message_name_to_topic(self, message_name):
topic = message_name.lower()
if self.first_underscores_to_slash:
topic = topic.replace("_", "/", self.first_underscores_to_slash)
if self.prefix:
topic = self.prefix + topic
return topic

def topic_to_message_name(self, topic):
message_name = topic.upper()
if self.first_underscores_to_slash:
message_name = message_name.replace("/", "_", self.first_underscores_to_slash)
if self.prefix:
message_name = re.sub("^#{self.prefix}", "", message_name)
return message_name
51 changes: 51 additions & 0 deletions can2mqtt/mqtt_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import paho.mqtt.client as mqtt
import time
import json

class MqttHandler():
def __init__(self, mqtt_config, can_bus, converter):
self.mqtt_config = mqtt_config
self.can_bus = can_bus
self.converter = converter

self.connect()

def on_connect(self, client, userdata, flags, reason_code, properties):
print(f"Connected to MQTT broker with result code {reason_code}")
self.subscribe()

def on_message(self, client, userdata, message):
raw_msg = None
try:
print("MQTT Message to be sent to can%s: %s" % (message.topic, message.payload))
raw_msg = self.converter.mqtt2can(message)
except ValueError:
print('Decoding MQTT message has failed')

if raw_msg:
print(raw_msg)
self.can_bus.send(raw_msg)

def connect(self):
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)

self.client.on_connect = self.on_connect
self.client.on_message = self.on_message

self.client.username_pw_set(username = self.mqtt_config.get('username'), password = self.mqtt_config.get('password'))
host = self.mqtt_config.get('host', 'localhost')
print("Trying to connect to MQTT broker at %s..." % host)
self.client.connect(host)

def subscribe(self):
if self.converter.send_to_can_from_topic:
topic = "%s/#" % self.converter.send_to_can_from_topic
print(topic, "<<<")
self.client.subscribe(topic)
print("Listening to '%s' for incoming mqtt messages to send to can bus" % topic)

def publish(self, topic, data):
self.client.publish(topic, data)

def loop_forever(self):
self.client.loop_forever()
5 changes: 3 additions & 2 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ mqtt:
host: some_host_or_ip
username: my_mqtt_user
password: my_mqtt_password
topic_names:
first_underscores_to_slash: 2
send_to_can_from_topic: mqtt2can
name_conversion:
first_underscores_to_slash: 2

0 comments on commit 9176330

Please sign in to comment.