Skip to content

Commit

Permalink
Merge pull request #691 from SignalK/sklistener
Browse files Browse the repository at this point in the history
Pass received Json objects always by value
  • Loading branch information
mairas authored Apr 16, 2024
2 parents 8672fa2 + f6f6c07 commit cba868d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
27 changes: 16 additions & 11 deletions src/sensesp/signalk/signalk_ws_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static void websocket_event_handler(void* handler_args, esp_event_base_t base,
case WEBSOCKET_EVENT_DATA:
// check if the payload is text)
if (data->op_code == 0x01) {
ws_client->on_receive_delta((uint8_t*)data->data_ptr);
ws_client->on_receive_delta((uint8_t*)data->data_ptr, data->data_len);
}
break;
case WEBSOCKET_EVENT_ERROR:
Expand Down Expand Up @@ -206,14 +206,19 @@ void SKWSClient::subscribe_listeners() {
*
* @param payload
*/
void SKWSClient::on_receive_delta(uint8_t* payload) {
void SKWSClient::on_receive_delta(uint8_t* payload, size_t length) {
// Need to work on null-terminated strings
char buf[length + 1];
memcpy(buf, payload, length);
buf[length] = 0;

#ifdef SIGNALK_PRINT_RCV_DELTA
debugD("Websocket payload received: %s", (char*)payload);
debugD("Websocket payload received: %s", (char*)buf);
#endif

JsonDocument message;
// JsonObject message = jsonDoc.as<JsonObject>();
auto error = deserializeJson(message, payload);
auto error = deserializeJson(message, buf);

if (!error) {
if (message.containsKey("updates")) {
Expand Down Expand Up @@ -251,13 +256,11 @@ void SKWSClient::on_receive_updates(JsonDocument& message) {
JsonArray values = update["values"];

for (size_t vi = 0; vi < values.size(); vi++) {
JsonObject value = values[vi];

const char* path = value["path"];
JsonDocument value_doc = JsonDocument((JsonObject)values[vi]);

// push all values into a separate list for processing
// in the main task
received_updates_.push_back(value);
received_updates_.push_back(value_doc);
}
}
release_received_updates_semaphore();
Expand All @@ -279,9 +282,10 @@ void SKWSClient::process_received_updates() {
take_received_updates_semaphore();
int num_updates = received_updates_.size();
while (!received_updates_.empty()) {
JsonObject value = received_updates_.front();
received_updates_.pop_front();
const char* path = value["path"];
JsonDocument& doc = received_updates_.front();

const char* path = doc["path"];
JsonObject value = doc.as<JsonObject>();

for (size_t i = 0; i < listeners.size(); i++) {
SKListener* listener = listeners[i];
Expand All @@ -296,6 +300,7 @@ void SKWSClient::process_received_updates() {
listener->parse_value(value);
}
}
received_updates_.pop_front();
}
release_received_updates_semaphore();
delta_rx_count_producer_.set(num_updates);
Expand Down
4 changes: 2 additions & 2 deletions src/sensesp/signalk/signalk_ws_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class SKWSClient : public Configurable,
void on_disconnected();
void on_error();
void on_connected();
void on_receive_delta(uint8_t* payload);
void on_receive_delta(uint8_t* payload, size_t length);
void on_receive_updates(JsonDocument& message);
void on_receive_put(JsonDocument& message);
void connect();
Expand Down Expand Up @@ -120,7 +120,7 @@ class SKWSClient : public Configurable,

SemaphoreHandle_t received_updates_semaphore_ =
xSemaphoreCreateRecursiveMutex();
std::list<JsonObject> received_updates_;
std::list<JsonDocument> received_updates_;

/////////////////////////////////////////////////////////
// methods for all tasks
Expand Down

0 comments on commit cba868d

Please sign in to comment.