From 7b1c9406252ec1d571f8040b4872eeeb1bc014d8 Mon Sep 17 00:00:00 2001 From: Darius Maitia Date: Mon, 9 Dec 2024 06:32:06 -0300 Subject: [PATCH] Adding Querier (unstable) to Zenoh-Kotlin (#316) * feat(querier): Adding Querier (missing docs) * feat(querier): Adding docs and fix undeclare querier on drop * Cargo fmt * Setting get consolidation mode default to AUTO * feat(querier): Adding 'Unstable' annotation to the querier feature. --- examples/build.gradle.kts | 1 + examples/src/main/kotlin/io.zenoh/ZQuerier.kt | 93 +++++++++ zenoh-jni/src/lib.rs | 1 + zenoh-jni/src/querier.rs | 137 ++++++++++++++ zenoh-jni/src/session.rs | 65 ++++++- .../src/commonMain/kotlin/io/zenoh/Session.kt | 52 ++++- .../io/zenoh/annotations/Annotations.kt | 23 +++ .../kotlin/io/zenoh/jni/JNIQuerier.kt | 121 ++++++++++++ .../kotlin/io/zenoh/jni/JNISession.kt | 29 +++ .../kotlin/io/zenoh/query/Querier.kt | 178 ++++++++++++++++++ .../commonTest/kotlin/io/zenoh/QuerierTest.kt | 88 +++++++++ 11 files changed, 785 insertions(+), 3 deletions(-) create mode 100644 examples/src/main/kotlin/io.zenoh/ZQuerier.kt create mode 100644 zenoh-jni/src/querier.rs create mode 100644 zenoh-kotlin/src/commonMain/kotlin/io/zenoh/annotations/Annotations.kt create mode 100644 zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuerier.kt create mode 100644 zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Querier.kt create mode 100644 zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QuerierTest.kt diff --git a/examples/build.gradle.kts b/examples/build.gradle.kts index 08f8eb398..fd48bf30e 100644 --- a/examples/build.gradle.kts +++ b/examples/build.gradle.kts @@ -40,6 +40,7 @@ tasks { "ZPub", "ZPubThr", "ZPut", + "ZQuerier", "ZQueryable", "ZScout", "ZSub", diff --git a/examples/src/main/kotlin/io.zenoh/ZQuerier.kt b/examples/src/main/kotlin/io.zenoh/ZQuerier.kt new file mode 100644 index 000000000..704f93f87 --- /dev/null +++ b/examples/src/main/kotlin/io.zenoh/ZQuerier.kt @@ -0,0 +1,93 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.long +import io.zenoh.annotations.Unstable +import io.zenoh.bytes.ZBytes +import io.zenoh.query.QueryTarget +import io.zenoh.query.intoSelector +import java.time.Duration + +class ZQuerier(private val emptyArgs: Boolean) : CliktCommand( + help = "Zenoh Querier example" +) { + + @OptIn(Unstable::class) + override fun run() { + val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode) + + Zenoh.initLogFromEnvOr("error") + + val session = Zenoh.open(config).getOrThrow() + val selector = selector.intoSelector().getOrThrow() + + val target = target ?.let{ QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING + val timeout = Duration.ofMillis(timeout) + val querier = session.declareQuerier(selector.keyExpr, target, timeout = timeout).getOrThrow() + + for (idx in 0..Int.MAX_VALUE) { + Thread.sleep(1000) + val payload = "[${idx.toString().padStart(4, ' ')}] ${payload ?: ""}" + println("Querying '$selector' with payload: '$payload'...") + querier.get(callback = { + it.result.onSuccess { sample -> + println(">> Received ('${sample.keyExpr}': '${sample.payload}')") + }.onFailure { error -> + println(">> Received (ERROR: '${error.message}')") + } + }, payload = ZBytes.from(payload), parameters = selector.parameters) + } + } + + private val selector by option( + "-s", + "--selector", + help = "The selection of resources to query [default: demo/example/**]", + metavar = "selector" + ).default("demo/example/**") + private val payload by option( + "-p", "--payload", help = "An optional payload to put in the query.", metavar = "payload" + ) + private val target by option( + "-t", + "--target", + help = "The target queryables of the query. Default: BEST_MATCHING. " + "[possible values: BEST_MATCHING, ALL, ALL_COMPLETE]", + metavar = "target" + ) + private val timeout by option( + "-o", "--timeout", help = "The query timeout in milliseconds [default: 10000]", metavar = "timeout" + ).long().default(10000) + private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config") + private val mode by option( + "-m", + "--mode", + help = "The session mode. Default: peer. Possible values: [peer, client, router]", + metavar = "mode" + ).default("peer") + private val connect: List by option( + "-e", "--connect", help = "Endpoints to connect to.", metavar = "connect" + ).multiple() + private val listen: List by option( + "-l", "--listen", help = "Endpoints to listen on.", metavar = "listen" + ).multiple() + private val noMulticastScouting: Boolean by option( + "--no-multicast-scouting", help = "Disable the multicast-based scouting mechanism." + ).flag(default = false) +} + +fun main(args: Array) = ZQuerier(args.isEmpty()).main(args) diff --git a/zenoh-jni/src/lib.rs b/zenoh-jni/src/lib.rs index 77138bdfc..ff3981a4f 100644 --- a/zenoh-jni/src/lib.rs +++ b/zenoh-jni/src/lib.rs @@ -18,6 +18,7 @@ mod key_expr; mod liveliness; mod logger; mod publisher; +mod querier; mod query; mod queryable; mod scouting; diff --git a/zenoh-jni/src/querier.rs b/zenoh-jni/src/querier.rs new file mode 100644 index 000000000..f8af486bd --- /dev/null +++ b/zenoh-jni/src/querier.rs @@ -0,0 +1,137 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::sync::Arc; + +use jni::{ + objects::{JByteArray, JClass, JObject, JString}, + sys::jint, + JNIEnv, +}; +use zenoh::{key_expr::KeyExpr, query::Querier, Wait}; + +use crate::{ + errors::ZResult, + key_expr::process_kotlin_key_expr, + session::{on_reply_error, on_reply_success}, + throw_exception, + utils::{ + decode_byte_array, decode_encoding, decode_string, get_callback_global_ref, get_java_vm, + load_on_close, + }, + zerror, +}; + +/// Perform a Zenoh GET through a querier. +/// +/// This function is meant to be called from Java/Kotlin code through JNI. +/// +/// Parameters: +/// - `env`: The JNI environment. +/// - `_class`: The JNI class. +/// - `querier_ptr`: The raw pointer to the querier. +/// - `key_expr_ptr`: A raw pointer to the [KeyExpr] provided to the kotlin querier. May be null in case of using an +/// undeclared key expression. +/// - `key_expr_str`: String representation of the key expression used during the querier declaration. +/// It won't be considered in case a key_expr_ptr to a declared key expression is provided. +/// - `selector_params`: Optional selector parameters for the query. +/// - `callback`: Reference to the Kotlin callback to be run upon receiving a reply. +/// - `on_close`: Reference to a kotlin callback to be run upon finishing the get operation, mostly used for closing a provided channel. +/// - `attachment`: Optional attachment. +/// - `payload`: Optional payload for the query. +/// - `encoding_id`: Encoding id of the payload provided. +/// - `encoding_schema`: Encoding schema of the payload provided. +/// +#[no_mangle] +#[allow(non_snake_case)] +pub unsafe extern "C" fn Java_io_zenoh_jni_JNIQuerier_getViaJNI( + mut env: JNIEnv, + _class: JClass, + querier_ptr: *const Querier, + key_expr_ptr: /*nullable*/ *const KeyExpr<'static>, + key_expr_str: JString, + selector_params: /*nullable*/ JString, + callback: JObject, + on_close: JObject, + attachment: /*nullable*/ JByteArray, + payload: /*nullable*/ JByteArray, + encoding_id: jint, + encoding_schema: /*nullable*/ JString, +) { + let querier = Arc::from_raw(querier_ptr); + let _ = || -> ZResult<()> { + let key_expr = process_kotlin_key_expr(&mut env, &key_expr_str, key_expr_ptr)?; + let java_vm = Arc::new(get_java_vm(&mut env)?); + let callback_global_ref = get_callback_global_ref(&mut env, callback)?; + let on_close_global_ref = get_callback_global_ref(&mut env, on_close)?; + let on_close = load_on_close(&java_vm, on_close_global_ref); + let mut get_builder = querier.get().callback(move |reply| { + || -> ZResult<()> { + on_close.noop(); // Does nothing, but moves `on_close` inside the closure so it gets destroyed with the closure + tracing::debug!("Receiving reply through JNI: {:?}", reply); + let mut env = java_vm.attach_current_thread_as_daemon().map_err(|err| { + zerror!("Unable to attach thread for GET query callback: {}.", err) + })?; + + match reply.result() { + Ok(sample) => { + on_reply_success(&mut env, reply.replier_id(), sample, &callback_global_ref) + } + Err(error) => { + on_reply_error(&mut env, reply.replier_id(), error, &callback_global_ref) + } + } + }() + .unwrap_or_else(|err| tracing::error!("Error on get callback: {err}")); + }); + + if !selector_params.is_null() { + let params = decode_string(&mut env, &selector_params)?; + get_builder = get_builder.parameters(params) + }; + + if !payload.is_null() { + let encoding = decode_encoding(&mut env, encoding_id, &encoding_schema)?; + get_builder = get_builder.encoding(encoding); + get_builder = get_builder.payload(decode_byte_array(&env, payload)?); + } + + if !attachment.is_null() { + let attachment = decode_byte_array(&env, attachment)?; + get_builder = get_builder.attachment::>(attachment); + } + + get_builder + .wait() + .map(|_| tracing::trace!("Performing get on '{key_expr}'.",)) + .map_err(|err| zerror!(err)) + }() + .map_err(|err| throw_exception!(env, err)); + std::mem::forget(querier); +} + +/// +/// Frees the pointer of the querier. +/// +/// After a call to this function, no further jni operations should be performed using the querier associated to the raw pointer provided. +/// +#[no_mangle] +#[allow(non_snake_case)] +pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuerier_freePtrViaJNI( + _env: JNIEnv, + _: JClass, + querier_ptr: *const Querier<'static>, +) { + Arc::from_raw(querier_ptr); +} diff --git a/zenoh-jni/src/session.rs b/zenoh-jni/src/session.rs index 43017c0d8..831810256 100644 --- a/zenoh-jni/src/session.rs +++ b/zenoh-jni/src/session.rs @@ -23,7 +23,7 @@ use zenoh::{ config::Config, key_expr::KeyExpr, pubsub::{Publisher, Subscriber}, - query::{Query, Queryable, ReplyError, Selector}, + query::{Querier, Query, Queryable, ReplyError, Selector}, sample::Sample, session::{Session, ZenohId}, Wait, @@ -514,6 +514,69 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareSubscriberViaJNI( }) } +/// Declare a Zenoh querier via JNI. +/// +/// This function is meant to be called from Java/Kotlin code through JNI. +/// +/// Parameters: +/// - `env`: The JNI environment. +/// - `_class`: The JNI class. +/// - `key_expr_ptr`: A raw pointer to the [KeyExpr] to be used for the querier. May be null in case of using an +/// undeclared key expression. +/// - `key_expr_str`: String representation of the key expression to be used to declare the querier. +/// It won't be considered in case a key_expr_ptr to a declared key expression is provided. +/// - `target`: The ordinal value of the query target enum value. +/// - `consolidation`: The ordinal value of the consolidation enum value. +/// - `congestion_control`: The ordinal value of the congestion control enum value. +/// - `priority`: The ordinal value of the priority enum value. +/// - `is_express`: The boolean express value of the QoS provided. +/// - `timeout_ms`: The timeout in milliseconds. +#[no_mangle] +#[allow(non_snake_case)] +pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareQuerierViaJNI( + mut env: JNIEnv, + _class: JClass, + key_expr_ptr: /*nullable*/ *const KeyExpr<'static>, + key_expr_str: JString, + session_ptr: *const Session, + target: jint, + consolidation: jint, + congestion_control: jint, + priority: jint, + is_express: jboolean, + timeout_ms: jlong, +) -> *const Querier<'static> { + let session = Arc::from_raw(session_ptr); + || -> ZResult<*const Querier<'static>> { + let key_expr = process_kotlin_key_expr(&mut env, &key_expr_str, key_expr_ptr)?; + let query_target = decode_query_target(target)?; + let consolidation = decode_consolidation(consolidation)?; + let congestion_control = decode_congestion_control(congestion_control)?; + let timeout = Duration::from_millis(timeout_ms as u64); + let priority = decode_priority(priority)?; + tracing::debug!("Declaring querier on '{}'...", key_expr); + + let querier = session + .declare_querier(key_expr.to_owned()) + .congestion_control(congestion_control) + .consolidation(consolidation) + .express(is_express != 0) + .target(query_target) + .priority(priority) + .timeout(timeout) + .wait() + .map_err(|err| zerror!(err))?; + + tracing::debug!("Querier declared on '{}'.", key_expr); + std::mem::forget(session); + Ok(Arc::into_raw(Arc::new(querier))) + }() + .unwrap_or_else(|err| { + throw_exception!(env, err); + null() + }) +} + /// Declare a Zenoh queryable via JNI. /// /// This function is meant to be called from Java/Kotlin code through JNI. diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt index b82e69aca..4ad182514 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt @@ -14,6 +14,7 @@ package io.zenoh +import io.zenoh.annotations.Unstable import io.zenoh.exceptions.ZError import io.zenoh.handlers.Callback import io.zenoh.handlers.ChannelHandler @@ -378,6 +379,40 @@ class Session private constructor(private val config: Config) : AutoCloseable { }, handler.receiver(), complete) } + /** + * Declare a [Querier]. + * + * A querier allows to send queries to a queryable. + * + * Queriers are automatically undeclared when dropped. + * + * Example: + * ```kotlin + * val session = Zenoh.open(config).getOrThrow(); + * val keyExpr = "a/b/c".intoKeyExpr().getOrThrow(); + * + * val querier = session.declareQuerier(keyExpr).getOrThrow(); + * querier.get(callback = { + * it.result.onSuccess { sample -> + * println(">> Received ('${sample.keyExpr}': '${sample.payload}')") + * }.onFailure { error -> + * println(">> Received (ERROR: '${error.message}')") + * } + * } + * ) + * ``` + */ + @Unstable + fun declareQuerier( + keyExpr: KeyExpr, + target: QueryTarget = QueryTarget.BEST_MATCHING, + qos: QoS = QoS.default(), + consolidation: ConsolidationMode = ConsolidationMode.AUTO, + timeout: Duration = Duration.ofMillis(10000) + ): Result { + return resolveQuerier(keyExpr, target, consolidation, qos, timeout) + } + /** * Declare a [KeyExpr]. * @@ -467,7 +502,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { attachment: IntoZBytes? = null, timeout: Duration = Duration.ofMillis(10000), target: QueryTarget = QueryTarget.BEST_MATCHING, - consolidation: ConsolidationMode = ConsolidationMode.NONE, + consolidation: ConsolidationMode = ConsolidationMode.AUTO, onClose: (() -> Unit)? = null ): Result { return resolveGet( @@ -544,7 +579,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { attachment: IntoZBytes? = null, timeout: Duration = Duration.ofMillis(10000), target: QueryTarget = QueryTarget.BEST_MATCHING, - consolidation: ConsolidationMode = ConsolidationMode.NONE, + consolidation: ConsolidationMode = ConsolidationMode.AUTO, onClose: (() -> Unit)? = null ): Result { return resolveGet( @@ -743,6 +778,19 @@ class Session private constructor(private val config: Config) : AutoCloseable { } ?: Result.failure(sessionClosedException) } + @OptIn(Unstable::class) + private fun resolveQuerier( + keyExpr: KeyExpr, + target: QueryTarget, + consolidation: ConsolidationMode, + qos: QoS, + timeout: Duration + ): Result { + return jniSession?.run { + declareQuerier(keyExpr, target, consolidation, qos, timeout) + } ?: Result.failure(sessionClosedException) + } + private fun resolveGet( selector: Selector, callback: Callback, diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/annotations/Annotations.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/annotations/Annotations.kt new file mode 100644 index 000000000..b296e0b0b --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/annotations/Annotations.kt @@ -0,0 +1,23 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.annotations + +@RequiresOptIn( + level = RequiresOptIn.Level.WARNING, + message = "This feature is unstable and may change in future releases." +) +@Retention(AnnotationRetention.BINARY) +@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION) +annotation class Unstable diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuerier.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuerier.kt new file mode 100644 index 000000000..9d12b70ec --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQuerier.kt @@ -0,0 +1,121 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.jni + +import io.zenoh.bytes.Encoding +import io.zenoh.bytes.IntoZBytes +import io.zenoh.bytes.into +import io.zenoh.config.ZenohId +import io.zenoh.exceptions.ZError +import io.zenoh.handlers.Callback +import io.zenoh.jni.callbacks.JNIGetCallback +import io.zenoh.jni.callbacks.JNIOnCloseCallback +import io.zenoh.keyexpr.KeyExpr +import io.zenoh.qos.CongestionControl +import io.zenoh.qos.Priority +import io.zenoh.qos.QoS +import io.zenoh.query.Parameters +import io.zenoh.query.Reply +import io.zenoh.query.ReplyError +import io.zenoh.sample.Sample +import io.zenoh.sample.SampleKind +import org.apache.commons.net.ntp.TimeStamp + +internal class JNIQuerier(val ptr: Long) { + + fun performGet( + keyExpr: KeyExpr, + parameters: Parameters?, + callback: Callback, + onClose: () -> Unit, + receiver: R, + attachment: IntoZBytes?, + payload: IntoZBytes?, + encoding: Encoding? + ): Result = runCatching { + val getCallback = JNIGetCallback { + replierId: ByteArray?, + success: Boolean, + keyExpr: String?, + payload: ByteArray, + encodingId: Int, + encodingSchema: String?, + kind: Int, + timestampNTP64: Long, + timestampIsValid: Boolean, + attachmentBytes: ByteArray?, + express: Boolean, + priority: Int, + congestionControl: Int, + -> + val reply: Reply + if (success) { + val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null + val sample = Sample( + KeyExpr(keyExpr!!, null), + payload.into(), + Encoding(encodingId, schema = encodingSchema), + SampleKind.fromInt(kind), + timestamp, + QoS(CongestionControl.fromInt(congestionControl), Priority.fromInt(priority), express), + attachmentBytes?.into() + ) + reply = Reply(replierId?.let { ZenohId(it) }, Result.success(sample)) + } else { + reply = Reply( + replierId?.let { ZenohId(it) }, Result.failure( + ReplyError( + payload.into(), + Encoding(encodingId, schema = encodingSchema) + ) + ) + ) + } + callback.run(reply) + } + getViaJNI(this.ptr, + keyExpr.jniKeyExpr?.ptr ?: 0, + keyExpr.keyExpr, + parameters?.toString(), + getCallback, + onClose, + attachment?.into()?.bytes, + payload?.into()?.bytes, + encoding?.id ?: Encoding.default().id, + encoding?.schema + ) + receiver + } + + fun close() { + freePtrViaJNI(ptr) + } + + @Throws(ZError::class) + private external fun getViaJNI( + querierPtr: Long, + keyExprPtr: Long, + keyExprString: String, + parameters: String?, + callback: JNIGetCallback, + onClose: JNIOnCloseCallback, + attachmentBytes: ByteArray?, + payload: ByteArray?, + encodingId: Int, + encodingSchema: String?, + ) + + private external fun freePtrViaJNI(ptr: Long) +} diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt index d3550b4ea..8ab4d6dce 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt @@ -27,6 +27,7 @@ import io.zenoh.bytes.IntoZBytes import io.zenoh.config.ZenohId import io.zenoh.bytes.into import io.zenoh.Config +import io.zenoh.annotations.Unstable import io.zenoh.pubsub.Delete import io.zenoh.pubsub.Publisher import io.zenoh.pubsub.Put @@ -136,6 +137,21 @@ internal class JNISession { Queryable(keyExpr, receiver, JNIQueryable(queryableRawPtr)) } + @OptIn(Unstable::class) + fun declareQuerier( + keyExpr: KeyExpr, + target: QueryTarget, + consolidation: ConsolidationMode, + qos: QoS, + timeout: Duration + ): Result = runCatching { + val querierRawPtr = declareQuerierViaJNI( + keyExpr.jniKeyExpr?.ptr ?: 0, keyExpr.keyExpr, sessionPtr.get(), target.ordinal, consolidation.ordinal, + qos.congestionControl.ordinal, qos.priority.ordinal, qos.express, timeout.toMillis() + ) + Querier(keyExpr, qos, JNIQuerier(querierRawPtr)) + } + fun performGet( selector: Selector, callback: Callback, @@ -313,6 +329,19 @@ internal class JNISession { complete: Boolean ): Long + @Throws(ZError::class) + private external fun declareQuerierViaJNI( + keyExprPtr: Long, + keyExprString: String, + sessionPtr: Long, + target: Int, + consolidation: Int, + congestionControl: Int, + priority: Int, + express: Boolean, + timeoutMs: Long + ): Long + @Throws(ZError::class) private external fun declareKeyExprViaJNI(sessionPtr: Long, keyExpr: String): Long diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Querier.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Querier.kt new file mode 100644 index 000000000..a8503865e --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Querier.kt @@ -0,0 +1,178 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.query + +import io.zenoh.annotations.Unstable +import io.zenoh.bytes.Encoding +import io.zenoh.bytes.IntoZBytes +import io.zenoh.exceptions.ZError +import io.zenoh.handlers.Callback +import io.zenoh.handlers.ChannelHandler +import io.zenoh.handlers.Handler +import io.zenoh.jni.JNIQuerier +import io.zenoh.keyexpr.KeyExpr +import io.zenoh.qos.QoS +import io.zenoh.session.SessionDeclaration +import kotlinx.coroutines.channels.Channel + +/** + * A querier that allows to send queries to a [Queryable]. + * + * Queriers are automatically undeclared when dropped. + * + * Example: + * ```kotlin + * val session = Zenoh.open(config).getOrThrow(); + * val keyExpr = "a/b/c".intoKeyExpr().getOrThrow(); + * + * val querier = session.declareQuerier(keyExpr).getOrThrow(); + * querier.get(callback = { + * it.result.onSuccess { sample -> + * println(">> Received ('${sample.keyExpr}': '${sample.payload}')") + * }.onFailure { error -> + * println(">> Received (ERROR: '${error.message}')") + * } + * } + * ) + * ``` + * + */ +@Unstable +class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private var jniQuerier: JNIQuerier?) : + SessionDeclaration, AutoCloseable { + + /** + * Perform a get operation to the [keyExpr] from the Querier and pipe the incoming + * replies into the [channel] provided. + * + * @param channel The [Channel] that will receive the replies. + * @param parameters Optional [Parameters] for the query. + * @param payload Optional payload for the query. + * @param encoding Optional encoding for the payload of the query. + * @param attachment Optional attachment for the query. + * @return A result with the provided channel. + */ + fun get( + channel: Channel, + parameters: Parameters? = null, + payload: IntoZBytes? = null, + encoding: Encoding? = null, + attachment: IntoZBytes? = null + ): Result> { + val handler = ChannelHandler(channel) + return jniQuerier?.performGet( + keyExpr, + parameters, + handler::handle, + handler::onClose, + handler.receiver(), + attachment, + payload, + encoding + ) ?: throw ZError("Querier is not valid.") + } + + /** + * Perform a get operation to the [keyExpr] from the Querier and handle the incoming replies + * with the [callback] provided. + * + * @param callback [Callback] to be run upon receiving a [Reply] to the query. + * @param parameters Optional [Parameters] for the query. + * @param payload Optional payload for the query. + * @param encoding Optional encoding for the payload of the query. + * @param attachment Optional attachment for the query. + * @return A result with the status of the operation. + */ + fun get( + callback: Callback, + parameters: Parameters? = null, + payload: IntoZBytes? = null, + encoding: Encoding? = null, + attachment: IntoZBytes? = null + ): Result { + return jniQuerier?.performGet( + keyExpr, + parameters, + callback, + {}, + Unit, + attachment, + payload, + encoding + ) ?: throw ZError("Querier is not valid.") + } + + /** + * Perform a get operation to the [keyExpr] from the Querier and handle the incoming replies + * with the [handler] provided. + * + * @param handler [Handler] to handle the receiving replies to the query. + * @param parameters Optional [Parameters] for the query. + * @param payload Optional payload for the query. + * @param encoding Optional encoding for the payload of the query. + * @param attachment Optional attachment for the query. + * @return A result with the status of the operation. + */ + fun get( + handler: Handler, + parameters: Parameters? = null, + payload: IntoZBytes? = null, + encoding: Encoding? = null, + attachment: IntoZBytes? = null + ): Result { + return jniQuerier?.performGet( + keyExpr, + parameters, + handler::handle, + handler::onClose, + handler.receiver(), + attachment, + payload, + encoding + ) ?: throw ZError("Querier is not valid.") + } + + /** + * Get the [QoS.congestionControl] of the querier. + */ + fun congestionControl() = qos.congestionControl + + /** + * Get the [QoS.priority] of the querier. + */ + fun priority() = qos.priority + + /** + * Undeclares the querier. After calling this function, the querier won't be valid anymore and get operations + * performed on it will fail. + */ + override fun undeclare() { + jniQuerier?.close() + jniQuerier = null + } + + /** + * Closes the querier. Equivalent to [undeclare], this function is automatically called when using + * try-with-resources. + */ + override fun close() { + undeclare() + } + + protected fun finalize() { + undeclare() + } + +} diff --git a/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QuerierTest.kt b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QuerierTest.kt new file mode 100644 index 000000000..4e768a82f --- /dev/null +++ b/zenoh-kotlin/src/commonTest/kotlin/io/zenoh/QuerierTest.kt @@ -0,0 +1,88 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh + +import io.zenoh.annotations.Unstable +import io.zenoh.bytes.Encoding +import io.zenoh.bytes.ZBytes +import io.zenoh.keyexpr.KeyExpr +import io.zenoh.keyexpr.intoKeyExpr +import io.zenoh.qos.QoS +import io.zenoh.query.Reply +import io.zenoh.sample.Sample +import io.zenoh.sample.SampleKind +import kotlinx.coroutines.runBlocking +import org.apache.commons.net.ntp.TimeStamp +import java.time.Instant +import java.util.* +import kotlin.test.* + +class QuerierTest { + + companion object { + val testPayload = ZBytes.from("Hello queryable") + } + + private lateinit var session: Session + private lateinit var testKeyExpr: KeyExpr + + @BeforeTest + fun setUp() { + session = Session.open(Config.default()).getOrThrow() + testKeyExpr = "example/testing/keyexpr".intoKeyExpr().getOrThrow() + } + + @AfterTest + fun tearDown() { + session.close() + testKeyExpr.close() + } + + /** Test validating both Queryable and get operations. */ + @OptIn(Unstable::class) + @Test + fun querier_runsWithCallback() = runBlocking { + val sample = Sample( + testKeyExpr, + testPayload, + Encoding.default(), + SampleKind.PUT, + TimeStamp(Date.from(Instant.now())), + QoS() + ) + val examplePayload = ZBytes.from("Example payload") + val exampleAttachment = ZBytes.from("Example attachment") + + val queryable = session.declareQueryable(testKeyExpr, callback = { query -> + assertEquals(exampleAttachment, query.attachment) + assertEquals(examplePayload, query.payload) + query.reply(testKeyExpr, payload = sample.payload, timestamp = sample.timestamp) + }).getOrThrow() + + val querier = session.declareQuerier(testKeyExpr).getOrThrow() + + var receivedReply: Reply? = null + querier.get( + callback = { reply -> receivedReply = reply}, + payload = examplePayload, + attachment = exampleAttachment + ) + + assertEquals(sample, receivedReply?.result?.getOrThrow()) + + queryable.close() + querier.close() + } +}