diff --git a/examples/build.gradle.kts b/examples/build.gradle.kts index e3aa4aba3..7022be640 100644 --- a/examples/build.gradle.kts +++ b/examples/build.gradle.kts @@ -38,6 +38,7 @@ tasks { "ZPubThr", "ZPut", "ZQueryable", + "ZScout", "ZSub", "ZSubThr" ) diff --git a/examples/src/main/kotlin/io.zenoh/ZScout.kt b/examples/src/main/kotlin/io.zenoh/ZScout.kt new file mode 100644 index 000000000..bc8ff1288 --- /dev/null +++ b/examples/src/main/kotlin/io.zenoh/ZScout.kt @@ -0,0 +1,40 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh + +import com.github.ajalt.clikt.core.CliktCommand +import io.zenoh.scouting.WhatAmI +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking + +class ZScout : CliktCommand( + help = "Zenoh Scouting example" +) { + override fun run() { + + println("Scouting...") + + val scout = Zenoh.scout(channel = Channel(), whatAmI = setOf(WhatAmI.Peer, WhatAmI.Router)) + runBlocking { + for (hello in scout.receiver) { + println(hello) + } + } + + scout.stop() + } +} + +fun main(args: Array) = ZScout().main(args) diff --git a/zenoh-jni/src/errors.rs b/zenoh-jni/src/errors.rs index a254c67f5..5afe57730 100644 --- a/zenoh-jni/src/errors.rs +++ b/zenoh-jni/src/errors.rs @@ -31,7 +31,7 @@ macro_rules! jni_error { $crate::errors::Error::Jni($arg.to_string()) }; ($fmt:expr, $($arg:tt)*) => { - Error::Jni(format!($fmt, $($arg)*)) + $crate::errors::Error::Jni(format!($fmt, $($arg)*)) }; } @@ -41,7 +41,7 @@ macro_rules! session_error { $crate::errors::Error::Session($arg.to_string()) }; ($fmt:expr, $($arg:tt)*) => { - Error::Session(format!($fmt, $($arg)*)) + $crate::errors::Error::Session(format!($fmt, $($arg)*)) }; } diff --git a/zenoh-jni/src/lib.rs b/zenoh-jni/src/lib.rs index a982babe4..406d20e22 100644 --- a/zenoh-jni/src/lib.rs +++ b/zenoh-jni/src/lib.rs @@ -18,6 +18,7 @@ mod logger; mod publisher; mod query; mod queryable; +mod scouting; mod session; mod subscriber; mod utils; diff --git a/zenoh-jni/src/scouting.rs b/zenoh-jni/src/scouting.rs new file mode 100644 index 000000000..356bfc5ab --- /dev/null +++ b/zenoh-jni/src/scouting.rs @@ -0,0 +1,130 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ptr::null, sync::Arc}; + +use jni::{ + objects::{JClass, JList, JObject, JString, JValue}, + sys::jint, + JNIEnv, +}; +use zenoh::{config::WhatAmIMatcher, prelude::Wait}; +use zenoh::{scouting::Scout, Config}; + +use crate::{errors::Result, throw_exception, utils::decode_string}; +use crate::{ + session_error, + utils::{get_callback_global_ref, get_java_vm}, +}; + +/// Start a scout. +/// +/// # Params +/// - `whatAmI`: Ordinal value of the WhatAmI enum. +/// - `callback`: Callback to be executed whenever a hello message is received. +/// - `config_string`: Optional embedded configuration as a string. +/// - `format`: format of the `config_string` param. +/// - `config_path`: Optional path to a config file. +/// +/// Note: Either the config_string or the config_path or None can be provided. +/// If none is provided, then the default configuration is loaded. Otherwise +/// it's the config_string or the config_path that are loaded. This consistency +/// logic is granted by the kotlin layer. +/// +/// Returns a pointer to the scout, which must be freed afterwards. +/// If starting the scout fails, an exception is thrown on the JVM, and a null pointer is returned. +/// +#[no_mangle] +#[allow(non_snake_case)] +pub unsafe extern "C" fn Java_io_zenoh_jni_JNIScout_00024Companion_scoutViaJNI( + mut env: JNIEnv, + _class: JClass, + whatAmI: jint, + callback: JObject, + config_string: /*nullable=*/ JString, + format: jint, + config_path: /*nullable=*/ JString, +) -> *const Scout<()> { + || -> Result<*const Scout<()>> { + let callback_global_ref = get_callback_global_ref(&mut env, callback)?; + let java_vm = Arc::new(get_java_vm(&mut env)?); + let whatAmIMatcher: WhatAmIMatcher = (whatAmI as u8).try_into().unwrap(); // The validity of the operation is guaranteed on the kotlin layer. + let config = if config_string.is_null() && config_path.is_null() { + Config::default() + } else if !config_string.is_null() { + let string_config = decode_string(&mut env, &config_string)?; + match format { + 0 /*YAML*/ => { + let deserializer = serde_yaml::Deserializer::from_str(&string_config); + Config::from_deserializer(deserializer).map_err(|err| match err { + Ok(c) => session_error!("Invalid configuration: {}", c), + Err(e) => session_error!("YAML error: {}", e), + })? + } + 1 | 2 /*JSON | JSON5*/ => { + let mut deserializer = + json5::Deserializer::from_str(&string_config).map_err(|err| session_error!(err))?; + Config::from_deserializer(&mut deserializer).map_err(|err| match err { + Ok(c) => session_error!("Invalid configuration: {}", c), + Err(e) => session_error!("JSON error: {}", e), + })? + } + _ => { + // This can never happen unless the Config.Format enum on Kotlin is wrongly modified! + Err(session_error!("Unexpected error: attempting to decode a config with a format other than Json, + Json5 or Yaml. Check Config.Format for eventual modifications..."))? + } + } + } else { + let config_file_path = decode_string(&mut env, &config_path)?; + Config::from_file(config_file_path).map_err(|err| session_error!(err))? + }; + zenoh::scout(whatAmIMatcher, config) + .callback(move |hello| { + tracing::debug!("Received hello: {hello}"); + let _ = || -> jni::errors::Result<()> { + let mut env = java_vm.attach_current_thread_as_daemon()?; + let whatami = hello.whatami() as jint; + let zenohid = env.new_string(hello.zid().to_string())?; + let locators = env + .new_object("java/util/ArrayList", "()V", &[]) + .map(|it| env.auto_local(it))?; + let jlist = JList::from_env(&mut env, &locators)?; + for value in hello.locators() { + let locator = env.new_string(value.as_str())?; + jlist.add(&mut env, &locator)?; + } + env.call_method( + &callback_global_ref, + "run", + "(ILjava/lang/String;Ljava/util/List;)V", + &[ + JValue::from(whatami), + JValue::from(&zenohid), + JValue::from(&locators), + ], + )?; + Ok(()) + }() + .map_err(|err| tracing::error!("Error while scouting: ${err}")); + }) + .wait() + .map(|scout| Arc::into_raw(Arc::new(scout))) + .map_err(|err| session_error!(err)) + }() + .unwrap_or_else(|err| { + throw_exception!(env, err); + null() + }) +} diff --git a/zenoh-jni/src/session.rs b/zenoh-jni/src/session.rs index 5975c5f27..fd38aed41 100644 --- a/zenoh-jni/src/session.rs +++ b/zenoh-jni/src/session.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // -use crate::errors::{Error, Result}; +use crate::errors::Result; use crate::key_expr::process_kotlin_key_expr; use crate::{jni_error, utils::*}; use crate::{session_error, throw_exception}; @@ -165,7 +165,7 @@ fn open_session_with_yaml_config(env: &mut JNIEnv, yaml_config: JString) -> Resu let deserializer = serde_yaml::Deserializer::from_str(&yaml_config); let config = Config::from_deserializer(deserializer).map_err(|err| match err { Ok(c) => session_error!("Invalid configuration: {}", c), - Err(e) => session_error!("JSON error: {}", e), + Err(e) => session_error!("YAML error: {}", e), })?; zenoh::open(config) .wait() diff --git a/zenoh-jni/src/utils.rs b/zenoh-jni/src/utils.rs index 96c705f94..cee9794b9 100644 --- a/zenoh-jni/src/utils.rs +++ b/zenoh-jni/src/utils.rs @@ -14,10 +14,7 @@ use std::sync::Arc; -use crate::{ - errors::{Error, Result}, - jni_error, session_error, throw_exception, -}; +use crate::{errors::Result, jni_error, session_error, throw_exception}; use jni::{ objects::{JByteArray, JObject, JString}, sys::jint, diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Zenoh.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Zenoh.kt index a6861ba8f..d3cfcf518 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Zenoh.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Zenoh.kt @@ -14,8 +14,92 @@ package io.zenoh +import io.zenoh.handlers.Callback +import io.zenoh.handlers.ChannelHandler +import io.zenoh.handlers.Handler +import io.zenoh.jni.JNIScout +import io.zenoh.scouting.Hello +import io.zenoh.scouting.Scout +import io.zenoh.scouting.WhatAmI +import io.zenoh.scouting.WhatAmI.* +import kotlinx.coroutines.channels.Channel + +object Zenoh { + + /** + * Scout for routers and/or peers. + * + * Scout spawns a task that periodically sends scout messages and waits for Hello replies. + * Drop the returned Scout to stop the scouting task or explicitly call [Scout.stop] or [Scout.close]. + * + * @param callback [Callback] to be run when receiving a [Hello] message. + * @param whatAmI [WhatAmI] configuration: it indicates the role of the zenoh node sending the HELLO message. + * @param config Optional [Config] for the scout. + * @return A [Scout] object. + */ + fun scout( + callback: Callback, + whatAmI: Set = setOf(Peer, Router), + config: Config? = null + ): Scout { + ZenohLoad + return JNIScout.scout(whatAmI = whatAmI, callback = callback, receiver = Unit, config = config) + } + + /** + * Scout for routers and/or peers. + * + * Scout spawns a task that periodically sends scout messages and waits for Hello replies. + * Drop the returned Scout to stop the scouting task or explicitly call [Scout.stop] or [Scout.close]. + * + * @param handler [Handler] to handle incoming [Hello] messages. + * @param whatAmI [WhatAmI] configuration: it indicates the role of the zenoh node sending the HELLO message. + * @param config Optional [Config] for the scout. + * @return A [Scout] object. + */ + fun scout( + handler: Handler, + whatAmI: Set = setOf(Peer, Router), + config: Config? = null + ): Scout { + ZenohLoad + return JNIScout.scout( + whatAmI = whatAmI, + callback = { hello -> handler.handle(hello) }, + receiver = handler.receiver(), + config = config + ) + } + + /** + * Scout for routers and/or peers. + * + * Scout spawns a task that periodically sends scout messages and waits for Hello replies. + * Drop the returned Scout to stop the scouting task or explicitly call [Scout.stop] or [Scout.close]. + * + * @param channel [Channel] upon which the incoming [Hello] messages will be piped. + * @param whatAmI [WhatAmI] configuration: it indicates the role of the zenoh node sending the HELLO message. + * @param config Optional [Config] for the scout. + * @return A [Scout] object. + */ + fun scout( + channel: Channel, + whatAmI: Set = setOf(Peer, Router), + config: Config? = null + ): Scout> { + ZenohLoad + val handler = ChannelHandler(channel) + return JNIScout.scout( + whatAmI = whatAmI, + callback = { hello -> handler.handle(hello) }, + receiver = handler.receiver(), + config = config + ) + } +} + /** * Static singleton class to load the Zenoh native library once and only once, as well as the logger in function of the * log level configuration. */ -internal expect object ZenohLoad \ No newline at end of file +internal expect object ZenohLoad diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQueryable.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQueryable.kt index f17df8689..ff0e33c75 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQueryable.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIQueryable.kt @@ -15,7 +15,7 @@ package io.zenoh.jni /** - * Adapter class to handle the interactions with Zenoh through JNI for a [Queryable] + * Adapter class to handle the interactions with Zenoh through JNI for a [io.zenoh.queryable.Queryable] * * @property ptr: raw pointer to the underlying native Queryable. */ diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIScout.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIScout.kt new file mode 100644 index 000000000..67b75cab4 --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIScout.kt @@ -0,0 +1,66 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.jni + +import io.zenoh.Config +import io.zenoh.handlers.Callback +import io.zenoh.jni.callbacks.JNIScoutCallback +import io.zenoh.protocol.ZenohID +import io.zenoh.scouting.Hello +import io.zenoh.scouting.Scout +import io.zenoh.scouting.WhatAmI + +/** + * Adapter class to handle the interactions with Zenoh through JNI for a [io.zenoh.scouting.Scout] + * + * @property ptr: raw pointer to the underlying native scout. + */ +class JNIScout(private val ptr: Long) { + + companion object { + fun scout( + whatAmI: Set, + callback: Callback, + config: Config?, + receiver: R + ): Scout { + val scoutCallback = JNIScoutCallback { whatAmI2: Int, id: String, locators: List -> + callback.run(Hello(WhatAmI.fromInt(whatAmI2), ZenohID(id), locators)) + } + val binaryWhatAmI: Int = whatAmI.map { it.value }.reduce { acc, it -> acc or it } + val ptr = scoutViaJNI( + binaryWhatAmI, scoutCallback, config?.config, config?.format?.ordinal ?: 0, + config?.path?.toString() + ) + return Scout(receiver, JNIScout(ptr)) + } + + @Throws(Exception::class) + private external fun scoutViaJNI( + whatAmI: Int, + callback: JNIScoutCallback, + config: String?, + format: Int, + path: String? + ): Long + + @Throws(Exception::class) + external fun freePtrViaJNI(ptr: Long) + } + + fun close() { + freePtrViaJNI(ptr) + } +} diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIScoutCallback.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIScoutCallback.kt new file mode 100644 index 000000000..795034a86 --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIScoutCallback.kt @@ -0,0 +1,20 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.jni.callbacks + +internal fun interface JNIScoutCallback { + + fun run(whatAmI: Int, zid: String, locators: List) +} diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/protocol/ZenohID.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/protocol/ZenohID.kt index 953fb226a..855e4f2b8 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/protocol/ZenohID.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/protocol/ZenohID.kt @@ -17,4 +17,4 @@ package io.zenoh.protocol /** * The global unique id of a Zenoh peer. */ -class ZenohID(val id: String) +data class ZenohID(val id: String) diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/scouting/Hello.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/scouting/Hello.kt new file mode 100644 index 000000000..c3cbe20b8 --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/scouting/Hello.kt @@ -0,0 +1,28 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.scouting + +import io.zenoh.ZenohType +import io.zenoh.protocol.ZenohID + +/** + * Hello message received while scouting. + * + * @property whatAmI [WhatAmI] configuration: it indicates the role of the zenoh node sending the HELLO message. + * @property zid [ZenohID] of the node sending the hello message. + * @property locators The locators of this hello message. + * @see Scout + */ +data class Hello(val whatAmI: WhatAmI, val zid: ZenohID, val locators: List): ZenohType diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/scouting/Scout.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/scouting/Scout.kt new file mode 100644 index 000000000..e7f311790 --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/scouting/Scout.kt @@ -0,0 +1,51 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.scouting + +import io.zenoh.jni.JNIScout + +/** + * Scout for routers and/or peers. + * + * Scout spawns a task that periodically sends scout messages and waits for Hello replies. + * Drop the returned Scout to stop the scouting task. + * + * @param R The receiver type. + * @param receiver Receiver to handle incoming hello messages. + */ +class Scout internal constructor( + val receiver: R, + private var jniScout: JNIScout? +) : AutoCloseable { + + /** + * Stops the scouting. + */ + fun stop() { + jniScout?.close() + jniScout = null + } + + /** + * Equivalent to [stop]. + */ + override fun close() { + stop() + } + + protected fun finalize() { + stop() + } +} diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/scouting/WhatAmI.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/scouting/WhatAmI.kt new file mode 100644 index 000000000..6296ee7bb --- /dev/null +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/scouting/WhatAmI.kt @@ -0,0 +1,30 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.scouting + +/** + * WhatAmI + * + * The role of the node sending the `hello` message. + */ +enum class WhatAmI(internal val value: Int) { + Router(1), + Peer(2), + Client(4); + + companion object { + internal fun fromInt(value: Int) = entries.first { value == it.value } + } +}