diff --git a/.github/workflows/release-ts.yml b/.github/workflows/release-ts.yml deleted file mode 100644 index 1d8260d..0000000 --- a/.github/workflows/release-ts.yml +++ /dev/null @@ -1,61 +0,0 @@ -name: Release zenoh-ts - -on: - push: - tags: - - '[0-9]+.*' - -defaults: - run: - shell: bash - working-directory: ./zenoh-ts - -jobs: - package: - name: Package app for ${{ matrix.job.name }} - strategy: - fail-fast: false - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: git config - run: | - git config --global user.name "${GITHUB_ACTOR}" - git config --global user.email "${GITHUB_ACTOR}@users.noreply.github.com" - - - name: Setup Node - uses: actions/setup-node@v4 - with: - node-version: 20 - registry-url: https://npm.pkg.github.com/ - - - name: Run install - uses: borales/actions-yarn@v4 - with: - cmd: install - dir: ./zenoh-ts - - - name: Transpile Code - run: yarn run build - - - name: Publish - id: publish - shell: bash - env: - YARN_NPM_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }} - NODE_AUTH_TOKEN: ${{secrets.GITHUB_TOKEN}} - run: | - npm install - npx release-it - - - name: Cleanup - if: always() - run: | - rm -rf node_modules - rm -rf dist - rm -rf package-lock.json \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8d5c13e..2375161 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -60,7 +60,7 @@ jobs: - name: Bump and tag project run: bash ci/scripts/bump-and-tag.bash env: - LIVE_RUN: ${{ inputs.live_run }} + LIVE_RUN: ${{ inputs.live-run }} VERSION: ${{ steps.create-release-branch.outputs.version }} BUMP_DEPS_VERSION: ${{ inputs.zenoh-version }} BUMP_DEPS_PATTERN: ${{ inputs.zenoh-version && 'zenoh.*' || '' }} @@ -68,6 +68,46 @@ jobs: GIT_USER_NAME: eclipse-zenoh-bot GIT_USER_EMAIL: eclipse-zenoh-bot@users.noreply.github.com + build-ts: + name: Build Typescript + runs-on: ubuntu-latest + needs: tag + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + ref: ${{ needs.tag.outputs.branch }} + + - name: Install Dependencies + uses: borales/actions-yarn@v4 + with: + cmd: install + dir: ./zenoh-ts + + - name: Transpile Code + working-directory: ./zenoh-ts + run: | + yarn run build + + - name: Upload zenoh-ts-build + uses: actions/upload-artifact@v4 + with: + name: zenoh-ts-build + include-hidden-files: true + path: | + zenoh-ts/dist/ + zenoh-ts/package.json + zenoh-ts/LICENSE + zenoh-ts/README.md + + - name: Cleanup + working-directory: ./zenoh-ts + if: always() + run: | + rm -rf node_modules + rm -rf dist + rm -rf package-lock.json + build-debian: name: Build Debian packages needs: tag @@ -91,8 +131,35 @@ jobs: ^zenoh_plugin_remote_api(2)?\.dll$ secrets: inherit + npm: + name: Release Zenoh-ts to NPM + runs-on: ubuntu-latest + needs: [tag, build-ts] + steps: + - name: Download zenoh-ts-build + uses: actions/download-artifact@v4 + with: + name: zenoh-ts-build + + - name: Publish Typescript to NPM + id: publish + shell: bash + env: + ORG_NPMJS_TOKEN: ${{ secrets.ORG_NPMJS_TOKEN }} + LIVE_RUN: ${{ inputs.live-run || false }} + run: | + readonly live_run=${LIVE_RUN:-false} + npm config set //registry.npmjs.org/:_authToken=\${ORG_NPMJS_TOKEN} + if [ ${live_run} = true ]; then + echo "Releasing to NPM" + npm publish --access public + else + echo "Dry Run" + npm publish --dry-run + fi + cargo: - needs: tag + needs: [tag, build-standalone, build-ts] name: Publish Cargo crates uses: eclipse-zenoh/ci/.github/workflows/release-crates-cargo.yml@main with: @@ -110,7 +177,7 @@ jobs: debian: name: Publish Debian packages - needs: [tag, build-debian, cargo] + needs: [tag, build-debian, build-ts, cargo] uses: eclipse-zenoh/ci/.github/workflows/release-crates-debian.yml@main with: no-build: true @@ -123,7 +190,7 @@ jobs: homebrew: name: Publish Homebrew formulae - needs: [tag, build-standalone, cargo] + needs: [tag, build-standalone, build-ts, cargo] uses: eclipse-zenoh/ci/.github/workflows/release-crates-homebrew.yml@main with: no-build: true @@ -139,7 +206,7 @@ jobs: eclipse: name: Publish artifacts to Eclipse downloads - needs: [tag, build-standalone, cargo] + needs: [tag, build-standalone, build-ts, cargo] uses: eclipse-zenoh/ci/.github/workflows/release-crates-eclipse.yml@main with: no-build: true @@ -155,7 +222,7 @@ jobs: github: name: Publish artifacts to GitHub Releases - needs: [tag, build-standalone, cargo] + needs: [tag, build-standalone, build-ts, cargo] uses: eclipse-zenoh/ci/.github/workflows/release-crates-github.yml@main with: no-build: true diff --git a/Cargo.lock b/Cargo.lock index 9eeaedf..e15db37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1543,6 +1543,26 @@ dependencies = [ "thiserror", ] +[[package]] +name = "ref-cast" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf0a6f84d5f1d581da8b41b47ec8600871962f2a528115b542b362d4b744931" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "regex" version = "1.10.6" @@ -2832,7 +2852,7 @@ dependencies = [ [[package]] name = "zenoh" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "ahash", "async-trait", @@ -2848,6 +2868,7 @@ dependencies = [ "petgraph", "phf", "rand", + "ref-cast", "rustc_version", "serde", "serde_json", @@ -2878,7 +2899,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "zenoh-collections", ] @@ -2886,7 +2907,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "tracing", "uhlc", @@ -2897,12 +2918,12 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" [[package]] name = "zenoh-config" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "json5", "num_cpus", @@ -2914,6 +2935,7 @@ dependencies = [ "uhlc", "validated_struct", "zenoh-core", + "zenoh-keyexpr", "zenoh-macros", "zenoh-protocol", "zenoh-result", @@ -2923,7 +2945,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "lazy_static", "tokio", @@ -2934,7 +2956,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "aes", "hmac", @@ -2947,8 +2969,9 @@ dependencies = [ [[package]] name = "zenoh-ext" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ + "async-trait", "bincode", "flume", "futures", @@ -2956,6 +2979,7 @@ dependencies = [ "serde", "tokio", "tracing", + "uhlc", "zenoh", "zenoh-macros", "zenoh-util", @@ -2964,7 +2988,7 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "hashbrown", "keyed-set", @@ -2978,7 +3002,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "zenoh-config", "zenoh-link-commons", @@ -2995,7 +3019,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "async-trait", "flume", @@ -3019,7 +3043,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "async-trait", "base64 0.22.1", @@ -3040,29 +3064,30 @@ dependencies = [ "zenoh-link-commons", "zenoh-protocol", "zenoh-result", + "zenoh-util", ] [[package]] name = "zenoh-link-tcp" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "async-trait", "socket2", "tokio", "tokio-util", "tracing", + "zenoh-config", "zenoh-core", "zenoh-link-commons", "zenoh-protocol", "zenoh-result", - "zenoh-util", ] [[package]] name = "zenoh-link-tls" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "async-trait", "base64 0.22.1", @@ -3091,7 +3116,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "async-trait", "socket2", @@ -3110,7 +3135,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "async-trait", "nix", @@ -3128,7 +3153,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "async-trait", "futures-util", @@ -3148,7 +3173,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "proc-macro2", "quote", @@ -3190,7 +3215,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "git-version", "libloading", @@ -3206,7 +3231,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "const_format", "rand", @@ -3220,7 +3245,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "anyhow", ] @@ -3228,7 +3253,7 @@ dependencies = [ [[package]] name = "zenoh-runtime" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "lazy_static", "ron", @@ -3241,7 +3266,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "event-listener", "futures", @@ -3254,7 +3279,7 @@ dependencies = [ [[package]] name = "zenoh-task" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "futures", "tokio", @@ -3267,7 +3292,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "async-trait", "crossbeam-utils", @@ -3300,7 +3325,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "async-trait", "const_format", @@ -3325,7 +3350,7 @@ dependencies = [ [[package]] name = "zenoh_backend_traits" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#c764bf9be0423b7d90f534209b0be2b8017b71ed" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#cc201aa1f2c0e9ec780bb6e55fdcc750434a9b2f" dependencies = [ "async-trait", "const_format", diff --git a/README.md b/README.md index 5661b76..7171346 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,9 @@ The `zenohd` router and its plugins should be built with the same Zenoh sources, same set of features. This requirement exists because the router and plugins share common Rust structures, and Rust doesn't guarantee ABI compatibility of the memory representation of these structures. -Therefore one of these methods is recommended: +Therefore one of the methods below is recommended to ensure that plugin and router are compatible. + +The file `EXAMPLE_CONFIG.json5` references the `zenoh-plugin-remote-api\EXAMPLE_CONFIG.json5` with minimal necessary set of options to run the plugin. See also full set of available options, like ssl certificate settings in `zenoh-plugin-remote-api\config.json5`. 1. Install the latest release of `zenohd` and `zenoh-plugin-remote-api` @@ -68,7 +70,7 @@ Therefore one of these methods is recommended: zenohd --config EXAMPLE_CONFIG.json5 ``` - Expected output is like: + Expected output should be something similar to: ```txt zenohd: zenohd v1.0.3 built with rustc 1.75.0 (82e1608df 2023-12-21) @@ -80,7 +82,7 @@ Therefore one of these methods is recommended: zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/.... ``` -1. Build the plugin and the router from the sources: +2. Build both the plugin and the router from the sources: Build the plugin `zenoh-plugin-remote-api` @@ -96,7 +98,7 @@ Therefore one of these methods is recommended: cargo bin zenohd --config EXAMPLE_CONFIG.json5 ``` - Expected output is like: + Expected output should be something similar to: ```txt zenohd: zenohd vc764bf9b built with rustc 1.75.0 (82e1608df 2023-12-21) @@ -131,19 +133,19 @@ This library is currently compatible with browsers, but not with NodeJS due to w To run the command line examples use the javascript runtime [deno](https://deno.com/) which is expected to be consistent with the browser. 1. Install [deno](https://deno.com/) -1. Navigate to the `zenoh-ts/examples/deno` directory -1. Install the `zenoh-ts` library by running `yarn install` -1. Run zenohd with the remote_api plugin, configured to websocket port 10000, as described above -1. Run the examples by running `yarn example `, i.e. `yarn example src/z_sub.ts` +2. Navigate to the `zenoh-ts/examples/deno` directory +3. Install the `zenoh-ts` library by running `yarn install` +4. Run zenohd with the remote_api plugin, configured to websocket port 10000, as described above +5. Run the examples by running `yarn example `, i.e. `yarn example src/z_sub.ts` To run publisher and subscriber examples: ```sh -yarn example src/pub.rs +yarn example src/z_pub.rs ``` ```sh -yarn example src/sub.rs +yarn example src/z_sub.rs ``` The subscriber should start to receive messages from publisher: diff --git a/zenoh-plugin-remote-api/src/handle_control_message.rs b/zenoh-plugin-remote-api/src/handle_control_message.rs index bb41588..8b24acd 100644 --- a/zenoh-plugin-remote-api/src/handle_control_message.rs +++ b/zenoh-plugin-remote-api/src/handle_control_message.rs @@ -14,9 +14,11 @@ use std::{error::Error, net::SocketAddr, time::Duration}; +use base64::{prelude::BASE64_STANDARD, Engine}; use tracing::{error, warn}; use uuid::Uuid; use zenoh::{ + bytes::ZBytes, handlers::{FifoChannel, RingChannel}, key_expr::KeyExpr, query::Selector, @@ -24,8 +26,8 @@ use zenoh::{ use crate::{ interface::{ - ControlMsg, DataMsg, HandlerChannel, LivelinessMsg, QueryWS, QueryableMsg, RemoteAPIMsg, - ReplyWS, SampleWS, + B64String, ControlMsg, DataMsg, HandlerChannel, LivelinessMsg, QueryWS, QueryableMsg, + RemoteAPIMsg, ReplyWS, SampleWS, }, spawn_future, RemoteState, StateMap, }; @@ -333,7 +335,94 @@ pub(crate) async fn handle_control_message( ControlMsg::Liveliness(liveliness_msg) => { return handle_liveliness(liveliness_msg, state_map).await; } + ControlMsg::DeclareQuerier { + id, + key_expr, + target, + timeout, + accept_replies, + congestion_control, + priority, + consolidation, + allowed_destination, + express, + } => { + let mut querier_builder = state_map.session.declare_querier(key_expr); + let timeout = timeout.map(Duration::from_millis); + + add_if_some!(target, querier_builder); + add_if_some!(timeout, querier_builder); + add_if_some!(accept_replies, querier_builder); + add_if_some!(accept_replies, querier_builder); + add_if_some!(congestion_control, querier_builder); + add_if_some!(priority, querier_builder); + add_if_some!(consolidation, querier_builder); + add_if_some!(allowed_destination, querier_builder); + add_if_some!(express, querier_builder); + + let querier = querier_builder.await?; + state_map.queriers.insert(id, querier); + } + ControlMsg::UndeclareQuerier(uuid) => { + if let Some(querier) = state_map.queriers.remove(&uuid) { + querier.undeclare().await?; + } else { + warn!("No Querier Found with UUID {}", uuid); + }; + } + ControlMsg::QuerierGet { + get_id, + querier_id, + encoding, + payload, + attachment, + } => { + if let Some(querier) = state_map.queriers.get(&querier_id) { + let mut get_builder = querier.get(); + + let payload = payload + .map(|B64String(x)| BASE64_STANDARD.decode(x)) + .and_then(|res_vec_bytes| { + if let Ok(vec_bytes) = res_vec_bytes { + Some(ZBytes::from(vec_bytes)) + } else { + None + } + }); + + let attachment: Option = attachment + .map(|B64String(x)| BASE64_STANDARD.decode(x)) + .and_then(|res_vec_bytes| { + if let Ok(vec_bytes) = res_vec_bytes { + Some(ZBytes::from(vec_bytes)) + } else { + None + } + }); + add_if_some!(encoding, get_builder); + add_if_some!(payload, get_builder); + add_if_some!(attachment, get_builder); + let receiver = get_builder.await?; + let ws_tx = state_map.websocket_tx.clone(); + let finish_msg = RemoteAPIMsg::Control(ControlMsg::GetFinished { id: get_id }); + spawn_future(async move { + while let Ok(reply) = receiver.recv_async().await { + let reply_ws = ReplyWS::from((reply, get_id)); + let remote_api_msg = RemoteAPIMsg::Data(DataMsg::GetReply(reply_ws)); + if let Err(err) = ws_tx.send(remote_api_msg) { + tracing::error!("{}", err); + } + } + if let Err(err) = ws_tx.send(finish_msg) { + tracing::error!("{}", err); + } + }); + } else { + // TODO: Do we want to add an error here ? + warn!("No Querier With ID {querier_id} found") + } + } msg @ (ControlMsg::GetFinished { id: _ } | ControlMsg::Session(_) | ControlMsg::Subscriber(_)) => { diff --git a/zenoh-plugin-remote-api/src/interface.rs b/zenoh-plugin-remote-api/src/interface/mod.rs similarity index 80% rename from zenoh-plugin-remote-api/src/interface.rs rename to zenoh-plugin-remote-api/src/interface/mod.rs index deb9f3f..e205057 100644 --- a/zenoh-plugin-remote-api/src/interface.rs +++ b/zenoh-plugin-remote-api/src/interface/mod.rs @@ -14,15 +14,24 @@ use std::sync::Arc; +// mod interface::ser_de; +pub(crate) mod ser_de; use base64::{prelude::BASE64_STANDARD, Engine}; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use ser_de::{ + deserialize_congestion_control, deserialize_consolidation_mode, deserialize_locality, + deserialize_priority, deserialize_query_target, deserialize_reliability, + deserialize_reply_key_expr, serialize_congestion_control, serialize_consolidation_mode, + serialize_locality, serialize_priority, serialize_query_target, serialize_reliability, + serialize_reply_key_expr, +}; +use serde::{Deserialize, Serialize}; use ts_rs::TS; use uuid::Uuid; use zenoh::{ key_expr::OwnedKeyExpr, qos::{CongestionControl, Priority, Reliability}, - query::{ConsolidationMode, Query, Reply, ReplyError}, - sample::{Sample, SampleKind}, + query::{ConsolidationMode, Query, QueryTarget, Reply, ReplyError, ReplyKeyExpr}, + sample::{Locality, Sample, SampleKind}, }; // ██████ ███████ ███ ███ ██████ ████████ ███████ █████ ██████ ██ ███ ███ ███████ ███████ ███████ █████ ██████ ███████ @@ -34,7 +43,7 @@ use zenoh::{ #[derive(TS)] #[ts(export)] #[derive(Debug, Serialize, Deserialize)] -pub(crate) struct B64String(String); +pub(crate) struct B64String(pub String); impl From for B64String { fn from(value: String) -> Self { B64String(value) @@ -247,6 +256,70 @@ pub enum ControlMsg { complete: bool, }, UndeclareQueryable(Uuid), + // Quierer + DeclareQuerier { + id: Uuid, + #[ts(as = "OwnedKeyExprWrapper")] + key_expr: OwnedKeyExpr, + #[serde( + deserialize_with = "deserialize_query_target", + serialize_with = "serialize_query_target", + default + )] + #[ts(type = "number | undefined")] + target: Option, + #[ts(type = "number | undefined")] + timeout: Option, + #[serde( + deserialize_with = "deserialize_reply_key_expr", + serialize_with = "serialize_reply_key_expr", + default + )] + #[ts(type = "number | undefined")] + accept_replies: Option, + #[serde( + deserialize_with = "deserialize_locality", + serialize_with = "serialize_locality", + default + )] + #[ts(type = "number | undefined")] + allowed_destination: Option, + #[serde( + deserialize_with = "deserialize_congestion_control", + serialize_with = "serialize_congestion_control", + default + )] + #[ts(type = "number | undefined")] + congestion_control: Option, + #[serde( + deserialize_with = "deserialize_priority", + serialize_with = "serialize_priority", + default + )] + #[ts(type = "number | undefined")] + priority: Option, + #[serde( + deserialize_with = "deserialize_consolidation_mode", + serialize_with = "serialize_consolidation_mode", + default + )] + #[ts(type = "number | undefined")] + consolidation: Option, + #[ts(type = "boolean | undefined")] + express: Option, + }, + UndeclareQuerier(Uuid), + // Querier + QuerierGet { + querier_id: Uuid, + get_id: Uuid, + #[ts(type = "string | undefined")] + encoding: Option, + #[ts(type = "string | undefined")] + payload: Option, + #[ts(type = "string | undefined")] + attachment: Option, + }, // Liveliness Liveliness(LivelinessMsg), @@ -278,151 +351,6 @@ pub enum LivelinessMsg { }, } -fn deserialize_consolidation_mode<'de, D>(d: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - match Option::::deserialize(d) { - Ok(Some(value)) => Ok(Some(match value { - 0u8 => ConsolidationMode::Auto, - 1u8 => ConsolidationMode::None, - 2u8 => ConsolidationMode::Monotonic, - 3u8 => ConsolidationMode::Latest, - _ => { - return Err(serde::de::Error::custom(format!( - "Value not valid for ConsolidationMode Enum {:?}", - value - ))) - } - })), - Ok(None) => Ok(None), - Err(err) => Err(serde::de::Error::custom(format!( - "Value not valid for ConsolidationMode Enum {:?}", - err - ))), - } -} - -fn serialize_consolidation_mode( - consolidation_mode: &Option, - s: S, -) -> Result -where - S: Serializer, -{ - match consolidation_mode { - Some(c_mode) => s.serialize_u8(*c_mode as u8), - None => s.serialize_none(), - } -} - -fn deserialize_congestion_control<'de, D>(d: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - match Option::::deserialize(d) { - Ok(Some(value)) => Ok(Some(match value { - 0u8 => CongestionControl::Drop, - 1u8 => CongestionControl::Block, - val => { - return Err(serde::de::Error::custom(format!( - "Value not valid for CongestionControl Enum {:?}", - val - ))) - } - })), - Ok(None) => Ok(None), - val => Err(serde::de::Error::custom(format!( - "Value not valid for CongestionControl Enum {:?}", - val - ))), - } -} - -fn serialize_congestion_control( - congestion_control: &Option, - s: S, -) -> Result -where - S: Serializer, -{ - match congestion_control { - Some(c_ctrl) => s.serialize_u8(*c_ctrl as u8), - None => s.serialize_none(), - } -} - -fn deserialize_priority<'de, D>(d: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - match Option::::deserialize(d) { - Ok(Some(value)) => Ok(Some(match value { - 1u8 => Priority::RealTime, - 2u8 => Priority::InteractiveHigh, - 3u8 => Priority::InteractiveLow, - 4u8 => Priority::DataHigh, - 5u8 => Priority::Data, - 6u8 => Priority::DataLow, - 7u8 => Priority::Background, - val => { - return Err(serde::de::Error::custom(format!( - "Value not valid for Priority Enum {:?}", - val - ))) - } - })), - Ok(None) => Ok(None), - val => Err(serde::de::Error::custom(format!( - "Value not valid for Priority Enum {:?}", - val - ))), - } -} - -fn serialize_priority(priority: &Option, s: S) -> Result -where - S: Serializer, -{ - match priority { - Some(prio) => s.serialize_u8(*prio as u8), - None => s.serialize_none(), - } -} - -fn deserialize_reliability<'de, D>(d: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - match Option::::deserialize(d) { - Ok(Some(value)) => Ok(Some(match value { - 0u8 => Reliability::Reliable, - 1u8 => Reliability::BestEffort, - val => { - return Err(serde::de::Error::custom(format!( - "Value not valid for Reliability Enum {:?}", - val - ))) - } - })), - Ok(None) => Ok(None), - val => Err(serde::de::Error::custom(format!( - "Value not valid for Reliability Enum {:?}", - val - ))), - } -} - -fn serialize_reliability(reliability: &Option, s: S) -> Result -where - S: Serializer, -{ - match reliability { - Some(prio) => s.serialize_u8(*prio as u8), - None => s.serialize_none(), - } -} - #[derive(Debug, Serialize, Deserialize, TS)] pub(crate) enum HandlerChannel { Fifo(usize), diff --git a/zenoh-plugin-remote-api/src/interface/ser_de.rs b/zenoh-plugin-remote-api/src/interface/ser_de.rs new file mode 100644 index 0000000..964d2e7 --- /dev/null +++ b/zenoh-plugin-remote-api/src/interface/ser_de.rs @@ -0,0 +1,258 @@ +use serde::{Deserialize, Deserializer, Serializer}; +use zenoh::{ + qos::{CongestionControl, Priority, Reliability}, + query::{ConsolidationMode, QueryTarget, ReplyKeyExpr}, + sample::Locality, +}; + +pub fn deserialize_consolidation_mode<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => ConsolidationMode::Auto, + 1u8 => ConsolidationMode::None, + 2u8 => ConsolidationMode::Monotonic, + 3u8 => ConsolidationMode::Latest, + _ => { + return Err(serde::de::Error::custom(format!( + "Value not valid for ConsolidationMode Enum {:?}", + value + ))) + } + })), + Ok(None) => Ok(None), + Err(err) => Err(serde::de::Error::custom(format!( + "Value not valid for ConsolidationMode Enum {:?}", + err + ))), + } +} + +pub fn serialize_consolidation_mode( + consolidation_mode: &Option, + s: S, +) -> Result +where + S: Serializer, +{ + match consolidation_mode { + Some(c_mode) => s.serialize_u8(*c_mode as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_reply_key_expr<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => ReplyKeyExpr::Any, + 1u8 => ReplyKeyExpr::MatchingQuery, + _ => { + return Err(serde::de::Error::custom(format!( + "Value not valid for ReplyKeyExpr Enum {:?}", + value + ))) + } + })), + Ok(None) => Ok(None), + Err(err) => Err(serde::de::Error::custom(format!( + "Value not valid for ReplyKeyExpr Enum {:?}", + err + ))), + } +} + +pub fn serialize_reply_key_expr( + consolidation_mode: &Option, + s: S, +) -> Result +where + S: Serializer, +{ + match consolidation_mode { + Some(c_mode) => s.serialize_u8(*c_mode as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_congestion_control<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => CongestionControl::Drop, + 1u8 => CongestionControl::Block, + val => { + return Err(serde::de::Error::custom(format!( + "Value not valid for CongestionControl Enum {:?}", + val + ))) + } + })), + Ok(None) => Ok(None), + val => Err(serde::de::Error::custom(format!( + "Value not valid for CongestionControl Enum {:?}", + val + ))), + } +} + +pub fn serialize_congestion_control( + congestion_control: &Option, + s: S, +) -> Result +where + S: Serializer, +{ + match congestion_control { + Some(c_ctrl) => s.serialize_u8(*c_ctrl as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_priority<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 1u8 => Priority::RealTime, + 2u8 => Priority::InteractiveHigh, + 3u8 => Priority::InteractiveLow, + 4u8 => Priority::DataHigh, + 5u8 => Priority::Data, + 6u8 => Priority::DataLow, + 7u8 => Priority::Background, + val => { + return Err(serde::de::Error::custom(format!( + "Value not valid for Priority Enum {:?}", + val + ))) + } + })), + Ok(None) => Ok(None), + val => Err(serde::de::Error::custom(format!( + "Value not valid for Priority Enum {:?}", + val + ))), + } +} + +pub fn serialize_priority(priority: &Option, s: S) -> Result +where + S: Serializer, +{ + match priority { + Some(prio) => s.serialize_u8(*prio as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_reliability<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => Reliability::BestEffort, + 1u8 => Reliability::Reliable, + val => { + return Err(serde::de::Error::custom(format!( + "Value not valid for Reliability Enum {:?}", + val + ))) + } + })), + Ok(None) => Ok(None), + val => Err(serde::de::Error::custom(format!( + "Value not valid for Reliability Enum {:?}", + val + ))), + } +} + +pub fn serialize_reliability(reliability: &Option, s: S) -> Result +where + S: Serializer, +{ + match reliability { + Some(rel) => s.serialize_u8(*rel as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_locality<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => Locality::SessionLocal, + 1u8 => Locality::Remote, + 2u8 => Locality::Any, + val => { + return Err(serde::de::Error::custom(format!( + "Value not valid for Locality Enum {:?}", + val + ))) + } + })), + Ok(None) => Ok(None), + val => Err(serde::de::Error::custom(format!( + "Value not valid for Locality Enum {:?}", + val + ))), + } +} + +pub fn serialize_locality(locality: &Option, s: S) -> Result +where + S: Serializer, +{ + match locality { + Some(rel) => s.serialize_u8(*rel as u8), + None => s.serialize_none(), + } +} + +pub fn deserialize_query_target<'de, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + match Option::::deserialize(d) { + Ok(Some(value)) => Ok(Some(match value { + 0u8 => QueryTarget::BestMatching, + 1u8 => QueryTarget::All, + 2u8 => QueryTarget::AllComplete, + val => { + return Err(serde::de::Error::custom(format!( + "Value not valid for QueryTarget Enum {:?}", + val + ))) + } + })), + Ok(None) => Ok(None), + val => Err(serde::de::Error::custom(format!( + "Value not valid for QueryTarget Enum {:?}", + val + ))), + } +} + +pub fn serialize_query_target( + query_target: &Option, + s: S, +) -> Result +where + S: Serializer, +{ + match query_target { + Some(rel) => s.serialize_u8(*rel as u8), + None => s.serialize_none(), + } +} diff --git a/zenoh-plugin-remote-api/src/lib.rs b/zenoh-plugin-remote-api/src/lib.rs index f1e5fe3..0cb9363 100644 --- a/zenoh-plugin-remote-api/src/lib.rs +++ b/zenoh-plugin-remote-api/src/lib.rs @@ -62,7 +62,7 @@ use zenoh::{ }, liveliness::LivelinessToken, pubsub::Publisher, - query::{Query, Queryable}, + query::{Querier, Query, Queryable}, Session, }; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl}; @@ -486,6 +486,8 @@ struct RemoteState { // Liveliness liveliness_tokens: HashMap, liveliness_subscribers: HashMap, OwnedKeyExpr)>, + // Querier + queriers: HashMap>, } impl RemoteState { @@ -500,6 +502,7 @@ impl RemoteState { unanswered_queries: Arc::new(std::sync::RwLock::new(HashMap::new())), liveliness_tokens: HashMap::new(), liveliness_subscribers: HashMap::new(), + queriers: HashMap::new(), } } diff --git a/zenoh-ts/examples/deno/src/z_querier.ts b/zenoh-ts/examples/deno/src/z_querier.ts new file mode 100644 index 0000000..5b9fd5e --- /dev/null +++ b/zenoh-ts/examples/deno/src/z_querier.ts @@ -0,0 +1,59 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +import { Duration, deserialize_string, ReplyError, Config, Receiver, RecvErr, Reply, Sample, Session, QueryTarget, Locality } from "@eclipse-zenoh/zenoh-ts"; + + +export async function main() { + const session = await Session.open(new Config("ws/127.0.0.1:10000")); + + + let querier = session.declare_querier("demo/example/**", + { + target: QueryTarget.BestMatching, + timeout: Duration.milliseconds.of(10000), + } + ); + + for(let i =0; i<1000; i++) { + await sleep(1000) + let payload = "["+i+"] Querier Get from Zenoh-ts!"; + let receiver = querier.get({payload:payload}) as Receiver; + + let reply = await receiver.receive(); + + while (reply != RecvErr.Disconnected) { + if (reply == RecvErr.MalformedReply) { + console.warn("MalformedReply"); + } else { + let resp = reply.result(); + if (resp instanceof Sample) { + let sample: Sample = resp; + console.warn(">> Received ('", sample.keyexpr(), ":", sample.payload().deserialize(deserialize_string), "')"); + } else { + let reply_error: ReplyError = resp; + console.warn(">> Received (ERROR: '{", reply_error.payload().deserialize(deserialize_string), "}')"); + } + } + reply = await receiver.receive(); + } + console.warn("Get Finished"); + } +} + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +main() \ No newline at end of file diff --git a/zenoh-ts/src/index.ts b/zenoh-ts/src/index.ts index a2eaa42..68a20f0 100644 --- a/zenoh-ts/src/index.ts +++ b/zenoh-ts/src/index.ts @@ -22,8 +22,10 @@ import { Session, RecvErr, Receiver, DeleteOptions, PutOptions, GetOptions, Quer import { Config } from "./config.js"; import { Encoding, IntoEncoding } from "./encoding.js"; import { Liveliness, LivelinessToken } from "./liveliness.js"; +import { Querier, QueryTarget, Locality, ReplyKeyExpr, QuerierOptions, QuerierGetOptions } from './querier.js' + // Re-export duration external library -import { TimeDuration as Duration } from 'typed-duration' +import { Duration } from 'typed-duration' // Exports export { KeyExpr, IntoKeyExpr }; @@ -36,3 +38,4 @@ export { Config }; export { Encoding, IntoEncoding }; export { Liveliness, LivelinessToken }; export { Duration }; +export { Querier, QueryTarget, Locality, ReplyKeyExpr, QuerierOptions, QuerierGetOptions } \ No newline at end of file diff --git a/zenoh-ts/src/liveliness.ts b/zenoh-ts/src/liveliness.ts index a1a2e59..d415981 100644 --- a/zenoh-ts/src/liveliness.ts +++ b/zenoh-ts/src/liveliness.ts @@ -50,7 +50,6 @@ export class Liveliness { } declare_subscriber(key_expr: IntoKeyExpr, options?: LivelinessSubscriberOptions): Subscriber { - console.log(key_expr, options) let _key_expr = new KeyExpr(key_expr); @@ -86,7 +85,7 @@ export class Liveliness { } get(key_expr: IntoKeyExpr, options?: LivelinessGetOptions): Receiver | undefined { - console.log(key_expr, options) + let _key_expr = new KeyExpr(key_expr); let _timeout_millis: number | undefined = undefined; diff --git a/zenoh-ts/src/querier.ts b/zenoh-ts/src/querier.ts new file mode 100644 index 0000000..9fd019e --- /dev/null +++ b/zenoh-ts/src/querier.ts @@ -0,0 +1,259 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +// External +import { SimpleChannel } from "channel-ts"; +// Remote API +import { ReplyWS } from "./remote_api/interface/ReplyWS.js"; +// API +import { IntoZBytes, ZBytes } from "./z_bytes.js"; +import { CongestionControl, ConsolidationMode, Priority, } from "./sample.js"; +import { TimeDuration } from "typed-duration"; +import { RemoteQuerier } from "./remote_api/querier.js"; +import { KeyExpr } from "./key_expr.js"; +import { Encoding } from "crypto"; +import { Receiver } from "./session.js"; + +export enum QueryTarget { + /// Let Zenoh find the BestMatching queryable capabale of serving the query. + BestMatching, + /// Deliver the query to all queryables matching the query's key expression. + All, + /// Deliver the query to all queryables matching the query's key expression that are declared as complete. + AllComplete, +} + +/** + * Convenience function to convert between QueryTarget and int + * @internal + */ +export function query_target_to_int(query_target?: QueryTarget): number { + switch (query_target) { + case QueryTarget.BestMatching: + return 0; + case QueryTarget.All: + return 1; + case QueryTarget.AllComplete: + return 2; + default: + // Default is QueryTarget.BestMatching + return 1; + } +} + +export enum Locality { + SessionLocal, + Remote, + Any, +} + +/** + * Convenience function to convert between Locality and int + * @internal + */ +export function locality_to_int(query_target?: Locality): number { + switch (query_target) { + case Locality.SessionLocal: + return 0; + case Locality.Remote: + return 1; + case Locality.Any: + return 2; + default: + // Default is Locality.Any + return 2; + } +} + +export enum ReplyKeyExpr { + /// Accept replies whose key expressions may not match the query key expression. + Any, + /// Accept replies whose key expressions match the query key expression. + MatchingQuery, +} + +/** + * Convenience function to convert between QueryTarget function and int + * @internal + */ +export function reply_key_expr_to_int(query_target?: ReplyKeyExpr): number { + switch (query_target) { + case ReplyKeyExpr.Any: + return 0; + case ReplyKeyExpr.MatchingQuery: + return 1; + default: + // Default is ReplyKeyExpr.MatchingQuery + return 1; + } +} + + +export interface QuerierOptions { + congestion_control?: CongestionControl, + consolidation?: ConsolidationMode, + priority?: Priority, + express?: boolean, + target: QueryTarget + timeout?: TimeDuration, + allowed_destination?: Locality + // + accept_replies?: ReplyKeyExpr +} + +export interface QuerierGetOptions { + encoding?: Encoding, + payload?: IntoZBytes, + attachment?: IntoZBytes, + parameters?: string +} + +/** + * Queryable class used to receive Query's from the network and handle Reply's + * created by Session.declare_queryable + */ +export class Querier { + private _remote_querier: RemoteQuerier; + private _key_expr: KeyExpr; + private _congestion_control: CongestionControl; + private _priority: Priority; + private _accept_replies: ReplyKeyExpr; + private undeclared: boolean; + /** + * @ignore + */ + dispose() { + this.undeclare(); + } + + /** + * Returns a Queryable + * Note! : user must use declare_queryable on a session + */ + constructor( + remote_querier: RemoteQuerier, + key_expr: KeyExpr, + congestion_control: CongestionControl, + priority: Priority, + accept_replies: ReplyKeyExpr, + ) { + this._remote_querier = remote_querier; + this._key_expr = key_expr; + this._congestion_control = congestion_control; + this._priority = priority; + this._accept_replies = accept_replies; + this.undeclared = false; + // TODO: Look at finalization registry + // Queryable.registry.register(this, remote_queryable, this) + } + + /** + * Undeclares Queryable + * @returns void + */ + undeclare() { + this.undeclared = true; + // Finalization registry + // Queryable.registry.unregister(this); + this._remote_querier.undeclare() + } + + /** + * returns key expression for this Querier + * @returns KeyExpr + */ + key_expr() { + return this._key_expr; + } + + /** + * returns Congestion Control for this Querier + * @returns CongestionControl + */ + congestion_control() { + return this._congestion_control; + } + + /** + * returns Priority for this Querier + * @returns Priority + */ + priority() { + return this._priority; + } + + /** + * returns ReplyKeyExpr for this Querier + * @returns ReplyKeyExpr + */ + accept_replies() { + return this._accept_replies; + } + + /** + * Issue a Get request on this querier + * @returns Promise + */ + get(get_options?: QuerierGetOptions): Receiver | undefined { + if (this.undeclared == true) { + return undefined; + } + let _payload; + let _attachment; + let _parameters; + let _encoding = get_options?.encoding?.toString() + + if (get_options?.attachment != undefined) { + _attachment = Array.from(new ZBytes(get_options?.attachment).buffer()) + } + if (get_options?.payload != undefined) { + _payload = Array.from(new ZBytes(get_options?.payload).buffer()) + } + if (get_options?.parameters != undefined) { + _parameters = get_options?.parameters; + } + + let chan: SimpleChannel = this._remote_querier.get( + _encoding, + _parameters, + _attachment, + _payload, + ); + + let receiver = new Receiver(chan); + + return receiver; + // if (callback != undefined) { + // executeAsync(async () => { + // for await (const message of chan) { + // // This horribleness comes from SimpleChannel sending a 0 when the channel is closed + // if (message != undefined && (message as unknown as number) != 0) { + // let reply = new Reply(message); + // if (callback != undefined) { + // callback(reply); + // } + // } else { + // break + // } + // } + // }); + // return undefined; + // } else { + // return receiver; + // } + + } + + +} diff --git a/zenoh-ts/src/remote_api/interface/ControlMsg.ts b/zenoh-ts/src/remote_api/interface/ControlMsg.ts index 9959d53..07fa815 100644 --- a/zenoh-ts/src/remote_api/interface/ControlMsg.ts +++ b/zenoh-ts/src/remote_api/interface/ControlMsg.ts @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel.js"; import type { LivelinessMsg } from "./LivelinessMsg.js"; import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper.js"; -export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "Liveliness": LivelinessMsg }; +export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg }; diff --git a/zenoh-ts/src/remote_api/querier.ts b/zenoh-ts/src/remote_api/querier.ts new file mode 100644 index 0000000..47bec58 --- /dev/null +++ b/zenoh-ts/src/remote_api/querier.ts @@ -0,0 +1,80 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +import { v4 as uuidv4 } from "uuid"; +import { RemoteSession } from "./session.js"; +import { ControlMsg } from "./interface/ControlMsg.js" +import { SimpleChannel } from "channel-ts"; +import { ReplyWS } from "./interface/ReplyWS.js"; +import { encode as b64_str_from_bytes } from "base64-arraybuffer"; + +type UUID = typeof uuidv4 | string; + +export class RemoteQuerier { + private querier_id: UUID; + private session_ref: RemoteSession; + + constructor( + querier_id: UUID, + session_ref: RemoteSession, + ) { + this.querier_id = querier_id; + this.session_ref = session_ref; + } + + undeclare() { + + let control_msg: ControlMsg = { + UndeclareQuerier: this.querier_id as string + }; + + this.session_ref.send_ctrl_message(control_msg); + } + + get( + _encoding?: string, + _parameters?: string, + _attachment?: Array, + _payload?: Array, + ): SimpleChannel { + let get_id = uuidv4(); + let channel: SimpleChannel = new SimpleChannel(); + this.session_ref.get_receiver.set(get_id, channel); + + let payload = undefined; + if (_payload != undefined) { + payload = b64_str_from_bytes(new Uint8Array(_payload)) + } + let attachment = undefined; + if (_attachment != undefined) { + attachment = b64_str_from_bytes(new Uint8Array(_attachment)) + } + + let control_msg: ControlMsg = { + QuerierGet: { + querier_id: this.querier_id as string, + get_id: get_id, + encoding: _encoding, + payload: payload, + attachment: attachment, + } + }; + + this.session_ref.send_ctrl_message(control_msg); + return channel; + } + +} + + diff --git a/zenoh-ts/src/remote_api/session.ts b/zenoh-ts/src/remote_api/session.ts index 8828d54..7906007 100644 --- a/zenoh-ts/src/remote_api/session.ts +++ b/zenoh-ts/src/remote_api/session.ts @@ -32,6 +32,7 @@ import { ReplyWS } from "./interface/ReplyWS.js"; import { QueryableMsg } from "./interface/QueryableMsg.js"; import { QueryReplyWS } from "./interface/QueryReplyWS.js"; import { HandlerChannel } from "./interface/HandlerChannel.js"; +import { RemoteQuerier } from "./querier.js" // ██████ ███████ ███ ███ ██████ ████████ ███████ ███████ ███████ ███████ ███████ ██ ██████ ███ ██ // ██ ██ ██ ████ ████ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ████ ██ @@ -326,6 +327,45 @@ export class RemoteSession { return publisher; } + declare_remote_querier( + key_expr: string, + consolidation?: number, + congestion_control?: number, + priority?: number, + express?: boolean, + target?: number, + allowed_destination?: number, + accept_replies?: number, + timeout_milliseconds?: number, + ): RemoteQuerier { + let timeout = undefined; + if (timeout_milliseconds !== undefined) { + timeout = timeout_milliseconds; + } + + let uuid: string = uuidv4(); + let querier = new RemoteQuerier(uuid, this); + + let control_message: ControlMsg = { + DeclareQuerier: { + id: uuid, + key_expr: key_expr, + congestion_control: congestion_control, + priority: priority, + express: express, + target: target, + timeout: timeout, + accept_replies: accept_replies, + allowed_destination: allowed_destination, + consolidation: consolidation, + }, + }; + + this.send_ctrl_message(control_message); + return querier; + } + + // Liveliness declare_liveliness_token( key_expr: string, diff --git a/zenoh-ts/src/sample.ts b/zenoh-ts/src/sample.ts index 0487d92..8cefd44 100644 --- a/zenoh-ts/src/sample.ts +++ b/zenoh-ts/src/sample.ts @@ -107,6 +107,7 @@ export function congestion_control_to_int( return 0; case CongestionControl.BLOCK: return 1; + // Default is Drop default: return 0; } diff --git a/zenoh-ts/src/session.ts b/zenoh-ts/src/session.ts index 2ae8a47..15ca72c 100644 --- a/zenoh-ts/src/session.ts +++ b/zenoh-ts/src/session.ts @@ -55,6 +55,7 @@ import { HandlerChannel } from "./remote_api/interface/HandlerChannel.js"; // External deps import { Duration, TimeDuration } from 'typed-duration' import { SimpleChannel } from "channel-ts"; +import { locality_to_int, Querier, QuerierOptions, query_target_to_int, reply_key_expr_to_int, ReplyKeyExpr } from "./querier.js"; function executeAsync(func: any) { setTimeout(func, 0); @@ -210,7 +211,6 @@ export class Session { let _priority; let _express; let _attachment; - let _encoding = put_opts?.encoding?.toString() let _congestion_control = congestion_control_to_int(put_opts?.congestion_control); @@ -563,8 +563,75 @@ export class Session { ); return publisher; } + + /** + * Declares a Querier + * + * @param {IntoKeyExpr} keyexpr - string of key_expression + * @param {QuerierOptions} publisher_opts - Optional, set of options to be used when declaring a publisher + * @returns Publisher + */ + declare_querier( + into_keyexpr: IntoKeyExpr, + querier_opts: QuerierOptions, + ): Querier { + const key_expr = new KeyExpr(into_keyexpr); + + // Optional Parameters + let _priority; + let priority = Priority.DATA; + if (querier_opts?.priority != null) { + _priority = priority_to_int(querier_opts?.priority); + priority = querier_opts?.priority; + } + + let _congestion_control; + let congestion_control = CongestionControl.DROP; + if (querier_opts?.congestion_control != null) { + _congestion_control = congestion_control_to_int(querier_opts?.congestion_control); + congestion_control = querier_opts?.congestion_control; + } + + let _accept_replies; + let accept_replies = ReplyKeyExpr.Any; + if (querier_opts?.accept_replies != null) { + _accept_replies = reply_key_expr_to_int(querier_opts?.accept_replies); + accept_replies = querier_opts?.accept_replies; + } + + let _consolidation = consolidation_mode_to_int(querier_opts?.consolidation); + let _target = query_target_to_int(querier_opts?.target); + let _allowed_destination = locality_to_int(querier_opts?.allowed_destination); + let _express = querier_opts?.express; + let _timeout_millis: number | undefined = undefined; + + if (querier_opts?.timeout !== undefined) { + _timeout_millis = Duration.milliseconds.from(querier_opts?.timeout); + } + + let remote_querier = this.remote_session.declare_remote_querier( + key_expr.toString(), + _consolidation, + _congestion_control, + _priority, + _express, + _target, + _allowed_destination, + _accept_replies, + _timeout_millis, + ); + + return new Querier( + remote_querier, + key_expr, + congestion_control, + priority, + accept_replies, + ); + } } + function isGetChannelClose(msg: any): msg is GetChannelClose { return msg === GetChannelClose.Disconnected; } @@ -595,7 +662,7 @@ export class Receiver { /** * @ignore */ - private constructor(receiver: SimpleChannel) { + constructor(receiver: SimpleChannel) { this.receiver = receiver; }