From 0555bafeccdffd7a1745d50ad46f8a999b74634a Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Tue, 14 May 2024 11:48:40 +0200 Subject: [PATCH] New interest format (#405) * feat: implement interest protocol changes * test: add interests to test msgcodec * fix: bad struct name * fix: bad interest generation and decode * feat: add declare interest id * feat: rework filter/interest with wire change * fix: is final mask naming * fix: remove stray char * fix: wait for joins before starting publisher * fix: encode declare_final header * test: update raweth test * fix: explicit decl_final_t members * build: reactivate interests by default * test: filter packets on raweth test * doc: bad comment values --- CMakeLists.txt | 2 +- GNUmakefile | 2 +- examples/unix/c11/z_pub.c | 11 +- include/zenoh-pico/net/filtering.h | 2 +- include/zenoh-pico/net/primitives.h | 6 +- .../zenoh-pico/protocol/codec/declarations.h | 11 +- include/zenoh-pico/protocol/codec/interest.h | 24 ++++ include/zenoh-pico/protocol/codec/network.h | 2 + .../protocol/definitions/declarations.h | 38 +----- .../protocol/definitions/interest.h | 46 +++++++ .../zenoh-pico/protocol/definitions/network.h | 58 +++++++- include/zenoh-pico/session/interest.h | 6 +- src/net/filtering.c | 6 +- src/net/primitives.c | 33 ++--- src/protocol/codec/declarations.c | 127 ++---------------- src/protocol/codec/interest.c | 95 +++++++++++++ src/protocol/codec/network.c | 69 +++++++++- src/protocol/definitions/declarations.c | 41 +----- src/protocol/definitions/interest.c | 37 +++++ src/protocol/definitions/network.c | 22 ++- src/session/interest.c | 49 +++---- src/session/rx.c | 31 +++-- tests/raweth.py | 9 +- tests/z_msgcodec_test.c | 127 +++++++++++++++--- 24 files changed, 561 insertions(+), 293 deletions(-) create mode 100644 include/zenoh-pico/protocol/codec/interest.h create mode 100644 include/zenoh-pico/protocol/definitions/interest.h create mode 100644 src/protocol/codec/interest.c create mode 100644 src/protocol/definitions/interest.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 92ca7e392..6702932fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -133,7 +133,7 @@ set(Z_FEATURE_QUERY 1 CACHE STRING "Toggle query feature") set(Z_FEATURE_QUERYABLE 1 CACHE STRING "Toggle queryable feature") set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport feature") set(Z_FEATURE_ATTACHMENT 1 CACHE STRING "Toggle attachment feature") -set(Z_FEATURE_INTEREST 0 CACHE STRING "Toggle interest feature") # Toggle to 1 when protocol changes are merged +set(Z_FEATURE_INTEREST 1 CACHE STRING "Toggle interest feature") add_definition(Z_FEATURE_MULTI_THREAD=${Z_FEATURE_MULTI_THREAD}) add_definition(Z_FEATURE_PUBLICATION=${Z_FEATURE_PUBLICATION}) add_definition(Z_FEATURE_SUBSCRIPTION=${Z_FEATURE_SUBSCRIPTION}) diff --git a/GNUmakefile b/GNUmakefile index 3160a168b..1b34a6bc9 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -57,7 +57,7 @@ Z_FEATURE_SUBSCRIPTION?=1 Z_FEATURE_QUERY?=1 Z_FEATURE_QUERYABLE?=1 Z_FEATURE_ATTACHMENT?=1 -Z_FEATURE_INTEREST?=0 +Z_FEATURE_INTEREST?=1 Z_FEATURE_RAWETH_TRANSPORT?=0 # zenoh-pico/ directory diff --git a/examples/unix/c11/z_pub.c b/examples/unix/c11/z_pub.c index 92a8b0c5b..cb5dd2ab6 100644 --- a/examples/unix/c11/z_pub.c +++ b/examples/unix/c11/z_pub.c @@ -88,18 +88,17 @@ int main(int argc, char **argv) { z_close(z_session_move(&s)); return -1; } - + // Wait for joins in peer mode + if (strcmp(mode, "peer") == 0) { + printf("Waiting for joins...\n"); + sleep(3); + } printf("Declaring publisher for '%s'...\n", keyexpr); z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL); if (!z_check(pub)) { printf("Unable to declare publisher for key expression!\n"); return -1; } - // Wait for joins in peer mode - if (strcmp(mode, "peer") == 0) { - printf("Waiting for joins...\n"); - sleep(3); - } printf("Press CTRL-C to quit...\n"); char buf[256]; for (int idx = 0; idx < n; ++idx) { diff --git a/include/zenoh-pico/net/filtering.h b/include/zenoh-pico/net/filtering.h index 36c17adc5..57e12c01a 100644 --- a/include/zenoh-pico/net/filtering.h +++ b/include/zenoh-pico/net/filtering.h @@ -34,7 +34,7 @@ typedef struct { /** * Return type when declaring a queryable. */ -typedef struct _z_interest_t { +typedef struct _z_write_filter_t { uint32_t _interest_id; _z_writer_filter_ctx_t *ctx; } _z_write_filter_t; diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index b0f74a1f4..2aa839398 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -234,9 +234,9 @@ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, #endif #if Z_FEATURE_INTEREST == 1 -uint32_t _z_declare_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags, - void *arg); -int8_t _z_undeclare_interest(_z_session_t *zn, uint32_t interest_id); +uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags, + void *arg); +int8_t _z_remove_interest(_z_session_t *zn, uint32_t interest_id); #endif #endif /* INCLUDE_ZENOH_PICO_NET_PRIMITIVES_H */ diff --git a/include/zenoh-pico/protocol/codec/declarations.h b/include/zenoh-pico/protocol/codec/declarations.h index 1e4ffa631..d41f875b7 100644 --- a/include/zenoh-pico/protocol/codec/declarations.h +++ b/include/zenoh-pico/protocol/codec/declarations.h @@ -28,9 +28,8 @@ #define _Z_UNDECL_QUERYABLE_MID 5 #define _Z_DECL_TOKEN_MID 6 #define _Z_UNDECL_TOKEN_MID 7 -#define _Z_DECL_INTEREST_MID 8 -#define _Z_FINAL_INTEREST_MID 9 -#define _Z_UNDECL_INTEREST_MID 10 +#define _Z_DECL_FINAL_MID 0x1a + int8_t _z_decl_kexpr_encode(_z_wbuf_t *wbf, const _z_decl_kexpr_t *decl); int8_t _z_decl_kexpr_decode(_z_decl_kexpr_t *decl, _z_zbuf_t *zbf, uint8_t header); int8_t _z_undecl_kexpr_encode(_z_wbuf_t *wbf, const _z_undecl_kexpr_t *decl); @@ -47,12 +46,6 @@ int8_t _z_decl_token_encode(_z_wbuf_t *wbf, const _z_decl_token_t *decl); int8_t _z_decl_token_decode(_z_decl_token_t *decl, _z_zbuf_t *zbf, uint8_t header); int8_t _z_undecl_token_encode(_z_wbuf_t *wbf, const _z_undecl_token_t *decl); int8_t _z_undecl_token_decode(_z_undecl_token_t *decl, _z_zbuf_t *zbf, uint8_t header); -int8_t _z_decl_interest_encode(_z_wbuf_t *wbf, const _z_decl_interest_t *decl); -int8_t _z_decl_interest_decode(_z_decl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header); -int8_t _z_final_interest_encode(_z_wbuf_t *wbf, const _z_final_interest_t *decl); -int8_t _z_final_interest_decode(_z_final_interest_t *decl, _z_zbuf_t *zbf, uint8_t header); -int8_t _z_undecl_interest_encode(_z_wbuf_t *wbf, const _z_undecl_interest_t *decl); -int8_t _z_undecl_interest_decode(_z_undecl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header); int8_t _z_declaration_encode(_z_wbuf_t *wbf, const _z_declaration_t *decl); int8_t _z_declaration_decode(_z_declaration_t *decl, _z_zbuf_t *zbf); diff --git a/include/zenoh-pico/protocol/codec/interest.h b/include/zenoh-pico/protocol/codec/interest.h new file mode 100644 index 000000000..239144870 --- /dev/null +++ b/include/zenoh-pico/protocol/codec/interest.h @@ -0,0 +1,24 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef INCLUDE_ZENOH_PICO_PROTOCOL_CODEC_INTEREST_H +#define INCLUDE_ZENOH_PICO_PROTOCOL_CODEC_INTEREST_H + +#include "zenoh-pico/protocol/definitions/interest.h" +#include "zenoh-pico/protocol/iobuf.h" + +int8_t _z_interest_encode(_z_wbuf_t *wbf, const _z_interest_t *interest, _Bool is_final); +int8_t _z_interest_decode(_z_interest_t *decl, _z_zbuf_t *zbf, _Bool is_final, _Bool has_ext); + +#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_CODEC_DECLARATIONS_H */ diff --git a/include/zenoh-pico/protocol/codec/network.h b/include/zenoh-pico/protocol/codec/network.h index ef42cc991..100bb2bd9 100644 --- a/include/zenoh-pico/protocol/codec/network.h +++ b/include/zenoh-pico/protocol/codec/network.h @@ -29,6 +29,8 @@ int8_t _z_response_final_encode(_z_wbuf_t *wbf, const _z_n_msg_response_final_t int8_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *zbf, uint8_t header); int8_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl); int8_t _z_declare_decode(_z_n_msg_declare_t *decl, _z_zbuf_t *zbf, uint8_t header); +int8_t _z_n_interest_encode(_z_wbuf_t *wbf, const _z_n_msg_interest_t *interest); +int8_t _z_n_interest_decode(_z_n_msg_interest_t *interest, _z_zbuf_t *zbf, uint8_t header); int8_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg); int8_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf); diff --git a/include/zenoh-pico/protocol/definitions/declarations.h b/include/zenoh-pico/protocol/definitions/declarations.h index 66e21add2..cba210a77 100644 --- a/include/zenoh-pico/protocol/definitions/declarations.h +++ b/include/zenoh-pico/protocol/definitions/declarations.h @@ -70,30 +70,10 @@ typedef struct { } _z_undecl_token_t; _z_undecl_token_t _z_undecl_token_null(void); -#define _Z_INTEREST_FLAG_KEYEXPRS (1) -#define _Z_INTEREST_FLAG_SUBSCRIBERS (1 << 1) -#define _Z_INTEREST_FLAG_QUERYABLES (1 << 2) -#define _Z_INTEREST_FLAG_TOKENS (1 << 3) -#define _Z_INTEREST_FLAG_RESTRICTED (1 << 4) -#define _Z_INTEREST_FLAG_CURRENT (1 << 5) -#define _Z_INTEREST_FLAG_FUTURE (1 << 6) -#define _Z_INTEREST_FLAG_AGGREGATE (1 << 7) - -typedef struct { - _z_keyexpr_t _keyexpr; - uint32_t _id; - uint8_t interest_flags; -} _z_decl_interest_t; -_z_decl_interest_t _z_decl_interest_null(void); typedef struct { - uint32_t _id; -} _z_final_interest_t; -_z_final_interest_t _z_final_interest_null(void); -typedef struct { - uint32_t _id; - _z_keyexpr_t _ext_keyexpr; -} _z_undecl_interest_t; -_z_undecl_interest_t _z_undecl_interest_null(void); + _Bool _placeholder; // In case we add extensions +} _z_decl_final_t; +_z_decl_final_t _z_decl_final_null(void); typedef struct { enum { @@ -105,9 +85,7 @@ typedef struct { _Z_UNDECL_QUERYABLE, _Z_DECL_TOKEN, _Z_UNDECL_TOKEN, - _Z_DECL_INTEREST, - _Z_FINAL_INTEREST, - _Z_UNDECL_INTEREST, + _Z_DECL_FINAL, } _tag; union { _z_decl_kexpr_t _decl_kexpr; @@ -118,9 +96,7 @@ typedef struct { _z_undecl_queryable_t _undecl_queryable; _z_decl_token_t _decl_token; _z_undecl_token_t _undecl_token; - _z_decl_interest_t _decl_interest; - _z_final_interest_t _final_interest; - _z_undecl_interest_t _undecl_interest; + _z_decl_final_t _decl_final; } _body; } _z_declaration_t; void _z_declaration_clear(_z_declaration_t* decl); @@ -138,8 +114,6 @@ _z_declaration_t _z_make_undecl_queryable(uint32_t id, _Z_OPTIONAL const _z_keye _z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id); _z_declaration_t _z_make_undecl_token(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key); -_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t interest_flags); -_z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key); -_z_declaration_t _z_make_final_interest(uint32_t id); +_z_declaration_t _z_make_decl_final(void); #endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_DECLARATIONS_H */ diff --git a/include/zenoh-pico/protocol/definitions/interest.h b/include/zenoh-pico/protocol/definitions/interest.h new file mode 100644 index 000000000..ca2900874 --- /dev/null +++ b/include/zenoh-pico/protocol/definitions/interest.h @@ -0,0 +1,46 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_INTEREST_H +#define INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_INTEREST_H + +#include + +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/keyexpr.h" + +#define _Z_INTEREST_FLAG_KEYEXPRS (1) +#define _Z_INTEREST_FLAG_SUBSCRIBERS (1 << 1) +#define _Z_INTEREST_FLAG_QUERYABLES (1 << 2) +#define _Z_INTEREST_FLAG_TOKENS (1 << 3) +#define _Z_INTEREST_FLAG_RESTRICTED (1 << 4) +#define _Z_INTEREST_FLAG_CURRENT (1 << 5) +#define _Z_INTEREST_FLAG_FUTURE (1 << 6) +#define _Z_INTEREST_FLAG_AGGREGATE (1 << 7) + +#define _Z_INTEREST_NOT_FINAL_MASK (_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE) + +typedef struct { + _z_keyexpr_t _keyexpr; + uint32_t _id; + uint8_t flags; +} _z_interest_t; +_z_interest_t _z_interest_null(void); + +void _z_interest_clear(_z_interest_t* decl); + +_z_interest_t _z_make_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t flags); +_z_interest_t _z_make_interest_final(uint32_t id); + +#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_INTEREST_H */ diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 1a26cddbb..b7ab81822 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -21,6 +21,7 @@ #include "zenoh-pico/collections/bytes.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/definitions/declarations.h" +#include "zenoh-pico/protocol/definitions/interest.h" #include "zenoh-pico/protocol/definitions/message.h" #include "zenoh-pico/protocol/ext.h" #include "zenoh-pico/protocol/keyexpr.h" @@ -31,6 +32,7 @@ #define _Z_MID_N_REQUEST 0x1c #define _Z_MID_N_RESPONSE 0x1b #define _Z_MID_N_RESPONSE_FINAL 0x1a +#define _Z_MID_N_INTEREST 0x19 /*=============================*/ /* Network flags */ @@ -43,6 +45,13 @@ // - Z: Extension If Z==1 then Zenoh extensions are present #define _Z_FLAG_N_DECLARE_I 0x20 // 1 << 5 +// INTEREST message flags: +// - C: Current If C==1 then interest concerns current declarations +// - F: Future If F==1 then interest concerns future declarations +// - Z: Extension If Z==1 then Zenoh extensions are present +#define _Z_FLAG_N_INTEREST_CURRENT 0x20 // 1 << 5 +#define _Z_FLAG_N_INTEREST_FUTURE 0x40 // 1 << 6 + // PUSH message flags: // N Named if N==1 then the key expr has name/suffix // M Mapping if M==1 then keyexpr mapping is the one declared by the sender, otherwise by the receiver @@ -210,13 +219,56 @@ typedef struct { } _z_n_msg_response_t; void _z_n_msg_response_clear(_z_n_msg_response_t *msg); +/*------------------ Declare Message ------------------*/ + typedef struct { _z_declaration_t _decl; _z_timestamp_t _ext_timestamp; _z_n_qos_t _ext_qos; + uint32_t _interest_id; + _Bool has_interest_id; } _z_n_msg_declare_t; static inline void _z_n_msg_declare_clear(_z_n_msg_declare_t *msg) { _z_declaration_clear(&msg->_decl); } +/*------------------ Interest Message ------------------*/ + +/// Flags: +/// - C: Current If C==1 then interest concerns current declarations +/// - F: Future If F==1 then interest concerns future declarations +/// - Z: Extension If Z==1 then Zenoh extensions are present +/// If C==0 and F==0, then interest is final +/// +/// 7 6 5 4 3 2 1 0 +/// +-+-+-+-+-+-+-+-+ +/// |Z|F|C|INTEREST | +/// +-+-+-+---------+ +/// ~ id:z32 ~ +/// +---------------+ +/// |A|M|N|R|T|Q|S|K| (*) if interest is not final +/// +---------------+ +/// ~ key_scope:z16 ~ if interest is not final && R==1 +/// +---------------+ +/// ~ key_suffix ~ if interest is not final && R==1 && N==1 -- +/// +---------------+ +/// ~ [int_exts] ~ if Z==1 +/// +---------------+ +/// +/// (*) - if K==1 then the interest refers to key expressions +/// - if S==1 then the interest refers to subscribers +/// - if Q==1 then the interest refers to queryables +/// - if T==1 then the interest refers to tokens +/// - if R==1 then the interest is restricted to the matching key expression, else it is for all key expressions. +/// - if N==1 then the key expr has name/suffix. If R==0 then N should be set to 0. +/// - if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver. +/// If R==0 then M should be set to 0. +/// - if A==1 then the replies SHOULD be aggregated +/// ``` + +typedef struct { + _z_interest_t _interest; +} _z_n_msg_interest_t; +static inline void _z_n_msg_interest_clear(_z_n_msg_interest_t *msg) { _z_interest_clear(&msg->_interest); } + /*------------------ Zenoh Message ------------------*/ typedef union { _z_n_msg_declare_t _declare; @@ -224,9 +276,10 @@ typedef union { _z_n_msg_request_t _request; _z_n_msg_response_t _response; _z_n_msg_response_final_t _response_final; + _z_n_msg_interest_t _interest; } _z_network_body_t; typedef struct { - enum { _Z_N_DECLARE, _Z_N_PUSH, _Z_N_REQUEST, _Z_N_RESPONSE, _Z_N_RESPONSE_FINAL } _tag; + enum { _Z_N_DECLARE, _Z_N_PUSH, _Z_N_REQUEST, _Z_N_RESPONSE, _Z_N_RESPONSE_FINAL, _Z_N_INTEREST } _tag; _z_network_body_t _body; } _z_network_message_t; typedef _z_network_message_t _z_zenoh_message_t; @@ -248,7 +301,8 @@ _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_byt ); _z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body); _z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid); -_z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration); +_z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, _Bool has_interest_id, uint32_t interest_id); _z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body); +_z_network_message_t _z_n_msg_make_interest(_z_interest_t interest); #endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_NETWORK_H */ diff --git a/include/zenoh-pico/session/interest.h b/include/zenoh-pico/session/interest.h index a4eeb43ba..7114fea1b 100644 --- a/include/zenoh-pico/session/interest.h +++ b/include/zenoh-pico/session/interest.h @@ -28,8 +28,8 @@ void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr); void _z_flush_interest(_z_session_t *zn); int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl); int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl); -int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id); -int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id); -int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags); +int8_t _z_interest_process_declare_final(_z_session_t *zn, uint32_t id); +int8_t _z_interest_process_interest_final(_z_session_t *zn, uint32_t id); +int8_t _z_interest_process_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags); #endif /* ZENOH_PICO_SESSION_INTEREST_H */ diff --git a/src/net/filtering.c b/src/net/filtering.c index bf26bbdf4..c6e596550 100644 --- a/src/net/filtering.c +++ b/src/net/filtering.c @@ -84,8 +84,8 @@ int8_t _z_write_filter_create(_z_publisher_t *pub) { ctx->decl_id = 0; pub->_filter.ctx = ctx; - pub->_filter._interest_id = _z_declare_interest(&pub->_zn.in->val, _z_keyexpr_alias(pub->_key), - _z_write_filter_callback, flags, (void *)ctx); + pub->_filter._interest_id = + _z_add_interest(&pub->_zn.in->val, _z_keyexpr_alias(pub->_key), _z_write_filter_callback, flags, (void *)ctx); if (pub->_filter._interest_id == 0) { z_free(ctx); return _Z_ERR_GENERIC; @@ -94,7 +94,7 @@ int8_t _z_write_filter_create(_z_publisher_t *pub) { } int8_t _z_write_filter_destroy(const _z_publisher_t *pub) { - _Z_RETURN_IF_ERR(_z_undeclare_interest(&pub->_zn.in->val, pub->_filter._interest_id)); + _Z_RETURN_IF_ERR(_z_remove_interest(&pub->_zn.in->val, pub->_filter._interest_id)); z_free(pub->_filter.ctx); return _Z_RES_OK; } diff --git a/src/net/primitives.c b/src/net/primitives.c index dd66050e3..a07cfe0b8 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -63,7 +63,7 @@ uint16_t _z_declare_resource(_z_session_t *zn, _z_keyexpr_t keyexpr) { // Build the declare message to send on the wire _z_keyexpr_t alias = _z_keyexpr_alias(keyexpr); _z_declaration_t declaration = _z_make_decl_keyexpr(id, &alias); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { ret = id; } else { @@ -83,7 +83,7 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) { if (r != NULL) { // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_undecl_keyexpr(rid); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { _z_unregister_resource(zn, rid, _Z_KEYEXPR_MAPPING_LOCAL); // Only if message is send, local resource is removed @@ -214,7 +214,7 @@ _z_subscriber_t *_z_declare_subscriber(_z_session_rc_t *zn, _z_keyexpr_t keyexpr // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_decl_subscriber(&keyexpr, s._id, sub_info.reliability == Z_RELIABILITY_RELIABLE); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); if (_z_send_n_msg(&zn->in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { _z_unregister_subscription(&zn->in->val, _Z_RESOURCE_IS_LOCAL, sp_s); _z_subscriber_free(&ret); @@ -243,7 +243,7 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) { } else { declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->in->val._key); } - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); if (_z_send_n_msg(&sub->_zn.in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } @@ -282,7 +282,7 @@ _z_queryable_t *_z_declare_queryable(_z_session_rc_t *zn, _z_keyexpr_t keyexpr, } // Build the declare message to send on the wire _z_declaration_t declaration = _z_make_decl_queryable(&keyexpr, q._id, q._complete, _Z_QUERYABLE_DISTANCE_DEFAULT); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); if (_z_send_n_msg(&zn->in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { _z_unregister_session_queryable(&zn->in->val, sp_q); _z_queryable_free(&ret); @@ -311,7 +311,7 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) { } else { declaration = _z_make_undecl_queryable(qle->_entity_id, &q->in->val._key); } - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); if (_z_send_n_msg(&qle->_zn.in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } @@ -432,8 +432,8 @@ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, #if Z_FEATURE_INTEREST == 1 /*------------------ Interest Declaration ------------------*/ -uint32_t _z_declare_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags, - void *arg) { +uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags, + void *arg) { _z_session_interest_t intr; intr._id = _z_get_entity_id(zn); intr._key = _z_get_expanded_key_from_key(zn, &keyexpr); @@ -446,9 +446,9 @@ uint32_t _z_declare_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest if (sintr == NULL) { return 0; } - // Build the declare message to send on the wire - _z_declaration_t declaration = _z_make_decl_interest(&keyexpr, intr._id, intr._flags); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + // Build the interest message to send on the wire + _z_interest_t interest = _z_make_interest(&keyexpr, intr._id, intr._flags); + _z_network_message_t n_msg = _z_n_msg_make_interest(interest); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { _z_unregister_interest(zn, sintr); return 0; @@ -457,20 +457,15 @@ uint32_t _z_declare_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest return intr._id; } -int8_t _z_undeclare_interest(_z_session_t *zn, uint32_t interest_id) { +int8_t _z_remove_interest(_z_session_t *zn, uint32_t interest_id) { // Find interest entry _z_session_interest_rc_t *sintr = _z_get_interest_by_id(zn, interest_id); if (sintr == NULL) { return _Z_ERR_ENTITY_UNKNOWN; } // Build the declare message to send on the wire - _z_declaration_t declaration; - if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { - declaration = _z_make_undecl_interest(sintr->in->val._id, NULL); - } else { - declaration = _z_make_undecl_interest(sintr->in->val._id, &sintr->in->val._key); - } - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + _z_interest_t interest = _z_make_interest_final(sintr->in->val._id); + _z_network_message_t n_msg = _z_n_msg_make_interest(interest); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } diff --git a/src/protocol/codec/declarations.c b/src/protocol/codec/declarations.c index aa9426dce..818e8373e 100644 --- a/src/protocol/codec/declarations.c +++ b/src/protocol/codec/declarations.c @@ -131,53 +131,12 @@ int8_t _z_decl_token_encode(_z_wbuf_t *wbf, const _z_decl_token_t *decl) { int8_t _z_undecl_token_encode(_z_wbuf_t *wbf, const _z_undecl_token_t *decl) { return _z_undecl_encode(wbf, _Z_UNDECL_TOKEN_MID, decl->_id, decl->_ext_keyexpr); } - -int8_t _z_decl_interest_encode(_z_wbuf_t *wbf, const _z_decl_interest_t *decl) { - // Set header - uint8_t header = _Z_DECL_INTEREST_MID; - if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_CURRENT)) { - _Z_SET_FLAG(header, _Z_INTEREST_FLAG_CURRENT); - } - if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_FUTURE)) { - _Z_SET_FLAG(header, _Z_INTEREST_FLAG_FUTURE); - } +int8_t _z_decl_final_encode(_z_wbuf_t *wbf) { + uint8_t header = _Z_DECL_FINAL_MID; _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); - // Set id - _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, decl->_id)); - // Copy flags but clear double use ones. - uint8_t interest_flags = decl->interest_flags; - _Z_CLEAR_FLAG(interest_flags, _Z_INTEREST_FLAG_CURRENT); - _Z_CLEAR_FLAG(interest_flags, _Z_INTEREST_FLAG_FUTURE); - // Process restricted flag - if (_Z_HAS_FLAG(interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) { - // Set Named & Mapping flags - _Bool has_kesuffix = _z_keyexpr_has_suffix(decl->_keyexpr); - if (has_kesuffix) { - _Z_SET_FLAG(interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N); - } - if (_z_keyexpr_is_local(&decl->_keyexpr)) { - _Z_SET_FLAG(interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M); - } - // Set decl flags & keyexpr - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, interest_flags)); - _Z_RETURN_IF_ERR(_z_keyexpr_encode(wbf, has_kesuffix, &decl->_keyexpr)); - } else { - // Set decl flags - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, interest_flags)); - } return _Z_RES_OK; } -int8_t _z_final_interest_encode(_z_wbuf_t *wbf, const _z_final_interest_t *decl) { - uint8_t header = _Z_FINAL_INTEREST_MID; - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); - _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, decl->_id)); - return _Z_RES_OK; -} - -int8_t _z_undecl_interest_encode(_z_wbuf_t *wbf, const _z_undecl_interest_t *decl) { - return _z_undecl_encode(wbf, _Z_UNDECL_INTEREST_MID, decl->_id, decl->_ext_keyexpr); -} int8_t _z_declaration_encode(_z_wbuf_t *wbf, const _z_declaration_t *decl) { int8_t ret = _Z_RES_OK; switch (decl->_tag) { @@ -205,15 +164,10 @@ int8_t _z_declaration_encode(_z_wbuf_t *wbf, const _z_declaration_t *decl) { case _Z_UNDECL_TOKEN: { ret = _z_undecl_token_encode(wbf, &decl->_body._undecl_token); } break; - case _Z_DECL_INTEREST: { - ret = _z_decl_interest_encode(wbf, &decl->_body._decl_interest); - } break; - case _Z_FINAL_INTEREST: { - ret = _z_final_interest_encode(wbf, &decl->_body._final_interest); - } break; - case _Z_UNDECL_INTEREST: { - ret = _z_undecl_interest_encode(wbf, &decl->_body._undecl_interest); + case _Z_DECL_FINAL: { + ret = _z_decl_final_encode(wbf); } break; + ; } return ret; } @@ -367,65 +321,14 @@ int8_t _z_decl_token_decode(_z_decl_token_t *decl, _z_zbuf_t *zbf, uint8_t heade int8_t _z_undecl_token_decode(_z_undecl_token_t *decl, _z_zbuf_t *zbf, uint8_t header) { return _z_undecl_trivial_decode(zbf, &decl->_ext_keyexpr, &decl->_id, header); } -int8_t _z_decl_interest_decode(_z_decl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header) { - *decl = _z_decl_interest_null(); - // Decode id - _Z_RETURN_IF_ERR(_z_zint32_decode(&decl->_id, zbf)); - // Decode interest flags - _Z_RETURN_IF_ERR(_z_uint8_decode(&decl->interest_flags, zbf)); - // Process restricted flag - if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) { - uint16_t mapping = _Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M) - ? _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE - : _Z_KEYEXPR_MAPPING_LOCAL; - // Decode ke id - _Z_RETURN_IF_ERR(_z_zint16_decode(&decl->_keyexpr._id, zbf)); - // Decode ke suffix - if (_Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N)) { - _z_zint_t len; - _Z_RETURN_IF_ERR(_z_zsize_decode(&len, zbf)); - if (_z_zbuf_len(zbf) < len) { - return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; - } - decl->_keyexpr._suffix = z_malloc(len + 1); - if (decl->_keyexpr._suffix == NULL) { - return _Z_ERR_SYSTEM_OUT_OF_MEMORY; - } - decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, true); - _z_zbuf_read_bytes(zbf, (uint8_t *)decl->_keyexpr._suffix, 0, len); - decl->_keyexpr._suffix[len] = 0; - } else { - decl->_keyexpr._suffix = NULL; - decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, false); - } - } - // Replace named & mapping by current & future flags - _Z_CLEAR_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M); - _Z_CLEAR_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N); - if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_CURRENT)) { - _Z_SET_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_CURRENT); - } - if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_FUTURE)) { - _Z_SET_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_FUTURE); - } - // Decode extention + +int8_t _z_decl_final_decode(_z_decl_final_t *decl, _z_zbuf_t *zbf, uint8_t header) { if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { _Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, 0x13)); } return _Z_RES_OK; } -int8_t _z_final_interest_decode(_z_final_interest_t *decl, _z_zbuf_t *zbf, uint8_t header) { - *decl = _z_final_interest_null(); - _Z_RETURN_IF_ERR(_z_zint32_decode(&decl->_id, zbf)); - if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { - _Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, 0x10)); - } - return _Z_RES_OK; -} -int8_t _z_undecl_interest_decode(_z_undecl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header) { - *decl = _z_undecl_interest_null(); - return _z_undecl_trivial_decode(zbf, &decl->_ext_keyexpr, &decl->_id, header); -} + int8_t _z_declaration_decode(_z_declaration_t *decl, _z_zbuf_t *zbf) { uint8_t header; _Z_RETURN_IF_ERR(_z_uint8_decode(&header, zbf)); @@ -463,17 +366,9 @@ int8_t _z_declaration_decode(_z_declaration_t *decl, _z_zbuf_t *zbf) { decl->_tag = _Z_UNDECL_TOKEN; ret = _z_undecl_token_decode(&decl->_body._undecl_token, zbf, header); } break; - case _Z_DECL_INTEREST_MID: { - decl->_tag = _Z_DECL_INTEREST; - ret = _z_decl_interest_decode(&decl->_body._decl_interest, zbf, header); - } break; - case _Z_FINAL_INTEREST_MID: { - decl->_tag = _Z_FINAL_INTEREST; - ret = _z_final_interest_decode(&decl->_body._final_interest, zbf, header); - } break; - case _Z_UNDECL_INTEREST_MID: { - decl->_tag = _Z_UNDECL_INTEREST; - ret = _z_undecl_interest_decode(&decl->_body._undecl_interest, zbf, header); + case _Z_DECL_FINAL_MID: { + decl->_tag = _Z_DECL_FINAL; + ret = _z_decl_final_decode(&decl->_body._decl_final, zbf, header); } break; default: { ret = _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; diff --git a/src/protocol/codec/interest.c b/src/protocol/codec/interest.c new file mode 100644 index 000000000..7c6f31bb4 --- /dev/null +++ b/src/protocol/codec/interest.c @@ -0,0 +1,95 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/protocol/codec/interest.h" + +#include +#include +#include +#include +#include + +#include "zenoh-pico/protocol/codec.h" +#include "zenoh-pico/protocol/codec/core.h" +#include "zenoh-pico/protocol/codec/ext.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/core.h" +#include "zenoh-pico/protocol/definitions/interest.h" +#include "zenoh-pico/protocol/definitions/message.h" +#include "zenoh-pico/protocol/ext.h" +#include "zenoh-pico/protocol/iobuf.h" +#include "zenoh-pico/protocol/keyexpr.h" +#include "zenoh-pico/session/session.h" +#include "zenoh-pico/system/platform.h" + +#define _Z_INTEREST_CODEC_FLAG_N 0x20 +#define _Z_INTEREST_CODEC_FLAG_M 0x40 +#define _Z_INTEREST_FLAG_COPY_MASK 0x9f // N & M flags occupy the place of C & F + +#define _Z_INTEREST_TRACE_ID 0x13 + +int8_t _z_interest_encode(_z_wbuf_t *wbf, const _z_interest_t *interest, _Bool is_final) { + // Set id + _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, interest->_id)); + if (is_final) { + return _Z_RES_OK; + } + // Copy flags but clear current and future that are already processed. + uint8_t flags = interest->flags; + _Z_CLEAR_FLAG(flags, _Z_INTEREST_FLAG_CURRENT); + _Z_CLEAR_FLAG(flags, _Z_INTEREST_FLAG_FUTURE); + // Process restricted flag + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_RESTRICTED)) { + // Set Named & Mapping flags + _Bool has_kesuffix = _z_keyexpr_has_suffix(interest->_keyexpr); + if (has_kesuffix) { + _Z_SET_FLAG(flags, _Z_INTEREST_CODEC_FLAG_N); + } + if (_z_keyexpr_is_local(&interest->_keyexpr)) { + _Z_SET_FLAG(flags, _Z_INTEREST_CODEC_FLAG_M); + } + // Set decl flags & keyexpr + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, flags)); + _Z_RETURN_IF_ERR(_z_keyexpr_encode(wbf, has_kesuffix, &interest->_keyexpr)); + } else { + // Set decl flags + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, flags)); + } + return _Z_RES_OK; +} + +int8_t _z_interest_decode(_z_interest_t *interest, _z_zbuf_t *zbf, _Bool is_final, _Bool has_ext) { + // Decode id + _Z_RETURN_IF_ERR(_z_zint32_decode(&interest->_id, zbf)); + if (!is_final) { + uint8_t flags = 0; + // Decode interest flags + _Z_RETURN_IF_ERR(_z_uint8_decode(&flags, zbf)); + // Process restricted flag + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_RESTRICTED)) { + // Decode ke + _Z_RETURN_IF_ERR(_z_keyexpr_decode(&interest->_keyexpr, zbf, _Z_HAS_FLAG(flags, _Z_INTEREST_CODEC_FLAG_N))); + // Set mapping + if (_Z_HAS_FLAG(flags, _Z_INTEREST_CODEC_FLAG_M)) { + _z_keyexpr_set_mapping(&interest->_keyexpr, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); + } + } + // Store interest flags (current and future already processed) + interest->flags |= (flags & _Z_INTEREST_FLAG_COPY_MASK); + } + if (has_ext) { + _Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, _Z_INTEREST_TRACE_ID)); + } + return _Z_RES_OK; +} diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index 5d88d9143..2243b2d32 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -25,9 +25,11 @@ #include "zenoh-pico/protocol/codec/core.h" #include "zenoh-pico/protocol/codec/declarations.h" #include "zenoh-pico/protocol/codec/ext.h" +#include "zenoh-pico/protocol/codec/interest.h" #include "zenoh-pico/protocol/codec/network.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/definitions/core.h" +#include "zenoh-pico/protocol/definitions/interest.h" #include "zenoh-pico/protocol/definitions/message.h" #include "zenoh-pico/protocol/ext.h" #include "zenoh-pico/protocol/iobuf.h" @@ -399,7 +401,16 @@ int8_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl) { if (n != 0) { header |= _Z_FLAG_N_Z; } + if (decl->has_interest_id) { + header |= _Z_FLAG_N_DECLARE_I; + } + // Encode header _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); + // Encode interest id + if (decl->has_interest_id) { + _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, decl->_interest_id)); + } + // Encode extensions if (has_qos_ext) { n -= 1; _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, 0x01 | _Z_MSG_EXT_ENC_ZINT | (n != 0 ? _Z_FLAG_Z_Z : 0))); @@ -410,6 +421,7 @@ int8_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl) { _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, 0x02 | _Z_MSG_EXT_ENC_ZBUF | (n != 0 ? _Z_FLAG_Z_Z : 0))); _Z_RETURN_IF_ERR(_z_timestamp_encode_ext(wbf, &decl->_ext_timestamp)); } + // Encode declaration return _z_declaration_encode(wbf, &decl->_decl); } int8_t _z_declare_decode_extensions(_z_msg_ext_t *extension, void *ctx) { @@ -433,16 +445,55 @@ int8_t _z_declare_decode_extensions(_z_msg_ext_t *extension, void *ctx) { int8_t _z_declare_decode(_z_n_msg_declare_t *decl, _z_zbuf_t *zbf, uint8_t header) { *decl = (_z_n_msg_declare_t){0}; decl->_ext_qos = _Z_N_QOS_DEFAULT; + // Retrieve interest id + if (_Z_HAS_FLAG(header, _Z_FLAG_N_DECLARE_I)) { + _Z_RETURN_IF_ERR(_z_zint32_decode(&decl->_interest_id, zbf)); + decl->has_interest_id = true; + } + // Decode extensions if (_Z_HAS_FLAG(header, _Z_FLAG_N_Z)) { _Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_declare_decode_extensions, decl)) } - // FIXME: For now, zenoh pico should not receive this (answer from interests with current=1, future=0) - if (_Z_HAS_FLAG(header, _Z_FLAG_N_DECLARE_I)) { - return _Z_ERR_MESSAGE_FLAG_UNEXPECTED; - } + // Decode declaration return _z_declaration_decode(&decl->_decl, zbf); } +int8_t _z_n_interest_encode(_z_wbuf_t *wbf, const _z_n_msg_interest_t *interest) { + // Set header + uint8_t header = _Z_MID_N_INTEREST; + _Bool is_final = true; + if (_Z_HAS_FLAG(interest->_interest.flags, _Z_INTEREST_FLAG_CURRENT)) { + is_final = false; + _Z_SET_FLAG(header, _Z_FLAG_N_INTEREST_CURRENT); + } + if (_Z_HAS_FLAG(interest->_interest.flags, _Z_INTEREST_FLAG_FUTURE)) { + is_final = false; + _Z_SET_FLAG(header, _Z_FLAG_N_INTEREST_FUTURE); + } + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); + return _z_interest_encode(wbf, &interest->_interest, is_final); +} + +int8_t _z_n_interest_decode(_z_n_msg_interest_t *interest, _z_zbuf_t *zbf, uint8_t header) { + interest->_interest = _z_interest_null(); + _Bool is_final = true; + _Bool has_ext = false; + + if (_Z_HAS_FLAG(header, _Z_FLAG_N_INTEREST_CURRENT)) { + _Z_SET_FLAG(interest->_interest.flags, _Z_INTEREST_FLAG_CURRENT); + is_final = false; + } + if (_Z_HAS_FLAG(header, _Z_FLAG_N_INTEREST_FUTURE)) { + _Z_SET_FLAG(interest->_interest.flags, _Z_INTEREST_FLAG_FUTURE); + is_final = false; + } + // Decode extention + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { + has_ext = true; + } + return _z_interest_decode(&interest->_interest, zbf, is_final, has_ext); +} + int8_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg) { switch (msg->_tag) { case _Z_N_DECLARE: { @@ -460,8 +511,12 @@ int8_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg case _Z_N_RESPONSE_FINAL: { return _z_response_final_encode(wbf, &msg->_body._response_final); } break; + case _Z_N_INTEREST: { + return _z_n_interest_encode(wbf, &msg->_body._interest); + } break; + default: + return _Z_ERR_GENERIC; } - return _Z_ERR_GENERIC; } int8_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf) { uint8_t header; @@ -487,6 +542,10 @@ int8_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf) { msg->_tag = _Z_N_RESPONSE_FINAL; return _z_response_final_decode(&msg->_body._response_final, zbf, header); } break; + case _Z_MID_N_INTEREST: { + msg->_tag = _Z_N_INTEREST; + return _z_n_interest_decode(&msg->_body._interest, zbf, header); + } break; default: return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; } diff --git a/src/protocol/definitions/declarations.c b/src/protocol/definitions/declarations.c index 69dc52560..cd903aa03 100644 --- a/src/protocol/definitions/declarations.c +++ b/src/protocol/definitions/declarations.c @@ -49,15 +49,8 @@ void _z_declaration_clear(_z_declaration_t *decl) { _z_keyexpr_clear(&decl->_body._undecl_token._ext_keyexpr); break; } - case _Z_DECL_INTEREST: { - _z_keyexpr_clear(&decl->_body._decl_interest._keyexpr); - break; - } - case _Z_FINAL_INTEREST: { - break; - } - case _Z_UNDECL_INTEREST: { - _z_keyexpr_clear(&decl->_body._undecl_interest._ext_keyexpr); + default: + case _Z_DECL_FINAL: { break; } } @@ -109,34 +102,20 @@ _z_declaration_t _z_make_undecl_token(uint32_t id, _Z_OPTIONAL const _z_keyexpr_ ._body = {._undecl_token = {._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}}; } -_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t interest_flags) { - return (_z_declaration_t){._tag = _Z_DECL_INTEREST, - ._body = {._decl_interest = { - ._id = id, - ._keyexpr = _z_keyexpr_steal(key), - .interest_flags = interest_flags, - }}}; -} -_z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) { - return (_z_declaration_t){ - ._tag = _Z_UNDECL_INTEREST, - ._body = {._undecl_interest = { - ._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}}; -} -_z_declaration_t _z_make_final_interest(uint32_t id) { - return (_z_declaration_t){._tag = _Z_FINAL_INTEREST, ._body = {._final_interest = {._id = id}}}; +_z_declaration_t _z_make_decl_final(void) { + return (_z_declaration_t){._tag = _Z_DECL_FINAL, ._body = {._decl_final = {0}}}; } + _z_decl_kexpr_t _z_decl_kexpr_null(void) { return (_z_decl_kexpr_t){0}; } _z_decl_subscriber_t _z_decl_subscriber_null(void) { return (_z_decl_subscriber_t){0}; } _z_decl_queryable_t _z_decl_queryable_null(void) { return (_z_decl_queryable_t){0}; } _z_decl_token_t _z_decl_token_null(void) { return (_z_decl_token_t){0}; } -_z_decl_interest_t _z_decl_interest_null(void) { return (_z_decl_interest_t){0}; } _z_undecl_kexpr_t _z_undecl_kexpr_null(void) { return (_z_undecl_kexpr_t){0}; } _z_undecl_subscriber_t _z_undecl_subscriber_null(void) { return (_z_undecl_subscriber_t){0}; } _z_undecl_queryable_t _z_undecl_queryable_null(void) { return (_z_undecl_queryable_t){0}; } _z_undecl_token_t _z_undecl_token_null(void) { return (_z_undecl_token_t){0}; } -_z_undecl_interest_t _z_undecl_interest_null(void) { return (_z_undecl_interest_t){0}; } -_z_final_interest_t _z_final_interest_null(void) { return (_z_final_interest_t){0}; } +_z_decl_final_t _z_decl_final_null(void) { return (_z_decl_final_t){0}; } + void _z_decl_fix_mapping(_z_declaration_t *msg, uint16_t mapping) { switch (msg->_tag) { case _Z_DECL_KEXPR: { @@ -160,12 +139,6 @@ void _z_decl_fix_mapping(_z_declaration_t *msg, uint16_t mapping) { case _Z_UNDECL_TOKEN: { _z_keyexpr_fix_mapping(&msg->_body._undecl_token._ext_keyexpr, mapping); } break; - case _Z_DECL_INTEREST: { - _z_keyexpr_fix_mapping(&msg->_body._decl_interest._keyexpr, mapping); - } break; - case _Z_UNDECL_INTEREST: { - _z_keyexpr_fix_mapping(&msg->_body._undecl_interest._ext_keyexpr, mapping); - } break; default: break; } diff --git a/src/protocol/definitions/interest.c b/src/protocol/definitions/interest.c new file mode 100644 index 000000000..7c0254ee7 --- /dev/null +++ b/src/protocol/definitions/interest.c @@ -0,0 +1,37 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/protocol/definitions/interest.h" + +#include "zenoh-pico/protocol/keyexpr.h" + +void _z_interest_clear(_z_interest_t *interest) { _z_keyexpr_clear(&interest->_keyexpr); } + +_z_interest_t _z_make_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t flags) { + return (_z_interest_t){ + ._id = id, + ._keyexpr = _z_keyexpr_steal(key), + .flags = flags, + }; +} + +_z_interest_t _z_make_interest_final(uint32_t id) { + return (_z_interest_t){ + ._id = id, + ._keyexpr = _z_keyexpr_null(), + .flags = 0, + }; +} + +_z_interest_t _z_interest_null(void) { return (_z_interest_t){0}; } diff --git a/src/protocol/definitions/network.c b/src/protocol/definitions/network.c index b4202486b..4d88903fd 100644 --- a/src/protocol/definitions/network.c +++ b/src/protocol/definitions/network.c @@ -117,6 +117,11 @@ void _z_n_msg_clear(_z_network_message_t *msg) { case _Z_N_DECLARE: _z_n_msg_declare_clear(&msg->_body._declare); break; + case _Z_N_INTEREST: + _z_n_msg_interest_clear(&msg->_body._interest); + break; + default: + break; } } @@ -168,11 +173,13 @@ _z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid) { ._body = {._response_final = {._request_id = rid}}, }; } -_z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration) { +_z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, _Bool has_interest_id, uint32_t interest_id) { return (_z_network_message_t){ ._tag = _Z_N_DECLARE, ._body._declare = { + .has_interest_id = has_interest_id, + ._interest_id = interest_id, ._decl = declaration, ._ext_qos = _Z_N_QOS_DEFAULT, ._ext_timestamp = _z_timestamp_null(), @@ -210,6 +217,16 @@ _z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) ke }; } +_z_network_message_t _z_n_msg_make_interest(_z_interest_t interest) { + return (_z_network_message_t){ + ._tag = _Z_N_INTEREST, + ._body._interest = + { + ._interest = interest, + }, + }; +} + void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping) { switch (msg->_tag) { case _Z_N_DECLARE: { @@ -224,6 +241,9 @@ void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping) { case _Z_N_RESPONSE: { _z_keyexpr_fix_mapping(&msg->_body._response._key, mapping); } break; + case _Z_N_INTEREST: { + _z_keyexpr_fix_mapping(&msg->_body._interest._interest._keyexpr, mapping); + } break; default: break; } diff --git a/src/session/interest.c b/src/session/interest.c index eb8bc5be0..2ae7b20aa 100644 --- a/src/session/interest.c +++ b/src/session/interest.c @@ -94,7 +94,7 @@ static _z_session_interest_rc_list_t *__unsafe_z_get_interest_by_key_and_flags(_ return __z_get_interest_by_key_and_flags(intrs, flags, key); } -static int8_t _z_send_resource_interest(_z_session_t *zn) { +static int8_t _z_interest_send_decl_resource(_z_session_t *zn, uint32_t interest_id) { _zp_session_lock_mutex(zn); _z_resource_list_t *res_list = _z_resource_list_clone(zn->_local_resources); _zp_session_unlock_mutex(zn); @@ -104,7 +104,7 @@ static int8_t _z_send_resource_interest(_z_session_t *zn) { // Build the declare message to send on the wire _z_keyexpr_t key = _z_keyexpr_alias(res->_key); _z_declaration_t declaration = _z_make_decl_keyexpr(res->_id, &key); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, true, interest_id); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } @@ -116,7 +116,7 @@ static int8_t _z_send_resource_interest(_z_session_t *zn) { } #if Z_FEATURE_SUBSCRIPTION == 1 -static int8_t _z_send_subscriber_interest(_z_session_t *zn) { +static int8_t _z_interest_send_decl_subscriber(_z_session_t *zn, uint32_t interest_id) { _zp_session_lock_mutex(zn); _z_subscription_rc_list_t *sub_list = _z_subscription_rc_list_clone(zn->_local_subscriptions); _zp_session_unlock_mutex(zn); @@ -127,7 +127,7 @@ static int8_t _z_send_subscriber_interest(_z_session_t *zn) { _z_keyexpr_t key = _z_keyexpr_alias(sub->in->val._key); _z_declaration_t declaration = _z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, true, interest_id); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } @@ -138,14 +138,15 @@ static int8_t _z_send_subscriber_interest(_z_session_t *zn) { return _Z_RES_OK; } #else -static int8_t _z_send_subscriber_interest(_z_session_t *zn) { +static int8_t _z_interest_send_decl_subscriber(_z_session_t *zn, uint32_t interest_id) { _ZP_UNUSED(zn); + _ZP_UNUSED(interest_id); return _Z_RES_OK; } #endif #if Z_FEATURE_QUERYABLE == 1 -static int8_t _z_send_queryable_interest(_z_session_t *zn) { +static int8_t _z_interest_send_decl_queryable(_z_session_t *zn, uint32_t interest_id) { _zp_session_lock_mutex(zn); _z_session_queryable_rc_list_t *qle_list = _z_session_queryable_rc_list_clone(zn->_local_queryable); _zp_session_unlock_mutex(zn); @@ -156,7 +157,7 @@ static int8_t _z_send_queryable_interest(_z_session_t *zn) { _z_keyexpr_t key = _z_keyexpr_alias(qle->in->val._key); _z_declaration_t declaration = _z_make_decl_queryable(&key, qle->in->val._id, qle->in->val._complete, _Z_QUERYABLE_DISTANCE_DEFAULT); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, true, interest_id); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } @@ -167,15 +168,16 @@ static int8_t _z_send_queryable_interest(_z_session_t *zn) { return _Z_RES_OK; } #else -static int8_t _z_send_queryable_interest(_z_session_t *zn) { +static int8_t _z_interest_send_decl_queryable(_z_session_t *zn, uint32_t interest_id) { _ZP_UNUSED(zn); + _ZP_UNUSED(interest_id); return _Z_RES_OK; } #endif -static int8_t _z_interest_send_final_interest(_z_session_t *zn, uint32_t id) { - _z_declaration_t decl = _z_make_final_interest(id); - _z_network_message_t n_msg = _z_n_msg_make_declare(decl); +static int8_t _z_interest_send_declare_final(_z_session_t *zn, uint32_t interest_id) { + _z_declaration_t decl = _z_make_decl_final(); + _z_network_message_t n_msg = _z_n_msg_make_declare(decl, true, interest_id); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; } @@ -355,7 +357,7 @@ void _z_flush_interest(_z_session_t *zn) { _zp_session_unlock_mutex(zn); } -int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id) { +int8_t _z_interest_process_declare_final(_z_session_t *zn, uint32_t id) { _z_interest_msg_t msg = {.type = _Z_INTEREST_MSG_TYPE_FINAL, .id = id}; // Retrieve interest _zp_session_lock_mutex(zn); @@ -371,17 +373,16 @@ int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id) { return _Z_RES_OK; } -int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id) { +int8_t _z_interest_process_interest_final(_z_session_t *zn, uint32_t id) { _ZP_UNUSED(zn); _ZP_UNUSED(id); - // Update future masks + // TODO: Update future masks return _Z_RES_OK; } -int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) { - // TODO process restricted flag & key +int8_t _z_interest_process_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) { + // TODO: process restricted flag & associated key _ZP_UNUSED(key); - _ZP_UNUSED(id); // Check transport type if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { return _Z_RES_OK; // Nothing to do on unicast @@ -391,21 +392,21 @@ int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, // Send all declare if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_KEYEXPRS)) { _Z_DEBUG("Sending declare resources"); - _Z_RETURN_IF_ERR(_z_send_resource_interest(zn)); + _Z_RETURN_IF_ERR(_z_interest_send_decl_resource(zn, id)); } if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_SUBSCRIBERS)) { _Z_DEBUG("Sending declare subscribers"); - _Z_RETURN_IF_ERR(_z_send_subscriber_interest(zn)); + _Z_RETURN_IF_ERR(_z_interest_send_decl_subscriber(zn, id)); } if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_QUERYABLES)) { _Z_DEBUG("Sending declare queryables"); - _Z_RETURN_IF_ERR(_z_send_queryable_interest(zn)); + _Z_RETURN_IF_ERR(_z_interest_send_decl_queryable(zn, id)); } if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_TOKENS)) { // Zenoh pico doesn't support liveliness token for now } // Send final declare - _Z_RETURN_IF_ERR(_z_interest_send_final_interest(zn, id)); + _Z_RETURN_IF_ERR(_z_interest_send_declare_final(zn, id)); } return _Z_RES_OK; } @@ -425,19 +426,19 @@ int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t * return _Z_RES_OK; } -int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id) { +int8_t _z_interest_process_declare_final(_z_session_t *zn, uint32_t id) { _ZP_UNUSED(zn); _ZP_UNUSED(id); return _Z_RES_OK; } -int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id) { +int8_t _z_interest_process_interest_final(_z_session_t *zn, uint32_t id) { _ZP_UNUSED(zn); _ZP_UNUSED(id); return _Z_RES_OK; } -int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) { +int8_t _z_interest_process_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) { _ZP_UNUSED(zn); _ZP_UNUSED(key); _ZP_UNUSED(id); diff --git a/src/session/rx.c b/src/session/rx.c index 1ce925afa..b9e126bd9 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -52,17 +52,6 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint case _Z_UNDECL_KEXPR: { _z_unregister_resource(zn, decl._decl._body._undecl_kexpr._id, local_peer_id); } break; - case _Z_DECL_INTEREST: { - _z_interest_process_declare_interest(zn, decl._decl._body._decl_interest._keyexpr, - decl._decl._body._decl_interest._id, - decl._decl._body._decl_interest.interest_flags); - } break; - case _Z_UNDECL_INTEREST: { - _z_interest_process_undeclare_interest(zn, decl._decl._body._undecl_interest._id); - } break; - case _Z_FINAL_INTEREST: { - _z_interest_process_final_interest(zn, decl._decl._body._final_interest._id); - } break; case _Z_DECL_SUBSCRIBER: { _z_interest_process_declares(zn, &decl._decl); } break; @@ -81,6 +70,13 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint case _Z_UNDECL_TOKEN: { // TODO: add support or explicitly discard } break; + case _Z_DECL_FINAL: { + // Check that interest id is valid + if (!decl.has_interest_id) { + return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN; + } + _z_interest_process_declare_final(zn, decl._interest_id); + } break; } } break; case _Z_N_PUSH: { @@ -160,6 +156,19 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint _Z_DEBUG("Handling _Z_N_RESPONSE_FINAL"); ret = _z_trigger_reply_final(zn, &msg->_body._response_final); } break; + + case _Z_N_INTEREST: { + _Z_DEBUG("Handling _Z_N_INTEREST"); + _z_n_msg_interest_t *interest = &msg->_body._interest; + + _Bool not_final = ((interest->_interest.flags & _Z_INTEREST_NOT_FINAL_MASK) != 0); + if (not_final) { + _z_interest_process_interest(zn, interest->_interest._keyexpr, interest->_interest._id, + interest->_interest.flags); + } else { + _z_interest_process_interest_final(zn, interest->_interest._id); + } + } } _z_msg_clear(msg); return ret; diff --git a/tests/raweth.py b/tests/raweth.py index 41c6fcc4a..c70231c2d 100644 --- a/tests/raweth.py +++ b/tests/raweth.py @@ -16,8 +16,8 @@ def pub_and_sub(args): if args.reth == 1: z_pub_expected_status = 0 z_pub_expected_output = '''Opening session... -Declaring publisher for 'demo/example/zenoh-pico-pub'... Waiting for joins... +Declaring publisher for 'demo/example/zenoh-pico-pub'... Press CTRL-C to quit... Putting Data ('demo/example/zenoh-pico-pub': '[ 0] Pub from Pico!')... Putting Data ('demo/example/zenoh-pico-pub': '[ 1] Pub from Pico!')... @@ -57,7 +57,9 @@ def pub_and_sub(args): print("Start subscriber") # Start z_sub in the background - z_sub_command = f"sudo stdbuf -oL -eL ./{DIR_EXAMPLES}/z_sub -n 10 -m \"peer\" -l \"reth/0\"s" + z_sub_command = f"sudo stdbuf -oL -eL ./{DIR_EXAMPLES}/z_sub -n 10 -m \"peer\" -l \ + \"reth/30:03:8c:c8:00:a2#iface=lo;whitelist=30:03:8c:c8:00:a1,\"s" + z_sub_process = subprocess.Popen(z_sub_command, shell=True, stdin=subprocess.PIPE, @@ -70,7 +72,8 @@ def pub_and_sub(args): print("Start publisher") # Start z_pub - z_pub_command = f"sudo stdbuf -oL -eL ./{DIR_EXAMPLES}/z_pub -n 10 -m \"peer\" -l \"reth/0\"s" + z_pub_command = f"sudo stdbuf -oL -eL ./{DIR_EXAMPLES}/z_pub -n 10 -m \"peer\" -l \ + \"reth/30:03:8c:c8:00:a1#iface=lo;whitelist=30:03:8c:c8:00:a2,\"s" z_pub_process = subprocess.Popen(z_pub_command, shell=True, stdin=subprocess.PIPE, diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index 39d761c72..463adc5e0 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -31,9 +31,11 @@ #include "zenoh-pico/protocol/codec/core.h" #include "zenoh-pico/protocol/codec/declarations.h" #include "zenoh-pico/protocol/codec/ext.h" +#include "zenoh-pico/protocol/codec/interest.h" #include "zenoh-pico/protocol/codec/transport.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/definitions/declarations.h" +#include "zenoh-pico/protocol/definitions/interest.h" #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/ext.h" #include "zenoh-pico/protocol/iobuf.h" @@ -381,7 +383,7 @@ void assert_eq_unit_extension(_z_msg_ext_unit_t *left, _z_msg_ext_unit_t *right) void unit_extension(void) { printf("\n>> UNIT Extension\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_msg_ext_t ext = gen_unit_extension(); @@ -422,7 +424,7 @@ void assert_eq_zint_extension(_z_msg_ext_zint_t *left, _z_msg_ext_zint_t *right) void zint_extension(void) { printf("\n>> ZINT Extension\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_msg_ext_t ext = gen_zint_extension(); @@ -464,7 +466,7 @@ void assert_eq_zbuf_extension(_z_msg_ext_zbuf_t *left, _z_msg_ext_zbuf_t *right) void zbuf_extension(void) { printf("\n>> ZBUF Extension\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_msg_ext_t ext = gen_zbuf_extension(); @@ -500,7 +502,7 @@ void assert_eq_bytes(const _z_bytes_t *left, const _z_bytes_t *right) { assert_e void payload_field(void) { printf("\n>> Payload field\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_bytes_t e_pld = gen_payload(64); @@ -571,7 +573,7 @@ void assert_eq_timestamp(const _z_timestamp_t *left, const _z_timestamp_t *right void timestamp_field(void) { printf("\n>> Timestamp field\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_timestamp_t e_ts = gen_timestamp(); @@ -649,7 +651,7 @@ void assert_eq_keyexpr(const _z_keyexpr_t *left, const _z_keyexpr_t *right) { void keyexpr_field(void) { printf("\n>> ResKey field\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_keyexpr_t e_rk = gen_keyexpr(); @@ -693,7 +695,7 @@ void assert_eq_resource_declaration(const _z_decl_kexpr_t *left, const _z_decl_k void resource_declaration(void) { printf("\n>> Resource declaration\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_decl_kexpr_t e_rd = gen_resource_declaration(); @@ -739,7 +741,7 @@ void assert_eq_subscriber_declaration(const _z_decl_subscriber_t *left, const _z void subscriber_declaration(void) { printf("\n>> Subscriber declaration\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_decl_subscriber_t e_sd = gen_subscriber_declaration(); @@ -787,7 +789,7 @@ void assert_eq_queryable_declaration(const _z_decl_queryable_t *left, const _z_d void queryable_declaration(void) { printf("\n>> Queryable declaration\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_decl_queryable_t e_qd = gen_queryable_declaration(); @@ -832,7 +834,7 @@ void assert_eq_forget_resource_declaration(const _z_undecl_kexpr_t *left, const void forget_resource_declaration(void) { printf("\n>> Forget resource declaration\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_undecl_kexpr_t e_frd = gen_forget_resource_declaration(); @@ -873,7 +875,7 @@ void assert_eq_forget_subscriber_declaration(const _z_undecl_subscriber_t *left, void forget_subscriber_declaration(void) { printf("\n>> Forget subscriber declaration\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_undecl_subscriber_t e_fsd = gen_forget_subscriber_declaration(); @@ -915,7 +917,7 @@ void assert_eq_forget_queryable_declaration(const _z_undecl_queryable_t *left, c void forget_queryable_declaration(void) { printf("\n>> Forget queryable declaration\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_undecl_queryable_t e_fqd = gen_forget_queryable_declaration(); @@ -1012,19 +1014,24 @@ void assert_eq_declaration(const _z_declaration_t *left, const _z_declaration_t /*------------------ Declare message ------------------*/ _z_network_message_t gen_declare_message(void) { _z_declaration_t declaration = gen_declaration(); - - return _z_n_msg_make_declare(declaration); + _Bool has_id = gen_bool(); + uint32_t id = gen_uint32(); + return _z_n_msg_make_declare(declaration, has_id, id); } void assert_eq_declare_message(_z_n_msg_declare_t *left, _z_n_msg_declare_t *right) { printf(" "); + assert(left->has_interest_id == right->has_interest_id); + if (left->has_interest_id) { + assert(left->_interest_id == right->_interest_id); + } assert_eq_declaration(&left->_decl, &right->_decl); printf("\n"); } void declare_message(void) { printf("\n>> Declare message\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_network_message_t n_msg = gen_declare_message(); @@ -1049,6 +1056,78 @@ void declare_message(void) { _z_wbuf_clear(&wbf); } +/*------------------ Interest ------------------*/ +#define _Z_MSGCODEC_INTEREST_BASE_FLAGS_MASK 0x8f // Used to remove R, C & F flags + +_z_interest_t gen_interest(void) { + _z_interest_t i = {0}; + _Bool is_final = gen_bool(); // To determine if interest is final or not + // Generate interest id + i._id = gen_uint32(); + printf("Gen interest %d\n", is_final); + // Add regular interest data + if (!is_final) { + // Generate base flags + i.flags = gen_uint8() & _Z_MSGCODEC_INTEREST_BASE_FLAGS_MASK; + // Generate test cases + _Bool is_restricted = gen_bool(); + uint8_t cf_type = gen_uint8() % 3; // Flags must be c, f or cf + switch (cf_type) { + default: + case 0: + i.flags |= _Z_INTEREST_FLAG_CURRENT; + break; + case 1: + i.flags |= _Z_INTEREST_FLAG_FUTURE; + break; + case 2: + i.flags |= (_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE); + break; + } + if (is_restricted) { + i.flags |= _Z_INTEREST_FLAG_RESTRICTED; + // Generate ke + i._keyexpr = gen_keyexpr(); + } + }; + return i; +} + +_z_network_message_t gen_interest_message(void) { + _z_interest_t interest = gen_interest(); + return _z_n_msg_make_interest(interest); +} + +void assert_eq_interest(const _z_interest_t *left, const _z_interest_t *right) { + printf("Interest: 0x%x, 0x%x, %u, %u\n", left->flags, right->flags, left->_id, right->_id); + printf("Interest ke: %d, %d, %d, %d, %s, %s\n", left->_keyexpr._id, right->_keyexpr._id, + left->_keyexpr._mapping._val, right->_keyexpr._mapping._val, left->_keyexpr._suffix, + right->_keyexpr._suffix); + assert(left->flags == right->flags); + assert(left->_id == right->_id); + assert_eq_keyexpr(&left->_keyexpr, &right->_keyexpr); +} +void interest_message(void) { + printf("\n>> Interest message\n"); + // Init + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); + _z_network_message_t expected = gen_interest_message(); + // Encode + assert(_z_n_interest_encode(&wbf, &expected._body._interest) == _Z_RES_OK); + // Decode + _z_n_msg_interest_t decoded; + _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); + uint8_t header = _z_zbuf_read(&zbf); + assert(_z_n_interest_decode(&decoded, &zbf, header) == _Z_RES_OK); + // Check + assert_eq_interest(&expected._body._interest._interest, &decoded._interest); + // Clean-up + _z_n_msg_interest_clear(&decoded); + _z_n_msg_interest_clear(&expected._body._interest); + _z_zbuf_clear(&zbf); + _z_wbuf_clear(&wbf); +} + /*=============================*/ /* Zenoh Messages */ /*=============================*/ @@ -1085,7 +1164,7 @@ void assert_eq_push_body(const _z_push_body_t *left, const _z_push_body_t *right void push_body_message(void) { printf("\n>> Put/Del message\n"); - _z_wbuf_t wbf = gen_wbuf(65535); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); // Initialize _z_push_body_t e_da = gen_push_body(); @@ -1534,7 +1613,8 @@ void keep_alive_message(void) { _z_wbuf_clear(&wbf); } _z_network_message_t gen_net_msg(void) { - switch (gen_uint8() % 5) { + switch (gen_uint8() % 6) { + default: case 0: { return gen_declare_message(); } break; @@ -1547,10 +1627,12 @@ _z_network_message_t gen_net_msg(void) { case 3: { return (_z_network_message_t){._tag = _Z_N_RESPONSE, ._body._response = gen_response()}; } break; - case 4: - default: { + case 4: { return (_z_network_message_t){._tag = _Z_N_RESPONSE_FINAL, ._body._response_final = gen_response_final()}; } break; + case 5: { + return (_z_network_message_t){._tag = _Z_N_INTEREST, ._body._interest._interest = gen_interest()}; + } break; } } void assert_eq_net_msg(const _z_network_message_t *left, const _z_network_message_t *right) { @@ -1573,6 +1655,12 @@ void assert_eq_net_msg(const _z_network_message_t *left, const _z_network_messag case _Z_N_RESPONSE_FINAL: { assert_eq_response_final(&left->_body._response_final, &right->_body._response_final); } break; + case _Z_N_INTEREST: { + assert_eq_interest(&left->_body._interest._interest, &right->_body._interest._interest); + } break; + default: + assert(false); + break; } } _z_network_message_vec_t gen_net_msgs(size_t n) { @@ -1777,6 +1865,7 @@ int main(void) { query_message(); err_message(); reply_message(); + interest_message(); // Network messages push_message();