From 82f86f12f3e95abdac920b4b0cef445e5449513a Mon Sep 17 00:00:00 2001 From: Darius Maitia Date: Fri, 29 Nov 2024 17:01:32 -0300 Subject: [PATCH 1/2] Examples - Adding ping / pong examples --- examples/build.gradle.kts | 2 + .../src/main/kotlin/io.zenoh/ZLiveliness.kt | 4 +- examples/src/main/kotlin/io.zenoh/ZPing.kt | 113 ++++++++++++++++++ examples/src/main/kotlin/io.zenoh/ZPong.kt | 66 ++++++++++ 4 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 examples/src/main/kotlin/io.zenoh/ZPing.kt create mode 100644 examples/src/main/kotlin/io.zenoh/ZPong.kt diff --git a/examples/build.gradle.kts b/examples/build.gradle.kts index 08f8eb398..c21d638c0 100644 --- a/examples/build.gradle.kts +++ b/examples/build.gradle.kts @@ -37,6 +37,8 @@ tasks { "ZGetLiveliness", "ZInfo", "ZLiveliness", + "ZPing", + "ZPong", "ZPub", "ZPubThr", "ZPut", diff --git a/examples/src/main/kotlin/io.zenoh/ZLiveliness.kt b/examples/src/main/kotlin/io.zenoh/ZLiveliness.kt index 43c54f9a5..87b6e2b1e 100644 --- a/examples/src/main/kotlin/io.zenoh/ZLiveliness.kt +++ b/examples/src/main/kotlin/io.zenoh/ZLiveliness.kt @@ -40,8 +40,8 @@ class ZLiveliness(private val emptyArgs: Boolean) : CliktCommand( private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config") private val key by option( - "-k", "--key", help = "The key expression to subscribe to [default: group1/**]", metavar = "key" - ).default("group1/**") + "-k", "--key", help = "The key expression to subscribe to [default: group1/zenoh-kotlin]", metavar = "key" + ).default("group1/zenoh-kotlin") private val connect: List by option( "-e", "--connect", help = "Endpoints to connect to.", metavar = "connect" ).multiple() diff --git a/examples/src/main/kotlin/io.zenoh/ZPing.kt b/examples/src/main/kotlin/io.zenoh/ZPing.kt new file mode 100644 index 000000000..4944b4fcd --- /dev/null +++ b/examples/src/main/kotlin/io.zenoh/ZPing.kt @@ -0,0 +1,113 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.arguments.default +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.double +import com.github.ajalt.clikt.parameters.types.int +import io.zenoh.bytes.ZBytes +import io.zenoh.keyexpr.intoKeyExpr +import io.zenoh.qos.CongestionControl +import io.zenoh.qos.QoS +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking + +class ZPing(private val emptyArgs: Boolean) : CliktCommand( + help = "Zenoh Ping example" +) { + override fun run() { + val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode) + + Zenoh.initLogFromEnvOr("error") + + println("Opening session...") + val session = Zenoh.open(config).getOrThrow() + val keyExprPing = "test/ping".intoKeyExpr().getOrThrow() + val keyExprPong = "test/pong".intoKeyExpr().getOrThrow() + + val sub = session.declareSubscriber(keyExprPong, Channel()).getOrThrow() + val publisher = session.declarePublisher(keyExprPing, qos = QoS(CongestionControl.BLOCK, express = !noExpress)).getOrThrow() + + val data = ByteArray(payloadSize) + for (i in 0..() + + // -- warmup -- + println("Warming up for $warmup...") + val startTime = System.currentTimeMillis() + while (System.currentTimeMillis() - startTime < warmup) { + publisher.put(payload).getOrThrow() + runBlocking { sub.receiver.receive() } + } + + for (x in 0..n ) { + val writeTime = System.nanoTime() + publisher.put(payload).getOrThrow() + runBlocking { sub.receiver.receive() } + val ts = (System.nanoTime() - writeTime) / 1_000 //convert to microseconds + samples.add(ts) + } + + for (x in samples.withIndex()) { + println("$payloadSize bytes: seq=${x.index} rtt=${x.value}µs lat=${x.value / 2}µs") + } + } + + + private val payloadSize by argument( + "payload_size", + help = "Sets the size of the payload to publish [Default: 8]" + ).int().default(8) + private val noExpress: Boolean by option( + "--no-express", help = "Express for sending data." + ).flag(default = false) + private val warmup: Double by option( + "-w", + "--warmup", + metavar = "warmup", + help = "The number of seconds to warm up (double) [default: 1]" + ).double().default(1.0) + private val n: Int by option( + "-n", + "--samples", + metavar = "samples", + help = "The number of round-trips to measure [default: 100]" + ).int().default(100) + + private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config") + private val connect: List by option( + "-e", "--connect", help = "Endpoints to connect to.", metavar = "connect" + ).multiple() + private val listen: List by option( + "-l", "--listen", help = "Endpoints to listen on.", metavar = "listen" + ).multiple() + private val mode by option( + "-m", + "--mode", + help = "The session mode. Default: peer. Possible values: [peer, client, router]", + metavar = "mode" + ).default("peer") + private val noMulticastScouting: Boolean by option( + "--no-multicast-scouting", help = "Disable the multicast-based scouting mechanism." + ).flag(default = false) +} + +fun main(args: Array) = ZPing(args.isEmpty()).main(args) diff --git a/examples/src/main/kotlin/io.zenoh/ZPong.kt b/examples/src/main/kotlin/io.zenoh/ZPong.kt new file mode 100644 index 000000000..fbd73f5fc --- /dev/null +++ b/examples/src/main/kotlin/io.zenoh/ZPong.kt @@ -0,0 +1,66 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.* +import io.zenoh.keyexpr.intoKeyExpr +import io.zenoh.qos.CongestionControl +import io.zenoh.qos.QoS +import io.zenoh.sample.Sample + +class ZPong(private val emptyArgs: Boolean) : CliktCommand( + help = "Zenoh ZPong example" +) { + override fun run() { + val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode) + + Zenoh.initLogFromEnvOr("error") + + println("Opening session...") + val session = Zenoh.open(config).getOrThrow() + val keyExprPing = "test/ping".intoKeyExpr().getOrThrow() + val keyExprPong = "test/pong".intoKeyExpr().getOrThrow() + + val publisher = session.declarePublisher(keyExprPong, qos = QoS(CongestionControl.BLOCK, express = !noExpress)).getOrThrow() + session.declareSubscriber(keyExprPing, callback = { sample: Sample -> publisher.put(sample.payload).getOrThrow() }).getOrThrow() + + while (true) { Thread.sleep(1000)} + } + + + private val noExpress: Boolean by option( + "--no-express", help = "Express for sending data." + ).flag(default = false) + + private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config") + private val connect: List by option( + "-e", "--connect", help = "Endpoints to connect to.", metavar = "connect" + ).multiple() + private val listen: List by option( + "-l", "--listen", help = "Endpoints to listen on.", metavar = "listen" + ).multiple() + private val mode by option( + "-m", + "--mode", + help = "The session mode. Default: peer. Possible values: [peer, client, router]", + metavar = "mode" + ).default("peer") + private val noMulticastScouting: Boolean by option( + "--no-multicast-scouting", help = "Disable the multicast-based scouting mechanism." + ).flag(default = false) +} + +fun main(args: Array) = ZPong(args.isEmpty()).main(args) From db637e6aa9488f1c34ce23c18492d72edd38ee1c Mon Sep 17 00:00:00 2001 From: Darius Maitia Date: Fri, 29 Nov 2024 18:00:20 -0300 Subject: [PATCH 2/2] Examples - using countdown latch on ZPong --- examples/src/main/kotlin/io.zenoh/ZPong.kt | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/src/main/kotlin/io.zenoh/ZPong.kt b/examples/src/main/kotlin/io.zenoh/ZPong.kt index fbd73f5fc..9525c362f 100644 --- a/examples/src/main/kotlin/io.zenoh/ZPong.kt +++ b/examples/src/main/kotlin/io.zenoh/ZPong.kt @@ -20,10 +20,13 @@ import io.zenoh.keyexpr.intoKeyExpr import io.zenoh.qos.CongestionControl import io.zenoh.qos.QoS import io.zenoh.sample.Sample +import java.util.concurrent.CountDownLatch class ZPong(private val emptyArgs: Boolean) : CliktCommand( help = "Zenoh ZPong example" ) { + val latch = CountDownLatch(1) + override fun run() { val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode) @@ -37,7 +40,7 @@ class ZPong(private val emptyArgs: Boolean) : CliktCommand( val publisher = session.declarePublisher(keyExprPong, qos = QoS(CongestionControl.BLOCK, express = !noExpress)).getOrThrow() session.declareSubscriber(keyExprPing, callback = { sample: Sample -> publisher.put(sample.payload).getOrThrow() }).getOrThrow() - while (true) { Thread.sleep(1000)} + latch.await() } @@ -63,4 +66,7 @@ class ZPong(private val emptyArgs: Boolean) : CliktCommand( ).flag(default = false) } -fun main(args: Array) = ZPong(args.isEmpty()).main(args) +fun main(args: Array) { + val zPong = ZPong(args.isEmpty()) + zPong.main(args) +}