Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fencing pre-request fetches #4313

Open
CLOVIS-AI opened this issue Dec 24, 2024 · 4 comments
Open

Fencing pre-request fetches #4313

CLOVIS-AI opened this issue Dec 24, 2024 · 4 comments
Labels

Comments

@CLOVIS-AI
Copy link
Contributor

Use case

A very common pattern I see very often in codebases is having a bunch of independent read-only operations that are pre-requisites to one write operation. Since the read-only operations are independent, we want them to execute concurrently. However, we don't want to execute the write operations if the read-only operations fail.

Without coroutines, this is typically written:

fun foo(u: UserId, b: BarId) {
    val user = userService.load(u)
    val bar = barService.load(b)
    
    doFoo(user, bar)
}

private doFoo(user: User, bar: Bar) { … }

Of course, in the real world, we see examples with more than 10 of such prior read-only operations.

The goal of this issue is to find a canonical way of implementing this pattern with coroutines. The following are a few patterns I considered.

coroutineScope {}

Mentally, launching the read-only operations in a specific coroutineScope {} seems to be exactly what I'm searching for: they all execute concurrently before the write operation can start, and the Coroutines library takes care of cancellation and exceptions. However, since coroutineScope {} introduces a lexical scope, it isn't possible to easily extract the data into local variables, and tricks are necessary.

suspend fun foo(u: UserId, b: BarId) {
    var user: User by notNull()
    var bar: Bar by notNull()

    coroutineScope {
        launch {
            user = userService.load(u)
        }

        launch {
            bar = barService.load(b)
        }
    }

    doFoo(user, bar)
}

This works, but is particularly not fun to write, and the necessity to declare variables as var is not great.

joinAll

Another option is to start a bunch of jobs and use joinAll followed by getCompleted to access the values:

suspend fun foo(u: UserId, b: BarId) = coroutineScope {
    val user = async { userService.load(u) }
    val bar = async { barService.load(b) }

    joinAll(user, bar)

    doFoo(user.getCompleted(), bar.getCompleted())
}

To me, using getCompleted() is application code is a code smell, and in fact it is common to forget to add one of the deferred to the joinAll code, leading to runtime errors.

await

A variation of the previous example, using await() instead of getCompleted() to avoid the need for joinAll():

suspend fun foo(u: UserId, b: BarId) = coroutineScope {
    val user = async { userService.load(u) }
    val bar = async { barService.load(b) }

    doFoo(user.await(), bar.await()) 
}

This is definitely the easiest version to read and write, but it has a few downsides:

  • In the real world, the write operation is often inside some kind of loop, so we end up .await()'ing the same value many times. Looking at the code, it seems .await() spin-locks, so probably not a great idea?
  • The nature of "read-only operations that must finish before the write operation starts" is lost. If the write operation is written in multiple function calls, we risk starting it without awaiting all values.

Arrow parZip

Using Arrow Fx Coroutines, we can use parZip:

suspend fun foo(u: UserId, b: BarId) {
    parZip(
        { userService.load(u) },
        { barService.load(b) }
    ) { user, bar ->
        doFoo(user, bar)
    }
}

I guess this is exactly what I'm asking for, but it doesn't really feel like idiomatic Kotlin code. In particular, the usage of non-trailing lambdas, the declaration of the names in a different place than their contents, and the added indentation in the primary function code.

Another idea?

I don't really feel satisfied with any of the above options (though I have seen them all in production code), but I can't really come up with a satisfying design either. My best idea is something like:

suspend fun foo(u: UserId, b: BarId) {
    val fence = prerequisites()

    val user by fence { userService.load(u) }
    val bar by fence { barService.load(b) }

    fence.close() // suspends

    doFoo(user, bar)
}

where prerequisites() creates a CoroutineScope similar to the one created by coroutineScope {}, which is joined by the call of close(), and the delegates use .getCompleted() under the hood.

Still, this solution will compile even if fence.close() isn't called, breaking at runtime. Also, this solution breaks at run-time if any of the read operations attempts to use the results of another read operation (which only the .await() example above can handle).


What do you think? Is there another pattern I missed? Could this be improved somehow?

@fvasco
Copy link
Contributor

fvasco commented Dec 24, 2024

In the real world, the write operation is often inside some kind of loop, so we end up .await()'ing the same value many times. Looking at the code, it seems .await() spin-locks, so probably not a great idea?

For a completed Deferred, I believe that both await and getCompleted offer the same performance. They use the same code, but note that await executes the while loop only once.

The complexity of await is necessary to handle the join operation internally. However, in the case of getCompleted, you need to invoke join externally.
So I prefer await than joinAll.
parZip looks error prone (positional argument).


Instead of using joinAll(...), you can create a barrier defining:

suspend fun CoroutineScope.joinChildren() {
    coroutineContext.job.children.forEach { it.join() }
    ensureActive() // don't forget!
}

So my tip is:

suspend fun foo(u: UserId, b: BarId) = coroutineScope {
    val user = async { userService.load(u) }
    val bar = async { barService.load(b) }

    joinChildren()
    doFoo(user.getCompleted(), bar.getCompleted())
}
  1. You must always await both user and bar
  2. A bit verbose...

Another idea?

I sometimes used:

