Skip to content

Commit

Permalink
client: Report failure on timeout in mid-message timeout
Browse files Browse the repository at this point in the history
Fixes error processing on network reads:

1) Treat EOF as an error, since the connection is closed (FIN) from the
server side. If we didn't we would try to read (in the next iteration)
from the same socket that has been already closed and get an error
ENOTCONN.
Before the fix:
D (13760) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF
E (13800) transport_base: tcp_read error, errno=Socket is not connected
E (13800) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128
D (13810) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0
I (13820) esp_mqtt_demo: MQTT_EVENT_ERROR
E (13830) esp_mqtt_demo: Last error reported from esp-tls: 0x8008
E (13830) esp_mqtt_demo: Last error captured as transport's socket errno: 0x80
I (13840) esp_mqtt_demo: Last errno string (Socket is not connected)
E (13850) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1
D (13860) mqtt_client: Reconnect after 10000 ms
D (13860) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (13870) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED
After the fix:
E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): EOF
E (12420) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=128
D (12430) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0
I (12440) esp_mqtt_demo: MQTT_EVENT_ERROR
E (12450) esp_mqtt_demo: Last error reported from esp-tls: 0x8008
I (12450) esp_mqtt_demo: Last errno string (Success)
E (12460) mqtt_client: mqtt_process_receive: mqtt_message_receive() returned -1
D (12470) mqtt_client: Reconnect after 10000 ms
D (12470) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (12480) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED

2) Treat timeouts in the middle of MQTT message reading as errors (if
timeouted for the second time and didn't read a byte)
Before the fix:
D (9160) mqtt_client: mqtt_message_receive: read "remaining length" byte: 0x2
D (9170) mqtt_client: mqtt_message_receive: total message length: 4 (already read: 2)
D (19190) mqtt_client: mqtt_message_receive: read_len=0
D (19190) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read(): call timed out before data was ready!
E (19200) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned 0
E (19210) mqtt_client: MQTT connect failed
D (19220) mqtt_client: Reconnect after 10000 ms
D (19220) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (19230) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED
After the fix:
D (19190) mqtt_client: mqtt_message_receive: read_len=0
E (19190) mqtt_client: Network timeout while reading MQTT message
E (19200) mqtt_client: esp_mqtt_handle_transport_read_error: transport_read() error: errno=119
D (19210) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=0
I (19220) esp_mqtt_demo: MQTT_EVENT_ERROR
I (19220) esp_mqtt_demo: Last errno string (Success)
E (19230) mqtt_client: esp_mqtt_connect: mqtt_message_receive() returned -1
E (19240) mqtt_client: MQTT connect failed
D (19240) mqtt_client: Reconnect after 10000 ms
D (19240) esp_mqtt_demo: Event dispatched from event loop base=MQTT_EVENTS, event_id=2
I (19250) esp_mqtt_demo: MQTT_EVENT_DISCONNECTED
(Note that the above log is from mid-message timeout of CONNECT message,
which was hadled before the fix. If the mid-message timeout ocurs with
for example SUBACK, the current version would repeatably resend
susbscribe message)

Merges #232
  • Loading branch information
david-cermak committed Nov 10, 2023
1 parent 7894dd0 commit ddde502
Showing 1 changed file with 44 additions and 18 deletions.
62 changes: 44 additions & 18 deletions mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,36 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
static void esp_mqtt_client_dispatch_transport_error(esp_mqtt_client_handle_t client);
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client);

static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client)
/**
* @brief Processes error reported from transport layer (considering the message read status)
*
* @param err: Error reported from TCP transport
* @param client: MQTT client handle
* @param mid_message: True if the error occured when reading incomplete message
*
* @return - 0 on Timeout
* - -1 on Timeout with incomplete message
* - -2 on Error or EOF
*/
static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client, bool mid_message)
{
if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) {
ESP_LOGD(TAG, "%s: transport_read(): EOF", __func__);
return 0;
}

if (err == ERR_TCP_TRANSPORT_CONNECTION_TIMEOUT) {
if (mid_message) {
// No error message, because we could've read with timeout 0 (caller will decide)
return -1;
}
// Not an error, continue
ESP_LOGD(TAG, "%s: transport_read(): call timed out before data was ready!", __func__);
return 0;
}

if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) {
ESP_LOGE(TAG, "%s: transport_read(): EOF", __func__);
}

ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
esp_mqtt_client_dispatch_transport_error(client);
return -1;
return -2;
}

#if MQTT_ENABLE_SSL
Expand Down Expand Up @@ -1094,7 +1109,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
client->config->network_timeout_ms);
if (ret <= 0) {
return esp_mqtt_handle_transport_read_error(ret, client) == 0 ? ESP_OK : ESP_FAIL;
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
}

msg_data_len = ret;
Expand Down Expand Up @@ -1178,11 +1193,13 @@ static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_

/*
* Returns:
* -1 in case of failure
* -2 in case of failure or EOF (clean connection closure)
* -1 timeout while in-the-middle of the messge
* 0 if no message has been received
* 1 if a message has been received and placed to client->mqtt_state:
* message length: client->mqtt_state.message_length
* message content: client->mqtt_state.in_buffer
*
*/
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms)
{
Expand All @@ -1198,7 +1215,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client);
return esp_mqtt_handle_transport_read_error(read_len, client, false);
}
ESP_LOGD(TAG, "%s: first byte: 0x%x", __func__, *buf);
/*
Expand All @@ -1224,7 +1241,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
*/
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client);
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}
ESP_LOGD(TAG, "%s: read \"remaining length\" byte: 0x%x", __func__, *buf);
buf++;
Expand All @@ -1245,7 +1262,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
read_len = esp_transport_read(t, (char *)buf, client->mqtt_state.in_buffer_read_len - fixed_header_len + 2, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client);
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}
client->mqtt_state.in_buffer_read_len += read_len;
buf += read_len;
Expand Down Expand Up @@ -1276,7 +1293,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
read_len = esp_transport_read(t, (char *)buf, total_len - client->mqtt_state.in_buffer_read_len, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client);
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}
client->mqtt_state.in_buffer_read_len += read_len;
if (client->mqtt_state.in_buffer_read_len < total_len) {
Expand All @@ -1290,23 +1307,32 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
return 1;
err:
esp_mqtt_client_dispatch_transport_error(client);
return -1;
return -2;
}

static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
{
uint8_t msg_type = 0, msg_qos = 0;
uint16_t msg_id = 0;
size_t previous_in_buffer_read_len = client->mqtt_state.in_buffer_read_len;

/* non-blocking receive in order not to block other tasks */
int recv = mqtt_message_receive(client, 0);
if (recv < 0) {
if (recv == 0) { // Timeout
return ESP_OK;
}
if (recv == -1) { // Mid-message timeout
if (previous_in_buffer_read_len == client->mqtt_state.in_buffer_read_len) {
// Report error only if didn't receive anything since previous iteration
ESP_LOGE(TAG, "%s: Network timeout while reading MQTT message", __func__);
return ESP_FAIL;
}
return ESP_OK; // Treat as standard timeout (keep reading the message)
}
if (recv < 0) { // Other error
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, recv);
return ESP_FAIL;
}
if (recv == 0) {
return ESP_OK;
}
int read_len = client->mqtt_state.message_length;

// If the message was valid, get the type, quality of service and id of the message
Expand Down

0 comments on commit ddde502

Please sign in to comment.