Skip to content

Commit

Permalink
Scouting (#171)
Browse files Browse the repository at this point in the history
* feat(scouting): scouting - wip

* feat(scouting): scouting - enabling 'whatAmI' parameter

* feature(scouting): enabling configuration parameter

* feature(scouting): Adding kdocs

* feature(scouting): small comment fixes

* cargo fmt

* feat(scouting): revert config param made optional on session.open()
  • Loading branch information
DariusIMP authored Aug 29, 2024
1 parent 6229300 commit a2ebeb2
Show file tree
Hide file tree
Showing 15 changed files with 459 additions and 11 deletions.
1 change: 1 addition & 0 deletions examples/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tasks {
"ZPubThr",
"ZPut",
"ZQueryable",
"ZScout",
"ZSub",
"ZSubThr"
)
Expand Down
40 changes: 40 additions & 0 deletions examples/src/main/kotlin/io.zenoh/ZScout.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh

import com.github.ajalt.clikt.core.CliktCommand
import io.zenoh.scouting.WhatAmI
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking

class ZScout : CliktCommand(
help = "Zenoh Scouting example"
) {
override fun run() {

println("Scouting...")

val scout = Zenoh.scout(channel = Channel(), whatAmI = setOf(WhatAmI.Peer, WhatAmI.Router))
runBlocking {
for (hello in scout.receiver) {
println(hello)
}
}

scout.stop()
}
}

fun main(args: Array<String>) = ZScout().main(args)
4 changes: 2 additions & 2 deletions zenoh-jni/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ macro_rules! jni_error {
$crate::errors::Error::Jni($arg.to_string())
};
($fmt:expr, $($arg:tt)*) => {
Error::Jni(format!($fmt, $($arg)*))
$crate::errors::Error::Jni(format!($fmt, $($arg)*))
};
}

Expand All @@ -41,7 +41,7 @@ macro_rules! session_error {
$crate::errors::Error::Session($arg.to_string())
};
($fmt:expr, $($arg:tt)*) => {
Error::Session(format!($fmt, $($arg)*))
$crate::errors::Error::Session(format!($fmt, $($arg)*))
};

}
Expand Down
1 change: 1 addition & 0 deletions zenoh-jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod logger;
mod publisher;
mod query;
mod queryable;
mod scouting;
mod session;
mod subscriber;
mod utils;
Expand Down
130 changes: 130 additions & 0 deletions zenoh-jni/src/scouting.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{ptr::null, sync::Arc};

use jni::{
objects::{JClass, JList, JObject, JString, JValue},
sys::jint,
JNIEnv,
};
use zenoh::{config::WhatAmIMatcher, prelude::Wait};
use zenoh::{scouting::Scout, Config};

use crate::{errors::Result, throw_exception, utils::decode_string};
use crate::{
session_error,
utils::{get_callback_global_ref, get_java_vm},
};

