Skip to content

Commit

Permalink
Session info (#230)
Browse files Browse the repository at this point in the history
* Session info: creating SessionInfo, implementing peersZid() and routersZid()

* Session info: adding id() function, returning Results.

* Session info: adding kdocs + cargo clippy

* Session info: renaming functions
  • Loading branch information
DariusIMP authored Sep 19, 2024
1 parent dbc6d2b commit d443d9e
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 2 deletions.
1 change: 1 addition & 0 deletions examples/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tasks {
"ZBytes",
"ZDelete",
"ZGet",
"ZInfo",
"ZPub",
"ZPubThr",
"ZPut",
Expand Down
60 changes: 60 additions & 0 deletions examples/src/main/kotlin/io.zenoh/ZInfo.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh

import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*

class ZInfo(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Info example"
) {
override fun run() {
val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode)

Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
session.use {
val info = session.info()
println("zid: ${info.zid().getOrThrow()}")

println("routers zid: ${info.routersZid().getOrThrow()}")

println("peers zid: ${info.peersZid().getOrThrow()}")
}
}.onFailure { exception -> println(exception.message) }
}


private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config")
private val connect: List<String> by option(
"-e", "--connect", help = "Endpoints to connect to.", metavar = "connect"
).multiple()
private val listen: List<String> by option(
"-l", "--listen", help = "Endpoints to listen on.", metavar = "listen"
).multiple()
private val mode by option(
"-m",
"--mode",
help = "The session mode. Default: peer. Possible values: [peer, client, router]",
metavar = "mode"
).default("peer")
private val noMulticastScouting: Boolean by option(
"--no-multicast-scouting", help = "Disable the multicast-based scouting mechanism."
).flag(default = false)
}

fun main(args: Array<String>) = ZInfo(args.isEmpty()).main(args)
83 changes: 81 additions & 2 deletions zenoh-jni/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::{mem, ops::Deref, ptr::null, sync::Arc, time::Duration};

use jni::{
objects::{GlobalRef, JByteArray, JClass, JObject, JString, JValue},
sys::{jboolean, jint, jlong},
objects::{GlobalRef, JByteArray, JClass, JList, JObject, JString, JValue},
sys::{jboolean, jbyteArray, jint, jlong, jobject},
JNIEnv,
};
use zenoh::{
Expand Down Expand Up @@ -1036,3 +1036,82 @@ fn on_reply_error(
};
result
}

/// Returns a list of zenoh ids as byte arrays corresponding to the peers connected to the session provided.
///
#[no_mangle]
#[allow(non_snake_case)]
pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getPeersZidViaJNI(
mut env: JNIEnv,
_class: JClass,
session_ptr: *const Session,
) -> jobject {
let session = Arc::from_raw(session_ptr);
let ids = {
let peers_zid = session.info().peers_zid().wait();
let ids = peers_zid.collect::<Vec<ZenohId>>();
ids_to_java_list(&mut env, ids).map_err(|err| jni_error!(err))
}
.unwrap_or_else(|err| {
throw_exception!(env, err);
JObject::default().as_raw()
});
std::mem::forget(session);
ids
}

/// Returns a list of zenoh ids as byte arrays corresponding to the routers connected to the session provided.
///
#[no_mangle]
#[allow(non_snake_case)]
pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getRoutersZidViaJNI(
mut env: JNIEnv,
_class: JClass,
session_ptr: *const Session,
) -> jobject {
let session = Arc::from_raw(session_ptr);
let ids = {
let peers_zid = session.info().routers_zid().wait();
let ids = peers_zid.collect::<Vec<ZenohId>>();
ids_to_java_list(&mut env, ids).map_err(|err| jni_error!(err))
}
.unwrap_or_else(|err| {
throw_exception!(env, err);
JObject::default().as_raw()
});
std::mem::forget(session);
ids
}

/// Returns the Zenoh ID as a byte array of the session.
#[no_mangle]
#[allow(non_snake_case)]
pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getZidViaJNI(
mut env: JNIEnv,
_class: JClass,
session_ptr: *const Session,
) -> jbyteArray {
let session = Arc::from_raw(session_ptr);
let ids = {
let zid = session.info().zid().wait();
env.byte_array_from_slice(&zid.to_le_bytes())
.map(|x| x.as_raw())
.map_err(|err| jni_error!(err))
}
.unwrap_or_else(|err| {
throw_exception!(env, err);
JByteArray::default().as_raw()
});
std::mem::forget(session);
ids
}

fn ids_to_java_list(env: &mut JNIEnv, ids: Vec<ZenohId>) -> jni::errors::Result<jobject> {
let array_list = env.new_object("java/util/ArrayList", "()V", &[])?;
let jlist = JList::from_env(env, &array_list)?;
for id in ids {
let value = &mut env.byte_array_from_slice(&id.to_le_bytes())?;
jlist.add(env, value)?;
}
Ok(array_list.as_raw())
}
17 changes: 17 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.zenoh.prelude.Encoding
import io.zenoh.prelude.QoS
import io.zenoh.protocol.IntoZBytes
import io.zenoh.protocol.ZBytes
import io.zenoh.protocol.ZenohID
import io.zenoh.publication.Delete
import io.zenoh.publication.Publisher
import io.zenoh.publication.Put
Expand Down Expand Up @@ -802,6 +803,10 @@ class Session private constructor(private val config: Config) : AutoCloseable {
return jniSession != null
}

fun info(): SessionInfo {
return SessionInfo(this)
}

private fun resolvePublisher(keyExpr: KeyExpr, qos: QoS, reliability: Reliability): Result<Publisher> {
return jniSession?.run {
declarePublisher(keyExpr, qos, reliability).onSuccess { declarations.add(it) }
Expand Down Expand Up @@ -867,6 +872,18 @@ class Session private constructor(private val config: Config) : AutoCloseable {
jniSession?.run { performDelete(keyExpr, delete) }
}

internal fun zid(): Result<ZenohID> {
return jniSession?.zid() ?: Result.failure(sessionClosedException)
}

internal fun getPeersId(): Result<List<ZenohID>> {
return jniSession?.peersZid() ?: Result.failure(sessionClosedException)
}

internal fun getRoutersId(): Result<List<ZenohID>> {
return jniSession?.routersZid() ?: Result.failure(sessionClosedException)
}

/** Launches the session through the jni session, returning the [Session] on success. */
private fun launch(): Result<Session> = runCatching {
jniSession = JNISession()
Expand Down
44 changes: 44 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/SessionInfo.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh

import io.zenoh.protocol.ZenohID

/**
* Class allowing to obtain the information of a [Session].
*/
class SessionInfo(private val session: Session) {

/**
* Return the [ZenohID] of the current Zenoh [Session]
*/
fun zid(): Result<ZenohID> {
return session.zid()
}

/**
* Return the [ZenohID] of the zenoh peers the session is currently connected to.
*/
fun peersZid(): Result<List<ZenohID>> {
return session.getPeersId()
}

/**
* Return the [ZenohID] of the zenoh routers the session is currently connected to.
*/
fun routersZid(): Result<List<ZenohID>> {
return session.getRoutersId()
}
}
21 changes: 21 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,27 @@ internal class JNISession {
)
}

fun zid(): Result<ZenohID> = runCatching {
ZenohID(getZidViaJNI(sessionPtr.get()))
}

fun peersZid(): Result<List<ZenohID>> = runCatching {
getPeersZidViaJNI(sessionPtr.get()).map { ZenohID(it) }
}

fun routersZid(): Result<List<ZenohID>> = runCatching {
getRoutersZidViaJNI(sessionPtr.get()).map { ZenohID(it) }
}

@Throws(Exception::class)
private external fun getZidViaJNI(ptr: Long): ByteArray

@Throws(Exception::class)
private external fun getPeersZidViaJNI(ptr: Long): List<ByteArray>

@Throws(Exception::class)
private external fun getRoutersZidViaJNI(ptr: Long): List<ByteArray>

@Throws(Exception::class)
private external fun openSessionViaJNI(configPtr: Long): Long

Expand Down
109 changes: 109 additions & 0 deletions zenoh-kotlin/src/commonTest/kotlin/io/zenoh/SessionInfoTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh

import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue

class SessionInfoTest {

@Test
fun `peersZid test`() {
val jsonConfig = """
{
mode: "peer",
connect: {
endpoints: ["tcp/localhost:7450"],
},
}
""".trimIndent()

val listenConfig = Config.fromJson("""
{
mode: "peer",
listen: {
endpoints: ["tcp/localhost:7450"],
},
}
""".trimIndent()).getOrThrow()

val sessionC = Zenoh.open(listenConfig).getOrThrow()
val sessionA = Zenoh.open(Config.fromJson(jsonConfig).getOrThrow()).getOrThrow()
val sessionB = Zenoh.open(Config.fromJson(jsonConfig).getOrThrow()).getOrThrow()

val idA = sessionA.info().zid().getOrThrow()
val idB = sessionB.info().zid().getOrThrow()
val peers = sessionC.info().peersZid().getOrThrow()
assertTrue(peers.contains(idA))
assertTrue(peers.contains(idB))

sessionA.close()
sessionB.close()
sessionC.close()
}


@Test
fun `routersZid test`() {
val jsonConfig = """
{
mode: "router",
connect: {
endpoints: ["tcp/localhost:7450"],
},
listen: {
endpoints: ["tcp/localhost:7452"],
},
}
""".trimIndent()

val listenConfig = Config.fromJson("""
{
mode: "router",
listen: {
endpoints: ["tcp/localhost:7450"],
},
}
""".trimIndent()).getOrThrow()

val sessionC = Zenoh.open(listenConfig).getOrThrow()
val sessionA = Zenoh.open(Config.fromJson(jsonConfig).getOrThrow()).getOrThrow()
val sessionB = Zenoh.open(Config.fromJson(jsonConfig).getOrThrow()).getOrThrow()

val idA = sessionA.info().zid().getOrThrow()
val idB = sessionB.info().zid().getOrThrow()
val routers = sessionC.info().routersZid().getOrThrow()
assertTrue(routers.contains(idA))
assertTrue(routers.contains(idB))

sessionA.close()
sessionB.close()
sessionC.close()
}

@Test
fun `zid test`() {
val jsonConfig = """
{
id: "123456",
}
""".trimIndent()

val session = Zenoh.open(Config.fromJson(jsonConfig).getOrThrow()).getOrThrow()
assertEquals("123456", session.info().zid().getOrThrow().toString())
session.close()
}
}

0 comments on commit d443d9e

Please sign in to comment.