diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 556dcdc7b..269431bfe 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -29,6 +29,10 @@ extern "C" { #endif +/*------------- Declaration Helpers --------------*/ +z_result_t _z_send_declare(_z_session_t *zn, const _z_network_message_t *n_msg); +z_result_t _z_send_undeclare(_z_session_t *zn, const _z_network_message_t *n_msg); + /*------------------ Discovery ------------------*/ /** diff --git a/include/zenoh-pico/session/liveliness.h b/include/zenoh-pico/session/liveliness.h index a3df1dc04..6f79569e3 100644 --- a/include/zenoh-pico/session/liveliness.h +++ b/include/zenoh-pico/session/liveliness.h @@ -48,6 +48,7 @@ void _z_liveliness_unregister_token(_z_session_t *zn, uint32_t id); z_result_t _z_liveliness_subscription_declare(_z_session_t *zn, uint32_t id, const _z_keyexpr_t *keyexpr, const _z_timestamp_t *timestamp); z_result_t _z_liveliness_subscription_undeclare(_z_session_t *zn, uint32_t id, const _z_timestamp_t *timestamp); +z_result_t _z_liveliness_subscription_undeclare_all(_z_session_t *zn); z_result_t _z_liveliness_subscription_trigger_history(_z_session_t *zn, const _z_keyexpr_t *keyexpr); #endif diff --git a/src/net/liveliness.c b/src/net/liveliness.c index b827aada6..099a85ca0 100644 --- a/src/net/liveliness.c +++ b/src/net/liveliness.c @@ -36,7 +36,7 @@ z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_ _z_declaration_t declaration = _z_make_decl_token(keyexpr, id); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - ret = _z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + ret = _z_send_declare(_Z_RC_IN_VAL(zn), &n_msg); _z_n_msg_clear(&n_msg); _z_liveliness_register_token(_Z_RC_IN_VAL(zn), id, keyexpr); @@ -58,7 +58,7 @@ z_result_t _z_undeclare_liveliness_token(_z_liveliness_token_t *token) { _z_declaration_t declaration = _z_make_undecl_token(token->_id, &token->_key); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - ret = _z_send_n_msg(_Z_RC_IN_VAL(&token->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + ret = _z_send_undeclare(_Z_RC_IN_VAL(&token->_zn), &n_msg); _z_n_msg_clear(&n_msg); return ret; @@ -92,7 +92,7 @@ _z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_k keyexpr, s._id, _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_TOKENS | _Z_INTEREST_FLAG_RESTRICTED | mode); _z_network_message_t n_msg = _z_n_msg_make_interest(interest); - if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + if (_z_send_declare(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) { _z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, sp_s); _z_subscriber_clear(&ret); return ret; @@ -117,8 +117,7 @@ z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub) { _z_interest_t interest = _z_make_interest_final(s->_val->_id); _z_network_message_t n_msg = _z_n_msg_make_interest(interest); - if (_z_send_n_msg(_Z_RC_IN_VAL(&sub->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != - _Z_RES_OK) { + if (_z_send_undeclare(_Z_RC_IN_VAL(&sub->_zn), &n_msg) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } _z_n_msg_clear(&n_msg); @@ -155,7 +154,7 @@ z_result_t _z_liveliness_query(_z_session_t *zn, const _z_keyexpr_t *keyexpr, _z _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_CURRENT); _z_network_message_t n_msg = _z_n_msg_make_interest(interest); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + if (_z_send_declare(zn, &n_msg) != _Z_RES_OK) { _z_liveliness_unregister_pending_query(zn, id); ret = _Z_ERR_TRANSPORT_TX_FAILED; } diff --git a/src/net/primitives.c b/src/net/primitives.c index c81b37d1b..adea1b3ed 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -43,7 +43,7 @@ #include "zenoh-pico/utils/result.h" /*------------------ Declaration Helpers ------------------*/ -static z_result_t _z_send_decalre(_z_session_t *zn, const _z_network_message_t *n_msg) { +z_result_t _z_send_declare(_z_session_t *zn, const _z_network_message_t *n_msg) { z_result_t ret = _Z_RES_OK; ret = _z_send_n_msg(zn, n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); @@ -56,7 +56,7 @@ static z_result_t _z_send_decalre(_z_session_t *zn, const _z_network_message_t * return ret; } -static z_result_t _z_send_undecalre(_z_session_t *zn, const _z_network_message_t *n_msg) { +z_result_t _z_send_undeclare(_z_session_t *zn, const _z_network_message_t *n_msg) { z_result_t ret = _Z_RES_OK; ret = _z_send_n_msg(zn, n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); @@ -98,7 +98,7 @@ uint16_t _z_declare_resource(_z_session_t *zn, const _z_keyexpr_t *keyexpr) { _z_keyexpr_t alias = _z_keyexpr_alias(keyexpr); _z_declaration_t declaration = _z_make_decl_keyexpr(id, &alias); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_decalre(zn, &n_msg) == _Z_RES_OK) { + if (_z_send_declare(zn, &n_msg) == _Z_RES_OK) { ret = id; // Invalidate cache _z_subscription_cache_invalidate(zn); @@ -120,7 +120,7 @@ z_result_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) { // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_undecl_keyexpr(rid); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_undecalre(zn, &n_msg) == _Z_RES_OK) { + if (_z_send_undeclare(zn, &n_msg) == _Z_RES_OK) { // Remove local resource _z_unregister_resource(zn, rid, _Z_KEYEXPR_MAPPING_LOCAL); // Invalidate cache @@ -273,7 +273,7 @@ _z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t ke // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_decl_subscriber(&keyexpr, s._id); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_decalre(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) { + if (_z_send_declare(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) { _z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_SUBSCRIBER, sp_s); _z_subscriber_clear(&ret); return ret; @@ -305,7 +305,7 @@ z_result_t _z_undeclare_subscriber(_z_subscriber_t *sub) { declaration = _z_make_undecl_subscriber(sub->_entity_id, &_Z_RC_IN_VAL(s)->_key); } _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_undecalre(_Z_RC_IN_VAL(&sub->_zn), &n_msg) != _Z_RES_OK) { + if (_z_send_undeclare(_Z_RC_IN_VAL(&sub->_zn), &n_msg) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } _z_n_msg_clear(&n_msg); @@ -340,7 +340,7 @@ _z_queryable_t _z_declare_queryable(const _z_session_rc_t *zn, _z_keyexpr_t keye // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_decl_queryable(&keyexpr, q._id, q._complete, _Z_QUERYABLE_DISTANCE_DEFAULT); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_decalre(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) { + if (_z_send_declare(_Z_RC_IN_VAL(zn), &n_msg) != _Z_RES_OK) { _z_unregister_session_queryable(_Z_RC_IN_VAL(zn), sp_q); _z_queryable_clear(&ret); return ret; @@ -371,7 +371,7 @@ z_result_t _z_undeclare_queryable(_z_queryable_t *qle) { declaration = _z_make_undecl_queryable(qle->_entity_id, &_Z_RC_IN_VAL(q)->_key); } _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - if (_z_send_undecalre(_Z_RC_IN_VAL(&qle->_zn), &n_msg) != _Z_RES_OK) { + if (_z_send_undeclare(_Z_RC_IN_VAL(&qle->_zn), &n_msg) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } _z_n_msg_clear(&n_msg); diff --git a/src/session/liveliness.c b/src/session/liveliness.c index 31a9d4b6b..8aae529d9 100644 --- a/src/session/liveliness.c +++ b/src/session/liveliness.c @@ -114,6 +114,28 @@ z_result_t _z_liveliness_subscription_undeclare(_z_session_t *zn, uint32_t id, c return ret; } +z_result_t _z_liveliness_subscription_undeclare_all(_z_session_t *zn) { + z_result_t ret = _Z_RES_OK; + + _z_session_mutex_lock(zn); + _z_keyexpr_intmap_t token_list = _z_keyexpr_intmap_clone(&zn->_remote_tokens); + _z_keyexpr_intmap_clear(&zn->_remote_tokens); + _z_session_mutex_unlock(zn); + + _z_keyexpr_intmap_iterator_t iter = _z_keyexpr_intmap_iterator_make(&token_list); + _z_timestamp_t tm = _z_timestamp_null(); + while (_z_keyexpr_intmap_iterator_next(&iter)) { + _z_keyexpr_t key = *_z_keyexpr_intmap_iterator_value(&iter); + ret = _z_trigger_liveliness_subscriptions_undeclare(zn, &key, &tm); + if (ret != _Z_RES_OK) { + break; + } + } + _z_keyexpr_intmap_clear(&token_list); + + return ret; +} + z_result_t _z_liveliness_subscription_trigger_history(_z_session_t *zn, const _z_keyexpr_t *keyexpr) { z_result_t ret = _Z_RES_OK; diff --git a/src/transport/multicast/lease.c b/src/transport/multicast/lease.c index b29603169..6ae04b76a 100644 --- a/src/transport/multicast/lease.c +++ b/src/transport/multicast/lease.c @@ -58,6 +58,11 @@ z_result_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { static void _zp_multicast_failed(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); + +#if Z_FEATURE_LIVELINESS == 1 && Z_FEATURE_SUBSCRIPTION == 1 + _z_liveliness_subscription_undeclare_all(_Z_RC_IN_VAL(ztm->_common._session)); +#endif + #if Z_FEATURE_AUTO_RECONNECT == 1 _z_reopen(ztm->_common._session); #endif diff --git a/src/transport/unicast/lease.c b/src/transport/unicast/lease.c index 233ab0053..f684289d3 100644 --- a/src/transport/unicast/lease.c +++ b/src/transport/unicast/lease.c @@ -14,6 +14,7 @@ #include "zenoh-pico/transport/unicast/lease.h" +#include "zenoh-pico/session/liveliness.h" #include "zenoh-pico/session/query.h" #include "zenoh-pico/system/common/platform.h" #include "zenoh-pico/transport/common/tx.h" @@ -45,6 +46,10 @@ static void _zp_unicast_failed(_z_transport_unicast_t *ztu) { _z_unicast_transport_close(ztu, _Z_CLOSE_EXPIRED); _z_unicast_transport_clear(ztu, true); +#if Z_FEATURE_LIVELINESS == 1 && Z_FEATURE_SUBSCRIPTION == 1 + _z_liveliness_subscription_undeclare_all(_Z_RC_IN_VAL(ztu->_common._session)); +#endif + #if Z_FEATURE_AUTO_RECONNECT == 1 _z_session_rc_ref_t *zs = ztu->_common._session; z_result_t ret = _z_reopen(zs); diff --git a/tests/connection_restore.py b/tests/connection_restore.py index c2685778e..e32f4ae8c 100644 --- a/tests/connection_restore.py +++ b/tests/connection_restore.py @@ -8,6 +8,8 @@ WAIT_MESSAGE_TIMEOUT_S = 15 DISCONNECT_MESSAGES = ["Closing session because it has expired", "Send keep alive failed"] CONNECT_MESSAGES = ["Z_OPEN(Ack)"] +LIVELINESS_TOKEN_ALIVE_MESSAGES = ["[LivelinessSubscriber] New alive token"] +LIVELINESS_TOKEN_DROP_MESSAGES = ["[LivelinessSubscriber] Dropped token"] ROUTER_ERROR_MESSAGE = "ERROR" ZENOH_PORT = "7447" @@ -18,11 +20,15 @@ ACTIVE_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_pub', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}'] PASSIVE_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_sub', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}'] +LIVELINESS_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_liveliness', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}'] +LIVELINESS_SUB_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_sub_liveliness', '-h', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}'] + LIBASAN_PATH = subprocess.run(["gcc", "-print-file-name=libasan.so"], stdout=subprocess.PIPE, text=True, check=True).stdout.strip() def run_process(command, output_collector, process_list): env = os.environ.copy() + env["RUST_LOG"] = "trace" if LIBASAN_PATH: env["LD_PRELOAD"] = LIBASAN_PATH @@ -30,7 +36,7 @@ def run_process(command, output_collector, process_list): process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env) process_list.append(process) for line in iter(process.stdout.readline, ''): - print("--", line.strip()) + print(f"-- [{process.pid}]:", line.strip()) output_collector.append(line.strip()) process.stdout.close() process.wait() @@ -175,6 +181,57 @@ def test_router_restart(router_command, client_command, timeout): terminate_processes(client_process_list + router_process_list) +def test_liveliness_drop(router_command, liveliness_command, liveliness_sub_command): + print(f"Liveliness drop test") + router_output = [] + dummy_output = [] + client_output = [] + process_list = [] + blocked = False + try: + run_background(router_command, router_output, process_list) + time.sleep(ROUTER_INIT_TIMEOUT_S) + + run_background(liveliness_sub_command, client_output, process_list) + run_background(liveliness_command, dummy_output, process_list) + + if wait_messages(client_output, LIVELINESS_TOKEN_ALIVE_MESSAGES): + print("Liveliness token alive") + else: + raise Exception("Failed to get liveliness token alive.") + client_output.clear() + + print("Blocking connection...") + block_connection() + blocked = True + + time.sleep(15) + + if wait_messages(client_output, LIVELINESS_TOKEN_DROP_MESSAGES): + print("Liveliness token dropped") + else: + raise Exception("Failed to get liveliness token drop.") + client_output.clear() + + print("Unblocking connection...") + unblock_connection() + blocked = False + + if wait_messages(client_output, LIVELINESS_TOKEN_ALIVE_MESSAGES): + print("Liveliness token alive") + else: + raise Exception("Failed to get liveliness token alive.") + client_output.clear() + + check_router_errors(router_output) + + print(f"Liveliness drop test passed") + finally: + if blocked: + unblock_connection() + terminate_processes(process_list) + + def main(): if len(sys.argv) != 2: print("Usage: sudo python3 ./connection_restore.py /path/to/zenohd") @@ -198,6 +255,8 @@ def main(): test_router_restart(router_command, ACTIVE_CLIENT_COMMAND, 15) test_router_restart(router_command, PASSIVE_CLIENT_COMMAND, 15) + test_liveliness_drop(router_command, LIVELINESS_CLIENT_COMMAND, LIVELINESS_SUB_CLIENT_COMMAND) + if __name__ == "__main__": main()