From bde92f75a062957e173e250c71edc11894f7c4b7 Mon Sep 17 00:00:00 2001 From: Darius Maitia Date: Mon, 30 Dec 2024 16:20:19 -0300 Subject: [PATCH] refactor(examples): refactoring exmaples - reducing nesting - showing alternative implementations (with channel, callback and handlers) --- examples/src/main/kotlin/io.zenoh/ZDelete.kt | 11 +-- examples/src/main/kotlin/io.zenoh/ZGet.kt | 64 ++++++++----- .../main/kotlin/io.zenoh/ZGetLiveliness.kt | 67 ++++++++++---- examples/src/main/kotlin/io.zenoh/ZInfo.kt | 16 ++-- .../src/main/kotlin/io.zenoh/ZLiveliness.kt | 16 ++-- examples/src/main/kotlin/io.zenoh/ZPong.kt | 2 +- examples/src/main/kotlin/io.zenoh/ZPub.kt | 48 +++++----- examples/src/main/kotlin/io.zenoh/ZPubThr.kt | 41 ++++----- examples/src/main/kotlin/io.zenoh/ZPut.kt | 17 ++-- examples/src/main/kotlin/io.zenoh/ZQuerier.kt | 6 +- .../src/main/kotlin/io.zenoh/ZQueryable.kt | 92 +++++++++++++++---- examples/src/main/kotlin/io.zenoh/ZScout.kt | 41 +++++++++ examples/src/main/kotlin/io.zenoh/ZSub.kt | 80 ++++++++++++---- .../main/kotlin/io.zenoh/ZSubLiveliness.kt | 28 +++--- 14 files changed, 356 insertions(+), 173 deletions(-) diff --git a/examples/src/main/kotlin/io.zenoh/ZDelete.kt b/examples/src/main/kotlin/io.zenoh/ZDelete.kt index 628a86ab8..e538fb8bb 100644 --- a/examples/src/main/kotlin/io.zenoh/ZDelete.kt +++ b/examples/src/main/kotlin/io.zenoh/ZDelete.kt @@ -27,12 +27,11 @@ class ZDelete(private val emptyArgs: Boolean) : CliktCommand( Zenoh.initLogFromEnvOr("error") println("Opening session...") - Zenoh.open(config).onSuccess { session -> - session.use { - key.intoKeyExpr().onSuccess { keyExpr -> - println("Deleting resources matching '$keyExpr'...") - session.delete(keyExpr) - } + val session = Zenoh.open(config).getOrThrow() + session.use { + key.intoKeyExpr().onSuccess { keyExpr -> + println("Deleting resources matching '$keyExpr'...") + session.delete(keyExpr) } } } diff --git a/examples/src/main/kotlin/io.zenoh/ZGet.kt b/examples/src/main/kotlin/io.zenoh/ZGet.kt index af3330c02..107728e80 100644 --- a/examples/src/main/kotlin/io.zenoh/ZGet.kt +++ b/examples/src/main/kotlin/io.zenoh/ZGet.kt @@ -18,14 +18,15 @@ import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.long import io.zenoh.bytes.ZBytes +import io.zenoh.handlers.Handler import io.zenoh.sample.SampleKind import io.zenoh.query.QueryTarget +import io.zenoh.query.Reply import io.zenoh.query.Selector import io.zenoh.query.intoSelector import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking import java.time.Duration -import java.util.concurrent.CountDownLatch class ZGet(private val emptyArgs: Boolean) : CliktCommand( help = "Zenoh Get example" @@ -39,44 +40,43 @@ class ZGet(private val emptyArgs: Boolean) : CliktCommand( val session = Zenoh.open(config).getOrThrow() val selector = selector.intoSelector().getOrThrow() + // Run the Get query through one of the examples below: runChannelExample(session, selector) -// runCallbackExample(session, selector) + // runCallbackExample(session, selector) + // runHandlerExample(session, selector) session.close() } private fun runChannelExample(session: Session, selector: Selector) { - session.get(selector, + val channelReceiver = session.get(selector, channel = Channel(), payload = payload?.let { ZBytes.from(it) }, target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING, attachment = attachment?.let { ZBytes.from(it) }, timeout = Duration.ofMillis(timeout) - ) - .onSuccess { channelReceiver -> - runBlocking { - for (reply in channelReceiver) { - reply.result.onSuccess { sample -> - when (sample.kind) { - SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')") - SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')") - } - }.onFailure { error -> - println("Received (ERROR: '${error.message}')") - } + ).getOrThrow() + runBlocking { + for (reply in channelReceiver) { + reply.result.onSuccess { sample -> + when (sample.kind) { + SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')") + SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')") } + }.onFailure { error -> + println("Received (ERROR: '${error.message}')") } } + } } private fun runCallbackExample(session: Session, selector: Selector) { - val countDownLatch = CountDownLatch(1) session.get(selector, payload = payload?.let { ZBytes.from(it) }, target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING, attachment = attachment?.let { ZBytes.from(it) }, timeout = Duration.ofMillis(timeout), - callback = {reply -> + callback = { reply -> reply.result.onSuccess { sample -> when (sample.kind) { SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')") @@ -86,12 +86,34 @@ class ZGet(private val emptyArgs: Boolean) : CliktCommand( println("Received (ERROR: '${error.message}')") } }, - onClose = { - countDownLatch.countDown() - } ).getOrThrow() + } + + private fun runHandlerExample(session: Session, selector: Selector) { + // Create your own handler implementation + class ExampleHandler : Handler { + override fun handle(t: Reply) { + t.result.onSuccess { sample -> + when (sample.kind) { + SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')") + SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')") + } + }.onFailure { error -> + println("Received (ERROR: '${error.message}')") + } + } + + override fun receiver() {} + override fun onClose() {} + } - countDownLatch.await() + session.get(selector, + payload = payload?.let { ZBytes.from(it) }, + target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING, + attachment = attachment?.let { ZBytes.from(it) }, + timeout = Duration.ofMillis(timeout), + handler = ExampleHandler(), // Provide a handler instance + ).getOrThrow() } private val selector by option( diff --git a/examples/src/main/kotlin/io.zenoh/ZGetLiveliness.kt b/examples/src/main/kotlin/io.zenoh/ZGetLiveliness.kt index 2e770c15d..e6e8d790f 100644 --- a/examples/src/main/kotlin/io.zenoh/ZGetLiveliness.kt +++ b/examples/src/main/kotlin/io.zenoh/ZGetLiveliness.kt @@ -17,13 +17,16 @@ package io.zenoh import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.long +import io.zenoh.handlers.Handler +import io.zenoh.keyexpr.KeyExpr import io.zenoh.keyexpr.intoKeyExpr +import io.zenoh.query.Reply import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking import java.time.Duration class ZGetLiveliness(private val emptyArgs: Boolean) : CliktCommand( - help = "Zenoh Sub Liveliness example" + help = "Zenoh Get Liveliness example" ) { override fun run() { @@ -32,24 +35,54 @@ class ZGetLiveliness(private val emptyArgs: Boolean) : CliktCommand( Zenoh.initLogFromEnvOr("error") println("Opening session...") - Zenoh.open(config).onSuccess { session -> - session.use { - key.intoKeyExpr().onSuccess { keyExpr -> - session.liveliness().get(keyExpr, channel = Channel(), timeout = Duration.ofMillis(timeout)) - .onSuccess { channel -> - runBlocking { - for (reply in channel) { - reply.result.onSuccess { - println(">> Alive token ('${it.keyExpr}')") - }.onFailure { - println(">> Received (ERROR: '${it.message}')") - } - } - } - } + val session = Zenoh.open(config).getOrThrow() + val keyExpr = key.intoKeyExpr().getOrThrow() + + runChannelExample(session, keyExpr) + + session.close() + } + + private fun runChannelExample(session: Session, keyExpr: KeyExpr) { + val channel = + session.liveliness().get(keyExpr, channel = Channel(), timeout = Duration.ofMillis(timeout)).getOrThrow() + runBlocking { + for (reply in channel) { + reply.result.onSuccess { + println(">> Alive token ('${it.keyExpr}')") + }.onFailure { + println(">> Received (ERROR: '${it.message}')") } } - }.onFailure { exception -> println(exception.message) } + } + } + + private fun runCallbackExample(session: Session, keyExpr: KeyExpr) { + session.liveliness().get(keyExpr, timeout = Duration.ofMillis(timeout), callback = { reply -> + reply.result.onSuccess { + println(">> Alive token ('${it.keyExpr}')") + }.onFailure { + println(">> Received (ERROR: '${it.message}')") + } + }).getOrThrow() + } + + private fun runHandlerExample(session: Session, keyExpr: KeyExpr) { + // Create your own handler implementation + class ExampleHandler : Handler { + override fun handle(t: Reply) { + t.result.onSuccess { + println(">> Alive token ('${it.keyExpr}')") + }.onFailure { + println(">> Received (ERROR: '${it.message}')") + } + } + + override fun receiver() {} + override fun onClose() {} + } + + session.liveliness().get(keyExpr, timeout = Duration.ofMillis(timeout), handler = ExampleHandler()).getOrThrow() } private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config") diff --git a/examples/src/main/kotlin/io.zenoh/ZInfo.kt b/examples/src/main/kotlin/io.zenoh/ZInfo.kt index 1bf3e7ea8..9804eee59 100644 --- a/examples/src/main/kotlin/io.zenoh/ZInfo.kt +++ b/examples/src/main/kotlin/io.zenoh/ZInfo.kt @@ -26,16 +26,12 @@ class ZInfo(private val emptyArgs: Boolean) : CliktCommand( Zenoh.initLogFromEnvOr("error") println("Opening session...") - Zenoh.open(config).onSuccess { session -> - session.use { - val info = session.info() - println("zid: ${info.zid().getOrThrow()}") - - println("routers zid: ${info.routersZid().getOrThrow()}") - - println("peers zid: ${info.peersZid().getOrThrow()}") - } - }.onFailure { exception -> println(exception.message) } + val session = Zenoh.open(config).getOrThrow() + val info = session.info() + println("zid: ${info.zid().getOrThrow()}") + println("routers zid: ${info.routersZid().getOrThrow()}") + println("peers zid: ${info.peersZid().getOrThrow()}") + session.close() } diff --git a/examples/src/main/kotlin/io.zenoh/ZLiveliness.kt b/examples/src/main/kotlin/io.zenoh/ZLiveliness.kt index 87b6e2b1e..8f02c0b14 100644 --- a/examples/src/main/kotlin/io.zenoh/ZLiveliness.kt +++ b/examples/src/main/kotlin/io.zenoh/ZLiveliness.kt @@ -17,6 +17,7 @@ package io.zenoh import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.options.* import io.zenoh.keyexpr.intoKeyExpr +import java.util.concurrent.CountDownLatch class ZLiveliness(private val emptyArgs: Boolean) : CliktCommand( help = "Zenoh Liveliness example" @@ -28,14 +29,13 @@ class ZLiveliness(private val emptyArgs: Boolean) : CliktCommand( Zenoh.initLogFromEnvOr("error") println("Opening session...") - Zenoh.open(config).onSuccess { session -> - key.intoKeyExpr().onSuccess { keyExpr -> - session.liveliness().declareToken(keyExpr) - while (true) { - Thread.sleep(1000) - } - } - }.onFailure { exception -> println(exception.message) } + val session = Zenoh.open(config).getOrThrow() + val keyExpr = key.intoKeyExpr().getOrThrow() + session.liveliness().declareToken(keyExpr) + + CountDownLatch(1).await() // A countdown latch is used here to block execution while the liveliness token + // is declared. Typically, this wouldn't be needed. + session.close() } private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config") diff --git a/examples/src/main/kotlin/io.zenoh/ZPong.kt b/examples/src/main/kotlin/io.zenoh/ZPong.kt index 9525c362f..64a028032 100644 --- a/examples/src/main/kotlin/io.zenoh/ZPong.kt +++ b/examples/src/main/kotlin/io.zenoh/ZPong.kt @@ -25,12 +25,12 @@ import java.util.concurrent.CountDownLatch class ZPong(private val emptyArgs: Boolean) : CliktCommand( help = "Zenoh ZPong example" ) { - val latch = CountDownLatch(1) override fun run() { val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode) Zenoh.initLogFromEnvOr("error") + val latch = CountDownLatch(1) println("Opening session...") val session = Zenoh.open(config).getOrThrow() diff --git a/examples/src/main/kotlin/io.zenoh/ZPub.kt b/examples/src/main/kotlin/io.zenoh/ZPub.kt index ab00354f4..4bca136af 100644 --- a/examples/src/main/kotlin/io.zenoh/ZPub.kt +++ b/examples/src/main/kotlin/io.zenoh/ZPub.kt @@ -28,31 +28,29 @@ class ZPub(private val emptyArgs: Boolean) : CliktCommand( Zenoh.initLogFromEnvOr("error") println("Opening session...") - Zenoh.open(config).onSuccess { session -> - session.use { - key.intoKeyExpr().onSuccess { keyExpr -> - println("Declaring publisher on '$keyExpr'...") - session.declarePublisher(keyExpr).onSuccess { pub -> - println("Press CTRL-C to quit...") - val attachment = attachment?.toByteArray() - var idx = 0 - while (true) { - Thread.sleep(1000) - val payload = "[${ - idx.toString().padStart(4, ' ') - }] $value" - println( - "Putting Data ('$keyExpr': '$payload')..." - ) - attachment?.let { - pub.put(ZBytes.from(payload), attachment = ZBytes.from(it) ) - } ?: let { pub.put(ZBytes.from(payload)) } - idx++ - } - } - } - } - }.onFailure { exception -> println(exception.message) } + val session = Zenoh.open(config).getOrThrow() + val keyExpr = key.intoKeyExpr().getOrThrow() + + println("Declaring publisher on '$keyExpr'...") + val publisher = session.declarePublisher(keyExpr).getOrThrow() + + println("Press CTRL-C to quit...") + val attachment = attachment?.toByteArray() + + var idx = 0 + while (true) { + Thread.sleep(1000) + val payload = "[${ + idx.toString().padStart(4, ' ') + }] $value" + println( + "Putting Data ('$keyExpr': '$payload')..." + ) + attachment?.let { + publisher.put(ZBytes.from(payload), attachment = ZBytes.from(it)) + } ?: let { publisher.put(ZBytes.from(payload)) } + idx++ + } } diff --git a/examples/src/main/kotlin/io.zenoh/ZPubThr.kt b/examples/src/main/kotlin/io.zenoh/ZPubThr.kt index c2342624f..40bc95cc2 100644 --- a/examples/src/main/kotlin/io.zenoh/ZPubThr.kt +++ b/examples/src/main/kotlin/io.zenoh/ZPubThr.kt @@ -47,27 +47,26 @@ class ZPubThr(private val emptyArgs: Boolean) : CliktCommand( priority = priorityInput?.let { Priority.entries[it] } ?: Priority.DATA, ) - Zenoh.open(config).onSuccess { - it.use { session -> - session.declarePublisher("test/thr".intoKeyExpr().getOrThrow(), qos = qos).onSuccess { pub -> - println("Publisher declared on test/thr.") - var count: Long = 0 - var start = System.currentTimeMillis() - val number = number.toLong() - println("Press CTRL-C to quit...") - while (true) { - pub.put(payload).getOrThrow() - if (statsPrint) { - if (count < number) { - count++ - } else { - val throughput = count * 1000 / (System.currentTimeMillis() - start) - println("$throughput msgs/s") - count = 0 - start = System.currentTimeMillis() - } - } - } + val session = Zenoh.open(config).getOrThrow() + val keyExpr = "test/thr".intoKeyExpr().getOrThrow() + val publisher = session.declarePublisher(keyExpr, qos = qos).getOrThrow() + + println("Publisher declared on test/thr.") + var count: Long = 0 + var start = System.currentTimeMillis() + val number = number.toLong() + + println("Press CTRL-C to quit...") + while (true) { + publisher.put(payload).getOrThrow() + if (statsPrint) { + if (count < number) { + count++ + } else { + val throughput = count * 1000 / (System.currentTimeMillis() - start) + println("$throughput msgs/s") + count = 0 + start = System.currentTimeMillis() } } } diff --git a/examples/src/main/kotlin/io.zenoh/ZPut.kt b/examples/src/main/kotlin/io.zenoh/ZPut.kt index 2d498d23e..3fa3b1306 100644 --- a/examples/src/main/kotlin/io.zenoh/ZPut.kt +++ b/examples/src/main/kotlin/io.zenoh/ZPut.kt @@ -29,16 +29,13 @@ class ZPut(private val emptyArgs: Boolean) : CliktCommand( Zenoh.initLogFromEnvOr("error") println("Opening Session...") - Zenoh.open(config).onSuccess { session -> - session.use { - key.intoKeyExpr().onSuccess { keyExpr -> - keyExpr.use { - session.put(keyExpr, ZBytes.from(value), attachment = attachment?.let { ZBytes.from(it) }) - .onSuccess { println("Putting Data ('$keyExpr': '$value')...") } - } - } - } - }.onFailure { println(it.message) } + val session = Zenoh.open(config).getOrThrow() + val keyExpr = key.intoKeyExpr().getOrThrow() + + session.put(keyExpr, ZBytes.from(value), attachment = attachment?.let { ZBytes.from(it) }) + .onSuccess { println("Putting Data ('$keyExpr': '$value')...") }.getOrThrow() + + session.close() } private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config") diff --git a/examples/src/main/kotlin/io.zenoh/ZQuerier.kt b/examples/src/main/kotlin/io.zenoh/ZQuerier.kt index 704f93f87..01d02170c 100644 --- a/examples/src/main/kotlin/io.zenoh/ZQuerier.kt +++ b/examples/src/main/kotlin/io.zenoh/ZQuerier.kt @@ -52,6 +52,8 @@ class ZQuerier(private val emptyArgs: Boolean) : CliktCommand( } }, payload = ZBytes.from(payload), parameters = selector.parameters) } + + session.close() } private val selector by option( @@ -61,12 +63,12 @@ class ZQuerier(private val emptyArgs: Boolean) : CliktCommand( metavar = "selector" ).default("demo/example/**") private val payload by option( - "-p", "--payload", help = "An optional payload to put in the query.", metavar = "payload" + "-p", "--payload", help = "An optional payload to put in the queries.", metavar = "payload" ) private val target by option( "-t", "--target", - help = "The target queryables of the query. Default: BEST_MATCHING. " + "[possible values: BEST_MATCHING, ALL, ALL_COMPLETE]", + help = "The target queryables of the querier. Default: BEST_MATCHING. " + "[possible values: BEST_MATCHING, ALL, ALL_COMPLETE]", metavar = "target" ) private val timeout by option( diff --git a/examples/src/main/kotlin/io.zenoh/ZQueryable.kt b/examples/src/main/kotlin/io.zenoh/ZQueryable.kt index 630bb0e67..bf408f80d 100644 --- a/examples/src/main/kotlin/io.zenoh/ZQueryable.kt +++ b/examples/src/main/kotlin/io.zenoh/ZQueryable.kt @@ -17,10 +17,14 @@ package io.zenoh import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.options.* import io.zenoh.bytes.ZBytes +import io.zenoh.handlers.Handler +import io.zenoh.keyexpr.KeyExpr import io.zenoh.keyexpr.intoKeyExpr +import io.zenoh.query.Query import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking import org.apache.commons.net.ntp.TimeStamp +import java.util.concurrent.CountDownLatch class ZQueryable(private val emptyArgs: Boolean) : CliktCommand( help = "Zenoh Queryable example" @@ -31,26 +35,78 @@ class ZQueryable(private val emptyArgs: Boolean) : CliktCommand( Zenoh.initLogFromEnvOr("error") - Zenoh.open(config).onSuccess { session -> - session.use { - key.intoKeyExpr().onSuccess { keyExpr -> - println("Declaring Queryable on $key...") - session.declareQueryable(keyExpr, Channel()).onSuccess { queryable -> - runBlocking { - for (query in queryable.receiver) { - val valueInfo = query.payload?.let { value -> " with value '$value'" } ?: "" - println(">> [Queryable] Received Query '${query.selector}' $valueInfo") - query.reply( - keyExpr, - payload = ZBytes.from(value), - timestamp = TimeStamp.getCurrentTime() - ).onFailure { println(">> [Queryable ] Error sending reply: $it") } - } - } - } - } + val session = Zenoh.open(config).getOrThrow() + val keyExpr = key.intoKeyExpr().getOrThrow() + + // Run the queryable example through one of the examples below: + runChannelExample(session, keyExpr) + // runCallbackExample(session, keyExpr) + // runHandlerExample(session, keyExpr) + + session.close() + } + + private fun runChannelExample(session: Session, keyExpr: KeyExpr) { + println("Declaring Queryable on $key...") + val queryable = session.declareQueryable(keyExpr, Channel()).getOrThrow() + runBlocking { + for (query in queryable.receiver) { + val valueInfo = query.payload?.let { value -> " with value '$value'" } ?: "" + println(">> [Queryable] Received Query '${query.selector}' $valueInfo") + query.reply( + keyExpr, + payload = ZBytes.from(value), + timestamp = TimeStamp.getCurrentTime() + ).onFailure { println(">> [Queryable ] Error sending reply: $it") } } } + queryable.close() + } + + private fun runCallbackExample(session: Session, keyExpr: KeyExpr) { + println("Declaring Queryable on $key...") + val queryable = session.declareQueryable(keyExpr, callback = { query -> + val valueInfo = query.payload?.let { value -> " with value '$value'" } ?: "" + println(">> [Queryable] Received Query '${query.selector}' $valueInfo") + query.reply( + keyExpr, + payload = ZBytes.from(value), + timestamp = TimeStamp.getCurrentTime() + ).onFailure { println(">> [Queryable ] Error sending reply: $it") } + }).getOrThrow() + + CountDownLatch(1).await() // A countdown latch is used here to block execution while queries are received. + // Typically, this wouldn't be needed. + + queryable.close() + } + + private fun runHandlerExample(session: Session, keyExpr: KeyExpr) { + + // Create your own handler implementation + class ExampleHandler : Handler { + override fun handle(query: Query) { + val valueInfo = query.payload?.let { value -> " with value '$value'" } ?: "" + println(">> [Queryable] Received Query '${query.selector}' $valueInfo") + query.reply( + keyExpr, + payload = ZBytes.from(value), + timestamp = TimeStamp.getCurrentTime() + ).onFailure { println(">> [Queryable ] Error sending reply: $it") } + } + + override fun receiver() {} + override fun onClose() {} + } + + // Declare the queryable, providing an instance of the handler + println("Declaring Queryable on $key...") + val queryable = session.declareQueryable(keyExpr, handler = ExampleHandler()).getOrThrow() + + CountDownLatch(1).await() // A countdown latch is used here to block execution while queries are received. + // Typically, this wouldn't be needed. + + queryable.close() } private val key by option( diff --git a/examples/src/main/kotlin/io.zenoh/ZScout.kt b/examples/src/main/kotlin/io.zenoh/ZScout.kt index f3fd0cbb6..4dc2669e9 100644 --- a/examples/src/main/kotlin/io.zenoh/ZScout.kt +++ b/examples/src/main/kotlin/io.zenoh/ZScout.kt @@ -16,8 +16,11 @@ package io.zenoh import com.github.ajalt.clikt.core.CliktCommand import io.zenoh.config.WhatAmI +import io.zenoh.handlers.Handler +import io.zenoh.scouting.Hello import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking +import java.util.concurrent.CountDownLatch class ZScout : CliktCommand( help = "Zenoh Scouting example" @@ -26,6 +29,13 @@ class ZScout : CliktCommand( Zenoh.initLogFromEnvOr("error") + // Run the scout example with one of the implementations below: + runChannelExample() + // runCallbackExample() + // runHandlerExample() + } + + private fun runChannelExample() { println("Scouting...") val scout = Zenoh.scout(channel = Channel(), whatAmI = setOf(WhatAmI.Peer, WhatAmI.Router)).getOrThrow() @@ -37,6 +47,37 @@ class ZScout : CliktCommand( scout.stop() } + + private fun runCallbackExample() { + println("Scouting...") + + val scout = Zenoh.scout(whatAmI = setOf(WhatAmI.Peer, WhatAmI.Router), callback = ::println).getOrThrow() + + CountDownLatch(1).await() // A countdown latch is used here to block execution while queries are received. + // Typically, this wouldn't be needed. + scout.stop() + } + + private fun runHandlerExample() { + + // Create your own Handler implementation: + class ExampleHandler: Handler { + override fun handle(t: Hello) = println(t) + + override fun receiver() {} + + override fun onClose() {} + } + + println("Scouting...") + + // Declare the scout with the handler + val scout = Zenoh.scout(whatAmI = setOf(WhatAmI.Peer, WhatAmI.Router), handler = ExampleHandler()).getOrThrow() + + CountDownLatch(1).await() // A countdown latch is used here to block execution while queries are received. + // Typically, this wouldn't be needed. + scout.stop() + } } fun main(args: Array) = ZScout().main(args) diff --git a/examples/src/main/kotlin/io.zenoh/ZSub.kt b/examples/src/main/kotlin/io.zenoh/ZSub.kt index 4df36998e..8266054be 100644 --- a/examples/src/main/kotlin/io.zenoh/ZSub.kt +++ b/examples/src/main/kotlin/io.zenoh/ZSub.kt @@ -16,9 +16,13 @@ package io.zenoh import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.options.* +import io.zenoh.handlers.Handler +import io.zenoh.keyexpr.KeyExpr import io.zenoh.keyexpr.intoKeyExpr +import io.zenoh.sample.Sample import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking +import java.util.concurrent.CountDownLatch class ZSub(private val emptyArgs: Boolean) : CliktCommand( help = "Zenoh Sub example" @@ -30,27 +34,63 @@ class ZSub(private val emptyArgs: Boolean) : CliktCommand( Zenoh.initLogFromEnvOr("error") println("Opening session...") - Zenoh.open(config).onSuccess { session -> - session.use { - key.intoKeyExpr().onSuccess { keyExpr -> - keyExpr.use { - println("Declaring Subscriber on '$keyExpr'...") - - session.declareSubscriber(keyExpr, Channel()).onSuccess { subscriber -> - runBlocking { - for (sample in subscriber.receiver) { - println(">> [Subscriber] Received ${sample.kind} ('${sample.keyExpr}': '${sample.payload}'" + "${ - sample.attachment?.let { - ", with attachment: $it" - } ?: "" - })") - } - } - } - } - } + val session = Zenoh.open(config).getOrThrow() + val keyExpr = key.intoKeyExpr().getOrThrow() + + println("Declaring Subscriber on '$keyExpr'...") + + runChannelExample(session, keyExpr) + } + + private fun runChannelExample(session: Session, keyExpr: KeyExpr) { + val subscriber = session.declareSubscriber(keyExpr, Channel()).getOrThrow() + runBlocking { + for (sample in subscriber.receiver) { + println(">> [Subscriber] Received ${sample.kind} ('${sample.keyExpr}': '${sample.payload}'" + "${ + sample.attachment?.let { + ", with attachment: $it" + } ?: "" + })") } - }.onFailure { exception -> println(exception.message) } + } + + subscriber.close() + } + + private fun runCallbackExample(session: Session, keyExpr: KeyExpr) { + val subscriber = session.declareSubscriber(keyExpr, callback = { sample -> + println(">> [Subscriber] Received ${sample.kind} ('${sample.keyExpr}': '${sample.payload}'" + "${ + sample.attachment?.let { + ", with attachment: $it" + } ?: "" + })") + }).getOrThrow() + + + CountDownLatch(1).await() // A countdown latch is used here to block execution while samples are received. + // Typically, this wouldn't be needed. + subscriber.close() + } + + private fun runHandlerExample(session: Session, keyExpr: KeyExpr) { + class ExampleHandler: Handler { + override fun handle(sample: Sample) { + println(">> [Subscriber] Received ${sample.kind} ('${sample.keyExpr}': '${sample.payload}'" + "${ + sample.attachment?.let { + ", with attachment: $it" + } ?: "" + })") + } + + override fun receiver() {} + override fun onClose() {} + } + + val subscriber = session.declareSubscriber(keyExpr, handler = ExampleHandler()).getOrThrow() + + CountDownLatch(1).await() // A countdown latch is used here to block execution while samples are received. + // Typically, this wouldn't be needed. + subscriber.close() } private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config") diff --git a/examples/src/main/kotlin/io.zenoh/ZSubLiveliness.kt b/examples/src/main/kotlin/io.zenoh/ZSubLiveliness.kt index ac5b777ab..91b5f829c 100644 --- a/examples/src/main/kotlin/io.zenoh/ZSubLiveliness.kt +++ b/examples/src/main/kotlin/io.zenoh/ZSubLiveliness.kt @@ -31,21 +31,21 @@ class ZSubLiveliness(private val emptyArgs: Boolean) : CliktCommand( Zenoh.initLogFromEnvOr("error") println("Opening session...") - Zenoh.open(config).onSuccess { session -> - key.intoKeyExpr().onSuccess { keyExpr -> - session.liveliness().declareSubscriber(keyExpr, channel = Channel(), history = history) - .onSuccess { subscriber -> - runBlocking { - for (sample in subscriber.receiver) { - when (sample.kind) { - SampleKind.PUT -> println(">> [LivelinessSubscriber] New alive token ('${sample.keyExpr}')") - SampleKind.DELETE -> println(">> [LivelinessSubscriber] Dropped token ('${sample.keyExpr}')") - } - } - } - } + val session = Zenoh.open(config).getOrThrow() + val keyExpr = key.intoKeyExpr().getOrThrow() + val subscriber = + session.liveliness().declareSubscriber(keyExpr, channel = Channel(), history = history).getOrThrow() + + runBlocking { + for (sample in subscriber.receiver) { + when (sample.kind) { + SampleKind.PUT -> println(">> [LivelinessSubscriber] New alive token ('${sample.keyExpr}')") + SampleKind.DELETE -> println(">> [LivelinessSubscriber] Dropped token ('${sample.keyExpr}')") + } } - }.onFailure { exception -> println(exception.message) } + } + + session.close() } private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config")