diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index c7cb700a9..f40e5dcf8 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -279,8 +279,10 @@ template<> inline int8_t z_drop(z_owned_publisher_t* v) { return z_undeclare_pub template<> inline void z_drop(z_owned_keyexpr_t* v) { z_keyexpr_drop(v); } template<> inline void z_drop(z_owned_config_t* v) { z_config_drop(v); } template<> inline void z_drop(z_owned_scouting_config_t* v) { z_scouting_config_drop(v); } +#if Z_FEATURE_SUBSCRIPTION==1 template<> inline int8_t z_drop(z_owned_pull_subscriber_t* v) { return z_undeclare_pull_subscriber(v); } template<> inline int8_t z_drop(z_owned_subscriber_t* v) { return z_undeclare_subscriber(v); } +#endif template<> inline int8_t z_drop(z_owned_queryable_t* v) { return z_undeclare_queryable(v); } template<> inline void z_drop(z_owned_reply_t* v) { z_reply_drop(v); } template<> inline void z_drop(z_owned_hello_t* v) { z_hello_drop(v); } diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index c348d16b7..ce1bdd36e 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -43,9 +43,9 @@ // c11 atomic variant #define _ZP_RC_CNT_TYPE _z_atomic(unsigned int) -#define _ZP_RC_OP_INIT_CNT _z_atomic_store_explicit(&p.in->_cnt, 1, _z_memory_order_relaxed); -#define _ZP_RC_OP_INCR_CNT _z_atomic_fetch_add_explicit(&p->in->_cnt, 1, _z_memory_order_relaxed); -#define _ZP_RC_OP_DECR_AND_CMP _z_atomic_fetch_sub_explicit(&p->in->_cnt, 1, _z_memory_order_release) > 1 +#define _ZP_RC_OP_INIT_CNT _z_atomic_store_explicit(&p.in->_cnt, (unsigned int)1, _z_memory_order_relaxed); +#define _ZP_RC_OP_INCR_CNT _z_atomic_fetch_add_explicit(&p->in->_cnt, (unsigned int)1, _z_memory_order_relaxed); +#define _ZP_RC_OP_DECR_AND_CMP _z_atomic_fetch_sub_explicit(&p->in->_cnt, (unsigned int)1, _z_memory_order_release) > 1 #define _ZP_RC_OP_SYNC atomic_thread_fence(_z_memory_order_acquire); #else // ZENOH_C_STANDARD == 99 diff --git a/include/zenoh-pico/system/platform/espidf.h b/include/zenoh-pico/system/platform/espidf.h index 6210a9abc..7511cb72a 100644 --- a/include/zenoh-pico/system/platform/espidf.h +++ b/include/zenoh-pico/system/platform/espidf.h @@ -17,6 +17,7 @@ #include #include +#include #include #include "zenoh-pico/config.h" @@ -24,8 +25,20 @@ #if Z_FEATURE_MULTI_THREAD == 1 #include -typedef TaskHandle_t zp_task_t; -typedef void *zp_task_attr_t; // Not used in ESP32 +typedef struct { + const char *name; + UBaseType_t priority; + size_t stack_depth; +#if (configSUPPORT_STATIC_ALLOCATION == 1) + _Bool static_allocation; + StackType_t *stack_buffer; + StaticTask_t *task_buffer; +#endif /* SUPPORT_STATIC_ALLOCATION */ +} zp_task_attr_t; +typedef struct { + TaskHandle_t handle; + EventGroupHandle_t join_event; +} zp_task_t; typedef pthread_mutex_t zp_mutex_t; typedef pthread_cond_t zp_condvar_t; #endif // Z_FEATURE_MULTI_THREAD == 1 @@ -39,7 +52,11 @@ typedef struct { int _fd; #endif #if Z_FEATURE_LINK_SERIAL == 1 - uart_port_t _serial; + struct { + uart_port_t _serial; + uint8_t *before_cobs; + uint8_t *after_cobs; + }; #endif }; } _z_sys_net_socket_t; diff --git a/src/api/api.c b/src/api/api.c index 73350bd23..a2ded70d4 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -647,6 +647,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint #endif ); +#if Z_FEATURE_SUBSCRIPTION == 1 // Trigger local subscriptions _z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len, _z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority) @@ -655,7 +656,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint opt.attachment #endif ); - +#endif return ret; } @@ -746,7 +747,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l opt.attachment #endif ); - +#if Z_FEATURE_SUBSCRIPTION == 1 // Trigger local subscriptions _z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT #if Z_FEATURE_ATTACHMENT == 1 @@ -754,7 +755,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l opt.attachment #endif ); - +#endif return ret; } diff --git a/src/session/subscription.c b/src/session/subscription.c index 1b9345b0d..04cb808fd 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -184,8 +184,11 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 +#if Z_FEATURE_SUBSCRIPTION == 1 - _Z_DEBUG("Resolving %d - %s on mapping 0x%x", keyexpr._id, keyexpr._suffix, _z_keyexpr_mapping_id(&keyexpr)); +// _Z_DEBUG(" %x - %s", keyexpr._id, keyexpr._suffix); +// _Z_DEBUG("Resolving %d - %s on mapping 0x%x", keyexpr._id, keyexpr._suffix, _z_keyexpr_mapping_id(&keyexpr)); +#endif _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr); _Z_DEBUG("Triggering subs for %d - %s", key._id, key._suffix); if (key._suffix != NULL) { diff --git a/src/system/espidf/network.c b/src/system/espidf/network.c index f11d60658..f750436a1 100644 --- a/src/system/espidf/network.c +++ b/src/system/espidf/network.c @@ -591,9 +591,10 @@ int8_t _z_open_serial_from_dev(_z_sys_net_socket_t *sock, char *dev, uint32_t ba uart_set_pin(sock->_serial, txpin, rxpin, UART_PIN_NO_CHANGE, UART_PIN_NO_CHANGE); const int uart_buffer_size = (1024 * 2); - QueueHandle_t uart_queue; - uart_driver_install(sock->_serial, uart_buffer_size, 0, 100, &uart_queue, 0); - + uart_driver_install(sock->_serial, uart_buffer_size, 0, 100, NULL, 0); + uart_flush_input(sock->_serial); + sock->after_cobs = (uint8_t *)zp_malloc(_Z_SERIAL_MFS_SIZE); + sock->before_cobs = (uint8_t *)zp_malloc(_Z_SERIAL_MAX_COBS_BUF_SIZE); return ret; } @@ -622,65 +623,75 @@ int8_t _z_listen_serial_from_dev(_z_sys_net_socket_t *sock, char *dev, uint32_t return ret; } -void _z_close_serial(_z_sys_net_socket_t *sock) { uart_driver_delete(sock->_serial); } +void _z_close_serial(_z_sys_net_socket_t *sock) { + printf("Closing serial %d\n", sock->_serial); + uart_wait_tx_done(sock->_serial, 1000); + uart_flush(sock->_serial); + uart_driver_delete(sock->_serial); + zp_free(sock->after_cobs); + zp_free(sock->before_cobs); +} size_t _z_read_serial(const _z_sys_net_socket_t sock, uint8_t *ptr, size_t len) { int8_t ret = _Z_RES_OK; - uint8_t *before_cobs = (uint8_t *)zp_malloc(_Z_SERIAL_MAX_COBS_BUF_SIZE); size_t rb = 0; - for (size_t i = 0; i < _Z_SERIAL_MAX_COBS_BUF_SIZE; i++) { - size_t len = 0; - do { - uart_get_buffered_data_len(sock._serial, &len); - if (len < 1) { - zp_sleep_ms(10); // FIXME: Yield by sleeping. - } else { - break; + while (rb < _Z_SERIAL_MAX_COBS_BUF_SIZE) { + int r = uart_read_bytes(sock._serial, &sock.before_cobs[rb], 1, 1000); + if (r == 0) { + _Z_DEBUG("Timeout reading from serial"); + if ( rb == 0 ) { + ret = _Z_ERR_GENERIC; } - } while (1); - uart_read_bytes(sock._serial, &before_cobs[i], 1, 100); - rb = rb + (size_t)1; - if (before_cobs[i] == (uint8_t)0x00) { break; + } else if (r == 1) { + rb = rb + (size_t)1; + if (sock.before_cobs[rb-1] == (uint8_t)0x00) { + break; + } + } else { + _Z_ERROR("Error reading from serial"); + ret = _Z_ERR_GENERIC; } } - - uint8_t *after_cobs = (uint8_t *)zp_malloc(_Z_SERIAL_MFS_SIZE); - size_t trb = _z_cobs_decode(before_cobs, rb, after_cobs); - - size_t i = 0; uint16_t payload_len = 0; - for (i = 0; i < sizeof(payload_len); i++) { - payload_len |= (after_cobs[i] << ((uint8_t)i * (uint8_t)8)); - } - if (trb == (size_t)(payload_len + (uint16_t)6)) { - (void)memcpy(ptr, &after_cobs[i], payload_len); - i = i + (size_t)payload_len; - - uint32_t crc = 0; - for (uint8_t j = 0; j < sizeof(crc); j++) { - crc |= (uint32_t)(after_cobs[i] << (j * (uint8_t)8)); - i = i + (size_t)1; + if (ret == _Z_RES_OK) { + _Z_DEBUG("Read %u bytes from serial", rb); + size_t trb = _z_cobs_decode(sock.before_cobs, rb, sock.after_cobs); + _Z_DEBUG("Decoded %u bytes from serial", trb); + size_t i = 0; + for (i = 0; i < sizeof(payload_len); i++) { + payload_len |= (sock.after_cobs[i] << ((uint8_t)i * (uint8_t)8)); } + _Z_DEBUG("payload_len = %u <= %X %X", payload_len, sock.after_cobs[1], sock.after_cobs[0]); + + if (trb == (size_t)(payload_len + (uint16_t)6)) { + (void)memcpy(ptr, &sock.after_cobs[i], payload_len); + i = i + (size_t)payload_len; + + uint32_t crc = 0; + for (uint8_t j = 0; j < sizeof(crc); j++) { + crc |= (uint32_t)(sock.after_cobs[i] << (j * (uint8_t)8)); + i = i + (size_t)1; + } - uint32_t c_crc = _z_crc32(ptr, payload_len); - if (c_crc != crc) { + uint32_t c_crc = _z_crc32(ptr, payload_len); + if (c_crc != crc) { + _Z_ERROR("CRC mismatch: %d != %d ", c_crc, crc); + ret = _Z_ERR_GENERIC; + } + } else { + _Z_ERROR("length mismatch => %d <> %d ", trb, payload_len + (uint16_t)6); ret = _Z_ERR_GENERIC; } - } else { - ret = _Z_ERR_GENERIC; } - zp_free(before_cobs); - zp_free(after_cobs); - rb = payload_len; if (ret != _Z_RES_OK) { rb = SIZE_MAX; } - + _Z_DEBUG("return _z_read_serial() = %d ", rb); return rb; } diff --git a/src/system/espidf/system.c b/src/system/espidf/system.c index 12633ad2b..47194c31e 100644 --- a/src/system/espidf/system.c +++ b/src/system/espidf/system.c @@ -50,48 +50,77 @@ void zp_free(void *ptr) { heap_caps_free(ptr); } // In FreeRTOS, tasks created using xTaskCreate must end with vTaskDelete. // A task function should __not__ simply return. typedef struct { - void *(*_fun)(void *); - void *_arg; + void *(*fun)(void *); + void *arg; + EventGroupHandle_t join_event; } z_task_arg; -void z_task_wrapper(z_task_arg *targ) { - targ->_fun(targ->_arg); +static void z_task_wrapper(void *arg) { + z_task_arg *targ = (z_task_arg *)arg; + targ->fun(targ->arg); + xEventGroupSetBits(targ->join_event, 1); vTaskDelete(NULL); - zp_free(targ); } -/*------------------ Task ------------------*/ -int8_t zp_task_init(zp_task_t *task, zp_task_attr_t *attr, void *(*fun)(void *), void *arg) { - int ret = 0; +static zp_task_attr_t z_default_task_attr = { + .name = "", + .priority = configMAX_PRIORITIES / 2, + .stack_depth = 5120, +#if (configSUPPORT_STATIC_ALLOCATION == 1) + .static_allocation = false, + .stack_buffer = NULL, + .task_buffer = NULL, +#endif /* SUPPORT_STATIC_ALLOCATION */ +}; +/*------------------ Thread ------------------*/ +int8_t zp_task_init(zp_task_t *task, zp_task_attr_t *attr, void *(*fun)(void *), void *arg) { z_task_arg *z_arg = (z_task_arg *)zp_malloc(sizeof(z_task_arg)); - if (z_arg != NULL) { - z_arg->_fun = fun; - z_arg->_arg = arg; - if (xTaskCreate((void *)z_task_wrapper, "", 5120, z_arg, configMAX_PRIORITIES / 2, task) != pdPASS) { - ret = -1; + if (z_arg == NULL) { + return -1; + } + + z_arg->fun = fun; + z_arg->arg = arg; + z_arg->join_event = task->join_event = xEventGroupCreate(); + + if (attr == NULL) { + attr = &z_default_task_attr; + } + +#if (configSUPPORT_STATIC_ALLOCATION == 1) + if (attr->static_allocation) { + task->handle = xTaskCreateStatic(z_task_wrapper, attr->name, attr->stack_depth, z_arg, attr->priority, + attr->stack_buffer, attr->task_buffer); + if (task->handle == NULL) { + return -1; } } else { - ret = -1; +#endif /* SUPPORT_STATIC_ALLOCATION */ + if (xTaskCreate(z_task_wrapper, attr->name, attr->stack_depth, z_arg, attr->priority, &task->handle) != + pdPASS) { + return -1; + } +#if (configSUPPORT_STATIC_ALLOCATION == 1) } +#endif /* SUPPORT_STATIC_ALLOCATION */ - return ret; + return 0; } int8_t zp_task_join(zp_task_t *task) { - // Note: task/thread join not supported on FreeRTOS API, so we force its deletion instead. - return zp_task_cancel(task); + xEventGroupWaitBits(task->join_event, 1, pdFALSE, pdFALSE, portMAX_DELAY); + return 0; } int8_t zp_task_cancel(zp_task_t *task) { - vTaskDelete(*task); + vTaskDelete(task->handle); return 0; } void zp_task_free(zp_task_t **task) { - zp_task_t *ptr = *task; - zp_free(ptr); - *task = NULL; + zp_free((*task)->join_event); + zp_free(*task); } /*------------------ Mutex ------------------*/ @@ -125,7 +154,8 @@ int zp_sleep_ms(size_t time) { // This may compound, so this approach may make sleeps longer than expected. // This extra check tries to minimize the amount of extra time it might sleep. while (zp_time_elapsed_ms(&start) < time) { - zp_sleep_us(1000); + //zp_sleep_us(1000); + vTaskDelay(1/portTICK_PERIOD_MS); } return 0; diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index 3b02ff597..7b5e6083a 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -92,7 +92,7 @@ void *_zp_unicast_read_task(void *ztu_arg) { } // Wrap the main buffer for to_read bytes _z_zbuf_t zbuf = _z_zbuf_view(&ztu->_zbuf, to_read); - + // Mark the session that we have received data ztu->_received = true;