Skip to content

Commit

Permalink
Try to reconnect on MQTT disconnection while waiting for ready
Browse files Browse the repository at this point in the history
  • Loading branch information
GJKrupa committed Jul 13, 2022
1 parent 052b133 commit 3c1883c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 23 deletions.
78 changes: 55 additions & 23 deletions src/submitters/mqtt_submitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ MQTTSubmitter::MQTTSubmitter(String location, const char* host, int port) :
port(port),
started(false),
disconnected(false),
failure(false),
pendingAcks(),
client(new AsyncMqttClient())
{
Expand Down Expand Up @@ -40,12 +41,22 @@ void MQTTSubmitter::initialise()

bool MQTTSubmitter::ready()
{
return client->connected();
if (disconnected)
{
logln("MQTT was disconnected, trying to reconnect");
disconnected = false;
client->connect();
return false;
}
else
{
return client->connected();
}
}

bool MQTTSubmitter::failed()
{
return disconnected;
return failure;
}

void MQTTSubmitter::onConnect(bool sessionPresent)
Expand All @@ -56,7 +67,14 @@ void MQTTSubmitter::onConnect(bool sessionPresent)
void MQTTSubmitter::onDisconnect(AsyncMqttClientDisconnectReason reason)
{
logfmt("MQTT is down because %d\n", reason);
disconnected = true;
if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED)
{
disconnected = true;
}
else
{
failure = true;
}
}

void MQTTSubmitter::onPublishAck(uint16_t packetId)
Expand All @@ -66,30 +84,44 @@ void MQTTSubmitter::onPublishAck(uint16_t packetId)

void MQTTSubmitter::sendReading(String name, double value)
{
String topic = "environment/";
topic.concat(location);
topic.concat("/");
topic.concat(name);

String valueString = String(value);

logfmt("MQTT <- %s = %f\n", topic.c_str(), value);

uint16_t msgId = client->publish(topic.c_str(), 1, true, valueString.c_str(), valueString.length());
pendingAcks.insert(msgId);
if (disconnected)
{
logln("Attempting to send data to disconnected MQTT server");
}
else
{
String topic = "environment/";
topic.concat(location);
topic.concat("/");
topic.concat(name);

String valueString = String(value);

logfmt("MQTT <- %s = %f\n", topic.c_str(), value);

uint16_t msgId = client->publish(topic.c_str(), 1, true, valueString.c_str(), valueString.length());
pendingAcks.insert(msgId);
}
}

void MQTTSubmitter::sendReading(String name, String value)
{
String topic = "environment/";
topic.concat(location);
topic.concat("/");
topic.concat(name);

logfmt("MQTT <- %s = %s\n", topic.c_str(), value.c_str());

uint16_t msgId = client->publish(topic.c_str(), 1, true, value.c_str(), value.length());
pendingAcks.insert(msgId);
if (disconnected)
{
logln("Attempting to send data to disconnected MQTT server");
}
else
{
String topic = "environment/";
topic.concat(location);
topic.concat("/");
topic.concat(name);

logfmt("MQTT <- %s = %s\n", topic.c_str(), value.c_str());

uint16_t msgId = client->publish(topic.c_str(), 1, true, value.c_str(), value.length());
pendingAcks.insert(msgId);
}
}

bool MQTTSubmitter::complete()
Expand Down
1 change: 1 addition & 0 deletions src/submitters/mqtt_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class MQTTSubmitter: public ReadingSubmitter {
int port;
bool started;
volatile bool disconnected;
volatile bool failure;
AsyncMqttClient *client;
std::set<uint16_t> pendingAcks;

Expand Down

0 comments on commit 3c1883c

Please sign in to comment.