From 207106a6a3c22fb8f073e90325fe0b1f32899415 Mon Sep 17 00:00:00 2001 From: Charles Schleich Date: Thu, 19 Dec 2024 16:53:00 +0100 Subject: [PATCH] Update queryable to include handler type --- .../src/handle_control_message.rs | 95 ++++++++++++------- zenoh-plugin-remote-api/src/interface/mod.rs | 1 + zenoh-plugin-remote-api/src/lib.rs | 6 +- .../src/remote_api/interface/ControlMsg.ts | 2 +- zenoh-ts/src/remote_api/session.ts | 3 +- zenoh-ts/src/session.ts | 26 +++-- 6 files changed, 85 insertions(+), 48 deletions(-) diff --git a/zenoh-plugin-remote-api/src/handle_control_message.rs b/zenoh-plugin-remote-api/src/handle_control_message.rs index 8b24acd..fd0cfb1 100644 --- a/zenoh-plugin-remote-api/src/handle_control_message.rs +++ b/zenoh-plugin-remote-api/src/handle_control_message.rs @@ -209,15 +209,11 @@ pub(crate) async fn handle_control_message( } => { let key_expr = KeyExpr::new(owned_key_expr.clone())?; let ch_tx = state_map.websocket_tx.clone(); + let subscriber_builder = state_map.session.declare_subscriber(key_expr); let join_handle = match handler { HandlerChannel::Fifo(size) => { - let subscriber = state_map - .session - .declare_subscriber(key_expr) - .with(FifoChannel::new(size)) - .await?; - + let subscriber = subscriber_builder.with(FifoChannel::new(size)).await?; spawn_future(async move { while let Ok(sample) = subscriber.recv_async().await { let sample_ws = SampleWS::from(sample); @@ -230,12 +226,7 @@ pub(crate) async fn handle_control_message( }) } HandlerChannel::Ring(size) => { - let subscriber = state_map - .session - .declare_subscriber(key_expr) - .with(RingChannel::new(size)) - .await?; - + let subscriber = subscriber_builder.with(RingChannel::new(size)).await?; spawn_future(async move { while let Ok(sample) = subscriber.recv_async().await { let sample_ws = SampleWS::from(sample); @@ -295,41 +286,77 @@ pub(crate) async fn handle_control_message( key_expr, complete, id: queryable_uuid, + handler, } => { let unanswered_queries = state_map.unanswered_queries.clone(); - let session = state_map.session.clone(); let ch_tx = state_map.websocket_tx.clone(); - let queryable = session + let query_builder = state_map + .session .declare_queryable(&key_expr) - .complete(complete) - .callback(move |query| { - let query_uuid = Uuid::new_v4(); - let queryable_msg = QueryableMsg::Query { - queryable_uuid, - query: QueryWS::from((&query, query_uuid)), - }; + .complete(complete); + + let join_handle = match handler { + HandlerChannel::Fifo(size) => { + let queryable = query_builder.with(FifoChannel::new(size)).await?; + spawn_future(async move { + while let Ok(query) = queryable.recv_async().await { + let query_uuid = Uuid::new_v4(); + let queryable_msg = QueryableMsg::Query { + queryable_uuid, + query: QueryWS::from((&query, query_uuid)), + }; + + match unanswered_queries.write() { + Ok(mut rw_lock) => { + rw_lock.insert(query_uuid, query); + } + Err(err) => { + tracing::error!("Query RwLock has been poisoned {err:?}") + } + } - match unanswered_queries.write() { - Ok(mut rw_lock) => { - rw_lock.insert(query_uuid, query); + let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg)); + if let Err(err) = ch_tx.send(remote_msg) { + tracing::error!("Could not send Queryable Message on WS {}", err); + }; } - Err(err) => tracing::error!("Query RwLock has been poisoned {err:?}"), - } + }) + } + HandlerChannel::Ring(size) => { + let queryable = query_builder.with(RingChannel::new(size)).await?; + spawn_future(async move { + while let Ok(query) = queryable.recv_async().await { + let query_uuid = Uuid::new_v4(); + let queryable_msg = QueryableMsg::Query { + queryable_uuid, + query: QueryWS::from((&query, query_uuid)), + }; - let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg)); - if let Err(err) = ch_tx.send(remote_msg) { - tracing::error!("Could not send Queryable Message on WS {}", err); - }; - }) - .await?; + match unanswered_queries.write() { + Ok(mut rw_lock) => { + rw_lock.insert(query_uuid, query); + } + Err(err) => { + tracing::error!("Query RwLock has been poisoned {err:?}") + } + } + + let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg)); + if let Err(err) = ch_tx.send(remote_msg) { + tracing::error!("Could not send Queryable Message on WS {}", err); + }; + } + }) + } + }; state_map .queryables - .insert(queryable_uuid, (queryable, key_expr)); + .insert(queryable_uuid, (join_handle, key_expr)); } ControlMsg::UndeclareQueryable(uuid) => { if let Some((queryable, _)) = state_map.queryables.remove(&uuid) { - queryable.undeclare().await?; + queryable.abort(); }; } ControlMsg::Liveliness(liveliness_msg) => { diff --git a/zenoh-plugin-remote-api/src/interface/mod.rs b/zenoh-plugin-remote-api/src/interface/mod.rs index e205057..df70826 100644 --- a/zenoh-plugin-remote-api/src/interface/mod.rs +++ b/zenoh-plugin-remote-api/src/interface/mod.rs @@ -254,6 +254,7 @@ pub enum ControlMsg { key_expr: OwnedKeyExpr, id: Uuid, complete: bool, + handler: HandlerChannel, }, UndeclareQueryable(Uuid), // Quierer diff --git a/zenoh-plugin-remote-api/src/lib.rs b/zenoh-plugin-remote-api/src/lib.rs index 0cb9363..014684c 100644 --- a/zenoh-plugin-remote-api/src/lib.rs +++ b/zenoh-plugin-remote-api/src/lib.rs @@ -481,7 +481,7 @@ struct RemoteState { subscribers: HashMap, OwnedKeyExpr)>, publishers: HashMap>, // Queryable - queryables: HashMap, OwnedKeyExpr)>, + queryables: HashMap, OwnedKeyExpr)>, unanswered_queries: Arc>>, // Liveliness liveliness_tokens: HashMap, @@ -517,9 +517,7 @@ impl RemoteState { } for (_, (queryable, _)) in self.queryables { - if let Err(e) = queryable.undeclare().await { - error!("{e}") - } + queryable.abort(); } drop(self.unanswered_queries); diff --git a/zenoh-ts/src/remote_api/interface/ControlMsg.ts b/zenoh-ts/src/remote_api/interface/ControlMsg.ts index 9a534aa..f1026eb 100644 --- a/zenoh-ts/src/remote_api/interface/ControlMsg.ts +++ b/zenoh-ts/src/remote_api/interface/ControlMsg.ts @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel"; import type { LivelinessMsg } from "./LivelinessMsg"; import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper"; -export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg }; +export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, handler: HandlerChannel, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg }; diff --git a/zenoh-ts/src/remote_api/session.ts b/zenoh-ts/src/remote_api/session.ts index f0f3c6d..a53ff9b 100644 --- a/zenoh-ts/src/remote_api/session.ts +++ b/zenoh-ts/src/remote_api/session.ts @@ -276,12 +276,13 @@ export class RemoteSession { key_expr: string, complete: boolean, reply_tx: SimpleChannel, + handler: HandlerChannel, callback?: (sample: QueryWS) => void, ): RemoteQueryable { let uuid = uuidv4(); let control_message: ControlMsg = { - DeclareQueryable: { key_expr: key_expr, complete: complete, id: uuid }, + DeclareQueryable: { key_expr: key_expr, complete: complete, id: uuid, handler: handler }, }; let query_rx: SimpleChannel = new SimpleChannel(); diff --git a/zenoh-ts/src/session.ts b/zenoh-ts/src/session.ts index f30f911..0fd1dcd 100644 --- a/zenoh-ts/src/session.ts +++ b/zenoh-ts/src/session.ts @@ -117,9 +117,9 @@ export interface GetOptions { } /** - * Options for a SubscriberOpts function + * Options for a SubscriberOptions function */ -export interface SubscriberOpts { +export interface SubscriberOptions { handler?: ((sample: Sample) => Promise) | Handler, } @@ -130,7 +130,7 @@ export interface SubscriberOpts { */ export interface QueryableOptions { complete?: boolean, - callback?: (query: Query) => void, + handler?: ((sample: Query) => Promise) | Handler, } /** @@ -416,13 +416,12 @@ export class Session { // Handler size : This is to match the API_DATA_RECEPTION_CHANNEL_SIZE of zenoh internally declare_subscriber( key_expr: IntoKeyExpr, - subscriber_opts: SubscriberOpts + subscriber_opts: SubscriberOptions ): Subscriber { let _key_expr = new KeyExpr(key_expr); let remote_subscriber: RemoteSubscriber; let callback_subscriber = false; - // let [callback, handler_type] = this.check_handler_or_callback(handler); let handler; if (subscriber_opts?.handler !== undefined) { handler = subscriber_opts?.handler; @@ -488,21 +487,31 @@ export class Session { _complete = queryable_opts?.complete; }; + let handler; + if (queryable_opts?.handler !== undefined) { + handler = queryable_opts?.handler; + } else { + handler = new FifoChannel(256); + } + let [callback, handler_type] = this.check_handler_or_callback(handler); + let callback_queryable = false; - if (queryable_opts?.callback != undefined) { + if (callback != undefined) { callback_queryable = true; - let callback = queryable_opts?.callback; + // Typescript cant figure out that calback!=undefined here, so this needs to be explicit + let defined_callback = callback; const callback_conversion = function ( query_ws: QueryWS, ): void { let query: Query = QueryWS_to_Query(query_ws, reply_tx); - callback(query); + defined_callback(query); }; remote_queryable = this.remote_session.declare_remote_queryable( _key_expr.toString(), _complete, reply_tx, + handler_type, callback_conversion, ); } else { @@ -510,6 +519,7 @@ export class Session { _key_expr.toString(), _complete, reply_tx, + handler_type ); }