Skip to content

Commit

Permalink
ZGet refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
DariusIMP committed Dec 27, 2024
1 parent 267ad5e commit d5d3b12
Showing 1 changed file with 54 additions and 22 deletions.
76 changes: 54 additions & 22 deletions examples/src/main/kotlin/io.zenoh/ZGet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import com.github.ajalt.clikt.parameters.types.long
import io.zenoh.bytes.ZBytes
import io.zenoh.sample.SampleKind
import io.zenoh.query.QueryTarget
import io.zenoh.query.Selector
import io.zenoh.query.intoSelector
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import java.time.Duration
import java.util.concurrent.CountDownLatch

class ZGet(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Get example"
Expand All @@ -34,32 +36,62 @@ class ZGet(private val emptyArgs: Boolean) : CliktCommand(

Zenoh.initLogFromEnvOr("error")

Zenoh.open(config).onSuccess { session ->
session.use {
selector.intoSelector().onSuccess { selector ->
session.get(selector,
channel = Channel(),
payload = payload?.let { ZBytes.from(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.let { ZBytes.from(it) },
timeout = Duration.ofMillis(timeout))
.onSuccess { channelReceiver ->
runBlocking {
for (reply in channelReceiver) {
reply.result.onSuccess { sample ->
when (sample.kind) {
SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')")
SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')")
}
}.onFailure { error ->
println("Received (ERROR: '${error.message}')")
}
}
val session = Zenoh.open(config).getOrThrow()
val selector = selector.intoSelector().getOrThrow()

runChannelExample(session, selector)
// runCallbackExample(session, selector)

session.close()
}

private fun runChannelExample(session: Session, selector: Selector) {
session.get(selector,
channel = Channel(),
payload = payload?.let { ZBytes.from(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.let { ZBytes.from(it) },
timeout = Duration.ofMillis(timeout)
)
.onSuccess { channelReceiver ->
runBlocking {
for (reply in channelReceiver) {
reply.result.onSuccess { sample ->
when (sample.kind) {
SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')")
SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')")
}
}.onFailure { error ->
println("Received (ERROR: '${error.message}')")
}
}
}
}
}
}

private fun runCallbackExample(session: Session, selector: Selector) {
val countDownLatch = CountDownLatch(1)
session.get(selector,
payload = payload?.let { ZBytes.from(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.let { ZBytes.from(it) },
timeout = Duration.ofMillis(timeout),
callback = {reply ->
reply.result.onSuccess { sample ->
when (sample.kind) {
SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')")
SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')")
}
}.onFailure { error ->
println("Received (ERROR: '${error.message}')")
}
},
onClose = {
countDownLatch.countDown()
}
).getOrThrow()

countDownLatch.await()
}

private val selector by option(
Expand Down

0 comments on commit d5d3b12

Please sign in to comment.