Skip to content

Commit

Permalink
Align async, remove unneccsary Async/Promise (#73)
Browse files Browse the repository at this point in the history
* Align async, remove unneccsary Async/Promis

* Add Subscriber Options, include handler / callback option

* Update queryable to include handler type

* Merge main

* Add Explicit Reciever, resolves issue issues/63

* add is_closed, on session

* add new_timestamp to plugin

* Add Key Expr method to subscriber

* Make Publisher.put() take PutPublisherOptions

* Add PublisherDelete method

* Add QueryTarget to get operation

* Add new_timestamp() to session, Add timestamp to Delete, and Put Options, Tracking created timestamps on backend

* Handle Timestamp Variant DataMessage

* cleanup Sub Example

* add timestamp PublisherPut, fix non-calling get_resource_id() for timestamp on delete

* add is_empty to ZBytes

* rename buffer with to_bytes

* Update Examples to use new _Options format, Add command line parsing for Deno examples
  • Loading branch information
Charles-Schleich authored Jan 13, 2025
1 parent f4da5f5 commit f3b1f1b
Show file tree
Hide file tree
Showing 34 changed files with 877 additions and 243 deletions.
131 changes: 97 additions & 34 deletions zenoh-plugin-remote-api/src/handle_control_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,29 @@ pub(crate) async fn handle_control_message(
warn!("State Map Does not contain SocketAddr");
}
}
ControlMsg::NewTimestamp(uuid) => {
let ts = state_map.session.new_timestamp();
let ts_string = ts.to_string();
let _ = state_map.timestamps.insert(uuid, ts);

let since_the_epoch = ts
.get_time()
.to_system_time()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64; // JS numbers are F64, is the only way to get a Number that is similar to what is produced by Date.now() in Javascript

if let Err(e) = state_map
.websocket_tx
.send(RemoteAPIMsg::Data(DataMsg::NewTimestamp {
id: uuid,
string_rep: ts_string,
millis_since_epoch: since_the_epoch,
}))
{
error!("{}", e);
};
}
ControlMsg::Get {
key_expr,
parameters,
Expand All @@ -114,6 +137,7 @@ pub(crate) async fn handle_control_message(
congestion_control,
priority,
express,
target,
encoding,
payload,
attachment,
Expand All @@ -127,6 +151,8 @@ pub(crate) async fn handle_control_message(
add_if_some!(priority, get_builder);
add_if_some!(express, get_builder);
add_if_some!(encoding, get_builder);
add_if_some!(target, get_builder);

if let Some(payload_b64) = payload {
match payload_b64.b64_to_bytes() {
Ok(payload) => get_builder = get_builder.payload(payload),
Expand Down Expand Up @@ -186,6 +212,7 @@ pub(crate) async fn handle_control_message(
priority,
express,
attachment,
timestamp,
} => {
let mut put_builder = match payload.b64_to_bytes() {
Ok(payload) => state_map.session.put(key_expr, payload),
Expand All @@ -200,6 +227,10 @@ pub(crate) async fn handle_control_message(
add_if_some!(priority, put_builder);
add_if_some!(express, put_builder);

if let Some(ts) = timestamp.and_then(|k| state_map.timestamps.get(&k)) {
put_builder = put_builder.timestamp(*ts);
}

if let Some(attachment_b64) = attachment {
match attachment_b64.b64_to_bytes() {
Ok(attachment) => put_builder = put_builder.attachment(attachment),
Expand All @@ -215,11 +246,16 @@ pub(crate) async fn handle_control_message(
priority,
express,
attachment,
timestamp,
} => {
let mut delete_builder = state_map.session.delete(key_expr);
add_if_some!(congestion_control, delete_builder);
add_if_some!(priority, delete_builder);
add_if_some!(express, delete_builder);
if let Some(ts) = timestamp.and_then(|k| state_map.timestamps.get(&k)) {
delete_builder = delete_builder.timestamp(*ts);
}

if let Some(attachment_b64) = attachment {
match attachment_b64.b64_to_bytes() {
Ok(attachment) => delete_builder = delete_builder.attachment(attachment),
Expand All @@ -237,15 +273,11 @@ pub(crate) async fn handle_control_message(
} => {
let key_expr = KeyExpr::new(owned_key_expr.clone())?;
let ch_tx = state_map.websocket_tx.clone();
let subscriber_builder = state_map.session.declare_subscriber(key_expr);

let join_handle = match handler {
HandlerChannel::Fifo(size) => {
let subscriber = state_map
.session
.declare_subscriber(key_expr)
.with(FifoChannel::new(size))
.await?;

let subscriber = subscriber_builder.with(FifoChannel::new(size)).await?;
spawn_future(async move {
while let Ok(sample) = subscriber.recv_async().await {
let sample_ws = SampleWS::from(sample);
Expand All @@ -258,12 +290,7 @@ pub(crate) async fn handle_control_message(
})
}
HandlerChannel::Ring(size) => {
let subscriber = state_map
.session
.declare_subscriber(key_expr)
.with(RingChannel::new(size))
.await?;

let subscriber = subscriber_builder.with(RingChannel::new(size)).await?;
spawn_future(async move {
while let Ok(sample) = subscriber.recv_async().await {
let sample_ws = SampleWS::from(sample);
Expand Down Expand Up @@ -323,41 +350,77 @@ pub(crate) async fn handle_control_message(
key_expr,
complete,
id: queryable_uuid,
handler,
} => {
let unanswered_queries = state_map.unanswered_queries.clone();
let session = state_map.session.clone();
let ch_tx = state_map.websocket_tx.clone();
let queryable = session
let query_builder = state_map
.session
.declare_queryable(&key_expr)
.complete(complete)
.callback(move |query| {
let query_uuid = Uuid::new_v4();
let queryable_msg = QueryableMsg::Query {
queryable_uuid,
query: QueryWS::from((&query, query_uuid)),
};
.complete(complete);

match unanswered_queries.write() {
Ok(mut rw_lock) => {
rw_lock.insert(query_uuid, query);
let join_handle = match handler {
HandlerChannel::Fifo(size) => {
let queryable = query_builder.with(FifoChannel::new(size)).await?;
spawn_future(async move {
while let Ok(query) = queryable.recv_async().await {
let query_uuid = Uuid::new_v4();
let queryable_msg = QueryableMsg::Query {
queryable_uuid,
query: QueryWS::from((&query, query_uuid)),
};

match unanswered_queries.write() {
Ok(mut rw_lock) => {
rw_lock.insert(query_uuid, query);
}
Err(err) => {
tracing::error!("Query RwLock has been poisoned {err:?}")
}
}

let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg));
if let Err(err) = ch_tx.send(remote_msg) {
tracing::error!("Could not send Queryable Message on WS {}", err);
};
}
Err(err) => tracing::error!("Query RwLock has been poisoned {err:?}"),
}
})
}
HandlerChannel::Ring(size) => {
let queryable = query_builder.with(RingChannel::new(size)).await?;
spawn_future(async move {
while let Ok(query) = queryable.recv_async().await {
let query_uuid = Uuid::new_v4();
let queryable_msg = QueryableMsg::Query {
queryable_uuid,
query: QueryWS::from((&query, query_uuid)),
};

let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg));
if let Err(err) = ch_tx.send(remote_msg) {
tracing::error!("Could not send Queryable Message on WS {}", err);
};
})
.await?;
match unanswered_queries.write() {
Ok(mut rw_lock) => {
rw_lock.insert(query_uuid, query);
}
Err(err) => {
tracing::error!("Query RwLock has been poisoned {err:?}")
}
}

let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg));
if let Err(err) = ch_tx.send(remote_msg) {
tracing::error!("Could not send Queryable Message on WS {}", err);
};
}
})
}
};

state_map
.queryables
.insert(queryable_uuid, (queryable, key_expr));
.insert(queryable_uuid, (join_handle, key_expr));
}
ControlMsg::UndeclareQueryable(uuid) => {
if let Some((queryable, _)) = state_map.queryables.remove(&uuid) {
queryable.undeclare().await?;
queryable.abort();
};
}
ControlMsg::Liveliness(liveliness_msg) => {
Expand Down
38 changes: 37 additions & 1 deletion zenoh-plugin-remote-api/src/handle_data_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub async fn handle_data_message(
payload,
attachment,
encoding,
timestamp,
} => {
if let Some(publisher) = state_map.publishers.get(&id) {
let mut put_builder = match payload.b64_to_bytes() {
Expand All @@ -67,13 +68,41 @@ pub async fn handle_data_message(
if let Some(encoding) = encoding {
put_builder = put_builder.encoding(encoding);
}
if let Some(ts) = timestamp.and_then(|k| state_map.timestamps.get(&k)) {
put_builder = put_builder.timestamp(*ts);
}
if let Err(err) = put_builder.await {
error!("PublisherPut {id}, {err}");
}
} else {
warn!("Publisher {id}, does not exist in State");
}
}
DataMsg::PublisherDelete {
id,
attachment,
timestamp,
} => {
if let Some(publisher) = state_map.publishers.get(&id) {
let mut publisher_builder = publisher.delete();
match attachment.map(|x| x.b64_to_bytes()) {
Some(Ok(attachment)) => {
publisher_builder = publisher_builder.attachment(&attachment);
}
Some(Err(e)) => {
error!("{}", e);
}
None => {}
}
if let Some(ts) = timestamp.and_then(|k| state_map.timestamps.get(&k)) {
publisher_builder = publisher_builder.timestamp(*ts);
}

if let Err(e) = publisher_builder.await {
error!("Could not publish {e}");
};
}
}
DataMsg::Queryable(queryable_msg) => match queryable_msg {
QueryableMsg::Reply { reply } => {
let query: Option<Query> = match state_map.unanswered_queries.write() {
Expand Down Expand Up @@ -117,7 +146,14 @@ pub async fn handle_data_message(
warn!("Plugin should not receive Query from Client, This should go via Get API");
}
},
data_msg => {
DataMsg::Sample(_, _)
| DataMsg::GetReply(_)
| DataMsg::NewTimestamp {
id: _,
string_rep: _,
millis_since_epoch: _,
}
| DataMsg::SessionInfo(_) => {
error!("Server Should not recieved a {data_msg:?} Variant from client");
}
}
Expand Down
25 changes: 24 additions & 1 deletion zenoh-plugin-remote-api/src/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,24 @@ pub enum DataMsg {
payload: B64String,
attachment: Option<B64String>,
encoding: Option<String>,
timestamp: Option<Uuid>,
},
PublisherDelete {
id: Uuid,
attachment: Option<B64String>,
timestamp: Option<Uuid>,
},
// SVR -> Client
// Subscriber
Sample(SampleWS, Uuid),
// GetReply
GetReply(ReplyWS),
//
SessionInfo(SessionInfo),
NewTimestamp {
id: Uuid,
string_rep: String,
millis_since_epoch: u64,
},

// Bidirectional
Queryable(QueryableMsg),
Expand Down Expand Up @@ -125,6 +135,7 @@ pub enum ControlMsg {
OpenSession,
CloseSession,
Session(Uuid),
NewTimestamp(Uuid),

//
SessionInfo,
Expand Down Expand Up @@ -160,6 +171,13 @@ pub enum ControlMsg {
)]
#[ts(type = "number | undefined")]
priority: Option<Priority>,
#[serde(
deserialize_with = "deserialize_query_target",
serialize_with = "serialize_query_target",
default
)]
#[ts(type = "number | undefined")]
target: Option<QueryTarget>,
#[ts(type = "boolean | undefined")]
express: Option<bool>,
#[ts(type = "string | undefined")]
Expand Down Expand Up @@ -197,6 +215,8 @@ pub enum ControlMsg {
express: Option<bool>,
#[ts(type = "string | undefined")]
attachment: Option<B64String>,
#[ts(type = "string | undefined")]
timestamp: Option<Uuid>,
},
Delete {
#[ts(as = "OwnedKeyExprWrapper")]
Expand All @@ -220,6 +240,8 @@ pub enum ControlMsg {
express: Option<bool>,
#[ts(type = "string | undefined")]
attachment: Option<B64String>,
#[ts(type = "string | undefined")]
timestamp: Option<Uuid>,
},
// Subscriber
DeclareSubscriber {
Expand Down Expand Up @@ -269,6 +291,7 @@ pub enum ControlMsg {
key_expr: OwnedKeyExpr,
id: Uuid,
complete: bool,
handler: HandlerChannel,
},
UndeclareQueryable(Uuid),
// Quierer
Expand Down
Loading

0 comments on commit f3b1f1b

Please sign in to comment.