Skip to content

Commit

Permalink
feat(background declarations): Allowing session declarations to stay …
Browse files Browse the repository at this point in the history
…alive for the lifespan of a session.

Zenoh declarations ran for as long as the Kotlin variable representing them was kept alive. This
meant that whenever the user lost track of the variable, it got garbage collected and undeclared
in the process. This behavior seems to be counterintuitive for programmers used to garbage collected
languages (see eclipse-zenoh#43).

Therefore in this PR we provide the following change: we keep track of session declarations in a list
inside the session, allowing users to keep running them despite losing their references.
When the session is finalized, the associated declarations are undeclared.
In case the user needs to close a declaration earlier, they need to keep the variable in order
to undeclare it.
  • Loading branch information
DariusIMP committed May 14, 2024
1 parent 6df78be commit e54d723
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 25 deletions.
17 changes: 10 additions & 7 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ import java.time.Duration
* A Zenoh Session, the core interaction point with a Zenoh network.
*
* A session is typically associated with declarations such as [Publisher]s, [Subscriber]s, or [Queryable]s, which are
* declared using [declarePublisher], [declareSubscriber], and [declareQueryable], respectively.
* declared using [declarePublisher], [declareSubscriber], and [declareQueryable], respectively. It is also possible to
* declare key expressions ([KeyExpr]) as well with [declareKeyExpr] for optimization purposes.
*
* Other operations such as simple Put, Get or Delete can be performed from a session using [put], [get] and [delete].
* Finally, it's possible to declare key expressions ([KeyExpr]) as well.
*
* Sessions are open upon creation and can be closed manually by calling [close]. Alternatively, the session will be
* automatically closed when used with Java's try-with-resources statement or its Kotlin counterpart, [use].
Expand All @@ -52,6 +53,8 @@ class Session private constructor(private val config: Config) : AutoCloseable {

private var jniSession: JNISession? = JNISession()

private var declarations = mutableListOf<SessionDeclaration>()

companion object {

private val sessionClosedException = SessionException("Session is closed.")
Expand Down Expand Up @@ -88,8 +91,8 @@ class Session private constructor(private val config: Config) : AutoCloseable {
jniSession = null
}

@Suppress("removal")
protected fun finalize() {
declarations.forEach { it.undeclare() }
jniSession?.close()
}

Expand Down Expand Up @@ -232,7 +235,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
*/
fun declareKeyExpr(keyExpr: String): Resolvable<KeyExpr> = Resolvable {
return@Resolvable jniSession?.run {
declareKeyExpr(keyExpr)
declareKeyExpr(keyExpr).onSuccess { declarations.add(it) }
} ?: Result.failure(sessionClosedException)
}

Expand Down Expand Up @@ -385,7 +388,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {

internal fun resolvePublisher(builder: Publisher.Builder): Result<Publisher> {
return jniSession?.run {
declarePublisher(builder)
declarePublisher(builder).onSuccess { declarations.add(it) }
} ?: Result.failure(sessionClosedException)
}

Expand All @@ -397,7 +400,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
reliability: Reliability
): Result<Subscriber<R>> {
return jniSession?.run {
declareSubscriber(keyExpr, callback, onClose, receiver, reliability)
declareSubscriber(keyExpr, callback, onClose, receiver, reliability).onSuccess { declarations.add(it) }
} ?: Result.failure(sessionClosedException)
}

Expand All @@ -409,7 +412,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
complete: Boolean
): Result<Queryable<R>> {
return jniSession?.run {
declareQueryable(keyExpr, callback, onClose, receiver, complete)
declareQueryable(keyExpr, callback, onClose, receiver, complete).onSuccess { declarations.add(it) }
} ?: Result.failure(sessionClosedException)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ package io.zenoh
* Session declaration.
*
* A session declaration is either a [io.zenoh.publication.Publisher],
* a [io.zenoh.subscriber.Subscriber] or a [io.zenoh.queryable.Queryable] declared from a [Session].
* a [io.zenoh.subscriber.Subscriber], a [io.zenoh.queryable.Queryable] or a [io.zenoh.keyexpr.KeyExpr] declared from a [Session].
*/
interface SessionDeclaration {

/** Returns true if the declaration has not been undeclared. */
fun isValid(): Boolean

/** Undeclare a declaration. No further operations should be performed after calling this function. */
fun undeclare()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package io.zenoh.keyexpr

import io.zenoh.Resolvable
import io.zenoh.Session
import io.zenoh.SessionDeclaration
import io.zenoh.jni.JNIKeyExpr

/**
Expand Down Expand Up @@ -58,7 +59,7 @@ import io.zenoh.jni.JNIKeyExpr
* includes, etc.) which are done natively. It keeps track of the underlying key expression instance. Once it is freed,
* the [KeyExpr] instance is considered to not be valid anymore.
*/
class KeyExpr internal constructor(internal var jniKeyExpr: JNIKeyExpr? = null): AutoCloseable {
class KeyExpr internal constructor(internal var jniKeyExpr: JNIKeyExpr? = null): AutoCloseable, SessionDeclaration {

companion object {

Expand Down Expand Up @@ -143,6 +144,11 @@ class KeyExpr internal constructor(internal var jniKeyExpr: JNIKeyExpr? = null):
jniKeyExpr?.close()
}

override fun undeclare() {
jniKeyExpr?.close()
jniKeyExpr = null
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import io.zenoh.sample.Attachment
import io.zenoh.value.Value

/**
* # Publisher
*
* A Zenoh Publisher.
*
* A publisher is automatically dropped when using it with the 'try-with-resources' statement (i.e. 'use' in Kotlin).
Expand Down Expand Up @@ -56,6 +58,12 @@ import io.zenoh.value.Value
*
* The publisher configuration parameters can be later changed using the setter functions.
*
* ## Lifespan
*
* Internally, the [Session] from which the [Publisher] was declared keeps a reference to it, therefore keeping it alive
* until the session is closed. For the cases where we want to stop the publisher earlier, it's necessary
* to keep a reference to it in order to undeclare it later.
*
* @property keyExpr The key expression the publisher will be associated to.
* @property jniPublisher Delegate class handling the communication with the native code.
* @property congestionControl The congestion control policy.
Expand Down Expand Up @@ -118,7 +126,7 @@ class Publisher internal constructor(
jniPublisher?.setPriority(priority)?.onSuccess { this.priority = priority }
}

override fun isValid(): Boolean {
fun isValid(): Boolean {
return jniPublisher != null
}

Expand All @@ -131,10 +139,6 @@ class Publisher internal constructor(
jniPublisher = null
}

protected fun finalize() {
jniPublisher?.close()
}

class Put internal constructor(
private var jniPublisher: JNIPublisher?,
val value: Value,
Expand Down
25 changes: 18 additions & 7 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/queryable/Queryable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import io.zenoh.keyexpr.KeyExpr
import kotlinx.coroutines.channels.Channel

/**
* # Queryable
*
* A queryable that allows to perform multiple queries on the specified [KeyExpr].
*
* Its main purpose is to keep the queryable active as long as it exists.
Expand Down Expand Up @@ -56,6 +58,11 @@ import kotlinx.coroutines.channels.Channel
* }
* }}
* ```
* ## Lifespan
*
* Internally, the [Session] from which the [Queryable] was declared keeps a reference to it, therefore keeping it alive
* until the session is closed. For the cases where we want to stop the queryable earlier, it's necessary
* to keep a reference to it in order to undeclare it later.
*
* @param R Receiver type of the [Handler] implementation. If no handler is provided to the builder, [R] will be [Unit].
* @property keyExpr The [KeyExpr] to which the subscriber is associated.
Expand All @@ -68,7 +75,7 @@ class Queryable<R> internal constructor(
val keyExpr: KeyExpr, val receiver: R?, private var jniQueryable: JNIQueryable?
) : AutoCloseable, SessionDeclaration {

override fun isValid(): Boolean {
fun isValid(): Boolean {
return jniQueryable != null
}

Expand All @@ -81,10 +88,6 @@ class Queryable<R> internal constructor(
undeclare()
}

protected fun finalize() {
jniQueryable?.close()
}

companion object {

/**
Expand Down Expand Up @@ -119,7 +122,7 @@ class Queryable<R> internal constructor(
private val keyExpr: KeyExpr,
private var callback: Callback<Query>? = null,
private var handler: Handler<Query, R>? = null
): Resolvable<Queryable<R>> {
) : Resolvable<Queryable<R>> {
private var complete: Boolean = false
private var onClose: (() -> Unit)? = null

Expand Down Expand Up @@ -165,7 +168,15 @@ class Queryable<R> internal constructor(
handler?.onClose()
onClose?.invoke()
}
return session.run { resolveQueryable(keyExpr, resolvedCallback, resolvedOnClose, handler?.receiver(), complete) }
return session.run {
resolveQueryable(
keyExpr,
resolvedCallback,
resolvedOnClose,
handler?.receiver(),
complete
)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import io.zenoh.sample.Sample
import kotlinx.coroutines.channels.Channel

/**
* # Subscriber
*
* A subscriber that allows listening to updates on a key expression and reacting to changes.
*
* Its main purpose is to keep the subscription active as long as it exists.
Expand Down Expand Up @@ -56,6 +58,12 @@ import kotlinx.coroutines.channels.Channel
* }
* ```
*
* ## Lifespan
*
* Internally, the [Session] from which the [Subscriber] was declared keeps a reference to it, therefore keeping it alive
* until the session is closed. For the cases where we want to stop the subscriber earlier, it's necessary
* to keep a reference to it in order to undeclare it later.
*
* @param R Receiver type of the [Handler] implementation. If no handler is provided to the builder, R will be [Unit].
* @property keyExpr The [KeyExpr] to which the subscriber is associated.
* @property receiver Optional [R] that is provided when specifying a [Handler] for the subscriber.
Expand All @@ -67,7 +75,7 @@ class Subscriber<R> internal constructor(
val keyExpr: KeyExpr, val receiver: R?, private var jniSubscriber: JNISubscriber?
) : AutoCloseable, SessionDeclaration {

override fun isValid(): Boolean {
fun isValid(): Boolean {
return jniSubscriber != null
}

Expand Down

0 comments on commit e54d723

Please sign in to comment.