Skip to content

Commit

Permalink
Update queryable to include handler type
Browse files Browse the repository at this point in the history
  • Loading branch information
Charles-Schleich committed Dec 19, 2024
1 parent 30007ab commit 207106a
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 48 deletions.
95 changes: 61 additions & 34 deletions zenoh-plugin-remote-api/src/handle_control_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,11 @@ pub(crate) async fn handle_control_message(
} => {
let key_expr = KeyExpr::new(owned_key_expr.clone())?;
let ch_tx = state_map.websocket_tx.clone();
let subscriber_builder = state_map.session.declare_subscriber(key_expr);

let join_handle = match handler {
HandlerChannel::Fifo(size) => {
let subscriber = state_map
.session
.declare_subscriber(key_expr)
.with(FifoChannel::new(size))
.await?;

let subscriber = subscriber_builder.with(FifoChannel::new(size)).await?;
spawn_future(async move {
while let Ok(sample) = subscriber.recv_async().await {
let sample_ws = SampleWS::from(sample);
Expand All @@ -230,12 +226,7 @@ pub(crate) async fn handle_control_message(
})
}
HandlerChannel::Ring(size) => {
let subscriber = state_map
.session
.declare_subscriber(key_expr)
.with(RingChannel::new(size))
.await?;

let subscriber = subscriber_builder.with(RingChannel::new(size)).await?;
spawn_future(async move {
while let Ok(sample) = subscriber.recv_async().await {
let sample_ws = SampleWS::from(sample);
Expand Down Expand Up @@ -295,41 +286,77 @@ pub(crate) async fn handle_control_message(
key_expr,
complete,
id: queryable_uuid,
handler,
} => {
let unanswered_queries = state_map.unanswered_queries.clone();
let session = state_map.session.clone();
let ch_tx = state_map.websocket_tx.clone();
let queryable = session
let query_builder = state_map
.session
.declare_queryable(&key_expr)
.complete(complete)
.callback(move |query| {
let query_uuid = Uuid::new_v4();
let queryable_msg = QueryableMsg::Query {
queryable_uuid,
query: QueryWS::from((&query, query_uuid)),
};
.complete(complete);

let join_handle = match handler {
HandlerChannel::Fifo(size) => {
let queryable = query_builder.with(FifoChannel::new(size)).await?;
spawn_future(async move {
while let Ok(query) = queryable.recv_async().await {
let query_uuid = Uuid::new_v4();
let queryable_msg = QueryableMsg::Query {
queryable_uuid,
query: QueryWS::from((&query, query_uuid)),
};

match unanswered_queries.write() {
Ok(mut rw_lock) => {
rw_lock.insert(query_uuid, query);
}
Err(err) => {
tracing::error!("Query RwLock has been poisoned {err:?}")
}
}

match unanswered_queries.write() {
Ok(mut rw_lock) => {
rw_lock.insert(query_uuid, query);
let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg));
if let Err(err) = ch_tx.send(remote_msg) {
tracing::error!("Could not send Queryable Message on WS {}", err);
};
}
Err(err) => tracing::error!("Query RwLock has been poisoned {err:?}"),
}
})
}
HandlerChannel::Ring(size) => {
let queryable = query_builder.with(RingChannel::new(size)).await?;
spawn_future(async move {
while let Ok(query) = queryable.recv_async().await {
let query_uuid = Uuid::new_v4();
let queryable_msg = QueryableMsg::Query {
queryable_uuid,
query: QueryWS::from((&query, query_uuid)),
};

let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg));
if let Err(err) = ch_tx.send(remote_msg) {
tracing::error!("Could not send Queryable Message on WS {}", err);
};
})
.await?;
match unanswered_queries.write() {
Ok(mut rw_lock) => {
rw_lock.insert(query_uuid, query);
}
Err(err) => {
tracing::error!("Query RwLock has been poisoned {err:?}")
}
}

let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg));
if let Err(err) = ch_tx.send(remote_msg) {
tracing::error!("Could not send Queryable Message on WS {}", err);
};
}
})
}
};

state_map
.queryables
.insert(queryable_uuid, (queryable, key_expr));
.insert(queryable_uuid, (join_handle, key_expr));
}
ControlMsg::UndeclareQueryable(uuid) => {
if let Some((queryable, _)) = state_map.queryables.remove(&uuid) {
queryable.undeclare().await?;
queryable.abort();
};
}
ControlMsg::Liveliness(liveliness_msg) => {
Expand Down
1 change: 1 addition & 0 deletions zenoh-plugin-remote-api/src/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ pub enum ControlMsg {
key_expr: OwnedKeyExpr,
id: Uuid,
complete: bool,
handler: HandlerChannel,
},
UndeclareQueryable(Uuid),
// Quierer
Expand Down
6 changes: 2 additions & 4 deletions zenoh-plugin-remote-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ struct RemoteState {
subscribers: HashMap<Uuid, (JoinHandle<()>, OwnedKeyExpr)>,
publishers: HashMap<Uuid, Publisher<'static>>,
// Queryable
queryables: HashMap<Uuid, (Queryable<()>, OwnedKeyExpr)>,
queryables: HashMap<Uuid, (JoinHandle<()>, OwnedKeyExpr)>,
unanswered_queries: Arc<std::sync::RwLock<HashMap<Uuid, Query>>>,
// Liveliness
liveliness_tokens: HashMap<Uuid, LivelinessToken>,
Expand Down Expand Up @@ -517,9 +517,7 @@ impl RemoteState {
}

for (_, (queryable, _)) in self.queryables {
if let Err(e) = queryable.undeclare().await {
error!("{e}")
}
queryable.abort();
}

drop(self.unanswered_queries);
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ts/src/remote_api/interface/ControlMsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel";
import type { LivelinessMsg } from "./LivelinessMsg";
import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper";

export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg };
export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, handler: HandlerChannel, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg };
3 changes: 2 additions & 1 deletion zenoh-ts/src/remote_api/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,13 @@ export class RemoteSession {
key_expr: string,
complete: boolean,
reply_tx: SimpleChannel<QueryReplyWS>,
handler: HandlerChannel,
callback?: (sample: QueryWS) => void,
): RemoteQueryable {
let uuid = uuidv4();

let control_message: ControlMsg = {
DeclareQueryable: { key_expr: key_expr, complete: complete, id: uuid },
DeclareQueryable: { key_expr: key_expr, complete: complete, id: uuid, handler: handler },
};

let query_rx: SimpleChannel<QueryWS> = new SimpleChannel<QueryWS>();
Expand Down
26 changes: 18 additions & 8 deletions zenoh-ts/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ export interface GetOptions {
}

/**
* Options for a SubscriberOpts function
* Options for a SubscriberOptions function
*/
export interface SubscriberOpts {
export interface SubscriberOptions {
handler?: ((sample: Sample) => Promise<void>) | Handler,
}

Expand All @@ -130,7 +130,7 @@ export interface SubscriberOpts {
*/
export interface QueryableOptions {
complete?: boolean,
callback?: (query: Query) => void,
handler?: ((sample: Query) => Promise<void>) | Handler,
}

/**
Expand Down Expand Up @@ -416,13 +416,12 @@ export class Session {
// Handler size : This is to match the API_DATA_RECEPTION_CHANNEL_SIZE of zenoh internally
declare_subscriber(
key_expr: IntoKeyExpr,
subscriber_opts: SubscriberOpts
subscriber_opts: SubscriberOptions
): Subscriber {
let _key_expr = new KeyExpr(key_expr);
let remote_subscriber: RemoteSubscriber;

let callback_subscriber = false;
// let [callback, handler_type] = this.check_handler_or_callback<Sample>(handler);
let handler;
if (subscriber_opts?.handler !== undefined) {
handler = subscriber_opts?.handler;
Expand Down Expand Up @@ -488,28 +487,39 @@ export class Session {
_complete = queryable_opts?.complete;
};

let handler;
if (queryable_opts?.handler !== undefined) {
handler = queryable_opts?.handler;
} else {
handler = new FifoChannel(256);
}
let [callback, handler_type] = this.check_handler_or_callback<Query>(handler);

let callback_queryable = false;
if (queryable_opts?.callback != undefined) {
if (callback != undefined) {
callback_queryable = true;
let callback = queryable_opts?.callback;
// Typescript cant figure out that calback!=undefined here, so this needs to be explicit
let defined_callback = callback;
const callback_conversion = function (
query_ws: QueryWS,
): void {
let query: Query = QueryWS_to_Query(query_ws, reply_tx);

callback(query);
defined_callback(query);
};
remote_queryable = this.remote_session.declare_remote_queryable(
_key_expr.toString(),
_complete,
reply_tx,
handler_type,
callback_conversion,
);
} else {
remote_queryable = this.remote_session.declare_remote_queryable(
_key_expr.toString(),
_complete,
reply_tx,
handler_type
);
}

Expand Down

0 comments on commit 207106a

Please sign in to comment.