-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmqtt.js
executable file
·125 lines (108 loc) · 3.97 KB
/
mqtt.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
* http://www.steves-internet-guide.com/using-node-mqtt-client/
* https://github.com/mqttjs/async-mqtt
*
* Configure Tasmota with:
Backlog mqtthost grag.fritz.box; mqttport 1883; mqttuser <username>; mqttpassword <password>; topic <device_topic>;
*/
const mqtt = require('async-mqtt')
const winston = require('winston')
const { v4: uuidv4 } = require('uuid')
module.exports = function(config, god) {
var self = {
logger: {},
client: {},
triggers: {},
init: function() {
this.logger = winston.loggers.get('mqtt')
this.client = mqtt.connect(config.server, { clientId: config.clientId } )
this.logger.info("Connecting to mqtt server %s as %s", config.server, config.clientId)
this.publish = this.client.publish.bind(this.client)
this.client.on("error", async (error) => {
this.logger.error("Can't connect" + error)
// TODO Steve says this only happens on auth failures and they are non-recoverable - other errors don't trigger this callback
})
this.client.on("connect", async () => {
// TODO apparently 'connect' is called each second?!?
this.logger.info("Connected " + this.client.connected)
// this.client.subscribe('#') // for debugging or finding new messages
})
this.client.on('message', this._onMessage.bind(this))
god.terminateListeners.push(this.onTerminate.bind(this))
},
onTerminate: async function() {
await this.client.end()
},
_onMessage: function(topic, message, packet) {
let topic2 = topic
let loop = true
let found = false
while(loop) {
let trigger = this.triggers[topic2]
if (trigger) {
found = true
let keys = Object.keys(trigger)
for(let i=0; i < keys.length; i++) {
let t = trigger[keys[i]]
this.logger.info(t.id + ": " + message.toString())
t.callback(t, topic, message, packet)
}
}
// go one level more generic
if (topic2.indexOf('/') > 0) {
topic2 = topic2.replace(/(^|\/)[^/#]+(\/#)?$/, '/#')
} else {
loop = false
}
}
if (!found) {
// unrecognized mqtt message
this.logger.debug("unrecognized: " + topic + " -> " + message.toString().substr(0, 200))
return
}
},
/** adds a MQTT topic trigger
* topic: the MQTT topic.
* id: ID which will be passed to the callback (as trigger.id)
* callback: function(trigger, topic, message, packet)
* returns the trigger uuid, which can be used to remove the trigger again
*/
addTrigger: async function(topic, id, callback) {
if (!this.triggers[topic]) {
this.triggers[topic] = {}
this.logger.info("Subscribing to %s", topic)
await this.client.subscribe(topic)
}
let uuid = uuidv4()
this.triggers[topic][uuid] = {
uuid: uuid,
id: id,
callback: callback,
}
this.logger.debug("Adding trigger %s (%s) to subscription for %s", id, uuid, topic)
return uuid
},
removeTrigger: async function(topic, uuid) {
if (!this.triggers[topic]) {
this.logger.warn("Trying to remove trigger %s, but no active subscription for topic %s", uuid, topic)
return
}
if (!this.triggers[topic][uuid]) {
this.logger.warn("Trying to remove trigger %s for %s, but trigger not found", uuid, topic)
return
}
this.logger.debug("Removing trigger '%s' (%s) from subscription for %s", this.triggers[topic][uuid].id, uuid, topic)
delete this.triggers[topic][uuid]
if (!Object.keys(this.triggers[topic]).length) {
this.logger.info("Unsubscribing from " + topic)
await this.client.unsubscribe(topic)
delete this.triggers[topic]
}
},
publish: async(topic) => { // gets overwritten with this.client.publish(topic, message) in init()
this.logger.error("Trying to publish to topic %s before mqtt was initialized", topic)
},
}
self.init()
return self
}