suspend fun foo(u: UserId, b: BarId) = coroutineScope {
    val userDeferred = async { userService.load(u) }
    val barDeferred = async { barService.load(b) }

    val user = userDeferred.await()
    val bar = barDeferred.await()
    doFoo(user, bar)
}
  1. Verbose
  2. Copy-paste is error prone

@dkhalanskyjb
Copy link
Collaborator

Thanks for the comment, @fvasco, I do agree that await is the idiomatic option here. I don't think ensureActive is needed, though? https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html says: "This suspending function is cancellable and always checks for a cancellation of the invoking coroutine's Job," and this agrees with the code AFAIK.

In the real world, the write operation is often inside some kind of loop, so we end up .await()'ing the same value many times. Looking at the code, it seems .await() spin-locks, so probably not a great idea?

Nope, .await() doesn't spin-lock, and almost nothing in the library does, especially not something so extensively used. #3635 contains a list of the known spin locks in the library.

The nature of "read-only operations that must finish before the write operation starts" is lost. If the write operation is written in multiple function calls, we risk starting it without awaiting all values.

I'm not sure what you mean. If the write operation requires a value, then how can it be invoked without the value being available? If some processes need to happen before other processes, but the two processes don't exchange data, what I would do would be to create phantom data ("class Process1Completed()") and pass that, but the joinChildren function suggested by @fvasco also seems good.

@CLOVIS-AI
Copy link
Contributor Author

I'm not sure what you mean. If the write operation requires a value, then how can it be invoked without the value being available? If some processes need to happen before other processes, but the two processes don't exchange data, what I would do would be to create phantom data ("class Process1Completed()") and pass that, but the joinChildren function suggested by @fvasco also seems good.

val read1 = async { … }
val read2 = async { … }
val read3 = async { … }

write1(read1.await(), read2.await())
write2(read1.await(), read3.await())

In this situation, write1 may be executed before read3 has had a chance to finish. If read3 fails (thus not all read-operations are successful), write1 may have already happened, which breaks the invariants of the system: the operation is partially done (write1 is executed but write2 isn't) even though it shouldn't have (read3 failed).

I'm not saying the KotlinX.Coroutines library behaves incorrectly in this situation—I'm saying that the code above is buggy because read3.await() should have been executed before write1 to ensure all read operations are successful before starting to write data. I'm searching for a pattern that could eliminate these kinds of bugs.

@fvasco
Copy link
Contributor

fvasco commented Jan 7, 2025

I don't think ensureActive is needed, though?

suspend fun CoroutineScope.joinChildren() {
    for (c in coroutineContext.job.children) c.join()
    // ensureActive() // don't forget!
}

fun main() = runBlocking<Unit>(Dispatchers.Default) {
    val a = async { "a" }
    val b = async<String> { error("b missing") }

    joinChildren()
    println("coroutineContext.isActive = ${coroutineContext.isActive}")
    println("a = " + a.getCompleted())
    println("b = " + b.getCompleted())
}

often fails with

coroutineContext.isActive = false
a = a
Exception in thread "main" java.lang.IllegalStateException: b missing
 at FileKt$main$1$b$1.invokeSuspend (File.kt:10) 
 at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith (ContinuationImpl.kt:33) 
 at kotlinx.coroutines.DispatchedTask.run (DispatchedTask.kt:108) 

Playground: https://play.kotlinlang.org/#eyJ2ZXJzaW9uIjoiMi4xLjAiLCJwbGF0Zm9ybSI6ImphdmEiLCJhcmdzIjoiIiwibm9uZU1hcmtlcnMiOnRydWUsInRoZW1lIjoiaWRlYSIsImNvZGUiOiJpbXBvcnQga290bGlueC5jb3JvdXRpbmVzLipcblxuc3VzcGVuZCBmdW4gQ29yb3V0aW5lU2NvcGUuam9pbkNoaWxkcmVuKCkge1xuICAgIGZvciAoYyBpbiBjb3JvdXRpbmVDb250ZXh0LmpvYi5jaGlsZHJlbikgYy5qb2luKClcbiAgICAvLyBlbnN1cmVBY3RpdmUoKSAvLyBkb24ndCBmb3JnZXQhXG59XG5cbmZ1biBtYWluKCkgPSBydW5CbG9ja2luZzxVbml0PihEaXNwYXRjaGVycy5EZWZhdWx0KSB7XG4gICAgdmFsIGEgPSBhc3luYyB7IFwiYVwiIH1cbiAgICB2YWwgYiA9IGFzeW5jPFN0cmluZz4geyBlcnJvcihcImIgbWlzc2luZ1wiKSB9XG5cbiAgICBqb2luQ2hpbGRyZW4oKVxuICAgIHByaW50bG4oXCJjb3JvdXRpbmVDb250ZXh0LmlzQWN0aXZlID0gJHtjb3JvdXRpbmVDb250ZXh0LmlzQWN0aXZlfVwiKVxuICAgIHByaW50bG4oXCJhID0gXCIgKyBhLmdldENvbXBsZXRlZCgpKVxuICAgIHByaW50bG4oXCJiID0gXCIgKyBiLmdldENvbXBsZXRlZCgpKVxufVxuIn0=

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants