Skip to content

Commit

Permalink
Continue implementation of matching subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 10, 2025
1 parent 9f3de55 commit 2495d00
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 31 deletions.
22 changes: 4 additions & 18 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/net/subscribe.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/session/session.h"

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -141,9 +142,7 @@ _Z_OWNED_TYPE_VALUE(_z_value_t, reply_err)
* A struct that indicates if there exist Subscribers matching the Publisher's key expression or Queryables matching
* Querier's key expression and target.
*/
typedef struct {
bool matching; // true if there exist matching Zenoh entities, false otherwise.
} z_matching_status_t;
typedef _z_matching_status_t z_matching_status_t;
#endif

/**
Expand Down Expand Up @@ -426,7 +425,7 @@ _Z_OWNED_TYPE_VALUE(_z_reply_t, reply)
*/
_Z_OWNED_TYPE_VALUE(_z_string_svec_t, string_array)

typedef void (*z_closure_drop_callback_t)(void *arg);
typedef _z_drop_handler_t z_closure_drop_callback_t;
typedef _z_closure_sample_callback_t z_closure_sample_callback_t;

typedef struct {
Expand Down Expand Up @@ -493,20 +492,7 @@ typedef struct {
_Z_OWNED_TYPE_VALUE(_z_closure_zid_t, closure_zid)

#if defined(Z_FEATURE_MATCHING)
typedef void (*z_closure_matching_status_callback_t)(const z_matching_status_t *status, void *arg);

typedef struct {
void *context;
z_closure_matching_status_callback_t call;
z_closure_drop_callback_t drop;
} _z_closure_matching_status_t;

// TODO(sashacmc): found better place?
typedef struct _z_matching_listener_ctx_t {
uint32_t decl_id;
_z_closure_matching_status_t callback;
} _z_matching_listener_ctx_t;

typedef _z_closure_matching_status_t z_closure_matching_status_t;
/**
* Represents the matching status callback closure.
*/
Expand Down
3 changes: 3 additions & 0 deletions include/zenoh-pico/net/matching.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
extern "C" {
#endif

typedef struct _z_publisher_t _z_publisher_t;

#if Z_FEATURE_MATCHING == 1
typedef struct _z_matching_listener_t {
uint32_t _interest_id;
_z_session_weak_t _zn;
} _z_matching_listener_t;

_z_matching_listener_t _z_matching_listener_declare(const _z_publisher_t *pub, _z_closure_matching_status_t callback);
// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_matching_listener_t _z_matching_listener_null(void) { return (_z_matching_listener_t){0}; }
static inline bool _z_matching_listener_check(const _z_matching_listener_t *matching_listener) {
Expand Down
18 changes: 17 additions & 1 deletion include/zenoh-pico/session/matching.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,29 @@

#include "zenoh-pico/collections/element.h"
#include "zenoh-pico/collections/intmap.h"
#include "zenoh-pico/session/session.h"

#ifdef __cplusplus
extern "C" {
#endif

#if Z_FEATURE_MATCHING == 1
typedef struct _z_matching_listener_ctx_t _z_matching_listener_ctx_t;
typedef struct {
bool matching; // true if there exist matching Zenoh entities, false otherwise.
} _z_matching_status_t;

typedef void (*_z_closure_matching_status_callback_t)(const _z_matching_status_t *status, void *arg);

typedef struct {
void *context;
_z_closure_matching_status_callback_t call;
_z_drop_handler_t drop;
} _z_closure_matching_status_t;

typedef struct _z_matching_listener_ctx_t {
uint32_t decl_id;
_z_closure_matching_status_t callback;
} _z_matching_listener_ctx_t;

typedef struct {
uint32_t _interest_id;
Expand Down
10 changes: 4 additions & 6 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ _Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_query, _z_closure_query_callback_t, z_cl
_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_reply, _z_closure_reply_callback_t, z_closure_drop_callback_t)
_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_hello, z_closure_hello_callback_t, z_closure_drop_callback_t)
_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_zid, z_closure_zid_callback_t, z_closure_drop_callback_t)
_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_matching_status, z_closure_matching_status_callback_t,
_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_matching_status, _z_closure_matching_status_callback_t,
z_closure_drop_callback_t)

/************* Primitives **************/
Expand Down Expand Up @@ -1119,11 +1119,9 @@ z_result_t z_publisher_declare_background_matching_listener(const z_loaned_publi
z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *publisher,
z_owned_matching_listener_t *matching_listener,
z_moved_closure_matching_status_t *callback) {
(void)publisher;
(void)matching_listener;
(void)callback;
// TODO(sashacmc): Implement
return _Z_RES_OK;
_z_matching_listener_t listener = _z_matching_listener_declare(publisher, callback->_this._val);
matching_listener->_val = listener;
return _z_matching_listener_check(&listener) ? _Z_RES_OK : _Z_ERR_GENERIC;
}

z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher,
Expand Down
11 changes: 5 additions & 6 deletions src/net/matching.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,24 @@ static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *ar
}
}

z_result_t _z_matching_listener_create(_z_publisher_t *pub) {
_z_matching_listener_t _z_matching_listener_declare(const _z_publisher_t *pub, _z_closure_matching_status_t callback) {
uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED |
_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE;
_z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t));

_z_matching_listener_t listener = _z_matching_listener_null();
if (ctx == NULL) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
return listener;
}
ctx->decl_id = 0;

_z_matching_listener_t listener = _z_matching_listener_null();
ctx->callback = callback;
listener._interest_id =
_z_add_interest(_Z_RC_IN_VAL(&pub->_zn), _z_keyexpr_alias_from_user_defined(pub->_key, true),
_z_matching_listener_callback, flags, (void *)ctx);
if (listener._interest_id == 0) {
z_free(ctx);
return _Z_ERR_GENERIC;
}
return _Z_RES_OK;
return listener;
}

void _z_matching_listener_clear(_z_matching_listener_t *listener) {
Expand Down

0 comments on commit 2495d00

Please sign in to comment.