Skip to content

Commit

Permalink
Add new_timestamp() to session, Add timestamp to Delete, and Put Opti…
Browse files Browse the repository at this point in the history
…ons, Tracking created timestamps on backend
  • Loading branch information
Charles-Schleich committed Jan 10, 2025
1 parent 2357228 commit d2bc32d
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 58 deletions.
46 changes: 30 additions & 16 deletions zenoh-plugin-remote-api/src/handle_control_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,27 @@ pub(crate) async fn handle_control_message(
warn!("State Map Does not contain SocketAddr");
}
}
ControlMsg::NewTimestamp => {
if let Some(ts) = state_map
.session
.new_timestamp()
.to_string_rfc3339_lossy()
.split("/")
.collect::<Vec<&str>>()
.get(0)
ControlMsg::NewTimestamp(uuid) => {
let ts = state_map.session.new_timestamp();
let ts_string = ts.to_string();
let _ = state_map.timestamps.insert(uuid, ts);

let since_the_epoch = ts
.get_time()
.to_system_time()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64; // JS numbers are F64, is the only way to get a Number that is similar to what is produced by Date.now() in Javascript

if let Err(e) = state_map
.websocket_tx
.send(RemoteAPIMsg::Data(DataMsg::NewTimestamp {
id: uuid,
string_rep: ts_string,
millis_since_epoch: since_the_epoch,
}))
{
if let Err(e) = state_map
.websocket_tx
.send(RemoteAPIMsg::Data(DataMsg::NewTimestamp(ts.to_string())))
{
error!("{}", e);
};
} else {
warn!("Could not get timestamp from Session");
error!("{}", e);
};
}
ControlMsg::Get {
Expand Down Expand Up @@ -208,6 +212,7 @@ pub(crate) async fn handle_control_message(
priority,
express,
attachment,
timestamp,
} => {
let mut put_builder = match payload.b64_to_bytes() {
Ok(payload) => state_map.session.put(key_expr, payload),
Expand All @@ -222,6 +227,10 @@ pub(crate) async fn handle_control_message(
add_if_some!(priority, put_builder);
add_if_some!(express, put_builder);

if let Some(ts) = timestamp.and_then(|k| state_map.timestamps.get(&k)) {
put_builder = put_builder.timestamp(*ts);
}

if let Some(attachment_b64) = attachment {
match attachment_b64.b64_to_bytes() {
Ok(attachment) => put_builder = put_builder.attachment(attachment),
Expand All @@ -237,11 +246,16 @@ pub(crate) async fn handle_control_message(
priority,
express,
attachment,
timestamp,
} => {
let mut delete_builder = state_map.session.delete(key_expr);
add_if_some!(congestion_control, delete_builder);
add_if_some!(priority, delete_builder);
add_if_some!(express, delete_builder);
if let Some(ts) = timestamp.and_then(|k| state_map.timestamps.get(&k)) {
delete_builder = delete_builder.timestamp(*ts);
}

if let Some(attachment_b64) = attachment {
match attachment_b64.b64_to_bytes() {
Ok(attachment) => delete_builder = delete_builder.attachment(attachment),
Expand Down
16 changes: 8 additions & 8 deletions zenoh-plugin-remote-api/src/handle_data_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::{error::Error, net::SocketAddr, str::FromStr};
use std::{error::Error, net::SocketAddr};

use tracing::{error, warn};
use zenoh::query::Query;
Expand Down Expand Up @@ -91,12 +91,8 @@ pub async fn handle_data_message(
None => {}
}

match timestamp.map(|x| uhlc::Timestamp::from_str(&x)) {
Some(Err(e)) => error!("{:?}", e),
Some(Ok(timestamp)) => {
publisher_builder = publisher_builder.timestamp(timestamp);
}
None => {} // let uhlc_ts = uhlc::Timestamp::from_str(&ts);
if let Some(ts) = timestamp.and_then(|k| state_map.timestamps.get(&k)) {
publisher_builder = publisher_builder.timestamp(*ts);
}

if let Err(e) = publisher_builder.await {
Expand Down Expand Up @@ -149,7 +145,11 @@ pub async fn handle_data_message(
},
DataMsg::Sample(_, _)
| DataMsg::GetReply(_)
| DataMsg::NewTimestamp(_)
| DataMsg::NewTimestamp {
id: _,
string_rep: _,
millis_since_epoch: _,
}
| DataMsg::SessionInfo(_) => {
error!("Server Should not recieved a {data_msg:?} Variant from client");
}
Expand Down
15 changes: 11 additions & 4 deletions zenoh-plugin-remote-api/src/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,22 @@ pub enum DataMsg {
PublisherDelete {
id: Uuid,
attachment: Option<B64String>,
timestamp: Option<String>,
timestamp: Option<Uuid>,
},
// SVR -> Client
// Subscriber
Sample(SampleWS, Uuid),
// GetReply
GetReply(ReplyWS),
//
SessionInfo(SessionInfo),
NewTimestamp {
id: Uuid,
string_rep: String,
millis_since_epoch: u64,
},

// Bidirectional
Queryable(QueryableMsg),
NewTimestamp(String),
}

#[derive(TS)]
Expand Down Expand Up @@ -131,7 +134,7 @@ pub enum ControlMsg {
OpenSession,
CloseSession,
Session(Uuid),
NewTimestamp,
NewTimestamp(Uuid),

//
SessionInfo,
Expand Down Expand Up @@ -211,6 +214,8 @@ pub enum ControlMsg {
express: Option<bool>,
#[ts(type = "string | undefined")]
attachment: Option<B64String>,
#[ts(type = "string | undefined")]
timestamp: Option<Uuid>,
},
Delete {
#[ts(as = "OwnedKeyExprWrapper")]
Expand All @@ -234,6 +239,8 @@ pub enum ControlMsg {
express: Option<bool>,
#[ts(type = "string | undefined")]
attachment: Option<B64String>,
#[ts(type = "string | undefined")]
timestamp: Option<Uuid>,
},
// Subscriber
DeclareSubscriber {
Expand Down
6 changes: 6 additions & 0 deletions zenoh-plugin-remote-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use tokio_rustls::{
};
use tokio_tungstenite::tungstenite::protocol::Message;
use tracing::{debug, error};
use uhlc::Timestamp;
use uuid::Uuid;
use zenoh::{
bytes::{Encoding, ZBytes},
Expand Down Expand Up @@ -477,6 +478,9 @@ struct RemoteState {
websocket_tx: Sender<RemoteAPIMsg>,
session_id: Uuid,
session: Session,
// KeyExpr's + Timestamp
key_exprs: HashMap<Uuid, OwnedKeyExpr>,
timestamps: HashMap<Uuid, Timestamp>,
// PubSub
subscribers: HashMap<Uuid, (JoinHandle<()>, OwnedKeyExpr)>,
publishers: HashMap<Uuid, Publisher<'static>>,
Expand All @@ -496,6 +500,8 @@ impl RemoteState {
websocket_tx,
session_id,
session,
key_exprs: HashMap::new(),
timestamps: HashMap::new(),
subscribers: HashMap::new(),
publishers: HashMap::new(),
queryables: HashMap::new(),
Expand Down
5 changes: 3 additions & 2 deletions zenoh-ts/src/key_expr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
// ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
// ██ ██ ███████ ██ ███████ ██ ██ ██ ██ ██

// import { UUIDv4 } from "./remote_api/session";

export type IntoKeyExpr = KeyExpr | String | string;

export class KeyExpr {
/**
* Class to represent a Key Expression in Zenoh
*/
private _inner: string;

// private key_expr_uuid: UUIDv4;

constructor(keyexpr: IntoKeyExpr) {
if (keyexpr instanceof KeyExpr) {
Expand All @@ -34,7 +36,6 @@ export class KeyExpr {
} else {
this._inner = keyexpr;
}

}

toString(): string {
Expand Down
5 changes: 3 additions & 2 deletions zenoh-ts/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
Sample_from_SampleWS,
} from "./sample.js";
import { Encoding, IntoEncoding } from "./encoding.js";
import { Timestamp } from "./timestamp.js";


// ███████ ██ ██ ██████ ███████ ██████ ██████ ██ ██████ ███████ ██████
Expand Down Expand Up @@ -199,7 +200,7 @@ export interface PublisherPutOptions {
*/
export interface PublisherDeleteOptions {
attachment?: IntoZBytes,
timestamp?: string
timestamp?: Timestamp
}

export class Publisher {
Expand Down Expand Up @@ -350,7 +351,7 @@ export class Publisher {

let _timestamp = null;
if (delete_options.timestamp != null) {
_timestamp = delete_options.timestamp;
_timestamp = delete_options.timestamp.get_resource_uuid as unknown as string;
}

return this._remote_publisher.delete(
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ts/src/remote_api/interface/ControlMsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel.js";
import type { LivelinessMsg } from "./LivelinessMsg.js";
import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper.js";

export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | "NewTimestamp" | "SessionInfo" | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, target: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, handler: HandlerChannel, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg };
export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "NewTimestamp": string } | "SessionInfo" | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, target: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, handler: HandlerChannel, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg };
2 changes: 1 addition & 1 deletion zenoh-ts/src/remote_api/interface/DataMsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ import type { ReplyWS } from "./ReplyWS.js";
import type { SampleWS } from "./SampleWS.js";
import type { SessionInfo } from "./SessionInfo.js";

export type DataMsg = { "PublisherPut": { id: string, payload: B64String, attachment: B64String | null, encoding: string | null, } } | { "PublisherDelete": { id: string, attachment: B64String | null, timestamp: string | null, } } | { "Sample": [SampleWS, string] } | { "GetReply": ReplyWS } | { "SessionInfo": SessionInfo } | { "Queryable": QueryableMsg } | { "NewTimestamp": string };
export type DataMsg = { "PublisherPut": { id: string, payload: B64String, attachment: B64String | null, encoding: string | null, } } | { "PublisherDelete": { id: string, attachment: B64String | null, timestamp: string | null, } } | { "Sample": [SampleWS, string] } | { "GetReply": ReplyWS } | { "SessionInfo": SessionInfo } | { "NewTimestamp": { id: string, string_rep: string, millis_since_epoch: bigint, } } | { "Queryable": QueryableMsg };
15 changes: 6 additions & 9 deletions zenoh-ts/src/remote_api/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
//

import { SimpleChannel } from "channel-ts";
import { v4 as uuidv4 } from "uuid";
import { encode as b64_str_from_bytes } from "base64-arraybuffer";

// Import interface
Expand All @@ -22,7 +21,7 @@ import { DataMsg } from "./interface/DataMsg.js";
import { ControlMsg } from "./interface/ControlMsg.js";

// Remote Api
import { RemoteSession } from "./session.js";
import { RemoteSession, UUIDv4 } from "./session.js";

function executeAsync(func: any) {
setTimeout(func, 0);
Expand All @@ -34,17 +33,15 @@ function executeAsync(func: any) {
// ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
// ██ ██ ███████ ██ ██ ██████ ██ ███████ ██ ██████ ██████ ███████ ██ ███████ ██ ██ ███████ ██ ██

type UUID = typeof uuidv4 | string;

export class RemotePublisher {
private key_expr: String;
private publisher_id: UUID;
private publisher_id: UUIDv4;
private session_ref: RemoteSession;
private undeclared: boolean;

constructor(
key_expr: String,
publisher_id: UUID,
publisher_id: UUIDv4,
session_ref: RemoteSession,
) {
this.key_expr = key_expr;
Expand Down Expand Up @@ -135,7 +132,7 @@ export class RemotePublisher {
// else, must call receive on the
export class RemoteSubscriber {
private key_expr: String;
private subscriber_id: UUID;
private subscriber_id: UUIDv4;
private session_ref: RemoteSession;
private callback?: (sample: SampleWS) => void;
private rx: SimpleChannel<SampleWS>;
Expand All @@ -144,7 +141,7 @@ export class RemoteSubscriber {

private constructor(
key_expr: String,
subscriber_id: UUID,
subscriber_id: UUIDv4,
session_ref: RemoteSession,
rx: SimpleChannel<SampleWS>,
callback?: (sample: SampleWS) => void,
Expand All @@ -159,7 +156,7 @@ export class RemoteSubscriber {

static new(
key_expr: String,
subscriber_id: UUID,
subscriber_id: UUIDv4,
session_ref: RemoteSession,
rx: SimpleChannel<SampleWS>,
callback?: (sample: SampleWS) => void,
Expand Down
8 changes: 3 additions & 5 deletions zenoh-ts/src/remote_api/querier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,18 @@
//

import { v4 as uuidv4 } from "uuid";
import { RemoteSession } from "./session.js";
import { RemoteSession, UUIDv4 } from "./session.js";
import { ControlMsg } from "./interface/ControlMsg.js"
import { SimpleChannel } from "channel-ts";
import { ReplyWS } from "./interface/ReplyWS.js";
import { encode as b64_str_from_bytes } from "base64-arraybuffer";

type UUID = typeof uuidv4 | string;

export class RemoteQuerier {
private querier_id: UUID;
private querier_id: UUIDv4;
private session_ref: RemoteSession;

constructor(
querier_id: UUID,
querier_id: UUIDv4,
session_ref: RemoteSession,
) {
this.querier_id = querier_id;
Expand Down
Loading

0 comments on commit d2bc32d

Please sign in to comment.