diff --git a/zenoh-plugin-remote-api/src/handle_control_message.rs b/zenoh-plugin-remote-api/src/handle_control_message.rs index 663ee2b..b45155f 100644 --- a/zenoh-plugin-remote-api/src/handle_control_message.rs +++ b/zenoh-plugin-remote-api/src/handle_control_message.rs @@ -105,6 +105,29 @@ pub(crate) async fn handle_control_message( warn!("State Map Does not contain SocketAddr"); } } + ControlMsg::NewTimestamp(uuid) => { + let ts = state_map.session.new_timestamp(); + let ts_string = ts.to_string(); + let _ = state_map.timestamps.insert(uuid, ts); + + let since_the_epoch = ts + .get_time() + .to_system_time() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; // JS numbers are F64, is the only way to get a Number that is similar to what is produced by Date.now() in Javascript + + if let Err(e) = state_map + .websocket_tx + .send(RemoteAPIMsg::Data(DataMsg::NewTimestamp { + id: uuid, + string_rep: ts_string, + millis_since_epoch: since_the_epoch, + })) + { + error!("{}", e); + }; + } ControlMsg::Get { key_expr, parameters, @@ -114,6 +137,7 @@ pub(crate) async fn handle_control_message( congestion_control, priority, express, + target, encoding, payload, attachment, @@ -127,6 +151,8 @@ pub(crate) async fn handle_control_message( add_if_some!(priority, get_builder); add_if_some!(express, get_builder); add_if_some!(encoding, get_builder); + add_if_some!(target, get_builder); + if let Some(payload_b64) = payload { match payload_b64.b64_to_bytes() { Ok(payload) => get_builder = get_builder.payload(payload), @@ -186,6 +212,7 @@ pub(crate) async fn handle_control_message( priority, express, attachment, + timestamp, } => { let mut put_builder = match payload.b64_to_bytes() { Ok(payload) => state_map.session.put(key_expr, payload), @@ -200,6 +227,10 @@ pub(crate) async fn handle_control_message( add_if_some!(priority, put_builder); add_if_some!(express, put_builder); + if let Some(ts) = timestamp.and_then(|k| state_map.timestamps.get(&k)) { + put_builder = put_builder.timestamp(*ts); + } + if let Some(attachment_b64) = attachment { match attachment_b64.b64_to_bytes() { Ok(attachment) => put_builder = put_builder.attachment(attachment), @@ -215,11 +246,16 @@ pub(crate) async fn handle_control_message( priority, express, attachment, + timestamp, } => { let mut delete_builder = state_map.session.delete(key_expr); add_if_some!(congestion_control, delete_builder); add_if_some!(priority, delete_builder); add_if_some!(express, delete_builder); + if let Some(ts) = timestamp.and_then(|k| state_map.timestamps.get(&k)) { + delete_builder = delete_builder.timestamp(*ts); + } + if let Some(attachment_b64) = attachment { match attachment_b64.b64_to_bytes() { Ok(attachment) => delete_builder = delete_builder.attachment(attachment), @@ -237,15 +273,11 @@ pub(crate) async fn handle_control_message( } => { let key_expr = KeyExpr::new(owned_key_expr.clone())?; let ch_tx = state_map.websocket_tx.clone(); + let subscriber_builder = state_map.session.declare_subscriber(key_expr); let join_handle = match handler { HandlerChannel::Fifo(size) => { - let subscriber = state_map - .session - .declare_subscriber(key_expr) - .with(FifoChannel::new(size)) - .await?; - + let subscriber = subscriber_builder.with(FifoChannel::new(size)).await?; spawn_future(async move { while let Ok(sample) = subscriber.recv_async().await { let sample_ws = SampleWS::from(sample); @@ -258,12 +290,7 @@ pub(crate) async fn handle_control_message( }) } HandlerChannel::Ring(size) => { - let subscriber = state_map - .session - .declare_subscriber(key_expr) - .with(RingChannel::new(size)) - .await?; - + let subscriber = subscriber_builder.with(RingChannel::new(size)).await?; spawn_future(async move { while let Ok(sample) = subscriber.recv_async().await { let sample_ws = SampleWS::from(sample); @@ -323,41 +350,77 @@ pub(crate) async fn handle_control_message( key_expr, complete, id: queryable_uuid, + handler, } => { let unanswered_queries = state_map.unanswered_queries.clone(); - let session = state_map.session.clone(); let ch_tx = state_map.websocket_tx.clone(); - let queryable = session + let query_builder = state_map + .session .declare_queryable(&key_expr) - .complete(complete) - .callback(move |query| { - let query_uuid = Uuid::new_v4(); - let queryable_msg = QueryableMsg::Query { - queryable_uuid, - query: QueryWS::from((&query, query_uuid)), - }; + .complete(complete); - match unanswered_queries.write() { - Ok(mut rw_lock) => { - rw_lock.insert(query_uuid, query); + let join_handle = match handler { + HandlerChannel::Fifo(size) => { + let queryable = query_builder.with(FifoChannel::new(size)).await?; + spawn_future(async move { + while let Ok(query) = queryable.recv_async().await { + let query_uuid = Uuid::new_v4(); + let queryable_msg = QueryableMsg::Query { + queryable_uuid, + query: QueryWS::from((&query, query_uuid)), + }; + + match unanswered_queries.write() { + Ok(mut rw_lock) => { + rw_lock.insert(query_uuid, query); + } + Err(err) => { + tracing::error!("Query RwLock has been poisoned {err:?}") + } + } + + let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg)); + if let Err(err) = ch_tx.send(remote_msg) { + tracing::error!("Could not send Queryable Message on WS {}", err); + }; } - Err(err) => tracing::error!("Query RwLock has been poisoned {err:?}"), - } + }) + } + HandlerChannel::Ring(size) => { + let queryable = query_builder.with(RingChannel::new(size)).await?; + spawn_future(async move { + while let Ok(query) = queryable.recv_async().await { + let query_uuid = Uuid::new_v4(); + let queryable_msg = QueryableMsg::Query { + queryable_uuid, + query: QueryWS::from((&query, query_uuid)), + }; - let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg)); - if let Err(err) = ch_tx.send(remote_msg) { - tracing::error!("Could not send Queryable Message on WS {}", err); - }; - }) - .await?; + match unanswered_queries.write() { + Ok(mut rw_lock) => { + rw_lock.insert(query_uuid, query); + } + Err(err) => { + tracing::error!("Query RwLock has been poisoned {err:?}") + } + } + + let remote_msg = RemoteAPIMsg::Data(DataMsg::Queryable(queryable_msg)); + if let Err(err) = ch_tx.send(remote_msg) { + tracing::error!("Could not send Queryable Message on WS {}", err); + }; + } + }) + } + }; state_map .queryables - .insert(queryable_uuid, (queryable, key_expr)); + .insert(queryable_uuid, (join_handle, key_expr)); } ControlMsg::UndeclareQueryable(uuid) => { if let Some((queryable, _)) = state_map.queryables.remove(&uuid) { - queryable.undeclare().await?; + queryable.abort(); }; } ControlMsg::Liveliness(liveliness_msg) => { diff --git a/zenoh-plugin-remote-api/src/handle_data_message.rs b/zenoh-plugin-remote-api/src/handle_data_message.rs index 996198d..800385a 100644 --- a/zenoh-plugin-remote-api/src/handle_data_message.rs +++ b/zenoh-plugin-remote-api/src/handle_data_message.rs @@ -43,6 +43,7 @@ pub async fn handle_data_message( payload, attachment, encoding, + timestamp, } => { if let Some(publisher) = state_map.publishers.get(&id) { let mut put_builder = match payload.b64_to_bytes() { @@ -67,6 +68,9 @@ pub async fn handle_data_message( if let Some(encoding) = encoding { put_builder = put_builder.encoding(encoding); } + if let Some(ts) = timestamp.and_then(|k| state_map.timestamps.get(&k)) { + put_builder = put_builder.timestamp(*ts); + } if let Err(err) = put_builder.await { error!("PublisherPut {id}, {err}"); } @@ -74,6 +78,31 @@ pub async fn handle_data_message( warn!("Publisher {id}, does not exist in State"); } } + DataMsg::PublisherDelete { + id, + attachment, + timestamp, + } => { + if let Some(publisher) = state_map.publishers.get(&id) { + let mut publisher_builder = publisher.delete(); + match attachment.map(|x| x.b64_to_bytes()) { + Some(Ok(attachment)) => { + publisher_builder = publisher_builder.attachment(&attachment); + } + Some(Err(e)) => { + error!("{}", e); + } + None => {} + } + if let Some(ts) = timestamp.and_then(|k| state_map.timestamps.get(&k)) { + publisher_builder = publisher_builder.timestamp(*ts); + } + + if let Err(e) = publisher_builder.await { + error!("Could not publish {e}"); + }; + } + } DataMsg::Queryable(queryable_msg) => match queryable_msg { QueryableMsg::Reply { reply } => { let query: Option = match state_map.unanswered_queries.write() { @@ -117,7 +146,14 @@ pub async fn handle_data_message( warn!("Plugin should not receive Query from Client, This should go via Get API"); } }, - data_msg => { + DataMsg::Sample(_, _) + | DataMsg::GetReply(_) + | DataMsg::NewTimestamp { + id: _, + string_rep: _, + millis_since_epoch: _, + } + | DataMsg::SessionInfo(_) => { error!("Server Should not recieved a {data_msg:?} Variant from client"); } } diff --git a/zenoh-plugin-remote-api/src/interface/mod.rs b/zenoh-plugin-remote-api/src/interface/mod.rs index 74d3e15..bc4044e 100644 --- a/zenoh-plugin-remote-api/src/interface/mod.rs +++ b/zenoh-plugin-remote-api/src/interface/mod.rs @@ -74,14 +74,24 @@ pub enum DataMsg { payload: B64String, attachment: Option, encoding: Option, + timestamp: Option, + }, + PublisherDelete { + id: Uuid, + attachment: Option, + timestamp: Option, }, // SVR -> Client // Subscriber Sample(SampleWS, Uuid), // GetReply GetReply(ReplyWS), - // SessionInfo(SessionInfo), + NewTimestamp { + id: Uuid, + string_rep: String, + millis_since_epoch: u64, + }, // Bidirectional Queryable(QueryableMsg), @@ -125,6 +135,7 @@ pub enum ControlMsg { OpenSession, CloseSession, Session(Uuid), + NewTimestamp(Uuid), // SessionInfo, @@ -160,6 +171,13 @@ pub enum ControlMsg { )] #[ts(type = "number | undefined")] priority: Option, + #[serde( + deserialize_with = "deserialize_query_target", + serialize_with = "serialize_query_target", + default + )] + #[ts(type = "number | undefined")] + target: Option, #[ts(type = "boolean | undefined")] express: Option, #[ts(type = "string | undefined")] @@ -197,6 +215,8 @@ pub enum ControlMsg { express: Option, #[ts(type = "string | undefined")] attachment: Option, + #[ts(type = "string | undefined")] + timestamp: Option, }, Delete { #[ts(as = "OwnedKeyExprWrapper")] @@ -220,6 +240,8 @@ pub enum ControlMsg { express: Option, #[ts(type = "string | undefined")] attachment: Option, + #[ts(type = "string | undefined")] + timestamp: Option, }, // Subscriber DeclareSubscriber { @@ -269,6 +291,7 @@ pub enum ControlMsg { key_expr: OwnedKeyExpr, id: Uuid, complete: bool, + handler: HandlerChannel, }, UndeclareQueryable(Uuid), // Quierer diff --git a/zenoh-plugin-remote-api/src/lib.rs b/zenoh-plugin-remote-api/src/lib.rs index 0cb9363..0a48409 100644 --- a/zenoh-plugin-remote-api/src/lib.rs +++ b/zenoh-plugin-remote-api/src/lib.rs @@ -49,6 +49,7 @@ use tokio_rustls::{ }; use tokio_tungstenite::tungstenite::protocol::Message; use tracing::{debug, error}; +use uhlc::Timestamp; use uuid::Uuid; use zenoh::{ bytes::{Encoding, ZBytes}, @@ -62,7 +63,7 @@ use zenoh::{ }, liveliness::LivelinessToken, pubsub::Publisher, - query::{Querier, Query, Queryable}, + query::{Querier, Query}, Session, }; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl}; @@ -477,11 +478,14 @@ struct RemoteState { websocket_tx: Sender, session_id: Uuid, session: Session, + // KeyExpr's + Timestamp + key_exprs: HashMap, + timestamps: HashMap, // PubSub subscribers: HashMap, OwnedKeyExpr)>, publishers: HashMap>, // Queryable - queryables: HashMap, OwnedKeyExpr)>, + queryables: HashMap, OwnedKeyExpr)>, unanswered_queries: Arc>>, // Liveliness liveliness_tokens: HashMap, @@ -496,6 +500,8 @@ impl RemoteState { websocket_tx, session_id, session, + key_exprs: HashMap::new(), + timestamps: HashMap::new(), subscribers: HashMap::new(), publishers: HashMap::new(), queryables: HashMap::new(), @@ -517,9 +523,7 @@ impl RemoteState { } for (_, (queryable, _)) in self.queryables { - if let Err(e) = queryable.undeclare().await { - error!("{e}") - } + queryable.abort(); } drop(self.unanswered_queries); diff --git a/zenoh-ts/examples/deno/README.md b/zenoh-ts/examples/deno/README.md index 61b69e2..57266f2 100644 --- a/zenoh-ts/examples/deno/README.md +++ b/zenoh-ts/examples/deno/README.md @@ -74,9 +74,10 @@ Keep this terminal running while building and running the example below The most simple way to run examples is to install [deno](https://deno.com/), and run each example individually. 1. Install [deno](https://deno.com/) -2. Navigate to the `/zenoh-ts/examples` directory -3. Install `zenoh-ts` library by running `yarn install` -4. Then run the examples by running `yarn run `, i.e. `yarn example src/z_sub.ts` +2. Install CLI parsing of arguments for deno : `deno add jsr:@std/cli` +3. Navigate to the `/zenoh-ts/examples` directory +4. Install `zenoh-ts` library by running `yarn install` +5. Then run the examples by running `yarn run `, i.e. `yarn example src/z_sub.ts` This will start an instance of Deno running the example. The application will attempt to connect to a `websocket_port` : `10000` where the Remote API plugin is expected to be running. diff --git a/zenoh-ts/examples/deno/deno.lock b/zenoh-ts/examples/deno/deno.lock new file mode 100644 index 0000000..5e00936 --- /dev/null +++ b/zenoh-ts/examples/deno/deno.lock @@ -0,0 +1,40 @@ +{ + "version": "4", + "specifiers": { + "jsr:@std/cli@^1.0.10": "1.0.10", + "npm:@types/uuid@10": "10.0.0", + "npm:typescript@^5.2.2": "5.6.3" + }, + "jsr": { + "@std/cli@1.0.10": { + "integrity": "d047f6f4954a5c2827fe0963765ddd3d8b6cc7b7518682842645b95f571539dc" + } + }, + "npm": { + "@types/uuid@10.0.0": { + "integrity": "sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==" + }, + "typescript@5.6.3": { + "integrity": "sha512-hjcS1mhfuyi4WW8IWtjP7brDrG2cuDZukyrYrSauoXGNgx0S7zceP07adYkJycEr56BOUTNPzbInooiN3fn1qw==" + } + }, + "redirects": { + "https://deno.land/std/flags/mod.ts": "https://deno.land/std@0.224.0/flags/mod.ts" + }, + "remote": { + "https://deno.land/std@0.224.0/assert/assert_exists.ts": "43420cf7f956748ae6ed1230646567b3593cb7a36c5a5327269279c870c5ddfd", + "https://deno.land/std@0.224.0/assert/assertion_error.ts": "ba8752bd27ebc51f723702fac2f54d3e94447598f54264a6653d6413738a8917", + "https://deno.land/std@0.224.0/flags/mod.ts": "88553267f34519c8982212185339efdb2d2e62c159ec558f47eb50c8952a6be3" + }, + "workspace": { + "dependencies": [ + "jsr:@std/cli@^1.0.10" + ], + "packageJson": { + "dependencies": [ + "npm:@types/uuid@10", + "npm:typescript@^5.2.2" + ] + } + } +} diff --git a/zenoh-ts/examples/deno/src/z_delete.ts b/zenoh-ts/examples/deno/src/z_delete.ts index 1e0a1ad..3808731 100644 --- a/zenoh-ts/examples/deno/src/z_delete.ts +++ b/zenoh-ts/examples/deno/src/z_delete.ts @@ -13,11 +13,33 @@ // import { Config, Session } from "@eclipse-zenoh/zenoh-ts"; +import { parseArgs } from "@std/cli/parse-args"; + +interface Args { + key: string; +} export async function main() { + const [key_expr] = get_args() + + console.log!("Opening session..."); const session = await Session.open(new Config("ws/127.0.0.1:10000")); - session.delete("demo/example/zenoh-ts-delete"); + + console.log!("Deleting resources matching '", key_expr, "'..."); + session.delete(key_expr); + await session.close(); } +// Convienence function for Getting Arguments +function get_args(): [string] { + const args: Args = parseArgs(Deno.args); + let key_expr_str = "demo/example/zenoh-ts-put"; + if (args.key != undefined) { + key_expr_str = args.key + } + return [key_expr_str] +} + + main() \ No newline at end of file diff --git a/zenoh-ts/examples/deno/src/z_get.ts b/zenoh-ts/examples/deno/src/z_get.ts index b58acdf..0c85f88 100644 --- a/zenoh-ts/examples/deno/src/z_get.ts +++ b/zenoh-ts/examples/deno/src/z_get.ts @@ -12,12 +12,25 @@ // ZettaScale Zenoh Team, // -import { deserialize_string, ReplyError, Config, Receiver, RecvErr, Reply, Sample, Session} from "@eclipse-zenoh/zenoh-ts"; +import { deserialize_string, ReplyError, Config, Receiver, RecvErr, Sample, Session, QueryTarget } from "@eclipse-zenoh/zenoh-ts"; +import { parseArgs } from "@std/cli/parse-args"; +import { Duration, Milliseconds } from 'typed-duration' +const { milliseconds } = Duration + +interface Args { + selector: string, + payload?: string, + target: string, + timeout: number, +} export async function main() { + const [selector, payload, timeout, query_target] = get_args() + const session = await Session.open(new Config("ws/127.0.0.1:10000")); // Callback get query + // const get_callback = async function (reply: Reply): Promise { // let resp = reply.result(); // if (resp instanceof Sample) { @@ -29,11 +42,11 @@ export async function main() { // } // }; - console.warn("Start z_get") // await session.get("demo/example/**", get_callback); + console.warn("Start z_get") // Poll receiever - let receiver: void | Receiver = await session.get("demo/example/**"); + const receiver: void | Receiver = session.get(selector, { payload: payload, timeout: timeout, target: query_target }); if (!(receiver instanceof Receiver)) { return // Return in case of callback get query } @@ -43,12 +56,12 @@ export async function main() { if (reply == RecvErr.MalformedReply) { console.warn("MalformedReply"); } else { - let resp = reply.result(); + const resp = reply.result(); if (resp instanceof Sample) { - let sample: Sample = resp; + const sample: Sample = resp; console.warn(">> Received ('", sample.keyexpr(), ":", sample.payload().deserialize(deserialize_string), "')"); } else { - let reply_error: ReplyError = resp; + const reply_error: ReplyError = resp; console.warn(">> Received (ERROR: '{", reply_error.payload().deserialize(deserialize_string), "}')"); } } @@ -58,4 +71,42 @@ export async function main() { } + +// Convienence function to parse command line arguments +function get_args(): [string, string | undefined, Milliseconds, QueryTarget] { + const args: Args = parseArgs(Deno.args); + let selector = "demo/example/**"; + let payload = undefined; + let target = "BEST_MATCHING"; + let timeout: Milliseconds = milliseconds.of(10000) + if (args.selector != undefined) { + selector = args.selector + } + if (args.payload != undefined) { + payload = args.payload + } + if (args.timeout != undefined) { + timeout = milliseconds.of(args.timeout) + } + if (args.target != undefined) { + target = args.target + } + let query_target; + switch (target) { + case "BEST_MATCHING": + query_target = QueryTarget.BestMatching + break; + case "ALL": + query_target = QueryTarget.All + break; + case "ALL_COMPLETE": + query_target = QueryTarget.AllComplete + break; + default: + query_target = QueryTarget.BestMatching + } + return [selector, payload, timeout, query_target] +} + + main() \ No newline at end of file diff --git a/zenoh-ts/examples/deno/src/z_get_liveliness.ts b/zenoh-ts/examples/deno/src/z_get_liveliness.ts index 7946f6a..53216b6 100644 --- a/zenoh-ts/examples/deno/src/z_get_liveliness.ts +++ b/zenoh-ts/examples/deno/src/z_get_liveliness.ts @@ -13,47 +13,63 @@ // import { - RingChannel, deserialize_string, Sample, Config, Subscriber, Session, KeyExpr, - SampleKind, + deserialize_string, Sample, Config, Session, KeyExpr, Receiver, RecvErr, ReplyError } from "@eclipse-zenoh/zenoh-ts"; -import { Duration } from 'typed-duration' -const { seconds } = Duration +import { Duration, Milliseconds } from 'typed-duration' +import { parseArgs } from "@std/cli/parse-args"; + +const { milliseconds } = Duration + +interface Args { + key: string; + timeout: number +} export async function main() { + const [key_expr_str, timeout] = get_args(); console.log("Opening session...") const session = await Session.open(new Config("ws/127.0.0.1:10000")); - let key_expr = new KeyExpr("group1/**"); - console.log("Sending Liveliness Query '", key_expr.toString(),"'"); + const key_expr = new KeyExpr(key_expr_str); + console.log("Sending Liveliness Query '", key_expr.toString(), "'"); - let receiver = session.liveliness().get(key_expr, {timeout: seconds.of(20)}); + const receiver: Receiver = session.liveliness().get(key_expr, { timeout: timeout }) as Receiver; - if (!(receiver instanceof Receiver)){ - return // Return in case of callback get query - } - let reply = await receiver.receive(); - + while (reply != RecvErr.Disconnected) { if (reply == RecvErr.MalformedReply) { console.warn("MalformedReply"); } else { - let resp = reply.result(); + const resp = reply.result(); if (resp instanceof Sample) { - let sample: Sample = resp; - console.warn(">> Alive token ('", sample.keyexpr() ,")"); + const sample: Sample = resp; + console.warn(">> Alive token ('", sample.keyexpr(), ")"); } else { - let reply_error: ReplyError = resp; + const reply_error: ReplyError = resp; console.warn(">> Received (ERROR: '", reply_error.payload().deserialize(deserialize_string), "')"); } } reply = await receiver.receive(); } - console.warn("End Liveliness query"); } + +function get_args(): [string, Milliseconds] { + const args: Args = parseArgs(Deno.args); + let key_expr_str = "group1/**"; + let timeout: Milliseconds = milliseconds.of(10000) + if (args.key != undefined) { + key_expr_str = args.key + } + if (args.timeout != undefined) { + timeout = milliseconds.of(args.timeout) + } + return [key_expr_str, timeout] +} + main() \ No newline at end of file diff --git a/zenoh-ts/examples/deno/src/z_info.ts b/zenoh-ts/examples/deno/src/z_info.ts index 889f0f8..f46bb9a 100644 --- a/zenoh-ts/examples/deno/src/z_info.ts +++ b/zenoh-ts/examples/deno/src/z_info.ts @@ -21,7 +21,7 @@ export async function main() { const session = await Session.open(new Config("ws/127.0.0.1:10000")); console.log!("Get Info..."); - let info: SessionInfo = await session.info(); + const info: SessionInfo = await session.info(); console.log!("zid: {}", info.zid()); @@ -37,8 +37,5 @@ export async function main() { } -function sleep(ms: number) { - return new Promise((resolve) => setTimeout(resolve, ms)); -} main() \ No newline at end of file diff --git a/zenoh-ts/examples/deno/src/z_liveliness.ts b/zenoh-ts/examples/deno/src/z_liveliness.ts index 34c58e6..40204a9 100644 --- a/zenoh-ts/examples/deno/src/z_liveliness.ts +++ b/zenoh-ts/examples/deno/src/z_liveliness.ts @@ -14,24 +14,40 @@ import { Config, Session, KeyExpr, LivelinessToken } from "@eclipse-zenoh/zenoh-ts"; +import { parseArgs } from "@std/cli/parse-args"; -export async function main() { +interface Args { + key: string, +} +export async function main() { + const [key] = get_args(); console.log("Opening session...") - const session = await Session.open(new Config ("ws/127.0.0.1:10000")); - let key_expr = new KeyExpr("group1/zenoh-rs"); - console.log("Declaring Liveliness token on ",key_expr.toString()); + const session = await Session.open(new Config("ws/127.0.0.1:10000")); + const key_expr = new KeyExpr(key); + console.log("Declaring Liveliness token on ", key_expr.toString()); - let token: LivelinessToken = session.liveliness().declare_token(key_expr); + const token: LivelinessToken = session.liveliness().declare_token(key_expr); // LivelinessTokens are NOT automatically closed when dropped // please call token.undeclare(); while (true) { await sleep(10000); + token; console.log("Tick") } } +function get_args(): [string] { + const args: Args = parseArgs(Deno.args); + let key_expr_str = "group1/zenoh-ts"; + if (args.key != undefined) { + key_expr_str = args.key + } + return [key_expr_str] +} + + function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); } diff --git a/zenoh-ts/examples/deno/src/z_ping.ts b/zenoh-ts/examples/deno/src/z_ping.ts index ceb52c4..226ebd2 100644 --- a/zenoh-ts/examples/deno/src/z_ping.ts +++ b/zenoh-ts/examples/deno/src/z_ping.ts @@ -18,8 +18,8 @@ import { Encoding, CongestionControl, Config, Session } from "@eclipse-zenoh/zen export async function main() { const session = await Session.open(new Config("ws/127.0.0.1:10000")); - let sub = await session.declare_subscriber("test/pong", new FifoChannel(256)); - let pub = session.declare_publisher( + const sub = session.declare_subscriber("test/pong", { handler: new FifoChannel(256) }); + const pub = session.declare_publisher( "test/ping", { encoding: Encoding.default(), @@ -30,25 +30,25 @@ export async function main() { // Warm up console.warn("Warming up for 5 seconds..."); - let startTime = new Date(); - let data = [122, 101, 110, 111, 104]; + const startTime = new Date(); + const data = [122, 101, 110, 111, 104]; while (elapsed(startTime) < 5) { - pub.put(data); + pub.put({ payload: data }); await sub.receive(); } - let samples = 600; - let samples_out = []; + const samples = 600; + const samples_out = []; for (let i = 0; i < samples; i++) { - let write_time = new Date(); - pub.put(data); + const write_time = new Date(); + pub.put({ payload: data }); await sub.receive(); samples_out.push(elapsed_ms(write_time)); } for (let i = 0; i < samples_out.length; i++) { - let rtt = samples_out[i]; + const rtt = samples_out[i]; console.warn( data.length + "bytes: seq=" + @@ -63,17 +63,17 @@ export async function main() { } function elapsed(startTime: Date) { - let endTime = new Date(); + const endTime = new Date(); - let timeDiff = + const timeDiff = (endTime.getMilliseconds() - startTime.getMilliseconds()) / 1000; //in s - let seconds = Math.round(timeDiff); + const seconds = Math.round(timeDiff); return seconds; } function elapsed_ms(startTime: Date) { - let endTime = new Date(); - let timeDiff: number = + const endTime = new Date(); + const timeDiff: number = endTime.getMilliseconds() - startTime.getMilliseconds(); //in ms return timeDiff; } diff --git a/zenoh-ts/examples/deno/src/z_pong.ts b/zenoh-ts/examples/deno/src/z_pong.ts index c713fb9..dadb4ee 100644 --- a/zenoh-ts/examples/deno/src/z_pong.ts +++ b/zenoh-ts/examples/deno/src/z_pong.ts @@ -17,7 +17,7 @@ import { Encoding, CongestionControl, Sample, Config, Session } from "@eclipse-z export async function main() { const session = await Session.open(new Config("ws/127.0.0.1:10000")); - let pub = session.declare_publisher( + const pub = session.declare_publisher( "test/ping", { encoding: Encoding.default(), @@ -26,14 +26,14 @@ export async function main() { ); const subscriber_callback = async function (sample: Sample): Promise { - await pub.put(sample.payload()); + pub.put({ payload: sample.payload() }); }; - await session.declare_subscriber("test/pong", subscriber_callback); + session.declare_subscriber("test/pong", { handler: subscriber_callback }); let count = 0; while (true) { - let seconds = 100; + const seconds = 100; await sleep(1000 * seconds); count = count + 1; } diff --git a/zenoh-ts/examples/deno/src/z_pub.ts b/zenoh-ts/examples/deno/src/z_pub.ts index 420a513..6d60632 100644 --- a/zenoh-ts/examples/deno/src/z_pub.ts +++ b/zenoh-ts/examples/deno/src/z_pub.ts @@ -13,12 +13,23 @@ // import { Priority, Reliability, Encoding, CongestionControl, Config, KeyExpr, Publisher, Session } from "@eclipse-zenoh/zenoh-ts"; +import { parseArgs } from "@std/cli/parse-args"; + +interface Args { + payload: string, + key: string + attach: string +} export async function main() { + + const [key, payload, attach] = get_args(); + + console.log("Opening session...") const session = await Session.open(new Config("ws/127.0.0.1:10000")); - let key_expr = new KeyExpr("demo/example/zenoh-ts-pub"); - let publisher: Publisher = session.declare_publisher( + const key_expr = new KeyExpr(key); + const publisher: Publisher = session.declare_publisher( key_expr, { encoding: Encoding.default(), @@ -29,19 +40,36 @@ export async function main() { } ); - const payload = [122, 101, 110, 111, 104]; - for (let idx = 0; idx < Number.MAX_VALUE; idx++) { - let buf = `[${idx}] ${payload}`; + const buf = `[${idx}] ${payload}`; console.warn("Block statement execution no : " + idx); console.warn(`Putting Data ('${key_expr}': '${buf}')...`); - publisher.put(buf, Encoding.TEXT_PLAIN, "attachment"); + publisher.put({ payload: buf, encoding: Encoding.TEXT_PLAIN, attachment: attach }); await sleep(1000); + } +} + +function get_args(): [string, string, string | undefined] { + const args: Args = parseArgs(Deno.args); + let key_expr_str = "demo/example/zenoh-ts-pub"; + let payload = "Pub from Typescript!"; + let attach = undefined; + if (args.key != undefined) { + key_expr_str = args.key + } + if (args.payload != undefined) { + payload = args.payload } + if (args.attach != undefined) { + attach = args.attach + } + + return [key_expr_str, payload, attach] } + function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); } diff --git a/zenoh-ts/examples/deno/src/z_put.ts b/zenoh-ts/examples/deno/src/z_put.ts index 2e821ab..e971a34 100644 --- a/zenoh-ts/examples/deno/src/z_put.ts +++ b/zenoh-ts/examples/deno/src/z_put.ts @@ -13,12 +13,36 @@ // import { Config, Session } from "@eclipse-zenoh/zenoh-ts"; +import { parseArgs } from "@std/cli/parse-args"; + +interface Args { + payload: string, + key: string +} export async function main() { + const [key, payload] = get_args(); + console.warn('Running Zenoh Put !'); const session = await Session.open(new Config("ws/127.0.0.1:10000")); - session.put("demo/example/zenoh-ts-put", "Put from Typescript!"); + session.put(key, payload); + +} + +function get_args(): [string, string] { + const args: Args = parseArgs(Deno.args); + let key_expr_str = "demo/example/zenoh-ts-put"; + let payload = "Put from Typescript!"; + + if (args.key != undefined) { + key_expr_str = args.key + } + if (args.payload != undefined) { + payload = args.payload + } + + return [key_expr_str, payload] } main() diff --git a/zenoh-ts/examples/deno/src/z_querier.ts b/zenoh-ts/examples/deno/src/z_querier.ts index 5b9fd5e..a620c8b 100644 --- a/zenoh-ts/examples/deno/src/z_querier.ts +++ b/zenoh-ts/examples/deno/src/z_querier.ts @@ -12,37 +12,47 @@ // ZettaScale Zenoh Team, // -import { Duration, deserialize_string, ReplyError, Config, Receiver, RecvErr, Reply, Sample, Session, QueryTarget, Locality } from "@eclipse-zenoh/zenoh-ts"; +import { deserialize_string, ReplyError, Config, Receiver, RecvErr, Sample, Session, QueryTarget, Selector, } from "@eclipse-zenoh/zenoh-ts"; +import { Duration, Milliseconds } from 'typed-duration' +import { parseArgs } from "@std/cli/parse-args"; +const { milliseconds } = Duration + +interface Args { + selector: string, + payload?: string, + target: string, + timeout: number, +} export async function main() { const session = await Session.open(new Config("ws/127.0.0.1:10000")); + const [selector, _payload, timeout, query_target] = get_args() - - let querier = session.declare_querier("demo/example/**", + const querier = session.declare_querier(selector.key_expr(), { - target: QueryTarget.BestMatching, - timeout: Duration.milliseconds.of(10000), + target: query_target, + timeout: timeout, } ); - for(let i =0; i<1000; i++) { + for (let i = 0; i < 1000; i++) { await sleep(1000) - let payload = "["+i+"] Querier Get from Zenoh-ts!"; - let receiver = querier.get({payload:payload}) as Receiver; - + const payload = "[" + i + "]" + _payload; + const receiver = querier.get({ payload: payload, parameters: selector.parameters() }) as Receiver; + let reply = await receiver.receive(); while (reply != RecvErr.Disconnected) { if (reply == RecvErr.MalformedReply) { console.warn("MalformedReply"); } else { - let resp = reply.result(); + const resp = reply.result(); if (resp instanceof Sample) { - let sample: Sample = resp; + const sample: Sample = resp; console.warn(">> Received ('", sample.keyexpr(), ":", sample.payload().deserialize(deserialize_string), "')"); } else { - let reply_error: ReplyError = resp; + const reply_error: ReplyError = resp; console.warn(">> Received (ERROR: '{", reply_error.payload().deserialize(deserialize_string), "}')"); } } @@ -52,6 +62,43 @@ export async function main() { } } +// Convienence function to parse command line arguments +function get_args(): [Selector, string | undefined, Milliseconds, QueryTarget] { + const args: Args = parseArgs(Deno.args); + let selector = new Selector("demo/example/**"); + let payload = "Querier Get from Zenoh-ts!"; + let target = "BEST_MATCHING"; + let timeout: Milliseconds = milliseconds.of(10000) + if (args.selector != undefined) { + const [key_expr, parameters] = args.selector.split("?") + selector = new Selector(key_expr, parameters); + } + if (args.payload != undefined) { + payload = args.payload + } + if (args.timeout != undefined) { + timeout = milliseconds.of(args.timeout) + } + if (args.target != undefined) { + target = args.target + } + let query_target; + switch (target) { + case "BEST_MATCHING": + query_target = QueryTarget.BestMatching + break; + case "ALL": + query_target = QueryTarget.All + break; + case "ALL_COMPLETE": + query_target = QueryTarget.AllComplete + break; + default: + query_target = QueryTarget.BestMatching + } + return [selector, payload, timeout, query_target] +} + function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); } diff --git a/zenoh-ts/examples/deno/src/z_queryable.ts b/zenoh-ts/examples/deno/src/z_queryable.ts index 1ae6c9b..9d39518 100644 --- a/zenoh-ts/examples/deno/src/z_queryable.ts +++ b/zenoh-ts/examples/deno/src/z_queryable.ts @@ -13,10 +13,19 @@ // import { Config, KeyExpr, Query, Queryable, Session, ZBytes } from "@eclipse-zenoh/zenoh-ts"; +import { parseArgs } from "@std/cli/parse-args"; + +interface Args { + key: string, + payload?: string, + complete: boolean +} export async function main() { const session = await Session.open(new Config("ws/127.0.0.1:10000")); - let key_expr = new KeyExpr("demo/example/zenoh-ts-queryable"); + const [key, _payload, complete] = get_args() + + const key_expr = new KeyExpr(key); console.warn("Declare Queryable on KeyExpr:", key_expr.toString()); const response = "Queryable from Typescript!"; @@ -48,34 +57,50 @@ export async function main() { // Declaring a Queryable with a handler - let queryable: Queryable = await session.declare_queryable(key_expr, { - complete: true, + const queryable: Queryable = session.declare_queryable(key_expr, { + complete: complete, }); let query = await queryable.receive(); while (query instanceof Query) { - let zbytes: ZBytes | undefined = query.payload(); + const zbytes: ZBytes | undefined = query.payload(); if (zbytes == null) { console.warn!(`>> [Queryable ] Received Query ${query.selector().toString()}`); } else { console.warn!( - `>> [Queryable ] Received Query ${query.selector().toString()} with payload '${zbytes.buffer()}'`, + `>> [Queryable ] Received Query ${query.selector().toString()} with payload '${zbytes.to_bytes()}'`, ); } - + console.warn( `>> [Queryable ] Responding ${key_expr.toString()} with payload '${response}'`, ); query.reply(key_expr, response); - + query = await queryable.receive(); } } -function sleep(ms: number) { - return new Promise((resolve) => setTimeout(resolve, ms)); +// Convienence function to parse command line arguments +function get_args(): [string, string | undefined, boolean] { + const args: Args = parseArgs(Deno.args); + + let key_expr = "demo/example/zenoh-ts-queryable"; + let payload = "Querier Get from Zenoh-ts!"; + let complete = true; + + if (args.key != undefined) { + key_expr = args.key; + } + if (args.payload != undefined) { + payload = args.payload + } + if (args.complete != undefined) { + complete = args.complete + } + return [key_expr, payload, complete] } main() diff --git a/zenoh-ts/examples/deno/src/z_sub.ts b/zenoh-ts/examples/deno/src/z_sub.ts index e8627c9..e81aea5 100644 --- a/zenoh-ts/examples/deno/src/z_sub.ts +++ b/zenoh-ts/examples/deno/src/z_sub.ts @@ -13,38 +13,25 @@ // import { - RingChannel, deserialize_string, Sample, Config, Subscriber, Session, KeyExpr + RingChannel, deserialize_string, Config, Subscriber, Session, KeyExpr } from "@eclipse-zenoh/zenoh-ts"; +import { parseArgs } from "@std/cli/parse-args"; -export async function main() { - const session = await Session.open(new Config("ws/127.0.0.1:10000")); - let key_expr = new KeyExpr("demo/example/**"); +interface Args { + key: string; +} - // const callback = async function (sample: Sample): Promise { - // console.warn!( - // ">> [Subscriber] Received " + - // sample.kind() + " ('" + - // sample.keyexpr() + "': '" + - // sample.payload().deserialize(deserialize_string) + "')", - // ); - // }; +export async function main() { + const [key] = get_args(); - // console.warn("Declare Subscriber ", key_expr.toString()); - // // Callback Subscriber take a callback which will be called upon every sample received. - // let callback_subscriber: Subscriber = await session.declare_subscriber( - // key_expr, - // callback, - // ); + console.log("Starting zenoh Subscriber ! ") + const session = await Session.open(new Config("ws/127.0.0.1:10000")); + const key_expr = new KeyExpr(key); - // await sleep(1000 * 3); - // callback_subscriber.undeclare(); - // console.warn("Undeclare callback_subscriber"); + const poll_subscriber: Subscriber = session.declare_subscriber(key_expr, { handler: new RingChannel(10) }); - // Poll Subscribers will only consume data on calls to receieve() - // This means that interally the FIFO queue will fill up to the point that new values will be dropped - // The dropping of these values occurs in the Remote-API Plugin - let poll_subscriber: Subscriber = await session.declare_subscriber(key_expr, new RingChannel(10)); let sample = await poll_subscriber.receive(); + while (sample != undefined) { console.warn!( ">> [Subscriber] Received " + @@ -58,8 +45,13 @@ export async function main() { poll_subscriber.undeclare(); } -function sleep(ms: number) { - return new Promise((resolve) => setTimeout(resolve, ms)); +function get_args(): [string] { + const args: Args = parseArgs(Deno.args); + let key_expr_str = "demo/example/**"; + if (args.key != undefined) { + key_expr_str = args.key + } + return [key_expr_str] } main() \ No newline at end of file diff --git a/zenoh-ts/examples/deno/src/z_sub_liveliness.ts b/zenoh-ts/examples/deno/src/z_sub_liveliness.ts index e507623..a6c36b3 100644 --- a/zenoh-ts/examples/deno/src/z_sub_liveliness.ts +++ b/zenoh-ts/examples/deno/src/z_sub_liveliness.ts @@ -16,15 +16,23 @@ import { Config, Subscriber, Session, KeyExpr, SampleKind } from "@eclipse-zenoh/zenoh-ts"; +import { parseArgs } from "@std/cli/parse-args"; + +interface Args { + key: string, + history: boolean +} export async function main() { + const [key, history] = get_args(); + console.log("Opening session...") const session = await Session.open(new Config("ws/127.0.0.1:10000")); - let key_expr = new KeyExpr("group1/**"); + const key_expr = new KeyExpr(key); console.log("Declaring Liveliness Subscriber on ", key_expr.toString()); - let liveliness_subscriber: Subscriber = session.liveliness().declare_subscriber(key_expr, { history: true }); + const liveliness_subscriber: Subscriber = session.liveliness().declare_subscriber(key_expr, { history: history }); let sample = await liveliness_subscriber.receive(); @@ -47,8 +55,26 @@ export async function main() { } sample = await liveliness_subscriber.receive(); } - liveliness_subscriber.undeclare(); } +// Convienence function to parse command line arguments +function get_args(): [string, boolean] { + const args: Args = parseArgs(Deno.args); + + let key_expr = "group1/**"; + let history = true; + + if (args.key != undefined) { + key_expr = args.key; + } + + if (args.history != undefined) { + history = args.history + } + return [key_expr, history] +} + + + main(); \ No newline at end of file diff --git a/zenoh-ts/examples/deno/src/z_sub_thr.ts b/zenoh-ts/examples/deno/src/z_sub_thr.ts index 5c0388c..07c2274 100644 --- a/zenoh-ts/examples/deno/src/z_sub_thr.ts +++ b/zenoh-ts/examples/deno/src/z_sub_thr.ts @@ -46,8 +46,8 @@ class Stats { } print_round() { - let elapsed_ms = Date.now() - this.round_start; - let throughput = (this.round_size) / (elapsed_ms / 1000); + const elapsed_ms = Date.now() - this.round_start; + const throughput = (this.round_size) / (elapsed_ms / 1000); console.warn(throughput, " msg/s"); } } @@ -55,20 +55,20 @@ class Stats { export async function main() { console.warn("Open Session"); const session: Session = await Session.open(new Config("ws/127.0.0.1:10000")); - let stats = new Stats(100000); + const stats = new Stats(100000); const subscriber_callback = async function (_sample: Sample): Promise { stats.increment(); }; console.warn("Declare subscriber"); - await session.declare_subscriber( + session.declare_subscriber( "test/thr", - subscriber_callback, + { handler: subscriber_callback } ); - var count = 0; + let count = 0; while (true) { - var seconds = 100; + const seconds = 100; await sleep(1000 * seconds); console.warn("Main Loop ? ", count); count = count + 1; diff --git a/zenoh-ts/src/key_expr.ts b/zenoh-ts/src/key_expr.ts index c60f279..a824a89 100644 --- a/zenoh-ts/src/key_expr.ts +++ b/zenoh-ts/src/key_expr.ts @@ -17,6 +17,8 @@ // ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ // ██ ██ ███████ ██ ███████ ██ ██ ██ ██ ██ +// import { UUIDv4 } from "./remote_api/session"; + export type IntoKeyExpr = KeyExpr | String | string; export class KeyExpr { @@ -24,7 +26,7 @@ export class KeyExpr { * Class to represent a Key Expression in Zenoh */ private _inner: string; - + // private key_expr_uuid: UUIDv4; constructor(keyexpr: IntoKeyExpr) { if (keyexpr instanceof KeyExpr) { @@ -34,7 +36,6 @@ export class KeyExpr { } else { this._inner = keyexpr; } - } toString(): string { diff --git a/zenoh-ts/src/liveliness.ts b/zenoh-ts/src/liveliness.ts index d415981..b2d0a88 100644 --- a/zenoh-ts/src/liveliness.ts +++ b/zenoh-ts/src/liveliness.ts @@ -78,6 +78,7 @@ export class Liveliness { let subscriber = Subscriber[NewSubscriber]( remote_subscriber, + _key_expr, callback_subscriber, ); diff --git a/zenoh-ts/src/pubsub.ts b/zenoh-ts/src/pubsub.ts index cbff9cb..b38a6e5 100644 --- a/zenoh-ts/src/pubsub.ts +++ b/zenoh-ts/src/pubsub.ts @@ -26,6 +26,8 @@ import { Sample_from_SampleWS, } from "./sample.js"; import { Encoding, IntoEncoding } from "./encoding.js"; +import { Timestamp } from "./timestamp.js"; + // ███████ ██ ██ ██████ ███████ ██████ ██████ ██ ██████ ███████ ██████ // ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ @@ -45,6 +47,10 @@ export class Subscriber { * @ignore */ private remote_subscriber: RemoteSubscriber; + /** + * @ignore + */ + private _key_expr: KeyExpr; /** * @ignore */ @@ -65,13 +71,22 @@ export class Subscriber { */ private constructor( remote_subscriber: RemoteSubscriber, + key_expr: KeyExpr, callback_subscriber: boolean, ) { this.remote_subscriber = remote_subscriber; this.callback_subscriber = callback_subscriber; + this._key_expr = key_expr; Subscriber.registry.register(this, remote_subscriber, this) } + /** + * returns the key expression of an object + * @returns KeyExpr + */ + key_expr(): KeyExpr { + return this._key_expr + } /** * Receives a new message on the subscriber * note: If subscriber was created with a callback, this recieve will return undefined, @@ -112,9 +127,10 @@ export class Subscriber { */ static [NewSubscriber]( remote_subscriber: RemoteSubscriber, + key_expr: KeyExpr, callback_subscriber: boolean, ): Subscriber { - return new Subscriber(remote_subscriber, callback_subscriber); + return new Subscriber(remote_subscriber, key_expr, callback_subscriber); } } @@ -166,6 +182,28 @@ export class FifoChannel implements Handler { // ██████ ██ ██ ██████ ██ ██ ███████ ███████ █████ ██████ // ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ // ██ ██████ ██████ ███████ ██ ███████ ██ ██ ███████ ██ ██ + +/** + * + * @param {IntoZBytes} payload - user payload, type that can be converted into a ZBytes + * @param {IntoEncoding=} encoding - Encoding parameter for Zenoh data + * @param {IntoZBytes=} attachment - optional extra data to send with Payload + */ +export interface PublisherPutOptions { + payload: IntoZBytes, + encoding?: IntoEncoding, + attachment?: IntoZBytes, + timestamp?: Timestamp; +} + +/** + * @param {IntoZBytes=} attachment - optional extra data to send with Payload + */ +export interface PublisherDeleteOptions { + attachment?: IntoZBytes, + timestamp?: Timestamp +} + export class Publisher { /** * Class that represents a Zenoh Publisher, @@ -234,35 +272,37 @@ export class Publisher { /** * Puts a payload on the publisher associated with this class instance * - * @param {IntoZBytes} payload - user payload, type that can be converted into a ZBytes - * @param {IntoEncoding=} encoding - Encoding parameter for Zenoh data - * @param {IntoZBytes=} attachment - optional extra data to send with Payload + * @param {PublisherPutOptions} put_options * * @returns void */ put( - payload: IntoZBytes, - encoding?: IntoEncoding, - attachment?: IntoZBytes, + put_options: PublisherPutOptions, ): void { - let zbytes: ZBytes = new ZBytes(payload); + let zbytes: ZBytes = new ZBytes(put_options.payload); let _encoding; - if (encoding != null) { - _encoding = Encoding.intoEncoding(encoding); + let _timestamp = null; + if (put_options.timestamp != null) { + _timestamp = put_options.timestamp.get_resource_uuid() as unknown as string; + } + + if (put_options.encoding != null) { + _encoding = Encoding.intoEncoding(put_options.encoding); } else { _encoding = Encoding.default(); } let _attachment = null; - if (attachment != null) { - let att_bytes = new ZBytes(attachment); - _attachment = Array.from(att_bytes.buffer()); + if (put_options.attachment != null) { + let att_bytes = new ZBytes(put_options.attachment); + _attachment = Array.from(att_bytes.to_bytes()); } return this._remote_publisher.put( - Array.from(zbytes.buffer()), + Array.from(zbytes.to_bytes()), _attachment, _encoding.toString(), + _timestamp, ); } @@ -302,6 +342,31 @@ export class Publisher { return this._congestion_control; } + /** + * + * executes delete on publisher + * @param {PublisherDeleteOptions} delete_options: Options associated with a publishers delete + * @returns void + */ + delete(delete_options: PublisherDeleteOptions) { + + let _attachment = null; + if (delete_options.attachment != null) { + let att_bytes = new ZBytes(delete_options.attachment); + _attachment = Array.from(att_bytes.to_bytes()); + } + + let _timestamp = null; + if (delete_options.timestamp != null) { + _timestamp = delete_options.timestamp.get_resource_uuid() as unknown as string; + } + + return this._remote_publisher.delete( + _attachment, + _timestamp + ); + } + /** * undeclares publisher * diff --git a/zenoh-ts/src/querier.ts b/zenoh-ts/src/querier.ts index 9fd019e..a6a2632 100644 --- a/zenoh-ts/src/querier.ts +++ b/zenoh-ts/src/querier.ts @@ -24,6 +24,7 @@ import { RemoteQuerier } from "./remote_api/querier.js"; import { KeyExpr } from "./key_expr.js"; import { Encoding } from "crypto"; import { Receiver } from "./session.js"; +import { Parameters } from "./query.js"; export enum QueryTarget { /// Let Zenoh find the BestMatching queryable capabale of serving the query. @@ -116,7 +117,7 @@ export interface QuerierGetOptions { encoding?: Encoding, payload?: IntoZBytes, attachment?: IntoZBytes, - parameters?: string + parameters?: Parameters } /** @@ -215,13 +216,13 @@ export class Querier { let _encoding = get_options?.encoding?.toString() if (get_options?.attachment != undefined) { - _attachment = Array.from(new ZBytes(get_options?.attachment).buffer()) + _attachment = Array.from(new ZBytes(get_options?.attachment).to_bytes()) } if (get_options?.payload != undefined) { - _payload = Array.from(new ZBytes(get_options?.payload).buffer()) + _payload = Array.from(new ZBytes(get_options?.payload).to_bytes()) } if (get_options?.parameters != undefined) { - _parameters = get_options?.parameters; + _parameters = get_options?.parameters.toString(); } let chan: SimpleChannel = this._remote_querier.get( diff --git a/zenoh-ts/src/query.ts b/zenoh-ts/src/query.ts index 69c530a..d2362c9 100644 --- a/zenoh-ts/src/query.ts +++ b/zenoh-ts/src/query.ts @@ -244,7 +244,7 @@ export class Query { let qr_variant: QueryReplyVariant = { Reply: { key_expr: _key_expr.toString(), - payload: b64_str_from_bytes(z_bytes.buffer()), + payload: b64_str_from_bytes(z_bytes.to_bytes()), }, }; @@ -258,7 +258,7 @@ export class Query { reply_err(payload: IntoZBytes): void { let z_bytes: ZBytes = new ZBytes(payload); let qr_variant: QueryReplyVariant = { - ReplyErr: { payload: b64_str_from_bytes(z_bytes.buffer()) }, + ReplyErr: { payload: b64_str_from_bytes(z_bytes.to_bytes()) }, }; this.reply_ws(qr_variant); } diff --git a/zenoh-ts/src/remote_api/interface/ControlMsg.ts b/zenoh-ts/src/remote_api/interface/ControlMsg.ts index dbabc12..36b0ea7 100644 --- a/zenoh-ts/src/remote_api/interface/ControlMsg.ts +++ b/zenoh-ts/src/remote_api/interface/ControlMsg.ts @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel.js"; import type { LivelinessMsg } from "./LivelinessMsg.js"; import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper.js"; -export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | "SessionInfo" | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg }; +export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "NewTimestamp": string } | "SessionInfo" | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, target: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, handler: HandlerChannel, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg }; diff --git a/zenoh-ts/src/remote_api/interface/DataMsg.ts b/zenoh-ts/src/remote_api/interface/DataMsg.ts index c170fbf..7286e6c 100644 --- a/zenoh-ts/src/remote_api/interface/DataMsg.ts +++ b/zenoh-ts/src/remote_api/interface/DataMsg.ts @@ -5,4 +5,4 @@ import type { ReplyWS } from "./ReplyWS.js"; import type { SampleWS } from "./SampleWS.js"; import type { SessionInfo } from "./SessionInfo.js"; -export type DataMsg = { "PublisherPut": { id: string, payload: B64String, attachment: B64String | null, encoding: string | null, } } | { "Sample": [SampleWS, string] } | { "GetReply": ReplyWS } | { "SessionInfo": SessionInfo } | { "Queryable": QueryableMsg }; +export type DataMsg = { "PublisherPut": { id: string, payload: B64String, attachment: B64String | null, encoding: string | null, timestamp: string | null, } } | { "PublisherDelete": { id: string, attachment: B64String | null, timestamp: string | null, } } | { "Sample": [SampleWS, string] } | { "GetReply": ReplyWS } | { "SessionInfo": SessionInfo } | { "NewTimestamp": { id: string, string_rep: string, millis_since_epoch: bigint, } } | { "Queryable": QueryableMsg }; diff --git a/zenoh-ts/src/remote_api/pubsub.ts b/zenoh-ts/src/remote_api/pubsub.ts index b099074..67fce09 100644 --- a/zenoh-ts/src/remote_api/pubsub.ts +++ b/zenoh-ts/src/remote_api/pubsub.ts @@ -13,7 +13,6 @@ // import { SimpleChannel } from "channel-ts"; -import { v4 as uuidv4 } from "uuid"; import { encode as b64_str_from_bytes } from "base64-arraybuffer"; // Import interface @@ -22,7 +21,7 @@ import { DataMsg } from "./interface/DataMsg.js"; import { ControlMsg } from "./interface/ControlMsg.js"; // Remote Api -import { RemoteSession } from "./session.js"; +import { RemoteSession, UUIDv4 } from "./session.js"; function executeAsync(func: any) { setTimeout(func, 0); @@ -34,17 +33,15 @@ function executeAsync(func: any) { // ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ // ██ ██ ███████ ██ ██ ██████ ██ ███████ ██ ██████ ██████ ███████ ██ ███████ ██ ██ ███████ ██ ██ -type UUID = typeof uuidv4 | string; - export class RemotePublisher { private key_expr: String; - private publisher_id: UUID; + private publisher_id: UUIDv4; private session_ref: RemoteSession; private undeclared: boolean; constructor( key_expr: String, - publisher_id: UUID, + publisher_id: UUIDv4, session_ref: RemoteSession, ) { this.key_expr = key_expr; @@ -57,6 +54,7 @@ export class RemotePublisher { payload: Array, attachment: Array | null, encoding: string | null, + timestamp: string | null, ) { if (this.undeclared == true) { let message = @@ -80,6 +78,28 @@ export class RemotePublisher { payload: b64_str_from_bytes(new Uint8Array(payload)), attachment: optional_attachment, encoding: encoding, + timestamp: timestamp + }, + }; + this.session_ref.send_data_message(data_msg); + } + + // Delete + delete( + attachment: Array | null, + timestamp: string | null, + ) { + + let optional_attachment = null; + if (attachment != null) { + optional_attachment = b64_str_from_bytes(new Uint8Array(attachment)); + } + + let data_msg: DataMsg = { + PublisherDelete: { + id: this.publisher_id.toString(), + attachment: optional_attachment, + timestamp: timestamp, }, }; this.session_ref.send_data_message(data_msg); @@ -114,7 +134,7 @@ export class RemotePublisher { // else, must call receive on the export class RemoteSubscriber { private key_expr: String; - private subscriber_id: UUID; + private subscriber_id: UUIDv4; private session_ref: RemoteSession; private callback?: (sample: SampleWS) => void; private rx: SimpleChannel; @@ -123,7 +143,7 @@ export class RemoteSubscriber { private constructor( key_expr: String, - subscriber_id: UUID, + subscriber_id: UUIDv4, session_ref: RemoteSession, rx: SimpleChannel, callback?: (sample: SampleWS) => void, @@ -138,7 +158,7 @@ export class RemoteSubscriber { static new( key_expr: String, - subscriber_id: UUID, + subscriber_id: UUIDv4, session_ref: RemoteSession, rx: SimpleChannel, callback?: (sample: SampleWS) => void, diff --git a/zenoh-ts/src/remote_api/querier.ts b/zenoh-ts/src/remote_api/querier.ts index 47bec58..99250c6 100644 --- a/zenoh-ts/src/remote_api/querier.ts +++ b/zenoh-ts/src/remote_api/querier.ts @@ -13,20 +13,18 @@ // import { v4 as uuidv4 } from "uuid"; -import { RemoteSession } from "./session.js"; +import { RemoteSession, UUIDv4 } from "./session.js"; import { ControlMsg } from "./interface/ControlMsg.js" import { SimpleChannel } from "channel-ts"; import { ReplyWS } from "./interface/ReplyWS.js"; import { encode as b64_str_from_bytes } from "base64-arraybuffer"; -type UUID = typeof uuidv4 | string; - export class RemoteQuerier { - private querier_id: UUID; + private querier_id: UUIDv4; private session_ref: RemoteSession; constructor( - querier_id: UUID, + querier_id: UUIDv4, session_ref: RemoteSession, ) { this.querier_id = querier_id; diff --git a/zenoh-ts/src/remote_api/session.ts b/zenoh-ts/src/remote_api/session.ts index df93740..1bc666a 100644 --- a/zenoh-ts/src/remote_api/session.ts +++ b/zenoh-ts/src/remote_api/session.ts @@ -43,6 +43,12 @@ import { RemoteQuerier } from "./querier.js" // ██ ██ ███████ ██ ██ ██████ ██ ███████ ███████ ███████ ███████ ███████ ██ ██████ ██ ████ +export interface TimestampIface { + id: string, + string_rep: string, + millis_since_epoch: bigint +} + export enum RemoteRecvErr { Disconnected, } @@ -63,6 +69,7 @@ export class RemoteSession { liveliness_subscribers: Map>; liveliness_get_receiver: Map>; session_info: SessionInfoIface | null; + _new_timestamp: TimestampIface | null; private constructor(ws: WebSocket, ws_channel: SimpleChannel) { this.ws = ws; @@ -74,6 +81,7 @@ export class RemoteSession { this.liveliness_subscribers = new Map>(); this.liveliness_get_receiver = new Map>(); this.session_info = null; + this._new_timestamp = null; } // @@ -149,7 +157,6 @@ export class RemoteSession { while (this.session_info === null) { await sleep(10); } - return this.session_info; } @@ -160,7 +167,8 @@ export class RemoteSession { congestion_control?: number, priority?: number, express?: boolean, - attachment?: Array + attachment?: Array, + timestamp?:string, ): void { let owned_keyexpr: OwnedKeyExprWrapper = key_expr; @@ -178,13 +186,14 @@ export class RemoteSession { priority: priority, express: express, attachment: opt_attachment, + timestamp: timestamp }, }; this.send_ctrl_message(ctrl_message); } // get - async get( + get( key_expr: string, parameters: string | null, handler: HandlerChannel, @@ -192,11 +201,12 @@ export class RemoteSession { congestion_control?: number, priority?: number, express?: boolean, + target?: number, encoding?: string, payload?: Array, attachment?: Array, timeout_ms?: number, - ): Promise> { + ): SimpleChannel { let uuid = uuidv4(); let channel: SimpleChannel = new SimpleChannel(); this.get_receiver.set(uuid, channel); @@ -220,6 +230,7 @@ export class RemoteSession { congestion_control: congestion_control, priority: priority, express: express, + target: target, encoding: encoding, timeout: timeout_ms, payload: opt_payload, @@ -231,13 +242,14 @@ export class RemoteSession { } // delete - async delete( + delete( key_expr: string, congestion_control?: number, priority?: number, express?: boolean, - attachment?: Array - ): Promise { + attachment?: Array, + timestamp?: string, + ): void { let owned_keyexpr: OwnedKeyExprWrapper = key_expr; let opt_attachment = undefined; if (attachment != undefined) { @@ -250,6 +262,7 @@ export class RemoteSession { priority: priority, express: express, attachment: opt_attachment, + timestamp: timestamp } }; this.send_ctrl_message(data_message); @@ -261,11 +274,11 @@ export class RemoteSession { this.ws.close(); } - async declare_remote_subscriber( + declare_remote_subscriber( key_expr: string, handler: HandlerChannel, - callback?: (sample: SampleWS) => Promise, - ): Promise { + callback?: (sample: SampleWS) => void, + ): RemoteSubscriber { let uuid = uuidv4(); let control_message: ControlMsg = { @@ -293,12 +306,13 @@ export class RemoteSession { key_expr: string, complete: boolean, reply_tx: SimpleChannel, + handler: HandlerChannel, callback?: (sample: QueryWS) => void, ): RemoteQueryable { let uuid = uuidv4(); let control_message: ControlMsg = { - DeclareQueryable: { key_expr: key_expr, complete: complete, id: uuid }, + DeclareQueryable: { key_expr: key_expr, complete: complete, id: uuid, handler: handler }, }; let query_rx: SimpleChannel = new SimpleChannel(); @@ -401,7 +415,7 @@ export class RemoteSession { declare_liveliness_subscriber( key_expr: string, history: boolean, - callback?: (sample: SampleWS) => Promise, + callback?: (sample: SampleWS) => void, ): RemoteSubscriber { let uuid = uuidv4(); @@ -450,6 +464,18 @@ export class RemoteSession { return channel; } + // Note: This method blocks until Timestamp has been created + // The correct way to do this would be with a request / response + async new_timestamp(): Promise { + let uuid = uuidv4(); + let control_message: ControlMsg = { "NewTimestamp": uuid }; + this._new_timestamp = null; + this.send_ctrl_message(control_message); + while (this._new_timestamp === null) { + await sleep(10); + } + return this._new_timestamp; + } // // Sending Messages @@ -496,7 +522,7 @@ export class RemoteSession { console.warn("Closed"); } - private async handle_control_message(control_msg: ControlMsg) { + private handle_control_message(control_msg: ControlMsg) { if (typeof control_msg === "string") { console.warn("unhandled Control Message:", control_msg); } else if (typeof control_msg === "object") { @@ -510,7 +536,7 @@ export class RemoteSession { } } - private async handle_data_message(data_msg: DataMsg) { + private handle_data_message(data_msg: DataMsg) { if ("Sample" in data_msg) { let subscription_uuid: UUIDv4 = data_msg["Sample"][1]; @@ -562,10 +588,11 @@ export class RemoteSession { console.warn("Queryable message Variant not recognized"); } } else if ("SessionInfo" in data_msg) { - let session_info: SessionInfoIface = data_msg["SessionInfo"]; this.session_info = session_info; - + } else if ("NewTimestamp" in data_msg) { + let new_timestamp: TimestampIface = data_msg["NewTimestamp"]; + this._new_timestamp = new_timestamp; } else { console.warn("Data Message not recognized Expected Variant", data_msg); } diff --git a/zenoh-ts/src/sample.ts b/zenoh-ts/src/sample.ts index 8cefd44..aab1653 100644 --- a/zenoh-ts/src/sample.ts +++ b/zenoh-ts/src/sample.ts @@ -327,7 +327,7 @@ export function SampleWS_from_Sample( attachement: ZBytes | undefined, ): SampleWS { let key_expr: OwnedKeyExprWrapper = sample.keyexpr().toString(); - let value: Array = Array.from(sample.payload().buffer()); + let value: Array = Array.from(sample.payload().to_bytes()); let sample_kind: SampleKindWS; if (sample.kind() == SampleKind.DELETE) { @@ -344,7 +344,7 @@ export function SampleWS_from_Sample( let attach = null; if (attachement != null) { - attach = b64_str_from_bytes(new Uint8Array(attachement.buffer())); + attach = b64_str_from_bytes(new Uint8Array(attachement.to_bytes())); } let sample_ws: SampleWS = { diff --git a/zenoh-ts/src/session.ts b/zenoh-ts/src/session.ts index 6cbee24..47127b2 100644 --- a/zenoh-ts/src/session.ts +++ b/zenoh-ts/src/session.ts @@ -15,6 +15,7 @@ import { RemoteRecvErr as GetChannelClose, RemoteSession, + TimestampIface as TimestampIface, } from "./remote_api/session.js"; import { ReplyWS } from "./remote_api/interface/ReplyWS.js"; import { RemotePublisher, RemoteSubscriber } from "./remote_api/pubsub.js"; @@ -56,7 +57,8 @@ import { SessionInfo as SessionInfoIface } from "./remote_api/interface/SessionI // External deps import { Duration, TimeDuration } from 'typed-duration' import { SimpleChannel } from "channel-ts"; -import { locality_to_int, Querier, QuerierOptions, query_target_to_int, reply_key_expr_to_int, ReplyKeyExpr } from "./querier.js"; +import { locality_to_int, Querier, QuerierOptions, query_target_to_int, QueryTarget, reply_key_expr_to_int, ReplyKeyExpr } from "./querier.js"; +import { Timestamp } from "./timestamp.js"; function executeAsync(func: any) { setTimeout(func, 0); @@ -77,6 +79,7 @@ export interface PutOptions { priority?: Priority, express?: boolean, attachment?: IntoZBytes + timestamp?: Timestamp, } /** @@ -91,6 +94,7 @@ export interface DeleteOptions { priority?: Priority, express?: boolean, attachment?: IntoZBytes + timestamp?: Timestamp } /** @@ -102,6 +106,8 @@ export interface DeleteOptions { * @prop {Encoding=} encoding - Encoding type of payload * @prop {IntoZBytes=} payload - Payload associated with getrequest * @prop {IntoZBytes=} attachment - Additional Data sent with the request + * @prop {TimeDuration=} timeout - Timeout value for a get request + * @prop {((sample: Reply) => Promise) | Handler} handler - either a callback or a polling handler with an underlying handling mechanism */ export interface GetOptions { consolidation?: ConsolidationMode, @@ -112,6 +118,15 @@ export interface GetOptions { payload?: IntoZBytes, attachment?: IntoZBytes timeout?: TimeDuration, + target?: QueryTarget, + handler?: ((sample: Reply) => Promise) | Handler, +} + +/** + * Options for a SubscriberOptions function +*/ +export interface SubscriberOptions { + handler?: ((sample: Sample) => Promise) | Handler, } /** @@ -121,7 +136,7 @@ export interface GetOptions { */ export interface QueryableOptions { complete?: boolean, - callback?: (query: Query) => void, + handler?: ((sample: Query) => Promise) | Handler, } /** @@ -130,13 +145,14 @@ export interface QueryableOptions { * @prop {CongestionControl} congestion_control - Optional, Type of Congestion control to be used (BLOCK / DROP) * @prop {Priority} priority - Optional, The Priority of zenoh messages * @prop {boolean} express - Optional, The Priority of zenoh messages - * @prop {Reliability} reliability - Optional, The Priority of zenoh messages + * @prop {Reliability} reliability - Optional, The Priority of zenoh messages : Note This is unstable in Zenoh */ export interface PublisherOptions { encoding?: Encoding, congestion_control?: CongestionControl, priority?: Priority, express?: boolean, + // Note realiability is unstable in Zenoh reliability?: Reliability, } @@ -193,6 +209,9 @@ export class Session { Session.registry.unregister(this); } + is_closed() { + return this.remote_session.ws.readyState == WebSocket.CLOSED; + } /** * Puts a value on the session, on a specific key expression KeyExpr * @@ -213,26 +232,30 @@ export class Session { let _express; let _attachment; let _encoding = put_opts?.encoding?.toString() - let _congestion_control = congestion_control_to_int(put_opts?.congestion_control); + let _timestamp; + if (put_opts?.timestamp != undefined) { + _timestamp = put_opts?.timestamp.get_resource_uuid() as string; + } if (put_opts?.priority != undefined) { _priority = priority_to_int(put_opts?.priority); } _express = put_opts?.express?.valueOf(); if (put_opts?.attachment != undefined) { - _attachment = Array.from(new ZBytes(put_opts?.attachment).buffer()) + _attachment = Array.from(new ZBytes(put_opts?.attachment).to_bytes()) } this.remote_session.put( key_expr.toString(), - Array.from(z_bytes.buffer()), + Array.from(z_bytes.to_bytes()), _encoding, _congestion_control, _priority, _express, _attachment, + _timestamp, ); } @@ -269,10 +292,15 @@ export class Session { let _congestion_control = congestion_control_to_int(delete_opts?.congestion_control); let _priority = priority_to_int(delete_opts?.priority); let _express = delete_opts?.express; - let _attachment + let _attachment; + let _timestamp; if (delete_opts?.attachment != undefined) { - _attachment = Array.from(new ZBytes(delete_opts?.attachment).buffer()) + _attachment = Array.from(new ZBytes(delete_opts?.attachment).to_bytes()) + } + + if (delete_opts?.timestamp != undefined) { + _timestamp = delete_opts?.timestamp.get_resource_uuid() as string; } this.remote_session.delete( @@ -281,6 +309,7 @@ export class Session { _priority, _express, _attachment, + _timestamp ); } @@ -320,11 +349,10 @@ export class Session { * * @returns Receiver */ - async get( + get( into_selector: IntoSelector, - handler: ((sample: Reply) => Promise) | Handler = new FifoChannel(256), get_options?: GetOptions - ): Promise { + ): Receiver | undefined { let selector: Selector; let key_expr: KeyExpr; @@ -345,6 +373,13 @@ export class Session { selector = new Selector(into_selector); } + let handler; + if (get_options?.handler !== undefined) { + handler = get_options?.handler; + } else { + handler = new FifoChannel(256); + } + let [callback, handler_type] = this.check_handler_or_callback(handler); // Optional Parameters @@ -354,6 +389,7 @@ export class Session { let _congestion_control = congestion_control_to_int(get_options?.congestion_control); let _priority = priority_to_int(get_options?.priority); let _express = get_options?.express; + let _target = query_target_to_int(get_options?.target); let _attachment; let _payload; let _timeout_millis: number | undefined = undefined; @@ -362,13 +398,13 @@ export class Session { _timeout_millis = Duration.milliseconds.from(get_options?.timeout); } if (get_options?.attachment != undefined) { - _attachment = Array.from(new ZBytes(get_options?.attachment).buffer()) + _attachment = Array.from(new ZBytes(get_options?.attachment).to_bytes()) } if (get_options?.payload != undefined) { - _payload = Array.from(new ZBytes(get_options?.payload).buffer()) + _payload = Array.from(new ZBytes(get_options?.payload).to_bytes()) } - let chan: SimpleChannel = await this.remote_session.get( + let chan: SimpleChannel = this.remote_session.get( selector.key_expr().toString(), selector.parameters().toString(), handler_type, @@ -376,6 +412,7 @@ export class Session { _congestion_control, _priority, _express, + _target, _encoding, _payload, _attachment, @@ -416,13 +453,20 @@ export class Session { * @returns Subscriber */ // Handler size : This is to match the API_DATA_RECEPTION_CHANNEL_SIZE of zenoh internally - async declare_subscriber( + declare_subscriber( key_expr: IntoKeyExpr, - handler: ((sample: Sample) => Promise) | Handler = new FifoChannel(256), - ): Promise { + subscriber_opts: SubscriberOptions + ): Subscriber { let _key_expr = new KeyExpr(key_expr); let remote_subscriber: RemoteSubscriber; + let callback_subscriber = false; + let handler; + if (subscriber_opts?.handler !== undefined) { + handler = subscriber_opts?.handler; + } else { + handler = new FifoChannel(256); + } let [callback, handler_type] = this.check_handler_or_callback(handler); if (callback !== undefined) { @@ -433,13 +477,13 @@ export class Session { callback(sample); } }; - remote_subscriber = await this.remote_session.declare_remote_subscriber( + remote_subscriber = this.remote_session.declare_remote_subscriber( _key_expr.toString(), handler_type, callback_conversion, ); } else { - remote_subscriber = await this.remote_session.declare_remote_subscriber( + remote_subscriber = this.remote_session.declare_remote_subscriber( _key_expr.toString(), handler_type, ); @@ -447,6 +491,7 @@ export class Session { let subscriber = Subscriber[NewSubscriber]( remote_subscriber, + _key_expr, callback_subscriber, ); @@ -458,10 +503,17 @@ export class Session { * * @returns Liveliness */ - liveliness() : Liveliness { + liveliness(): Liveliness { return new Liveliness(this.remote_session) } + async new_timestamp(): Promise { + + let ts_iface: TimestampIface = await this.remote_session.new_timestamp(); + + return new Timestamp(ts_iface.id, ts_iface.string_rep, ts_iface.millis_since_epoch); + } + /** * Declares a new Queryable * @@ -473,10 +525,10 @@ export class Session { * * @returns Queryable */ - async declare_queryable( + declare_queryable( key_expr: IntoKeyExpr, queryable_opts?: QueryableOptions - ): Promise { + ): Queryable { let _key_expr = new KeyExpr(key_expr); let remote_queryable: RemoteQueryable; let reply_tx: SimpleChannel = @@ -487,21 +539,31 @@ export class Session { _complete = queryable_opts?.complete; }; + let handler; + if (queryable_opts?.handler !== undefined) { + handler = queryable_opts?.handler; + } else { + handler = new FifoChannel(256); + } + let [callback, handler_type] = this.check_handler_or_callback(handler); + let callback_queryable = false; - if (queryable_opts?.callback != undefined) { + if (callback != undefined) { callback_queryable = true; - let callback = queryable_opts?.callback; + // Typescript cant figure out that calback!=undefined here, so this needs to be explicit + let defined_callback = callback; const callback_conversion = function ( query_ws: QueryWS, ): void { let query: Query = QueryWS_to_Query(query_ws, reply_tx); - callback(query); + defined_callback(query); }; remote_queryable = this.remote_session.declare_remote_queryable( _key_expr.toString(), _complete, reply_tx, + handler_type, callback_conversion, ); } else { @@ -509,6 +571,7 @@ export class Session { _key_expr.toString(), _complete, reply_tx, + handler_type ); } diff --git a/zenoh-ts/src/timestamp.ts b/zenoh-ts/src/timestamp.ts new file mode 100644 index 0000000..d011c86 --- /dev/null +++ b/zenoh-ts/src/timestamp.ts @@ -0,0 +1,38 @@ +import { UUIDv4 } from "./remote_api/session"; + +export class Timestamp { + + private timestamp_id: UUIDv4; + + private string_rep: string + + private ms_since_unix_epoch: bigint + + constructor(timestamp_id: UUIDv4, string_rep: string, ms_since_unix_epoch: bigint) { + this.timestamp_id = timestamp_id + this.string_rep = string_rep + this.ms_since_unix_epoch = ms_since_unix_epoch + } + + // Note: Developers Should not need to use this + get_resource_uuid(): UUIDv4 { + return this.timestamp_id; + } + + get_id(): string { + return this.string_rep.split("/")[1] as string; + } + + get_time(): string { + return this.string_rep.split("/")[0] as string; + } + + get_ms_since_unix_epoch(): bigint { + return this.ms_since_unix_epoch; + } + + as_date(): Date { + // Note: Values produced by this Bigint should fit into a number as they are ms since Unix Epoch + return new Date(this.ms_since_unix_epoch as unknown as number); + } +} \ No newline at end of file diff --git a/zenoh-ts/src/z_bytes.ts b/zenoh-ts/src/z_bytes.ts index 094c630..3b54444 100644 --- a/zenoh-ts/src/z_bytes.ts +++ b/zenoh-ts/src/z_bytes.ts @@ -58,6 +58,15 @@ export class ZBytes { return this._buffer.length; } + /** + * returns if the ZBytes Buffer is empty + * + * @returns boolean + */ + is_empty(): boolean { + return this._buffer.length == 0; + } + /** * returns an empty ZBytes buffer * @@ -72,7 +81,7 @@ export class ZBytes { * * @returns Uint8Array */ - buffer(): Uint8Array { + to_bytes(): Uint8Array { return this._buffer }