Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Examples refactor #334

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions examples/src/main/kotlin/io.zenoh/ZDelete.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ class ZDelete(private val emptyArgs: Boolean) : CliktCommand(
Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
session.use {
key.intoKeyExpr().onSuccess { keyExpr ->
println("Deleting resources matching '$keyExpr'...")
session.delete(keyExpr)
}
val session = Zenoh.open(config).getOrThrow()
session.use {
key.intoKeyExpr().onSuccess { keyExpr ->
println("Deleting resources matching '$keyExpr'...")
session.delete(keyExpr)
}
}
}
Expand Down
100 changes: 77 additions & 23 deletions examples/src/main/kotlin/io.zenoh/ZGet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.long
import io.zenoh.bytes.ZBytes
import io.zenoh.handlers.Handler
import io.zenoh.sample.SampleKind
import io.zenoh.query.QueryTarget
import io.zenoh.query.Reply
import io.zenoh.query.Selector
import io.zenoh.query.intoSelector
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
Expand All @@ -34,32 +37,83 @@ class ZGet(private val emptyArgs: Boolean) : CliktCommand(

Zenoh.initLogFromEnvOr("error")

Zenoh.open(config).onSuccess { session ->
session.use {
selector.intoSelector().onSuccess { selector ->
session.get(selector,
channel = Channel(),
payload = payload?.let { ZBytes.from(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.let { ZBytes.from(it) },
timeout = Duration.ofMillis(timeout))
.onSuccess { channelReceiver ->
runBlocking {
for (reply in channelReceiver) {
reply.result.onSuccess { sample ->
when (sample.kind) {
SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')")
SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')")
}
}.onFailure { error ->
println("Received (ERROR: '${error.message}')")
}
}
}
}
val session = Zenoh.open(config).getOrThrow()
val selector = selector.intoSelector().getOrThrow()

// Run the Get query through one of the examples below:
runChannelExample(session, selector)
// runCallbackExample(session, selector)
// runHandlerExample(session, selector)

session.close()
}

private fun runChannelExample(session: Session, selector: Selector) {
val channelReceiver = session.get(selector,
channel = Channel(),
payload = payload?.let { ZBytes.from(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.let { ZBytes.from(it) },
timeout = Duration.ofMillis(timeout)
).getOrThrow()
runBlocking {
for (reply in channelReceiver) {
reply.result.onSuccess { sample ->
when (sample.kind) {
SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')")
SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')")
}
}.onFailure { error ->
println("Received (ERROR: '${error.message}')")
}
}
}
}

private fun runCallbackExample(session: Session, selector: Selector) {
session.get(selector,
payload = payload?.let { ZBytes.from(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.let { ZBytes.from(it) },
timeout = Duration.ofMillis(timeout),
callback = { reply ->
reply.result.onSuccess { sample ->
when (sample.kind) {
SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')")
SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')")
}
}.onFailure { error ->
println("Received (ERROR: '${error.message}')")
}
},
).getOrThrow()
}

private fun runHandlerExample(session: Session, selector: Selector) {
// Create your own handler implementation
class ExampleHandler : Handler<Reply, Unit> {
override fun handle(t: Reply) {
t.result.onSuccess { sample ->
when (sample.kind) {
SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')")
SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')")
}
}.onFailure { error ->
println("Received (ERROR: '${error.message}')")
}
}

override fun receiver() {}
override fun onClose() {}
}

session.get(selector,
payload = payload?.let { ZBytes.from(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.let { ZBytes.from(it) },
timeout = Duration.ofMillis(timeout),
handler = ExampleHandler(), // Provide a handler instance
).getOrThrow()
}

private val selector by option(
Expand Down
67 changes: 50 additions & 17 deletions examples/src/main/kotlin/io.zenoh/ZGetLiveliness.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package io.zenoh
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.long
import io.zenoh.handlers.Handler
import io.zenoh.keyexpr.KeyExpr
import io.zenoh.keyexpr.intoKeyExpr
import io.zenoh.query.Reply
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import java.time.Duration

class ZGetLiveliness(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Sub Liveliness example"
help = "Zenoh Get Liveliness example"
) {

override fun run() {
Expand All @@ -32,24 +35,54 @@ class ZGetLiveliness(private val emptyArgs: Boolean) : CliktCommand(
Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
session.use {
key.intoKeyExpr().onSuccess { keyExpr ->
session.liveliness().get(keyExpr, channel = Channel(), timeout = Duration.ofMillis(timeout))
.onSuccess { channel ->
runBlocking {
for (reply in channel) {
reply.result.onSuccess {
println(">> Alive token ('${it.keyExpr}')")
}.onFailure {
println(">> Received (ERROR: '${it.message}')")
}
}
}
}
val session = Zenoh.open(config).getOrThrow()
val keyExpr = key.intoKeyExpr().getOrThrow()

runChannelExample(session, keyExpr)

session.close()
}

private fun runChannelExample(session: Session, keyExpr: KeyExpr) {
val channel =
session.liveliness().get(keyExpr, channel = Channel(), timeout = Duration.ofMillis(timeout)).getOrThrow()
runBlocking {
for (reply in channel) {
reply.result.onSuccess {
println(">> Alive token ('${it.keyExpr}')")
}.onFailure {
println(">> Received (ERROR: '${it.message}')")
}
}
}.onFailure { exception -> println(exception.message) }
}
}

private fun runCallbackExample(session: Session, keyExpr: KeyExpr) {
session.liveliness().get(keyExpr, timeout = Duration.ofMillis(timeout), callback = { reply ->
reply.result.onSuccess {
println(">> Alive token ('${it.keyExpr}')")
}.onFailure {
println(">> Received (ERROR: '${it.message}')")
}
}).getOrThrow()
}

private fun runHandlerExample(session: Session, keyExpr: KeyExpr) {
// Create your own handler implementation
class ExampleHandler : Handler<Reply, Unit> {
override fun handle(t: Reply) {
t.result.onSuccess {
println(">> Alive token ('${it.keyExpr}')")
}.onFailure {
println(">> Received (ERROR: '${it.message}')")
}
}

override fun receiver() {}
override fun onClose() {}
}

session.liveliness().get(keyExpr, timeout = Duration.ofMillis(timeout), handler = ExampleHandler()).getOrThrow()
}

private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config")
Expand Down
16 changes: 6 additions & 10 deletions examples/src/main/kotlin/io.zenoh/ZInfo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@ class ZInfo(private val emptyArgs: Boolean) : CliktCommand(
Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
session.use {
val info = session.info()
println("zid: ${info.zid().getOrThrow()}")

println("routers zid: ${info.routersZid().getOrThrow()}")

println("peers zid: ${info.peersZid().getOrThrow()}")
}
}.onFailure { exception -> println(exception.message) }
val session = Zenoh.open(config).getOrThrow()
val info = session.info()
println("zid: ${info.zid().getOrThrow()}")
println("routers zid: ${info.routersZid().getOrThrow()}")
println("peers zid: ${info.peersZid().getOrThrow()}")
session.close()
}


Expand Down
16 changes: 8 additions & 8 deletions examples/src/main/kotlin/io.zenoh/ZLiveliness.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.zenoh
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import io.zenoh.keyexpr.intoKeyExpr
import java.util.concurrent.CountDownLatch

class ZLiveliness(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Liveliness example"
Expand All @@ -28,14 +29,13 @@ class ZLiveliness(private val emptyArgs: Boolean) : CliktCommand(
Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
key.intoKeyExpr().onSuccess { keyExpr ->
session.liveliness().declareToken(keyExpr)
while (true) {
Thread.sleep(1000)
}
}
}.onFailure { exception -> println(exception.message) }
val session = Zenoh.open(config).getOrThrow()
val keyExpr = key.intoKeyExpr().getOrThrow()
session.liveliness().declareToken(keyExpr)

CountDownLatch(1).await() // A countdown latch is used here to block execution while the liveliness token
// is declared. Typically, this wouldn't be needed.
session.close()
}

private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config")
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/kotlin/io.zenoh/ZPong.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import java.util.concurrent.CountDownLatch
class ZPong(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh ZPong example"
) {
val latch = CountDownLatch(1)

override fun run() {
val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode)

Zenoh.initLogFromEnvOr("error")
val latch = CountDownLatch(1)

println("Opening session...")
val session = Zenoh.open(config).getOrThrow()
Expand Down
48 changes: 23 additions & 25 deletions examples/src/main/kotlin/io.zenoh/ZPub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,29 @@ class ZPub(private val emptyArgs: Boolean) : CliktCommand(
Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
session.use {
key.intoKeyExpr().onSuccess { keyExpr ->
println("Declaring publisher on '$keyExpr'...")
session.declarePublisher(keyExpr).onSuccess { pub ->
println("Press CTRL-C to quit...")
val attachment = attachment?.toByteArray()
var idx = 0
while (true) {
Thread.sleep(1000)
val payload = "[${
idx.toString().padStart(4, ' ')
}] $value"
println(
"Putting Data ('$keyExpr': '$payload')..."
)
attachment?.let {
pub.put(ZBytes.from(payload), attachment = ZBytes.from(it) )
} ?: let { pub.put(ZBytes.from(payload)) }
idx++
}
}
}
}
}.onFailure { exception -> println(exception.message) }
val session = Zenoh.open(config).getOrThrow()
val keyExpr = key.intoKeyExpr().getOrThrow()

println("Declaring publisher on '$keyExpr'...")
val publisher = session.declarePublisher(keyExpr).getOrThrow()

println("Press CTRL-C to quit...")
val attachment = attachment?.toByteArray()

var idx = 0
while (true) {
Thread.sleep(1000)
val payload = "[${
idx.toString().padStart(4, ' ')
}] $value"
println(
"Putting Data ('$keyExpr': '$payload')..."
)
attachment?.let {
publisher.put(ZBytes.from(payload), attachment = ZBytes.from(it))
} ?: let { publisher.put(ZBytes.from(payload)) }
idx++
}
}


Expand Down
41 changes: 20 additions & 21 deletions examples/src/main/kotlin/io.zenoh/ZPubThr.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,26 @@ class ZPubThr(private val emptyArgs: Boolean) : CliktCommand(
priority = priorityInput?.let { Priority.entries[it] } ?: Priority.DATA,
)

Zenoh.open(config).onSuccess {
it.use { session ->
session.declarePublisher("test/thr".intoKeyExpr().getOrThrow(), qos = qos).onSuccess { pub ->
println("Publisher declared on test/thr.")
var count: Long = 0
var start = System.currentTimeMillis()
val number = number.toLong()
println("Press CTRL-C to quit...")
while (true) {
pub.put(payload).getOrThrow()
if (statsPrint) {
if (count < number) {
count++
} else {
val throughput = count * 1000 / (System.currentTimeMillis() - start)
println("$throughput msgs/s")
count = 0
start = System.currentTimeMillis()
}
}
}
val session = Zenoh.open(config).getOrThrow()
val keyExpr = "test/thr".intoKeyExpr().getOrThrow()
val publisher = session.declarePublisher(keyExpr, qos = qos).getOrThrow()

println("Publisher declared on test/thr.")
var count: Long = 0
var start = System.currentTimeMillis()
val number = number.toLong()

println("Press CTRL-C to quit...")
while (true) {
publisher.put(payload).getOrThrow()
if (statsPrint) {
if (count < number) {
count++
} else {
val throughput = count * 1000 / (System.currentTimeMillis() - start)
println("$throughput msgs/s")
count = 0
start = System.currentTimeMillis()
}
}
}
Expand Down
Loading
Loading