/// Start a scout.
///
/// # Params
/// - `whatAmI`: Ordinal value of the WhatAmI enum.
/// - `callback`: Callback to be executed whenever a hello message is received.
/// - `config_string`: Optional embedded configuration as a string.
/// - `format`: format of the `config_string` param.
/// - `config_path`: Optional path to a config file.
///
/// Note: Either the config_string or the config_path or None can be provided.
/// If none is provided, then the default configuration is loaded. Otherwise
/// it's the config_string or the config_path that are loaded. This consistency
/// logic is granted by the kotlin layer.
///
/// Returns a pointer to the scout, which must be freed afterwards.
/// If starting the scout fails, an exception is thrown on the JVM, and a null pointer is returned.
///
#[no_mangle]
#[allow(non_snake_case)]
pub unsafe extern "C" fn Java_io_zenoh_jni_JNIScout_00024Companion_scoutViaJNI(
mut env: JNIEnv,
_class: JClass,
whatAmI: jint,
callback: JObject,
config_string: /*nullable=*/ JString,
format: jint,
config_path: /*nullable=*/ JString,
) -> *const Scout<()> {
|| -> Result<*const Scout<()>> {
let callback_global_ref = get_callback_global_ref(&mut env, callback)?;
let java_vm = Arc::new(get_java_vm(&mut env)?);
let whatAmIMatcher: WhatAmIMatcher = (whatAmI as u8).try_into().unwrap(); // The validity of the operation is guaranteed on the kotlin layer.
let config = if config_string.is_null() && config_path.is_null() {
Config::default()
} else if !config_string.is_null() {
let string_config = decode_string(&mut env, &config_string)?;
match format {
0 /*YAML*/ => {
let deserializer = serde_yaml::Deserializer::from_str(&string_config);
Config::from_deserializer(deserializer).map_err(|err| match err {
Ok(c) => session_error!("Invalid configuration: {}", c),
Err(e) => session_error!("YAML error: {}", e),
})?
}
1 | 2 /*JSON | JSON5*/ => {
let mut deserializer =
json5::Deserializer::from_str(&string_config).map_err(|err| session_error!(err))?;
Config::from_deserializer(&mut deserializer).map_err(|err| match err {
Ok(c) => session_error!("Invalid configuration: {}", c),
Err(e) => session_error!("JSON error: {}", e),
})?
}
_ => {
// This can never happen unless the Config.Format enum on Kotlin is wrongly modified!
Err(session_error!("Unexpected error: attempting to decode a config with a format other than Json,
Json5 or Yaml. Check Config.Format for eventual modifications..."))?
}
}
} else {
let config_file_path = decode_string(&mut env, &config_path)?;
Config::from_file(config_file_path).map_err(|err| session_error!(err))?
};
zenoh::scout(whatAmIMatcher, config)
.callback(move |hello| {
tracing::debug!("Received hello: {hello}");
let _ = || -> jni::errors::Result<()> {
let mut env = java_vm.attach_current_thread_as_daemon()?;
let whatami = hello.whatami() as jint;
let zenohid = env.new_string(hello.zid().to_string())?;
let locators = env
.new_object("java/util/ArrayList", "()V", &[])
.map(|it| env.auto_local(it))?;
let jlist = JList::from_env(&mut env, &locators)?;
for value in hello.locators() {
let locator = env.new_string(value.as_str())?;
jlist.add(&mut env, &locator)?;
}
env.call_method(
&callback_global_ref,
"run",
"(ILjava/lang/String;Ljava/util/List;)V",
&[
JValue::from(whatami),
JValue::from(&zenohid),
JValue::from(&locators),
],
)?;
Ok(())
}()
.map_err(|err| tracing::error!("Error while scouting: ${err}"));
})
.wait()
.map(|scout| Arc::into_raw(Arc::new(scout)))
.map_err(|err| session_error!(err))
}()
.unwrap_or_else(|err| {
throw_exception!(env, err);
null()
})
}
4 changes: 2 additions & 2 deletions zenoh-jni/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use crate::errors::{Error, Result};
use crate::errors::Result;
use crate::key_expr::process_kotlin_key_expr;
use crate::{jni_error, utils::*};
use crate::{session_error, throw_exception};
Expand Down Expand Up @@ -165,7 +165,7 @@ fn open_session_with_yaml_config(env: &mut JNIEnv, yaml_config: JString) -> Resu
let deserializer = serde_yaml::Deserializer::from_str(&yaml_config);
let config = Config::from_deserializer(deserializer).map_err(|err| match err {
Ok(c) => session_error!("Invalid configuration: {}", c),
Err(e) => session_error!("JSON error: {}", e),
Err(e) => session_error!("YAML error: {}", e),
})?;
zenoh::open(config)
.wait()
Expand Down
5 changes: 1 addition & 4 deletions zenoh-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@

use std::sync::Arc;

use crate::{
errors::{Error, Result},
jni_error, session_error, throw_exception,
};
use crate::{errors::Result, jni_error, session_error, throw_exception};
use jni::{
objects::{JByteArray, JObject, JString},
sys::jint,
Expand Down
86 changes: 85 additions & 1 deletion zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Zenoh.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,92 @@

package io.zenoh

import io.zenoh.handlers.Callback
import io.zenoh.handlers.ChannelHandler
import io.zenoh.handlers.Handler
import io.zenoh.jni.JNIScout
import io.zenoh.scouting.Hello
import io.zenoh.scouting.Scout
import io.zenoh.scouting.WhatAmI
import io.zenoh.scouting.WhatAmI.*
import kotlinx.coroutines.channels.Channel

object Zenoh {

/**
* Scout for routers and/or peers.
*
* Scout spawns a task that periodically sends scout messages and waits for Hello replies.
* Drop the returned Scout to stop the scouting task or explicitly call [Scout.stop] or [Scout.close].
*
* @param callback [Callback] to be run when receiving a [Hello] message.
* @param whatAmI [WhatAmI] configuration: it indicates the role of the zenoh node sending the HELLO message.
* @param config Optional [Config] for the scout.
* @return A [Scout] object.
*/
fun scout(
callback: Callback<Hello>,
whatAmI: Set<WhatAmI> = setOf(Peer, Router),
config: Config? = null
): Scout<Unit> {
ZenohLoad
return JNIScout.scout(whatAmI = whatAmI, callback = callback, receiver = Unit, config = config)
}

/**
* Scout for routers and/or peers.
*
* Scout spawns a task that periodically sends scout messages and waits for Hello replies.
* Drop the returned Scout to stop the scouting task or explicitly call [Scout.stop] or [Scout.close].
*
* @param handler [Handler] to handle incoming [Hello] messages.
* @param whatAmI [WhatAmI] configuration: it indicates the role of the zenoh node sending the HELLO message.
* @param config Optional [Config] for the scout.
* @return A [Scout] object.
*/
fun <R> scout(
handler: Handler<Hello, R>,
whatAmI: Set<WhatAmI> = setOf(Peer, Router),
config: Config? = null
): Scout<R> {
ZenohLoad
return JNIScout.scout(
whatAmI = whatAmI,
callback = { hello -> handler.handle(hello) },
receiver = handler.receiver(),
config = config
)
}

/**
* Scout for routers and/or peers.
*
* Scout spawns a task that periodically sends scout messages and waits for Hello replies.
* Drop the returned Scout to stop the scouting task or explicitly call [Scout.stop] or [Scout.close].
*
* @param channel [Channel] upon which the incoming [Hello] messages will be piped.
* @param whatAmI [WhatAmI] configuration: it indicates the role of the zenoh node sending the HELLO message.
* @param config Optional [Config] for the scout.
* @return A [Scout] object.
*/
fun scout(
channel: Channel<Hello>,
whatAmI: Set<WhatAmI> = setOf(Peer, Router),
config: Config? = null
): Scout<Channel<Hello>> {
ZenohLoad
val handler = ChannelHandler(channel)
return JNIScout.scout(
whatAmI = whatAmI,
callback = { hello -> handler.handle(hello) },
receiver = handler.receiver(),
config = config
)
}
}

/**
* Static singleton class to load the Zenoh native library once and only once, as well as the logger in function of the
* log level configuration.
*/
internal expect object ZenohLoad
internal expect object ZenohLoad
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package io.zenoh.jni

/**
* Adapter class to handle the interactions with Zenoh through JNI for a [Queryable]
* Adapter class to handle the interactions with Zenoh through JNI for a [io.zenoh.queryable.Queryable]
*
* @property ptr: raw pointer to the underlying native Queryable.
*/
Expand Down
Loading

0 comments on commit a2ebeb2

Please sign in to comment.