diff --git a/src/submitters/mqtt_submitter.cpp b/src/submitters/mqtt_submitter.cpp index 2f20b6c..524e5e6 100644 --- a/src/submitters/mqtt_submitter.cpp +++ b/src/submitters/mqtt_submitter.cpp @@ -11,6 +11,7 @@ MQTTSubmitter::MQTTSubmitter(String location, const char* host, int port) : port(port), started(false), disconnected(false), + failure(false), pendingAcks(), client(new AsyncMqttClient()) { @@ -40,12 +41,22 @@ void MQTTSubmitter::initialise() bool MQTTSubmitter::ready() { - return client->connected(); + if (disconnected) + { + logln("MQTT was disconnected, trying to reconnect"); + disconnected = false; + client->connect(); + return false; + } + else + { + return client->connected(); + } } bool MQTTSubmitter::failed() { - return disconnected; + return failure; } void MQTTSubmitter::onConnect(bool sessionPresent) @@ -56,7 +67,14 @@ void MQTTSubmitter::onConnect(bool sessionPresent) void MQTTSubmitter::onDisconnect(AsyncMqttClientDisconnectReason reason) { logfmt("MQTT is down because %d\n", reason); - disconnected = true; + if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) + { + disconnected = true; + } + else + { + failure = true; + } } void MQTTSubmitter::onPublishAck(uint16_t packetId) @@ -66,30 +84,44 @@ void MQTTSubmitter::onPublishAck(uint16_t packetId) void MQTTSubmitter::sendReading(String name, double value) { - String topic = "environment/"; - topic.concat(location); - topic.concat("/"); - topic.concat(name); - - String valueString = String(value); - - logfmt("MQTT <- %s = %f\n", topic.c_str(), value); - - uint16_t msgId = client->publish(topic.c_str(), 1, true, valueString.c_str(), valueString.length()); - pendingAcks.insert(msgId); + if (disconnected) + { + logln("Attempting to send data to disconnected MQTT server"); + } + else + { + String topic = "environment/"; + topic.concat(location); + topic.concat("/"); + topic.concat(name); + + String valueString = String(value); + + logfmt("MQTT <- %s = %f\n", topic.c_str(), value); + + uint16_t msgId = client->publish(topic.c_str(), 1, true, valueString.c_str(), valueString.length()); + pendingAcks.insert(msgId); + } } void MQTTSubmitter::sendReading(String name, String value) { - String topic = "environment/"; - topic.concat(location); - topic.concat("/"); - topic.concat(name); - - logfmt("MQTT <- %s = %s\n", topic.c_str(), value.c_str()); - - uint16_t msgId = client->publish(topic.c_str(), 1, true, value.c_str(), value.length()); - pendingAcks.insert(msgId); + if (disconnected) + { + logln("Attempting to send data to disconnected MQTT server"); + } + else + { + String topic = "environment/"; + topic.concat(location); + topic.concat("/"); + topic.concat(name); + + logfmt("MQTT <- %s = %s\n", topic.c_str(), value.c_str()); + + uint16_t msgId = client->publish(topic.c_str(), 1, true, value.c_str(), value.length()); + pendingAcks.insert(msgId); + } } bool MQTTSubmitter::complete() diff --git a/src/submitters/mqtt_submitter.h b/src/submitters/mqtt_submitter.h index 23e3268..7782a05 100644 --- a/src/submitters/mqtt_submitter.h +++ b/src/submitters/mqtt_submitter.h @@ -12,6 +12,7 @@ class MQTTSubmitter: public ReadingSubmitter { int port; bool started; volatile bool disconnected; + volatile bool failure; AsyncMqttClient *client; std::set pendingAcks;