Skip to content

Commit

Permalink
Api alignment (#169)
Browse files Browse the repository at this point in the history
* feat(api alignment): replacing res() with wait()

* feat(api alignment): replacing `withValue(...)` with `.payload(...)` on get queries

* feat(api alignment): making 'value' attribute internal for Query, offering 'payload' and 'encoding' accessors instead.

* feat(api alignment): using ZBytes for Value instead of ByteArray

* feat(api alignment): deserialization for String and Int

* feat(api alignment): deserialization for Byte, Short & Long

* feat(api alignment): deserialization for Float & Double

* feat(api alignment): deserialization

* feat(api alignment): serialization - wip

* feat(api alignment): serialization - wip 2

* feat(api alignment): custom deserializers - wip

* feat(api_alignment): zbytes jni refactor

* feat(api alignment): fixing attachment api

* feat(api alignment): ZBytes example + custom serialization improvements

* feat(api_alignment): serializable maps

* feat(api alignment): serialization refactor

* feat(api alignment): deserialization refactor

* rebase fix

* feat(api alignment): deserialization refactor

* feat(api alignment): splitting Serializable interface into Serializable and Deserializable.

* feat(api alignment): marking DeserializationUtils with @PublishedApi tag

* feat(api alignment): fix clippy errors

* feat(api alignment): kdoc comment for ZBytes + removing Serializable for ZBytes

* feat(api alignment): fix imports

* feat(api alignment): Removing IntoZBytes interface (in favor of Serializable) + moving Into ZBytes extension functions under ZBytes.kt

* feat(api alignment): deserialization refactor

* feat(api alignment): moving Serializable and Deserializable under their own file.

* feat(api alignment): refactoring ZBytesTest.kt

* feat(api alignment): refactoring ZBytes and ZBytesTest

* feat(default args): subscriber - wip

* feat(default args): publisher

* feat(default args): subscriber - updating documentation

* feat(default args): key expressions - removing resolvables

* feat(default args): queryable

* feat(default args): get query - WIP

* feat(default args): put operation - WIP

* feat(default args): delete operation - WIP

* feat(default args): fixing examples

* feat(default args): changing receiver attribute from Queryable, Get and Subscriber to be non optional.

* fix(selector): setting selector parameters as a nullable argument instead of putting an empty string.

* feat(default arguments): replacing reply builders with default args

* feat(default arguments): removing QoS builder

* feat(default arguments): making ChannelHandler internal

* feat(default arguments): removing the Get class (unused after changes)

* feat(default arguments): making Encoding and QoS data classes

* feat(default arguments): making Sample a data class

* feat(default arguments): updating KDoc for session.get .

* feat(default arguments): updating KDoc for session.declareQueryable

* feat(default arguments): updating KDoc for session.put and session.delete

* feat(default arguments): updating KDoc for Reply, converting Reply.Success, Reply.Error & Reply.Delete into data classes.

* feat(api alignment): replacing `withValue(...)` with `.payload(...)` on get queries

* feat(api-alignment): removing Resolvable.kt

* Removing IntoZBytes.kt

* feat(api-alignment): KDoc fixes

* fix: invalid query pointer freed
  • Loading branch information
DariusIMP authored Aug 19, 2024
1 parent 47e3ad0 commit c0aa350
Show file tree
Hide file tree
Showing 54 changed files with 2,880 additions and 1,635 deletions.
1 change: 1 addition & 0 deletions examples/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {

tasks {
val examples = listOf(
"ZBytes",
"ZDelete",
"ZGet",
"ZPub",
Expand Down
237 changes: 237 additions & 0 deletions examples/src/main/kotlin/io.zenoh/ZBytes.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package io.zenoh

import io.zenoh.protocol.*
import java.nio.ByteBuffer
import java.nio.ByteOrder
import kotlin.reflect.typeOf

fun main() {

/***********************************************
* Standard serialization and deserialization. *
***********************************************/

/** Numeric: byte, short, int, float, double */
val intInput = 1234
var payload = ZBytes.from(intInput)
var intOutput = payload.deserialize<Int>().getOrThrow()
check(intInput == intOutput)

// Alternatively you can serialize into the type.
payload = ZBytes.serialize(intInput).getOrThrow()
intOutput = payload.deserialize<Int>().getOrThrow()
check(intInput == intOutput)

// Alternatively, `Numeric.into()`: ZBytes can be used
payload = intInput.into()
intOutput = payload.deserialize<Int>().getOrThrow()
check(intInput == intOutput)

// Another example with float
val floatInput = 3.1415f
payload = ZBytes.from(floatInput)
val floatOutput = payload.deserialize<Float>().getOrThrow()
check(floatInput == floatOutput)

/** String serialization and deserialization. */
val stringInput = "example"
payload = ZBytes.from(stringInput)
// Alternatively, you can also call `String.into()` to convert
// a string into a ZBytes object:
// payload = stringInput.into()
var stringOutput = payload.deserialize<String>().getOrThrow()
check(stringInput == stringOutput)

// For the case of strings, ZBytes::toString() is equivalent:
stringOutput = payload.toString()
check(stringInput == stringOutput)

/** ByteArray serialization and deserialization. */
val byteArrayInput = "example".toByteArray()
payload = ZBytes.from(byteArrayInput) // Equivalent to `byteArrayInput.into()`
var byteArrayOutput = payload.deserialize<ByteArray>().getOrThrow()
check(byteArrayInput.contentEquals(byteArrayOutput))
// Alternatively, we can directly access the bytes of property of ZBytes:
byteArrayOutput = payload.toByteArray()
check(byteArrayInput.contentEquals(byteArrayOutput))

/** List serialization and deserialization.
*
* Supported types: String, ByteArray, ZBytes, Byte, Short, Int, Long, Float and Double.
*/
val inputList = listOf("sample1", "sample2", "sample3")
payload = ZBytes.serialize(inputList).getOrThrow()
val outputList = payload.deserialize<List<String>>().getOrThrow()
check(inputList == outputList)

val inputListZBytes = inputList.map { value -> value.into() }
payload = ZBytes.serialize(inputListZBytes).getOrThrow()
val outputListZBytes = payload.deserialize<List<ZBytes>>().getOrThrow()
check(inputListZBytes == outputListZBytes)

val inputListByteArray = inputList.map { value -> value.toByteArray() }
payload = ZBytes.serialize(inputListByteArray).getOrThrow()
val outputListByteArray = payload.deserialize<List<ByteArray>>().getOrThrow()
check(compareByteArrayLists(inputListByteArray, outputListByteArray))

/**
* Map serialization and deserialization.
*
* Maps with the following Type combinations are supported: String, ByteArray, ZBytes, Byte, Short, Int, Long, Float and Double.
*/
val inputMap = mapOf("key1" to "value1", "key2" to "value2", "key3" to "value3")
payload = ZBytes.serialize(inputMap).getOrThrow()
val outputMap = payload.deserialize<Map<String, String>>().getOrThrow()
check(inputMap == outputMap)

val combinedInputMap = mapOf("key1" to ZBytes.from("zbytes1"), "key2" to ZBytes.from("zbytes2"))
payload = ZBytes.serialize(combinedInputMap).getOrThrow()
val combinedOutputMap = payload.deserialize<Map<String, ZBytes>>().getOrThrow()
check(combinedInputMap == combinedOutputMap)

/*********************************************
* Custom serialization and deserialization. *
*********************************************/

/**
* The examples below use [MyZBytes], an example class consisting that implements the [Serializable] interface.
*
* In order for the serialization and deserialization to be successful on a custom class,
* the class itself must override the `into(): ZBytes` function, but also the companion
* object must implement the [Deserializable.From] interface.
*
* @see MyZBytes
*/
val inputMyZBytes = MyZBytes("example")
payload = ZBytes.serialize(inputMyZBytes).getOrThrow()
val outputMyZBytes = payload.deserialize<MyZBytes>().getOrThrow()
check(inputMyZBytes == outputMyZBytes)

/** List of MyZBytes. */
val inputListMyZBytes = inputList.map { value -> MyZBytes(value) }
payload = ZBytes.serialize<List<MyZBytes>>(inputListMyZBytes).getOrThrow()
val outputListMyZBytes = payload.deserialize<List<MyZBytes>>().getOrThrow()
check(inputListMyZBytes == outputListMyZBytes)

/** Map of MyZBytes. */
val inputMapMyZBytes = inputMap.map { (k, v) -> MyZBytes(k) to MyZBytes(v)}.toMap()
payload = ZBytes.serialize<Map<MyZBytes, MyZBytes>>(inputMapMyZBytes).getOrThrow()
val outputMapMyZBytes = payload.deserialize<Map<MyZBytes, MyZBytes>>().getOrThrow()
check(inputMapMyZBytes == outputMapMyZBytes)

val combinedMap = mapOf(MyZBytes("foo") to 1, MyZBytes("bar") to 2)
payload = ZBytes.serialize<Map<MyZBytes, Int>>(combinedMap).getOrThrow()
val combinedOutput = payload.deserialize<Map<MyZBytes, Int>>().getOrThrow()
check(combinedMap == combinedOutput)

/**
* Providing a map of deserializers.
*
* Alternatively, [ZBytes.deserialize] also accepts a deserializers parameter of type
* `Map<KType, KFunction1<ByteArray, Any>>`. That is, a map of types that is associated
* to a function receiving a ByteArray, that returns Any. This way, you can provide a series
* of deserializer functions that extend the deserialization mechanisms we provide by default.
*
* For example, let's say we have a custom map serializer, with its own deserializer:
*/
val fooMap = mapOf(Foo("foo1") to Foo("bar1"), Foo("foo2") to Foo("bar2"))
val fooMapSerialized = ZBytes.from(serializeFooMap(fooMap))
val deserializersMap = mapOf(typeOf<Map<Foo, Foo>>() to ::deserializeFooMap)
val deserializedFooMap = fooMapSerialized.deserialize<Map<Foo, Foo>>(deserializersMap).getOrThrow()
check(fooMap == deserializedFooMap)
}

class MyZBytes(val content: String) : Serializable, Deserializable {

override fun into(): ZBytes = content.into()

companion object : Deserializable.From {
override fun from(zbytes: ZBytes): MyZBytes {
return MyZBytes(zbytes.toString())
}
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as MyZBytes

return content == other.content
}

override fun hashCode(): Int {
return content.hashCode()
}
}

/** Example class for the deserialization map examples. */
class Foo(val content: String) {

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as Foo

return content == other.content
}

override fun hashCode(): Int {
return content.hashCode()
}
}

/** Example serializer and deserializer. */
private fun serializeFooMap(testMap: Map<Foo, Foo>): ByteArray {
return testMap.map {
val key = it.key.content.toByteArray()
val keyLength = ByteBuffer.allocate(Int.SIZE_BYTES).order(ByteOrder.LITTLE_ENDIAN).putInt(key.size).array()
val value = it.value.content.toByteArray()
val valueLength =
ByteBuffer.allocate(Int.SIZE_BYTES).order(ByteOrder.LITTLE_ENDIAN).putInt(value.size).array()
keyLength + key + valueLength + value
}.reduce { acc, bytes -> acc + bytes }
}

private fun deserializeFooMap(serializedMap: ZBytes): Map<Foo, Foo> {
var idx = 0
var sliceSize: Int
val bytes = serializedMap.toByteArray()
val decodedMap = mutableMapOf<Foo, Foo>()
while (idx < bytes.size) {
sliceSize = ByteBuffer.wrap(bytes.sliceArray(IntRange(idx, idx + Int.SIZE_BYTES - 1)))
.order(ByteOrder.LITTLE_ENDIAN).int
idx += Int.SIZE_BYTES

val key = bytes.sliceArray(IntRange(idx, idx + sliceSize - 1))
idx += sliceSize

sliceSize = ByteBuffer.wrap(bytes.sliceArray(IntRange(idx, idx + Int.SIZE_BYTES - 1))).order(
ByteOrder.LITTLE_ENDIAN
).int
idx += Int.SIZE_BYTES

val value = bytes.sliceArray(IntRange(idx, idx + sliceSize - 1))
idx += sliceSize

decodedMap[Foo(key.decodeToString())] = Foo(value.decodeToString())
}
return decodedMap
}

/** Utils for this example. */

private fun compareByteArrayLists(list1: List<ByteArray>, list2: List<ByteArray>): Boolean {
if (list1.size != list2.size) {
return false
}

for (i in list1.indices) {
if (!list1[i].contentEquals(list2[i])) {
return false
}
}

return true
}
6 changes: 2 additions & 4 deletions examples/src/main/kotlin/io.zenoh/ZDelete.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ class ZDelete(private val emptyArgs: Boolean) : CliktCommand(
Session.open(config).onSuccess { session ->
session.use {
key.intoKeyExpr().onSuccess { keyExpr ->
keyExpr.use {
println("Deleting resources matching '$keyExpr'...")
session.delete(keyExpr).res()
}
println("Deleting resources matching '$keyExpr'...")
session.delete(keyExpr)
}
}
}
Expand Down
48 changes: 20 additions & 28 deletions examples/src/main/kotlin/io.zenoh/ZGet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package io.zenoh
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.long
import io.zenoh.query.ConsolidationMode
import io.zenoh.protocol.into
import io.zenoh.query.QueryTarget
import io.zenoh.query.Reply
import io.zenoh.selector.intoSelector
import io.zenoh.value.Value
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import java.time.Duration

Expand All @@ -29,38 +31,28 @@ class ZGet(private val emptyArgs: Boolean) : CliktCommand(
) {

override fun run() {
val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting,mode)
val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode)

Session.open(config).onSuccess { session ->
session.use {
selector.intoSelector().onSuccess { selector ->
selector.use {
session.get(selector)
.timeout(Duration.ofMillis(timeout))
.apply {
target?.let {
target(QueryTarget.valueOf(it.uppercase()))
}
attachment?.let {
withAttachment(it.toByteArray())
}
value?.let {
withValue(it)
}
}
.res()
.onSuccess { receiver ->
runBlocking {
for (reply in receiver!!) {
when (reply) {
is Reply.Success -> println("Received ('${reply.sample.keyExpr}': '${reply.sample.value}')")
is Reply.Error -> println("Received (ERROR: '${reply.error}')")
is Reply.Delete -> println("Received (DELETE '${reply.keyExpr}')")
}
session.get(selector,
channel = Channel(),
value = payload?.let { Value(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.into(),
timeout = Duration.ofMillis(timeout))
.onSuccess { channelReceiver ->
runBlocking {
for (reply in channelReceiver) {
when (reply) {
is Reply.Success -> println("Received ('${reply.sample.keyExpr}': '${reply.sample.value}')")
is Reply.Error -> println("Received (ERROR: '${reply.error}')")
is Reply.Delete -> println("Received (DELETE '${reply.keyExpr}')")
}
}
}
}
}
}
}
}
Expand All @@ -72,8 +64,8 @@ class ZGet(private val emptyArgs: Boolean) : CliktCommand(
help = "The selection of resources to query [default: demo/example/**]",
metavar = "selector"
).default("demo/example/**")
private val value by option(
"-v", "--value", help = "An optional value to put in the query.", metavar = "value"
private val payload by option(
"-p", "--payload", help = "An optional payload to put in the query.", metavar = "payload"
)
private val target by option(
"-t",
Expand Down
39 changes: 18 additions & 21 deletions examples/src/main/kotlin/io.zenoh/ZPub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.zenoh
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import io.zenoh.keyexpr.intoKeyExpr
import io.zenoh.protocol.into

class ZPub(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Pub example"
Expand All @@ -28,27 +29,23 @@ class ZPub(private val emptyArgs: Boolean) : CliktCommand(
Session.open(config).onSuccess { session ->
session.use {
key.intoKeyExpr().onSuccess { keyExpr ->
keyExpr.use {
println("Declaring publisher on '$keyExpr'...")
session.declarePublisher(keyExpr).res().onSuccess { pub ->
pub.use {
println("Press CTRL-C to quit...")
val attachment = attachment?.toByteArray()
var idx = 0
while (true) {
Thread.sleep(1000)
val payload = "[${
idx.toString().padStart(4, ' ')
}] $value"
println(
"Putting Data ('$keyExpr': '$payload')..."
)
attachment?.let {
pub.put(payload).withAttachment(attachment).res()
} ?: let { pub.put(payload).res() }
idx++
}
}
println("Declaring publisher on '$keyExpr'...")
session.declarePublisher(keyExpr).onSuccess { pub ->
println("Press CTRL-C to quit...")
val attachment = attachment?.toByteArray()
var idx = 0
while (true) {
Thread.sleep(1000)
val payload = "[${
idx.toString().padStart(4, ' ')
}] $value"
println(
"Putting Data ('$keyExpr': '$payload')..."
)
attachment?.let {
pub.put(payload, attachment = it.into())
} ?: let { pub.put(payload) }
idx++
}
}
}
Expand Down
Loading

0 comments on commit c0aa350

Please sign in to comment.