From 2495d00fdb9b1e5e923c6681e6a74f2b0cbe2a93 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 10 Jan 2025 18:20:57 +0100 Subject: [PATCH] Continue implementation of matching subscribers --- include/zenoh-pico/api/types.h | 22 ++++------------------ include/zenoh-pico/net/matching.h | 3 +++ include/zenoh-pico/session/matching.h | 18 +++++++++++++++++- src/api/api.c | 10 ++++------ src/net/matching.c | 11 +++++------ 5 files changed, 33 insertions(+), 31 deletions(-) diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index a08580bd8..080974c0b 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -30,6 +30,7 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/net/subscribe.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/session/session.h" #ifdef __cplusplus extern "C" { @@ -141,9 +142,7 @@ _Z_OWNED_TYPE_VALUE(_z_value_t, reply_err) * A struct that indicates if there exist Subscribers matching the Publisher's key expression or Queryables matching * Querier's key expression and target. */ -typedef struct { - bool matching; // true if there exist matching Zenoh entities, false otherwise. -} z_matching_status_t; +typedef _z_matching_status_t z_matching_status_t; #endif /** @@ -426,7 +425,7 @@ _Z_OWNED_TYPE_VALUE(_z_reply_t, reply) */ _Z_OWNED_TYPE_VALUE(_z_string_svec_t, string_array) -typedef void (*z_closure_drop_callback_t)(void *arg); +typedef _z_drop_handler_t z_closure_drop_callback_t; typedef _z_closure_sample_callback_t z_closure_sample_callback_t; typedef struct { @@ -493,20 +492,7 @@ typedef struct { _Z_OWNED_TYPE_VALUE(_z_closure_zid_t, closure_zid) #if defined(Z_FEATURE_MATCHING) -typedef void (*z_closure_matching_status_callback_t)(const z_matching_status_t *status, void *arg); - -typedef struct { - void *context; - z_closure_matching_status_callback_t call; - z_closure_drop_callback_t drop; -} _z_closure_matching_status_t; - -// TODO(sashacmc): found better place? -typedef struct _z_matching_listener_ctx_t { - uint32_t decl_id; - _z_closure_matching_status_t callback; -} _z_matching_listener_ctx_t; - +typedef _z_closure_matching_status_t z_closure_matching_status_t; /** * Represents the matching status callback closure. */ diff --git a/include/zenoh-pico/net/matching.h b/include/zenoh-pico/net/matching.h index 472f4a2e5..3f1a56a91 100644 --- a/include/zenoh-pico/net/matching.h +++ b/include/zenoh-pico/net/matching.h @@ -21,12 +21,15 @@ extern "C" { #endif +typedef struct _z_publisher_t _z_publisher_t; + #if Z_FEATURE_MATCHING == 1 typedef struct _z_matching_listener_t { uint32_t _interest_id; _z_session_weak_t _zn; } _z_matching_listener_t; +_z_matching_listener_t _z_matching_listener_declare(const _z_publisher_t *pub, _z_closure_matching_status_t callback); // Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes. static inline _z_matching_listener_t _z_matching_listener_null(void) { return (_z_matching_listener_t){0}; } static inline bool _z_matching_listener_check(const _z_matching_listener_t *matching_listener) { diff --git a/include/zenoh-pico/session/matching.h b/include/zenoh-pico/session/matching.h index 9a26074dd..190e26ff5 100644 --- a/include/zenoh-pico/session/matching.h +++ b/include/zenoh-pico/session/matching.h @@ -17,13 +17,29 @@ #include "zenoh-pico/collections/element.h" #include "zenoh-pico/collections/intmap.h" +#include "zenoh-pico/session/session.h" #ifdef __cplusplus extern "C" { #endif #if Z_FEATURE_MATCHING == 1 -typedef struct _z_matching_listener_ctx_t _z_matching_listener_ctx_t; +typedef struct { + bool matching; // true if there exist matching Zenoh entities, false otherwise. +} _z_matching_status_t; + +typedef void (*_z_closure_matching_status_callback_t)(const _z_matching_status_t *status, void *arg); + +typedef struct { + void *context; + _z_closure_matching_status_callback_t call; + _z_drop_handler_t drop; +} _z_closure_matching_status_t; + +typedef struct _z_matching_listener_ctx_t { + uint32_t decl_id; + _z_closure_matching_status_t callback; +} _z_matching_listener_ctx_t; typedef struct { uint32_t _interest_id; diff --git a/src/api/api.c b/src/api/api.c index 1f598370e..a2d1fc8f3 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -578,7 +578,7 @@ _Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_query, _z_closure_query_callback_t, z_cl _Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_reply, _z_closure_reply_callback_t, z_closure_drop_callback_t) _Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_hello, z_closure_hello_callback_t, z_closure_drop_callback_t) _Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_zid, z_closure_zid_callback_t, z_closure_drop_callback_t) -_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_matching_status, z_closure_matching_status_callback_t, +_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_matching_status, _z_closure_matching_status_callback_t, z_closure_drop_callback_t) /************* Primitives **************/ @@ -1119,11 +1119,9 @@ z_result_t z_publisher_declare_background_matching_listener(const z_loaned_publi z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *publisher, z_owned_matching_listener_t *matching_listener, z_moved_closure_matching_status_t *callback) { - (void)publisher; - (void)matching_listener; - (void)callback; - // TODO(sashacmc): Implement - return _Z_RES_OK; + _z_matching_listener_t listener = _z_matching_listener_declare(publisher, callback->_this._val); + matching_listener->_val = listener; + return _z_matching_listener_check(&listener) ? _Z_RES_OK : _Z_ERR_GENERIC; } z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher, diff --git a/src/net/matching.c b/src/net/matching.c index 7dc969091..14f9e51f5 100644 --- a/src/net/matching.c +++ b/src/net/matching.c @@ -47,25 +47,24 @@ static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *ar } } -z_result_t _z_matching_listener_create(_z_publisher_t *pub) { +_z_matching_listener_t _z_matching_listener_declare(const _z_publisher_t *pub, _z_closure_matching_status_t callback) { uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE; _z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t)); + _z_matching_listener_t listener = _z_matching_listener_null(); if (ctx == NULL) { - return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + return listener; } ctx->decl_id = 0; - - _z_matching_listener_t listener = _z_matching_listener_null(); + ctx->callback = callback; listener._interest_id = _z_add_interest(_Z_RC_IN_VAL(&pub->_zn), _z_keyexpr_alias_from_user_defined(pub->_key, true), _z_matching_listener_callback, flags, (void *)ctx); if (listener._interest_id == 0) { z_free(ctx); - return _Z_ERR_GENERIC; } - return _Z_RES_OK; + return listener; } void _z_matching_listener_clear(_z_matching_listener_t *listener) {