Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calling Flow.stateIn with a cancelled scope suspends forever #4322

Open
francescotescari opened this issue Jan 6, 2025 · 2 comments
Open

Comments

@francescotescari
Copy link
Contributor

Describe the bug

Calling Flow.stateIn with a cancelled scope suspends forever. The expected behavior IMHO is for stateIn to rethrow the cancellation exception of the scope, similar to how scope.async { }.await() and CompleteableDeferred(scope.job) behave on a cancelled scope. This behavior happens not only if the scope is already cancelled, but also if it gets cancelled concurrently with stateIn.

The cause of the issue is that stateIn awaits a CompleteableDeferred that is completed exclusively by a coroutine launched (non-atomically) in the collecting (possibly cancelled) scope.

A possible fix is to bind the CompletableDeferred here to the job of the collecting scope (with CompleteableDeferred(scope.coroutineContext[Job])).

Provide a Reproducer

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

suspend fun main() {
    val flow = flowOf(1, 2, 3)
    val cancelledScope = CoroutineScope(EmptyCoroutineContext).apply { cancel() }
    println("Awaiting stateIn...")
    val stateFlow = flow.stateIn(cancelledScope) // Suspends forever
    println("Done!") // Never printed
}

// prints "Awaiting stateIn..." and hangs forever
@francescotescari francescotescari changed the title Calling Flow.stateIn with a cancelled scope suspends forever. Calling Flow.stateIn with a cancelled scope suspends forever Jan 6, 2025
@dkhalanskyjb
Copy link
Collaborator

I agree both that this is a bug and that your fix is the way to go. Would you like to make a PR? If so, could you also make stateIn fail with a NoSuchElementException("Expected at least one element") if the flow collection completes without emitting a single element?

This was a surprisingly difficult decision. Below are my thoughts.


There's one more bug/feature/behavior of the same form:

flow { }.stateIn(CoroutineScope(EmptyCoroutineContext)) // hangs

We can think of stateIn as shareIn that also keeps track of the "current" emission (the same correspondence as StateFlow itself has with SharedFlow). Let's see how shareIn deals with these issues.

If the scope is cancelled when we attempt to wait for the initial value, it hangs:

val scope = CoroutineScope(EmptyCoroutineScope).apply { cancel() }
val flow = flow {
  emit(1)
  yield()
  emit(2)
}
val result = MutableStateFlow<Int>()
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)
result.value = sharedFlow.first() // hangs

If the original flow is empty, attempting to wait for the first value hangs:

val scope = CoroutineScope(EmptyCoroutineScope)
val flow = flow<Int> { }
val result = MutableStateFlow<Int>()
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)
result.value = sharedFlow.first() // hangs

From this point of view, it makes sense that stateIn in a cancelled scope or on an empty flow hangs: the sharing has already started before the function even exits, and obtaining the initial value is the first thing that happens during the sharing. If obtaining the shared value hangs, the whole stateIn also hangs.

This is certainly unintuitive, but can be explained in terms of the provided abstractions.

However, there is a crucial detail: stateIn already exhibits behavior that can't be expressed through shareIn:

val scope = CoroutineScope(EmptyCoroutineScope)
val flow = flow<Int> { error("this flow can't be collected") }
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)
sharedFlow.first() // cancels `scope` and hangs!
val scope = CoroutineScope(EmptyCoroutineScope)
val flow = flow<Int> { error("this flow can't be collected") }
flow.stateIn(scope) // throws an exception

This behavior was introduced in #2329 without much fanfare, but it already breaks the analogy between shareIn and stateIn in favor of propagating issues with the acquisition of the first element. Not detecting an empty flow or a cancelled scope is inconsistent with that decision.

Then the fix you're proposing does seem reasonable:

  • Launching the procedure of obtaining the value with CoroutineStart.ATOMIC is dangerous: the flow may contain some operations that can not be run after we've witnessed the corresponding scope being cancelled (for example, UI updates can't be performed after the corresponding UI component is destroyed).
  • Without collecting the flow, there is nowhere to obtain the StateFlow value from.
  • Throwing a CancellationException is consistent with what happens if cancellation happens after launch but before the first element is emitted.

@francescotescari
Copy link
Contributor Author

Good point, I completely overlooked the empty case, but completely agree with what you wrote.
I'll open a PR soon with the fix for both cases ⚙️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants