Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass received Json objects always by value #691

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions src/sensesp/signalk/signalk_ws_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static void websocket_event_handler(void* handler_args, esp_event_base_t base,
case WEBSOCKET_EVENT_DATA:
// check if the payload is text)
if (data->op_code == 0x01) {
ws_client->on_receive_delta((uint8_t*)data->data_ptr);
ws_client->on_receive_delta((uint8_t*)data->data_ptr, data->data_len);
}
break;
case WEBSOCKET_EVENT_ERROR:
Expand Down Expand Up @@ -206,14 +206,19 @@ void SKWSClient::subscribe_listeners() {
*
* @param payload
*/
void SKWSClient::on_receive_delta(uint8_t* payload) {
void SKWSClient::on_receive_delta(uint8_t* payload, size_t length) {
// Need to work on null-terminated strings
char buf[length + 1];
memcpy(buf, payload, length);
buf[length] = 0;

#ifdef SIGNALK_PRINT_RCV_DELTA
debugD("Websocket payload received: %s", (char*)payload);
debugD("Websocket payload received: %s", (char*)buf);
#endif

JsonDocument message;
// JsonObject message = jsonDoc.as<JsonObject>();
auto error = deserializeJson(message, payload);
auto error = deserializeJson(message, buf);

if (!error) {
if (message.containsKey("updates")) {
Expand Down Expand Up @@ -251,13 +256,11 @@ void SKWSClient::on_receive_updates(JsonDocument& message) {
JsonArray values = update["values"];

for (size_t vi = 0; vi < values.size(); vi++) {
JsonObject value = values[vi];

const char* path = value["path"];
JsonDocument value_doc = JsonDocument((JsonObject)values[vi]);

// push all values into a separate list for processing
// in the main task
received_updates_.push_back(value);
received_updates_.push_back(value_doc);
}
}
release_received_updates_semaphore();
Expand All @@ -279,9 +282,10 @@ void SKWSClient::process_received_updates() {
take_received_updates_semaphore();
int num_updates = received_updates_.size();
while (!received_updates_.empty()) {
JsonObject value = received_updates_.front();
received_updates_.pop_front();
const char* path = value["path"];
JsonDocument& doc = received_updates_.front();

const char* path = doc["path"];
JsonObject value = doc.as<JsonObject>();

for (size_t i = 0; i < listeners.size(); i++) {
SKListener* listener = listeners[i];
Expand All @@ -296,6 +300,7 @@ void SKWSClient::process_received_updates() {
listener->parse_value(value);
}
}
received_updates_.pop_front();
}
release_received_updates_semaphore();
delta_rx_count_producer_.set(num_updates);
Expand Down
4 changes: 2 additions & 2 deletions src/sensesp/signalk/signalk_ws_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class SKWSClient : public Configurable,
void on_disconnected();
void on_error();
void on_connected();
void on_receive_delta(uint8_t* payload);
void on_receive_delta(uint8_t* payload, size_t length);
void on_receive_updates(JsonDocument& message);
void on_receive_put(JsonDocument& message);
void connect();
Expand Down Expand Up @@ -120,7 +120,7 @@ class SKWSClient : public Configurable,

SemaphoreHandle_t received_updates_semaphore_ =
xSemaphoreCreateRecursiveMutex();
std::list<JsonObject> received_updates_;
std::list<JsonDocument> received_updates_;

/////////////////////////////////////////////////////////
// methods for all tasks
Expand Down
Loading