Skip to content

Commit

Permalink
Liveliness feature (#296)
Browse files Browse the repository at this point in the history
* Liveliness: adding liveliness - wip

* Liveliness: fix blocking get liveliness issue

* Liveliness: tidying up code, adding test, adding kdocs.

* Liveliness: fix test
  • Loading branch information
DariusIMP authored Nov 21, 2024
1 parent b38d9d8 commit 4f68561
Show file tree
Hide file tree
Showing 15 changed files with 971 additions and 6 deletions.
3 changes: 3 additions & 0 deletions examples/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ tasks {
"ZBytes",
"ZDelete",
"ZGet",
"ZGetLiveliness",
"ZInfo",
"ZLiveliness",
"ZPub",
"ZPubThr",
"ZPut",
"ZQueryable",
"ZScout",
"ZSub",
"ZSubLiveliness",
"ZSubThr"
)

Expand Down
82 changes: 82 additions & 0 deletions examples/src/main/kotlin/io.zenoh/ZGetLiveliness.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh

import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.long
import io.zenoh.keyexpr.intoKeyExpr
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import java.time.Duration

class ZGetLiveliness(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Sub Liveliness example"
) {

override fun run() {
val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode)

Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
session.use {
key.intoKeyExpr().onSuccess { keyExpr ->
session.liveliness().get(keyExpr, channel = Channel(), timeout = Duration.ofMillis(timeout))
.onSuccess { channel ->
runBlocking {
for (reply in channel) {
reply.result.onSuccess {
println(">> Alive token ('${it.keyExpr}')")
}.onFailure {
println(">> Received (ERROR: '${it.message}')")
}
}
}
}
}
}
}.onFailure { exception -> println(exception.message) }
}

private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config")
private val key by option(
"-k",
"--key",
help = "The key expression matching liveliness tokens to query. [default: group1/**]",
metavar = "key"
).default("group1/**")
private val timeout by option(
"-o", "--timeout", help = "The query timeout in milliseconds [default: 10000]", metavar = "timeout"
).long().default(10000)
private val connect: List<String> by option(
"-e", "--connect", help = "Endpoints to connect to.", metavar = "connect"
).multiple()
private val listen: List<String> by option(
"-l", "--listen", help = "Endpoints to listen on.", metavar = "listen"
).multiple()
private val mode by option(
"-m",
"--mode",
help = "The session mode. Default: peer. Possible values: [peer, client, router]",
metavar = "mode"
).default("peer")
private val noMulticastScouting: Boolean by option(
"--no-multicast-scouting", help = "Disable the multicast-based scouting mechanism."
).flag(default = false)
}

fun main(args: Array<String>) = ZGetLiveliness(args.isEmpty()).main(args)
62 changes: 62 additions & 0 deletions examples/src/main/kotlin/io.zenoh/ZLiveliness.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh

import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import io.zenoh.keyexpr.intoKeyExpr

class ZLiveliness(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Liveliness example"
) {

override fun run() {
val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode)

Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
key.intoKeyExpr().onSuccess { keyExpr ->
session.liveliness().declareToken(keyExpr)
while (true) {
Thread.sleep(1000)
}
}
}.onFailure { exception -> println(exception.message) }
}

private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config")
private val key by option(
"-k", "--key", help = "The key expression to subscribe to [default: group1/**]", metavar = "key"
).default("group1/**")
private val connect: List<String> by option(
"-e", "--connect", help = "Endpoints to connect to.", metavar = "connect"
).multiple()
private val listen: List<String> by option(
"-l", "--listen", help = "Endpoints to listen on.", metavar = "listen"
).multiple()
private val mode by option(
"-m",
"--mode",
help = "The session mode. Default: peer. Possible values: [peer, client, router]",
metavar = "mode"
).default("peer")
private val noMulticastScouting: Boolean by option(
"--no-multicast-scouting", help = "Disable the multicast-based scouting mechanism."
).flag(default = false)
}

fun main(args: Array<String>) = ZLiveliness(args.isEmpty()).main(args)
1 change: 0 additions & 1 deletion examples/src/main/kotlin/io.zenoh/ZSub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package io.zenoh

import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import io.zenoh.ext.zDeserialize
import io.zenoh.keyexpr.intoKeyExpr
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
Expand Down
76 changes: 76 additions & 0 deletions examples/src/main/kotlin/io.zenoh/ZSubLiveliness.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh

import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import io.zenoh.keyexpr.intoKeyExpr
import io.zenoh.sample.SampleKind
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking

class ZSubLiveliness(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Sub Liveliness example"
) {

override fun run() {
val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode)

Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
key.intoKeyExpr().onSuccess { keyExpr ->
session.liveliness().declareSubscriber(keyExpr, channel = Channel(), history = history)
.onSuccess { subscriber ->
runBlocking {
for (sample in subscriber.receiver) {
when (sample.kind) {
SampleKind.PUT -> println(">> [LivelinessSubscriber] New alive token ('${sample.keyExpr}')")
SampleKind.DELETE -> println(">> [LivelinessSubscriber] Dropped token ('${sample.keyExpr}')")
}
}
}
}
}
}.onFailure { exception -> println(exception.message) }
}

private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config")
private val key by option(
"-k", "--key", help = "The key expression to subscribe to [default: group1/**]", metavar = "key"
).default("group1/**")
private val connect: List<String> by option(
"-e", "--connect", help = "Endpoints to connect to.", metavar = "connect"
).multiple()
private val listen: List<String> by option(
"-l", "--listen", help = "Endpoints to listen on.", metavar = "listen"
).multiple()
private val mode by option(
"-m",
"--mode",
help = "The session mode. Default: peer. Possible values: [peer, client, router]",
metavar = "mode"
).default("peer")
private val history: Boolean by option(
"--history",
help = "Get historical liveliness tokens."
).flag(default = false)
private val noMulticastScouting: Boolean by option(
"--no-multicast-scouting", help = "Disable the multicast-based scouting mechanism."
).flag(default = false)
}

fun main(args: Array<String>) = ZSubLiveliness(args.isEmpty()).main(args)
1 change: 1 addition & 0 deletions zenoh-jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod config;
mod errors;
mod key_expr;
mod liveliness;
mod logger;
mod publisher;
mod query;
Expand Down
Loading

0 comments on commit 4f68561

Please sign in to comment.