From e16eb9d315cbee42bcadb438a8d62b10f65a9aa4 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 26 Oct 2020 07:05:06 -0700 Subject: [PATCH] Update experimental declarations (#2316) * Gracefully increase deprecation level on Channel operators instead of removing them, a warning was not strict enough * Remove hidden onCompletion from -M release * Promote StateFlow and SharedFlow to stable API * Lift out experimentality where it is applicable * CoroutineDispatcher.invoke * ReceiveChannel.consume and ReceiveChannel.consumeEach * Flow core operators: onStart, onCompletion, onEmpty * CompletableDeferred.completeWith * awaitCancellation * Add experimentality notes where applicable --- .../kotlin/benchmarks/ChannelSinkBenchmark.kt | 18 + .../api/kotlinx-coroutines-core.api | 1 - .../common/src/Builders.common.kt | 1 - .../common/src/CompletableDeferred.kt | 1 - .../common/src/CoroutineStart.kt | 4 +- .../common/src/Debug.common.kt | 2 +- kotlinx-coroutines-core/common/src/Delay.kt | 1 - .../common/src/channels/Broadcast.kt | 2 +- .../common/src/channels/BufferOverflow.kt | 1 - .../common/src/channels/Channels.common.kt | 384 ++++---- .../common/src/flow/Builders.kt | 4 +- .../common/src/flow/Migration.kt | 1 - .../common/src/flow/SharedFlow.kt | 3 - .../common/src/flow/SharingStarted.kt | 6 - .../common/src/flow/StateFlow.kt | 5 +- .../common/src/flow/operators/Emitters.kt | 9 - .../common/src/flow/operators/Share.kt | 6 - .../common/test/channels/ChannelsTest.kt | 524 ---------- .../common/test/channels/ProduceTest.kt | 2 +- .../jvm/test/channels/ChannelsConsumeTest.kt | 908 ------------------ .../jvm/test/channels/ChannelsJvmTest.kt | 4 +- .../kotlinx-coroutines-jdk9/src/Publish.kt | 2 +- .../src/JavaFxConvert.kt | 2 +- 23 files changed, 222 insertions(+), 1669 deletions(-) delete mode 100644 kotlinx-coroutines-core/jvm/test/channels/ChannelsConsumeTest.kt diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt index 9c7f38a6f9..6c5b623191 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt @@ -50,4 +50,22 @@ open class ChannelSinkBenchmark { for (i in start until (start + count)) send(i) } + + // Migrated from deprecated operators, are good only for stressing channels + + private fun ReceiveChannel.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel = + GlobalScope.produce(context, onCompletion = { cancel() }) { + for (e in this@filter) { + if (predicate(e)) send(e) + } + } + + private suspend inline fun ReceiveChannel.fold(initial: R, operation: (acc: R, E) -> R): R { + var accumulator = initial + consumeEach { + accumulator = operation(accumulator, it) + } + return accumulator + } } + diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 06f4396778..b86076fca1 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -992,7 +992,6 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun merge ([Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; - public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun onEmpty (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/Builders.common.kt b/kotlinx-coroutines-core/common/src/Builders.common.kt index c0924a0238..b7deaccb72 100644 --- a/kotlinx-coroutines-core/common/src/Builders.common.kt +++ b/kotlinx-coroutines-core/common/src/Builders.common.kt @@ -175,7 +175,6 @@ public suspend fun withContext( * * This inline function calls [withContext]. */ -@ExperimentalCoroutinesApi public suspend inline operator fun CoroutineDispatcher.invoke( noinline block: suspend CoroutineScope.() -> T ): T = withContext(this, block) diff --git a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt index 0605817afa..2f00847298 100644 --- a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt +++ b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt @@ -57,7 +57,6 @@ public interface CompletableDeferred : Deferred { * This function transitions this deferred in the same ways described by [CompletableDeferred.complete] and * [CompletableDeferred.completeExceptionally]. */ -@ExperimentalCoroutinesApi // since 1.3.2, tentatively until 1.4.0 public fun CompletableDeferred.completeWith(result: Result): Boolean = result.fold({ complete(it) }, { completeExceptionally(it) }) diff --git a/kotlinx-coroutines-core/common/src/CoroutineStart.kt b/kotlinx-coroutines-core/common/src/CoroutineStart.kt index 05e80e3ed7..d5791c79fe 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineStart.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineStart.kt @@ -55,7 +55,7 @@ public enum class CoroutineStart { * Cancellability of coroutine at suspension points depends on the particular implementation details of * suspending functions as in [DEFAULT]. */ - @ExperimentalCoroutinesApi + @ExperimentalCoroutinesApi // Since 1.0.0, no ETA on stability ATOMIC, /** @@ -71,7 +71,7 @@ public enum class CoroutineStart { * * **Note: This is an experimental api.** Execution semantics of coroutines may change in the future when this mode is used. */ - @ExperimentalCoroutinesApi + @ExperimentalCoroutinesApi // Since 1.0.0, no ETA on stability UNDISPATCHED; /** diff --git a/kotlinx-coroutines-core/common/src/Debug.common.kt b/kotlinx-coroutines-core/common/src/Debug.common.kt index 013b983a74..949b05c63f 100644 --- a/kotlinx-coroutines-core/common/src/Debug.common.kt +++ b/kotlinx-coroutines-core/common/src/Debug.common.kt @@ -27,7 +27,7 @@ internal expect fun assert(value: () -> Boolean) * Copy mechanism is used only on JVM, but it might be convenient to implement it in common exceptions, * so on JVM their stacktraces will be properly recovered. */ -@ExperimentalCoroutinesApi +@ExperimentalCoroutinesApi // Since 1.2.0, no ETA on stability public interface CopyableThrowable where T : Throwable, T : CopyableThrowable { /** diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index f7948443fa..aae623d5df 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -95,7 +95,6 @@ public interface Delay { * } * ``` */ -@ExperimentalCoroutinesApi public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {} /** diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt index 790580e0a3..0193ed06b2 100644 --- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt +++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt @@ -47,7 +47,7 @@ public fun ReceiveChannel.broadcast( val scope = GlobalScope + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> } // We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()` // which passes all exceptions upstream to the source ReceiveChannel - return scope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) { + return scope.broadcast(capacity = capacity, start = start, onCompletion = { cancelConsumed(it) }) { for (e in this@broadcast) { send(e) } diff --git a/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt b/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt index 99994ea81b..a89c633fe6 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt @@ -16,7 +16,6 @@ import kotlinx.coroutines.* * * [DROP_LATEST] — drop **the latest** value that is being added to the buffer right now on buffer overflow * (so that buffer contents stay the same), do not suspend. */ -@ExperimentalCoroutinesApi public enum class BufferOverflow { /** * Suspend on buffer overflow. diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index d19028bf63..398d5ca44b 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -3,7 +3,7 @@ */ @file:JvmMultifileClass @file:JvmName("ChannelsKt") -@file:Suppress("DEPRECATION") +@file:Suppress("DEPRECATION_ERROR") package kotlinx.coroutines.channels @@ -52,7 +52,7 @@ public inline fun BroadcastChannel.consume(block: ReceiveChannel.() * to find bugs. */ @Suppress("EXTENSION_SHADOWED_BY_MEMBER") -@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.0 +@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x public suspend fun ReceiveChannel.receiveOrNull(): E? { @Suppress("DEPRECATION", "UNCHECKED_CAST") return (this as ReceiveChannel).receiveOrNull() @@ -68,7 +68,7 @@ public suspend fun ReceiveChannel.receiveOrNull(): E? { * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard * to find bugs. **/ -@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.0 +@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x public fun ReceiveChannel.onReceiveOrNull(): SelectClause1 { @Suppress("DEPRECATION", "UNCHECKED_CAST") return (this as ReceiveChannel).onReceiveOrNull @@ -102,8 +102,8 @@ public suspend inline fun BroadcastChannel.consumeEach(action: (E) -> Uni * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? -> cancelConsumed(cause) @@ -125,8 +125,8 @@ internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler = { cause: Throwable? -> @@ -150,7 +150,6 @@ public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler = * * The operation is _terminal_. */ -@ExperimentalCoroutinesApi // since 1.3.0, tentatively graduates in 1.4.0 public inline fun ReceiveChannel.consume(block: ReceiveChannel.() -> R): R { var cause: Throwable? = null try { @@ -171,7 +170,6 @@ public inline fun ReceiveChannel.consume(block: ReceiveChannel.() - * The operation is _terminal_. * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. */ -@ExperimentalCoroutinesApi // since 1.3.0, tentatively graduates in 1.4.0 public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit): Unit = consume { for (e in this) action(e) @@ -187,8 +185,8 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.consumeEachIndexed(action: (IndexedValue) -> Unit) { var index = 0 @@ -207,8 +205,8 @@ public suspend inline fun ReceiveChannel.consumeEachIndexed(action: (Inde * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.elementAt(index: Int): E = elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") } @@ -223,8 +221,8 @@ public suspend fun ReceiveChannel.elementAt(index: Int): E = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E = consume { @@ -248,8 +246,8 @@ public suspend inline fun ReceiveChannel.elementAtOrElse(index: Int, defa * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.elementAtOrNull(index: Int): E? = consume { @@ -273,8 +271,8 @@ public suspend fun ReceiveChannel.elementAtOrNull(index: Int): E? = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.find(predicate: (E) -> Boolean): E? = firstOrNull(predicate) @@ -289,8 +287,8 @@ public suspend inline fun ReceiveChannel.find(predicate: (E) -> Boolean): * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.findLast(predicate: (E) -> Boolean): E? = lastOrNull(predicate) @@ -306,8 +304,8 @@ public suspend inline fun ReceiveChannel.findLast(predicate: (E) -> Boole * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.first(): E = consume { @@ -328,8 +326,8 @@ public suspend fun ReceiveChannel.first(): E = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.first(predicate: (E) -> Boolean): E { consumeEach { @@ -348,8 +346,8 @@ public suspend inline fun ReceiveChannel.first(predicate: (E) -> Boolean) * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.firstOrNull(): E? = consume { @@ -369,8 +367,8 @@ public suspend fun ReceiveChannel.firstOrNull(): E? = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.firstOrNull(predicate: (E) -> Boolean): E? { consumeEach { @@ -389,8 +387,8 @@ public suspend inline fun ReceiveChannel.firstOrNull(predicate: (E) -> Bo * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.indexOf(element: E): Int { var index = 0 @@ -412,8 +410,8 @@ public suspend fun ReceiveChannel.indexOf(element: E): Int { * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.indexOfFirst(predicate: (E) -> Boolean): Int { var index = 0 @@ -435,8 +433,8 @@ public suspend inline fun ReceiveChannel.indexOfFirst(predicate: (E) -> B * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.indexOfLast(predicate: (E) -> Boolean): Int { var lastIndex = -1 @@ -460,8 +458,8 @@ public suspend inline fun ReceiveChannel.indexOfLast(predicate: (E) -> Bo * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.last(): E = consume { @@ -485,8 +483,8 @@ public suspend fun ReceiveChannel.last(): E = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.last(predicate: (E) -> Boolean): E { var last: E? = null @@ -512,8 +510,8 @@ public suspend inline fun ReceiveChannel.last(predicate: (E) -> Boolean): * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.lastIndexOf(element: E): Int { var lastIndex = -1 @@ -536,8 +534,8 @@ public suspend fun ReceiveChannel.lastIndexOf(element: E): Int { * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.lastOrNull(): E? = consume { @@ -560,8 +558,8 @@ public suspend fun ReceiveChannel.lastOrNull(): E? = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.lastOrNull(predicate: (E) -> Boolean): E? { var last: E? = null @@ -583,8 +581,8 @@ public suspend inline fun ReceiveChannel.lastOrNull(predicate: (E) -> Boo * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.single(): E = consume { @@ -607,8 +605,8 @@ public suspend fun ReceiveChannel.single(): E = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.single(predicate: (E) -> Boolean): E { var single: E? = null @@ -635,8 +633,8 @@ public suspend inline fun ReceiveChannel.single(predicate: (E) -> Boolean * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.singleOrNull(): E? = consume { @@ -659,8 +657,8 @@ public suspend fun ReceiveChannel.singleOrNull(): E? = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.singleOrNull(predicate: (E) -> Boolean): E? { var single: E? = null @@ -686,8 +684,8 @@ public suspend inline fun ReceiveChannel.singleOrNull(predicate: (E) -> B * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { @@ -714,8 +712,8 @@ public fun ReceiveChannel.drop(n: Int, context: CoroutineContext = Dispat * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.dropWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { @@ -740,8 +738,8 @@ public fun ReceiveChannel.dropWhile(context: CoroutineContext = Dispatche * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { @@ -762,8 +760,8 @@ public fun ReceiveChannel.filter(context: CoroutineContext = Dispatchers. * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.filterIndexed(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { @@ -785,8 +783,8 @@ public fun ReceiveChannel.filterIndexed(context: CoroutineContext = Dispa * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C { consumeEachIndexed { (index, element) -> @@ -807,8 +805,8 @@ public suspend inline fun > ReceiveChannel.fil * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C { consumeEachIndexed { (index, element) -> @@ -827,8 +825,8 @@ public suspend inline fun > ReceiveChannel.filterIndexe * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.filterNot(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel = filter(context) { !predicate(it) } @@ -843,8 +841,8 @@ public fun ReceiveChannel.filterNot(context: CoroutineContext = Dispatche * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) @Suppress("UNCHECKED_CAST") public fun ReceiveChannel.filterNotNull(): ReceiveChannel = @@ -860,8 +858,8 @@ public fun ReceiveChannel.filterNotNull(): ReceiveChannel = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun > ReceiveChannel.filterNotNullTo(destination: C): C { consumeEach { @@ -880,8 +878,8 @@ public suspend fun > ReceiveChannel.fil * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun > ReceiveChannel.filterNotNullTo(destination: C): C { consumeEach { @@ -900,8 +898,8 @@ public suspend fun > ReceiveChannel.filterNotNul * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.filterNotTo(destination: C, predicate: (E) -> Boolean): C { consumeEach { @@ -920,8 +918,8 @@ public suspend inline fun > ReceiveChannel.fil * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.filterNotTo(destination: C, predicate: (E) -> Boolean): C { consumeEach { @@ -940,8 +938,8 @@ public suspend inline fun > ReceiveChannel.filterNotTo( * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.filterTo(destination: C, predicate: (E) -> Boolean): C { consumeEach { @@ -960,8 +958,8 @@ public suspend inline fun > ReceiveChannel.fil * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.filterTo(destination: C, predicate: (E) -> Boolean): C { consumeEach { @@ -980,8 +978,8 @@ public suspend inline fun > ReceiveChannel.filterTo(des * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { @@ -1006,8 +1004,8 @@ public fun ReceiveChannel.take(n: Int, context: CoroutineContext = Dispat * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.takeWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { @@ -1032,8 +1030,8 @@ public fun ReceiveChannel.takeWhile(context: CoroutineContext = Dispatche * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.associate(transform: (E) -> Pair): Map = associateTo(LinkedHashMap(), transform) @@ -1053,8 +1051,8 @@ public suspend inline fun ReceiveChannel.associate(transform: (E) - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.associateBy(keySelector: (E) -> K): Map = associateByTo(LinkedHashMap(), keySelector) @@ -1073,8 +1071,8 @@ public suspend inline fun ReceiveChannel.associateBy(keySelector: (E) * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map = associateByTo(LinkedHashMap(), keySelector, valueTransform) @@ -1093,8 +1091,8 @@ public suspend inline fun ReceiveChannel.associateBy(keySelector: ( * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.associateByTo(destination: M, keySelector: (E) -> K): M { consumeEach { @@ -1117,8 +1115,8 @@ public suspend inline fun > ReceiveChannel.a * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M { consumeEach { @@ -1140,8 +1138,8 @@ public suspend inline fun > ReceiveChannel> ReceiveChannel.associateTo(destination: M, transform: (E) -> Pair): M { consumeEach { @@ -1161,8 +1159,8 @@ public suspend inline fun > ReceiveChannel> ReceiveChannel.toChannel(destination: C): C { consumeEach { @@ -1181,8 +1179,8 @@ public suspend fun > ReceiveChannel.toChannel(destinati * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun > ReceiveChannel.toCollection(destination: C): C { consumeEach { @@ -1210,8 +1208,8 @@ public suspend fun ReceiveChannel.toList(): List = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel>.toMap(): Map = toMap(LinkedHashMap()) @@ -1226,8 +1224,8 @@ public suspend fun ReceiveChannel>.toMap(): Map = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun > ReceiveChannel>.toMap(destination: M): M { consumeEach { @@ -1246,8 +1244,8 @@ public suspend fun > ReceiveChannel> * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.toMutableList(): MutableList = toCollection(ArrayList()) @@ -1264,8 +1262,8 @@ public suspend fun ReceiveChannel.toMutableList(): MutableList = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.toSet(): Set = this.toMutableSet() @@ -1280,8 +1278,8 @@ public suspend fun ReceiveChannel.toSet(): Set = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.flatMap(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> ReceiveChannel): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { @@ -1303,8 +1301,8 @@ public fun ReceiveChannel.flatMap(context: CoroutineContext = Dispatch * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.groupBy(keySelector: (E) -> K): Map> = groupByTo(LinkedHashMap(), keySelector) @@ -1323,8 +1321,8 @@ public suspend inline fun ReceiveChannel.groupBy(keySelector: (E) -> K * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map> = groupByTo(LinkedHashMap(), keySelector, valueTransform) @@ -1342,8 +1340,8 @@ public suspend inline fun ReceiveChannel.groupBy(keySelector: (E) - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun >> ReceiveChannel.groupByTo(destination: M, keySelector: (E) -> K): M { consumeEach { @@ -1368,8 +1366,8 @@ public suspend inline fun >> ReceiveCh * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun >> ReceiveChannel.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M { consumeEach { @@ -1388,8 +1386,8 @@ public suspend inline fun >> Receiv * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.map(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { @@ -1411,8 +1409,8 @@ public fun ReceiveChannel.map(context: CoroutineContext = Dispatchers. * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.mapIndexed(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { @@ -1435,8 +1433,8 @@ public fun ReceiveChannel.mapIndexed(context: CoroutineContext = Dispa * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.mapIndexedNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel = mapIndexed(context, transform).filterNotNull() @@ -1454,8 +1452,8 @@ public fun ReceiveChannel.mapIndexedNotNull(context: CoroutineCo * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C { consumeEachIndexed { (index, element) -> @@ -1477,8 +1475,8 @@ public suspend inline fun > ReceiveChann * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C { consumeEachIndexed { (index, element) -> @@ -1500,8 +1498,8 @@ public suspend inline fun > ReceiveChannel.map * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C { var index = 0 @@ -1524,8 +1522,8 @@ public suspend inline fun > ReceiveChannel. * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C { var index = 0 @@ -1546,8 +1544,8 @@ public suspend inline fun > ReceiveChannel.mapIndexe * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.mapNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R?): ReceiveChannel = map(context, transform).filterNotNull() @@ -1563,8 +1561,8 @@ public fun ReceiveChannel.mapNotNull(context: CoroutineContext = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.mapNotNullTo(destination: C, transform: (E) -> R?): C { consumeEach { @@ -1584,8 +1582,8 @@ public suspend inline fun > ReceiveChann * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.mapNotNullTo(destination: C, transform: (E) -> R?): C { consumeEach { @@ -1605,8 +1603,8 @@ public suspend inline fun > ReceiveChannel.map * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.mapTo(destination: C, transform: (E) -> R): C { consumeEach { @@ -1626,8 +1624,8 @@ public suspend inline fun > ReceiveChannel. * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.mapTo(destination: C, transform: (E) -> R): C { consumeEach { @@ -1646,8 +1644,8 @@ public suspend inline fun > ReceiveChannel.mapTo(des * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel> = GlobalScope.produce(context, onCompletion = consumes()) { @@ -1669,8 +1667,8 @@ public fun ReceiveChannel.withIndex(context: CoroutineContext = Dispatche * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.distinct(): ReceiveChannel = this.distinctBy { it } @@ -1688,8 +1686,8 @@ public fun ReceiveChannel.distinct(): ReceiveChannel = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.distinctBy(context: CoroutineContext = Dispatchers.Unconfined, selector: suspend (E) -> K): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumes()) { @@ -1715,8 +1713,8 @@ public fun ReceiveChannel.distinctBy(context: CoroutineContext = Dispa * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.toMutableSet(): MutableSet = toCollection(LinkedHashSet()) @@ -1731,8 +1729,8 @@ public suspend fun ReceiveChannel.toMutableSet(): MutableSet = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.all(predicate: (E) -> Boolean): Boolean { consumeEach { @@ -1751,8 +1749,8 @@ public suspend inline fun ReceiveChannel.all(predicate: (E) -> Boolean): * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.any(): Boolean = consume { @@ -1769,8 +1767,8 @@ public suspend fun ReceiveChannel.any(): Boolean = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.any(predicate: (E) -> Boolean): Boolean { consumeEach { @@ -1789,8 +1787,8 @@ public suspend inline fun ReceiveChannel.any(predicate: (E) -> Boolean): * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.count(): Int { var count = 0 @@ -1808,8 +1806,8 @@ public suspend fun ReceiveChannel.count(): Int { * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.count(predicate: (E) -> Boolean): Int { var count = 0 @@ -1829,8 +1827,8 @@ public suspend inline fun ReceiveChannel.count(predicate: (E) -> Boolean) * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.fold(initial: R, operation: (acc: R, E) -> R): R { var accumulator = initial @@ -1853,8 +1851,8 @@ public suspend inline fun ReceiveChannel.fold(initial: R, operation: ( * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R { var index = 0 @@ -1875,8 +1873,8 @@ public suspend inline fun ReceiveChannel.foldIndexed(initial: R, opera * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.maxBy(selector: (E) -> R): E? = consume { @@ -1905,8 +1903,8 @@ public suspend inline fun > ReceiveChannel.maxBy(selecto * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.maxWith(comparator: Comparator): E? = consume { @@ -1930,8 +1928,8 @@ public suspend fun ReceiveChannel.maxWith(comparator: Comparator): * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun > ReceiveChannel.minBy(selector: (E) -> R): E? = consume { @@ -1960,8 +1958,8 @@ public suspend inline fun > ReceiveChannel.minBy(selecto * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.minWith(comparator: Comparator): E? = consume { @@ -1985,8 +1983,8 @@ public suspend fun ReceiveChannel.minWith(comparator: Comparator): * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend fun ReceiveChannel.none(): Boolean = consume { @@ -2003,8 +2001,8 @@ public suspend fun ReceiveChannel.none(): Boolean = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.none(predicate: (E) -> Boolean): Boolean { consumeEach { @@ -2023,8 +2021,8 @@ public suspend inline fun ReceiveChannel.none(predicate: (E) -> Boolean): * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.reduce(operation: (acc: S, E) -> S): S = consume { @@ -2050,8 +2048,8 @@ public suspend inline fun ReceiveChannel.reduce(operation: (acc: S * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S = consume { @@ -2075,8 +2073,8 @@ public suspend inline fun ReceiveChannel.reduceIndexed(operation: * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.sumBy(selector: (E) -> Int): Int { var sum = 0 @@ -2096,8 +2094,8 @@ public suspend inline fun ReceiveChannel.sumBy(selector: (E) -> Int): Int * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.sumByDouble(selector: (E) -> Double): Double { var sum = 0.0 @@ -2117,8 +2115,8 @@ public suspend inline fun ReceiveChannel.sumByDouble(selector: (E) -> Dou * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.requireNoNulls(): ReceiveChannel = map { it ?: throw IllegalArgumentException("null element found in $this.") } @@ -2135,8 +2133,8 @@ public fun ReceiveChannel.requireNoNulls(): ReceiveChannel = * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public suspend inline fun ReceiveChannel.partition(predicate: (E) -> Boolean): Pair, List> { val first = ArrayList() @@ -2162,8 +2160,8 @@ public suspend inline fun ReceiveChannel.partition(predicate: (E) -> Bool * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public infix fun ReceiveChannel.zip(other: ReceiveChannel): ReceiveChannel> = zip(other) { t1, t2 -> t1 to t2 } @@ -2178,8 +2176,8 @@ public infix fun ReceiveChannel.zip(other: ReceiveChannel): Receive * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4", - level = DeprecationLevel.WARNING + message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", + level = DeprecationLevel.ERROR ) public fun ReceiveChannel.zip(other: ReceiveChannel, context: CoroutineContext = Dispatchers.Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel = GlobalScope.produce(context, onCompletion = consumesAll(this, other)) { diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 7e47e6947a..7d84cd2105 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -204,8 +204,8 @@ public fun LongRange.asFlow(): Flow = flow { @FlowPreview @Deprecated( message = "Use channelFlow with awaitClose { } instead of flowViaChannel and invokeOnClose { }.", - level = DeprecationLevel.WARNING -) + level = DeprecationLevel.ERROR +) // To be removed in 1.4.x @Suppress("DeprecatedCallableAddReplaceWith") public fun flowViaChannel( bufferSize: Int = BUFFERED, diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt index 490e88265c..11969a48fa 100644 --- a/kotlinx-coroutines-core/common/src/flow/Migration.kt +++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt @@ -434,7 +434,6 @@ public fun Flow.switchMap(transform: suspend (value: T) -> Flow): F message = "'scanReduce' was renamed to 'runningReduce' to be consistent with Kotlin standard library", replaceWith = ReplaceWith("runningReduce(operation)") ) -@ExperimentalCoroutinesApi public fun Flow.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow = runningReduce(operation) @Deprecated( diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 7167971429..427041a7bb 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -108,7 +108,6 @@ import kotlin.native.concurrent.* * might be added to this interface in the future, but is stable for use. * Use the `MutableSharedFlow(replay, ...)` constructor function to create an implementation. */ -@ExperimentalCoroutinesApi public interface SharedFlow : Flow { /** * A snapshot of the replay cache. @@ -138,7 +137,6 @@ public interface SharedFlow : Flow { * might be added to this interface in the future, but is stable for use. * Use the `MutableSharedFlow(...)` constructor function to create an implementation. */ -@ExperimentalCoroutinesApi public interface MutableSharedFlow : SharedFlow, FlowCollector { /** * Tries to emit a [value] to this shared flow without suspending. It returns `true` if the value was @@ -202,7 +200,6 @@ public interface MutableSharedFlow : SharedFlow, FlowCollector { * supported only when `replay > 0` or `extraBufferCapacity > 0`). */ @Suppress("FunctionName", "UNCHECKED_CAST") -@ExperimentalCoroutinesApi public fun MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, diff --git a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt index 935efdae2b..19e5fa36c7 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt @@ -12,7 +12,6 @@ import kotlin.time.* * A command emitted by [SharingStarted] implementations to control the sharing coroutine in * the [shareIn] and [stateIn] operators. */ -@ExperimentalCoroutinesApi public enum class SharingCommand { /** * Starts sharing, launching collection of the upstream flow. @@ -75,19 +74,16 @@ public enum class SharingCommand { * The completion of the `command` flow normally has no effect (the upstream flow keeps running if it was running). * The failure of the `command` flow cancels the sharing coroutine and the upstream flow. */ -@ExperimentalCoroutinesApi public interface SharingStarted { public companion object { /** * Sharing is started immediately and never stops. */ - @ExperimentalCoroutinesApi public val Eagerly: SharingStarted = StartedEagerly() /** * Sharing is started when the first subscriber appears and never stops. */ - @ExperimentalCoroutinesApi public val Lazily: SharingStarted = StartedLazily() /** @@ -108,7 +104,6 @@ public interface SharingStarted { * are negative. */ @Suppress("FunctionName") - @ExperimentalCoroutinesApi public fun WhileSubscribed( stopTimeoutMillis: Long = 0, replayExpirationMillis: Long = Long.MAX_VALUE @@ -143,7 +138,6 @@ public interface SharingStarted { */ @Suppress("FunctionName") @ExperimentalTime -@ExperimentalCoroutinesApi public fun SharingStarted.Companion.WhileSubscribed( stopTimeout: Duration = Duration.ZERO, replayExpiration: Duration = Duration.INFINITE diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt index 8587606633..a9a4ed3d24 100644 --- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -135,7 +135,6 @@ import kotlin.native.concurrent.* * might be added to this interface in the future, but is stable for use. * Use the `MutableStateFlow(value)` constructor function to create an implementation. */ -@ExperimentalCoroutinesApi public interface StateFlow : SharedFlow { /** * The current value of this state flow. @@ -156,7 +155,6 @@ public interface StateFlow : SharedFlow { * might be added to this interface in the future, but is stable for use. * Use the `MutableStateFlow()` constructor function to create an implementation. */ -@ExperimentalCoroutinesApi public interface MutableStateFlow : StateFlow, MutableSharedFlow { /** * The current value of this state flow. @@ -180,7 +178,6 @@ public interface MutableStateFlow : StateFlow, MutableSharedFlow { * Creates a [MutableStateFlow] with the given initial [value]. */ @Suppress("FunctionName") -@ExperimentalCoroutinesApi public fun MutableStateFlow(value: T): MutableStateFlow = StateFlowImpl(value ?: NULL) // ------------------------------------ Implementation ------------------------------------ @@ -380,4 +377,4 @@ internal fun StateFlow.fuseStateFlow( return this } return fuseSharedFlow(context, capacity, onBufferOverflow) -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt index 8be19f08e0..244af9a7f5 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt @@ -71,7 +71,6 @@ internal inline fun Flow.unsafeTransform( * .collect { println(it) } // prints Begin, a, b, c * ``` */ -@ExperimentalCoroutinesApi public fun Flow.onStart( action: suspend FlowCollector.() -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action @@ -142,7 +141,6 @@ public fun Flow.onStart( * In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception. * Use [catch] if you need to suppress failure and replace it with emission of elements. */ -@ExperimentalCoroutinesApi public fun Flow.onCompletion( action: suspend FlowCollector.(cause: Throwable?) -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action @@ -178,7 +176,6 @@ public fun Flow.onCompletion( * }.collect { println(it) } // prints 1, 2 * ``` */ -@ExperimentalCoroutinesApi public fun Flow.onEmpty( action: suspend FlowCollector.() -> Unit ): Flow = unsafeFlow { @@ -203,12 +200,6 @@ private class ThrowingCollector(private val e: Throwable) : FlowCollector } } -// It was only released in 1.3.0-M2, remove in 1.4.0 -/** @suppress */ -@Deprecated(level = DeprecationLevel.HIDDEN, message = "binary compatibility with a version w/o FlowCollector receiver") -public fun Flow.onCompletion(action: suspend (cause: Throwable?) -> Unit): Flow = - onCompletion { action(it) } - private suspend fun FlowCollector.invokeSafely( action: suspend FlowCollector.(cause: Throwable?) -> Unit, cause: Throwable? diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt index 6351d4a89b..fe737a5bd1 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt @@ -132,7 +132,6 @@ import kotlin.jvm.* * @param started the strategy that controls when sharing is started and stopped. * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero). */ -@ExperimentalCoroutinesApi public fun Flow.shareIn( scope: CoroutineScope, started: SharingStarted, @@ -297,7 +296,6 @@ private fun CoroutineScope.launchSharing( * This value is also used when the state flow is reset using the [SharingStarted.WhileSubscribed] strategy * with the `replayExpirationMillis` parameter. */ -@ExperimentalCoroutinesApi public fun Flow.stateIn( scope: CoroutineScope, started: SharingStarted, @@ -316,7 +314,6 @@ public fun Flow.stateIn( * * @param scope the coroutine scope in which sharing is started. */ -@ExperimentalCoroutinesApi public suspend fun Flow.stateIn(scope: CoroutineScope): StateFlow { val config = configureSharing(1) val result = CompletableDeferred>() @@ -353,14 +350,12 @@ private fun CoroutineScope.launchSharingDeferred( /** * Represents this mutable shared flow as a read-only shared flow. */ -@ExperimentalCoroutinesApi public fun MutableSharedFlow.asSharedFlow(): SharedFlow = ReadonlySharedFlow(this) /** * Represents this mutable state flow as a read-only state flow. */ -@ExperimentalCoroutinesApi public fun MutableStateFlow.asStateFlow(): StateFlow = ReadonlyStateFlow(this) @@ -391,7 +386,6 @@ private class ReadonlyStateFlow( * * The receiver of the [action] is [FlowCollector], so `onSubscription` can emit additional elements. */ -@ExperimentalCoroutinesApi public fun SharedFlow.onSubscription(action: suspend FlowCollector.() -> Unit): SharedFlow = SubscribedSharedFlow(this, action) diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt index ba786d53cc..fb704c5b86 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt @@ -50,123 +50,6 @@ class ChannelsTest: TestBase() { finish(7) } - @Test - fun testAssociate() = runTest { - assertEquals(testList.associate { it * 2 to it * 3 }, - testList.asReceiveChannel().associate { it * 2 to it * 3 }.toMap()) - } - - @Test - fun testAssociateBy() = runTest { - assertEquals(testList.associateBy { it % 2 }, testList.asReceiveChannel().associateBy { it % 2 }) - } - - @Test - fun testAssociateBy2() = runTest { - assertEquals(testList.associateBy({ it * 2}, { it * 3 }), - testList.asReceiveChannel().associateBy({ it * 2}, { it * 3 }).toMap()) - } - - @Test - fun testDistinct() = runTest { - assertEquals(testList.map { it % 2 }.distinct(), testList.asReceiveChannel().map { it % 2 }.distinct().toList()) - } - - @Test - fun testDistinctBy() = runTest { - assertEquals(testList.distinctBy { it % 2 }.toList(), testList.asReceiveChannel().distinctBy { it % 2 }.toList()) - } - - @Test - fun testToCollection() = runTest { - val target = mutableListOf() - testList.asReceiveChannel().toCollection(target) - assertEquals(testList, target) - } - - @Test - fun testDrop() = runTest { - for (i in 0..testList.size) { - assertEquals(testList.drop(i), testList.asReceiveChannel().drop(i).toList(), "Drop $i") - } - } - - @Test - fun testElementAtOrElse() = runTest { - assertEquals(testList.elementAtOrElse(2) { 42 }, testList.asReceiveChannel().elementAtOrElse(2) { 42 }) - assertEquals(testList.elementAtOrElse(9) { 42 }, testList.asReceiveChannel().elementAtOrElse(9) { 42 }) - } - - @Test - fun testFirst() = runTest { - assertEquals(testList.first(), testList.asReceiveChannel().first()) - for (i in testList) { - assertEquals(testList.first { it == i }, testList.asReceiveChannel().first { it == i }) - } - try { - testList.asReceiveChannel().first { it == 9 } - fail() - } catch (nse: NoSuchElementException) { - } - } - - @Test - fun testFirstOrNull() = runTest { - assertEquals(testList.firstOrNull(), testList.asReceiveChannel().firstOrNull()) - assertEquals(testList.firstOrNull { it == 2 }, testList.asReceiveChannel().firstOrNull { it == 2 }) - assertEquals(testList.firstOrNull { it == 9 }, testList.asReceiveChannel().firstOrNull { it == 9 }) - } - - @Test - fun testFlatMap() = runTest { - assertEquals(testList.flatMap { (0..it).toList() }, testList.asReceiveChannel().flatMap { (0..it).asReceiveChannel() }.toList()) - - } - - @Test - fun testFold() = runTest { - assertEquals(testList.fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }, - testList.asReceiveChannel().fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }.toList()) - } - - @Test - fun testFoldIndexed() = runTest { - assertEquals(testList.foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } }, - testList.asReceiveChannel().foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } }.toList()) - } - - @Test - fun testGroupBy() = runTest { - assertEquals(testList.groupBy { it % 2 }, testList.asReceiveChannel().groupBy { it % 2 }) - } - - @Test - fun testGroupBy2() = runTest { - assertEquals(testList.groupBy({ -it }, { it + 100 }), testList.asReceiveChannel().groupBy({ -it }, { it + 100 }).toMap()) - - } - - @Test - fun testMap() = runTest { - assertEquals(testList.map { it + 10 }, testList.asReceiveChannel().map { it + 10 }.toList()) - - } - - @Test - fun testMapToCollection() = runTest { - val c = mutableListOf() - testList.asReceiveChannel().mapTo(c) { it + 10 } - assertEquals(testList.map { it + 10 }, c) - } - - @Test - fun testMapToSendChannel() = runTest { - val c = produce { - testList.asReceiveChannel().mapTo(channel) { it + 10 } - } - assertEquals(testList.map { it + 10 }, c.toList()) - } - @Test fun testEmptyList() = runTest { assertTrue(emptyList().asReceiveChannel().toList().isEmpty()) @@ -178,413 +61,6 @@ class ChannelsTest: TestBase() { } - @Test - fun testEmptySet() = runTest { - assertTrue(emptyList().asReceiveChannel().toSet().isEmpty()) - - } - - @Test - fun testToSet() = runTest { - assertEquals(testList.toSet(), testList.asReceiveChannel().toSet()) - } - - @Test - fun testToMutableSet() = runTest { - assertEquals(testList.toMutableSet(), testList.asReceiveChannel().toMutableSet()) - } - - @Test - fun testEmptySequence() = runTest { - val channel = Channel() - channel.close() - - assertEquals(emptyList().asReceiveChannel().count(), 0) - } - - @Test - fun testEmptyMap() = runTest { - val channel = Channel>() - channel.close() - - assertTrue(channel.toMap().isEmpty()) - } - - @Test - fun testToMap() = runTest { - val values = testList.map { it to it.toString() } - assertEquals(values.toMap(), values.asReceiveChannel().toMap()) - } - - @Test - fun testReduce() = runTest { - assertEquals(testList.reduce { acc, e -> acc * e }, - testList.asReceiveChannel().reduce { acc, e -> acc * e }) - } - - @Test - fun testReduceIndexed() = runTest { - assertEquals(testList.reduceIndexed { index, acc, e -> index + acc * e }, - testList.asReceiveChannel().reduceIndexed { index, acc, e -> index + acc * e }) - } - - @Test - fun testTake() = runTest { - for (i in 0..testList.size) { - assertEquals(testList.take(i), testList.asReceiveChannel().take(i).toList()) - } - } - - @Test - fun testPartition() = runTest { - assertEquals(testList.partition { it % 2 == 0 }, testList.asReceiveChannel().partition { it % 2 == 0 }) - } - - @Test - fun testZip() = runTest { - val other = listOf("a", "b") - assertEquals(testList.zip(other), testList.asReceiveChannel().zip(other.asReceiveChannel()).toList()) - } - - @Test - fun testElementAt() = runTest { - testList.indices.forEach { i -> - assertEquals(testList[i], testList.asReceiveChannel().elementAt(i)) - } - } - - @Test - fun testElementAtOrNull() = runTest { - testList.indices.forEach { i -> - assertEquals(testList[i], testList.asReceiveChannel().elementAtOrNull(i)) - } - assertNull(testList.asReceiveChannel().elementAtOrNull(-1)) - assertNull(testList.asReceiveChannel().elementAtOrNull(testList.size)) - } - - @Test - fun testFind() = runTest { - repeat(3) { mod -> - assertEquals(testList.find { it % 2 == mod }, - testList.asReceiveChannel().find { it % 2 == mod }) - } - } - - @Test - fun testFindLast() = runTest { - repeat(3) { mod -> - assertEquals(testList.findLast { it % 2 == mod }, testList.asReceiveChannel().findLast { it % 2 == mod }) - } - } - - @Test - fun testIndexOf() = runTest { - repeat(testList.size + 1) { i -> - assertEquals(testList.indexOf(i), testList.asReceiveChannel().indexOf(i)) - } - } - - @Test - fun testLastIndexOf() = runTest { - repeat(testList.size + 1) { i -> - assertEquals(testList.lastIndexOf(i), testList.asReceiveChannel().lastIndexOf(i)) - } - } - - @Test - fun testIndexOfFirst() = runTest { - repeat(3) { mod -> - assertEquals(testList.indexOfFirst { it % 2 == mod }, - testList.asReceiveChannel().indexOfFirst { it % 2 == mod }) - } - } - - @Test - fun testIndexOfLast() = runTest { - repeat(3) { mod -> - assertEquals(testList.indexOfLast { it % 2 != mod }, - testList.asReceiveChannel().indexOfLast { it % 2 != mod }) - } - } - - @Test - fun testLastOrNull() = runTest { - assertEquals(testList.lastOrNull(), testList.asReceiveChannel().lastOrNull()) - assertNull(emptyList().asReceiveChannel().lastOrNull()) - } - - @Test - fun testSingleOrNull() = runTest { - assertEquals(1, listOf(1).asReceiveChannel().singleOrNull()) - assertNull(listOf(1, 2).asReceiveChannel().singleOrNull()) - assertNull(emptyList().asReceiveChannel().singleOrNull()) - repeat(testList.size + 1) { i -> - assertEquals(testList.singleOrNull { it == i }, - testList.asReceiveChannel().singleOrNull { it == i }) - } - repeat(3) { mod -> - assertEquals(testList.singleOrNull { it % 2 == mod }, - testList.asReceiveChannel().singleOrNull { it % 2 == mod }) - } - } - - @Test - fun testDropWhile() = runTest { - repeat(3) { mod -> - assertEquals(testList.dropWhile { it % 2 == mod }, - testList.asReceiveChannel().dropWhile { it % 2 == mod }.toList()) - } - } - - @Test - fun testFilter() = runTest { - repeat(3) { mod -> - assertEquals(testList.filter { it % 2 == mod }, - testList.asReceiveChannel().filter { it % 2 == mod }.toList()) - } - } - - @Test - fun testFilterToCollection() = runTest { - repeat(3) { mod -> - val c = mutableListOf() - testList.asReceiveChannel().filterTo(c) { it % 2 == mod } - assertEquals(testList.filter { it % 2 == mod }, c) - } - } - - @Test - fun testFilterToSendChannel() = runTest { - repeat(3) { mod -> - val c = produce { - testList.asReceiveChannel().filterTo(channel) { it % 2 == mod } - } - assertEquals(testList.filter { it % 2 == mod }, c.toList()) - } - } - - @Test - fun testFilterNot() = runTest { - repeat(3) { mod -> - assertEquals(testList.filterNot { it % 2 == mod }, - testList.asReceiveChannel().filterNot { it % 2 == mod }.toList()) - } - } - - @Test - fun testFilterNotToCollection() = runTest { - repeat(3) { mod -> - val c = mutableListOf() - testList.asReceiveChannel().filterNotTo(c) { it % 2 == mod } - assertEquals(testList.filterNot { it % 2 == mod }, c) - } - } - - @Test - fun testFilterNotToSendChannel() = runTest { - repeat(3) { mod -> - val c = produce { - testList.asReceiveChannel().filterNotTo(channel) { it % 2 == mod } - } - assertEquals(testList.filterNot { it % 2 == mod }, c.toList()) - } - } - - @Test - fun testFilterNotNull() = runTest { - repeat(3) { mod -> - assertEquals( - testList.mapNotNull { it.takeIf { it % 2 == mod } }, - testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNull().toList()) - } - } - - @Test - fun testFilterNotNullToCollection() = runTest { - repeat(3) { mod -> - val c = mutableListOf() - testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(c) - assertEquals(testList.mapNotNull { it.takeIf { it % 2 == mod } }, c) - } - } - - @Test - fun testFilterNotNullToSendChannel() = runTest { - repeat(3) { mod -> - val c = produce { - testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(channel) - } - assertEquals(testList.mapNotNull { it.takeIf { it % 2 == mod } }, c.toList()) - } - } - - @Test - fun testFilterIndexed() = runTest { - repeat(3) { mod -> - assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod }, - testList.asReceiveChannel().filterIndexed { index, _ -> index % 2 == mod }.toList()) - } - } - - @Test - fun testFilterIndexedToCollection() = runTest { - repeat(3) { mod -> - val c = mutableListOf() - testList.asReceiveChannel().filterIndexedTo(c) { index, _ -> index % 2 == mod } - assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod }, c) - } - } - - @Test - fun testFilterIndexedToChannel() = runTest { - repeat(3) { mod -> - val c = produce { - testList.asReceiveChannel().filterIndexedTo(channel) { index, _ -> index % 2 == mod } - } - assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod }, c.toList()) - } - } - - @Test - fun testTakeWhile() = runTest { - repeat(3) { mod -> - assertEquals(testList.takeWhile { it % 2 != mod }, - testList.asReceiveChannel().takeWhile { it % 2 != mod }.toList()) - } - } - - @Test - fun testToChannel() = runTest { - val c = produce { - testList.asReceiveChannel().toChannel(channel) - } - assertEquals(testList, c.toList()) - } - - @Test - fun testMapIndexed() = runTest { - assertEquals(testList.mapIndexed { index, i -> index + i }, - testList.asReceiveChannel().mapIndexed { index, i -> index + i }.toList()) - } - - @Test - fun testMapIndexedToCollection() = runTest { - val c = mutableListOf() - testList.asReceiveChannel().mapIndexedTo(c) { index, i -> index + i } - assertEquals(testList.mapIndexed { index, i -> index + i }, c) - } - - @Test - fun testMapIndexedToSendChannel() = runTest { - val c = produce { - testList.asReceiveChannel().mapIndexedTo(channel) { index, i -> index + i } - } - assertEquals(testList.mapIndexed { index, i -> index + i }, c.toList()) - } - - @Test - fun testMapNotNull() = runTest { - repeat(3) { mod -> - assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, - testList.asReceiveChannel().mapNotNull { i -> i.takeIf { i % 2 == mod } }.toList()) - } - } - - @Test - fun testMapNotNullToCollection() = runTest { - repeat(3) { mod -> - val c = mutableListOf() - testList.asReceiveChannel().mapNotNullTo(c) { i -> i.takeIf { i % 2 == mod } } - assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c) - } - } - - @Test - fun testMapNotNullToSendChannel() = runTest { - repeat(3) { mod -> - val c = produce { - testList.asReceiveChannel().mapNotNullTo(channel) { i -> i.takeIf { i % 2 == mod } } - } - assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, c.toList()) - } - } - - @Test - fun testMapIndexedNotNull() = runTest { - repeat(3) { mod -> - assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, - testList.asReceiveChannel().mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }.toList()) - } - } - - @Test - fun testMapIndexedNotNullToCollection() = runTest { - repeat(3) { mod -> - val c = mutableListOf() - testList.asReceiveChannel().mapIndexedNotNullTo(c) { index, i -> index.takeIf { i % 2 == mod } } - assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c) - } - } - - @Test - fun testMapIndexedNotNullToSendChannel() = runTest { - repeat(3) { mod -> - val c = produce { - testList.asReceiveChannel().mapIndexedNotNullTo(channel) { index, i -> index.takeIf { i % 2 == mod } } - } - assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, c.toList()) - } - } - - @Test - fun testWithIndex() = runTest { - assertEquals(testList.withIndex().toList(), testList.asReceiveChannel().withIndex().toList()) - } - - @Test - fun testMaxBy() = runTest { - assertEquals(testList.maxBy { 10 - abs(it - 2) }, - testList.asReceiveChannel().maxBy { 10 - abs(it - 2) }) - } - - @Test - fun testMaxWith() = runTest { - val cmp = compareBy { 10 - abs(it - 2) } - assertEquals(testList.maxWith(cmp), - testList.asReceiveChannel().maxWith(cmp)) - } - - @Test - fun testMinBy() = runTest { - assertEquals(testList.minBy { abs(it - 2) }, - testList.asReceiveChannel().minBy { abs(it - 2) }) - } - - @Test - fun testMinWith() = runTest { - val cmp = compareBy { abs(it - 2) } - assertEquals(testList.minWith(cmp), - testList.asReceiveChannel().minWith(cmp)) - } - - @Test - fun testSumBy() = runTest { - assertEquals(testList.sumBy { it * 3 }, - testList.asReceiveChannel().sumBy { it * 3 }) - } - - @Test - fun testSumByDouble() = runTest { - val expected = testList.sumByDouble { it * 3.0 } - val actual = testList.asReceiveChannel().sumByDouble { it * 3.0 } - assertEquals(expected, actual) - } - - @Test - fun testRequireNoNulls() = runTest { - assertEquals(testList.requireNoNulls(), testList.asReceiveChannel().requireNoNulls().toList()) - } - private fun Iterable.asReceiveChannel(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel = GlobalScope.produce(context) { for (element in this@asReceiveChannel) diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt index 885f1d6c8f..6ddde001e2 100644 --- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt @@ -163,7 +163,7 @@ class ProduceTest : TestBase() { private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply { val source = Channel() expect(1) - val produced = produce(coroutineContext, onCompletion = source.consumes()) { + val produced = produce(coroutineContext, onCompletion = { source.cancelConsumed(it) }) { expect(2) source.receive() } diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelsConsumeTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelsConsumeTest.kt deleted file mode 100644 index cb19b36a13..0000000000 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelsConsumeTest.kt +++ /dev/null @@ -1,908 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -@file:Suppress("DEPRECATION") - -package kotlinx.coroutines.channels - -import kotlinx.coroutines.* -import kotlin.coroutines.* -import kotlin.test.* - -/** - * Tests that various operators on channels properly consume (close) their source channels. - */ -class ChannelsConsumeTest : TestBase() { - private val sourceList = (1..10).toList() - - // test source with numbers 1..10 - private fun CoroutineScope.testSource() = produce(NonCancellable) { - for (i in sourceList) { - send(i) - } - } - - @Test - fun testConsume() { - checkTerminal { - consume { - assertEquals(1, receive()) - } - } - } - - @Test - fun testConsumeEach() { - checkTerminal { - var sum = 0 - consumeEach { sum += it } - assertEquals(55, sum) - } - } - - @Test - fun testConsumeEachIndexed() { - checkTerminal { - var sum = 0 - consumeEachIndexed { (index, i) -> sum += index * i } - assertEquals(330, sum) - } - } - - @Test - fun testElementAt() { - checkTerminal { - assertEquals(2, elementAt(1)) - } - checkTerminal(expected = { it is IndexOutOfBoundsException }) { - elementAt(10) - } - } - - @Test - fun testElementAtOrElse() { - checkTerminal { - assertEquals(3, elementAtOrElse(2) { error("Cannot happen") }) - } - checkTerminal { - assertEquals(-23, elementAtOrElse(10) { -23 }) - } - } - - @Test - fun testElementOrNull() { - checkTerminal { - assertEquals(4, elementAtOrNull(3)) - } - checkTerminal { - assertNull(elementAtOrNull(10)) - } - } - - @Test - fun testFind() { - checkTerminal { - assertEquals(3, find { it % 3 == 0 }) - } - } - - @Test - fun testFindLast() { - checkTerminal { - assertEquals(9, findLast { it % 3 == 0 }) - } - } - - @Test - fun testFirst() { - checkTerminal { - assertEquals(1, first()) - } - } - - @Test - fun testFirstPredicate() { - checkTerminal { - assertEquals(3, first { it % 3 == 0 }) - } - checkTerminal(expected = { it is NoSuchElementException }) { - first { it > 10 } - } - } - - @Test - fun testFirstOrNull() { - checkTerminal { - assertEquals(1, firstOrNull()) - } - } - - @Test - fun testFirstOrNullPredicate() { - checkTerminal { - assertEquals(3, firstOrNull { it % 3 == 0 }) - } - checkTerminal { - assertNull(firstOrNull { it > 10 }) - } - } - - @Test - fun testIndexOf() { - checkTerminal { - assertEquals(2, indexOf(3)) - } - checkTerminal { - assertEquals(-1, indexOf(11)) - } - } - - @Test - fun testIndexOfFirst() { - checkTerminal { - assertEquals(2, indexOfFirst { it % 3 == 0 }) - } - checkTerminal { - assertEquals(-1, indexOfFirst { it > 10 }) - } - } - - @Test - fun testIndexOfLast() { - checkTerminal { - assertEquals(8, indexOfLast { it % 3 == 0 }) - } - checkTerminal { - assertEquals(-1, indexOfLast { it > 10 }) - } - } - - @Test - fun testLast() { - checkTerminal { - assertEquals(10, last()) - } - } - - @Test - fun testLastPredicate() { - checkTerminal { - assertEquals(9, last { it % 3 == 0 }) - } - checkTerminal(expected = { it is NoSuchElementException }) { - last { it > 10 } - } - } - - @Test - fun testLastIndexOf() { - checkTerminal { - assertEquals(8, lastIndexOf(9)) - } - } - - @Test - fun testLastOrNull() { - checkTerminal { - assertEquals(10, lastOrNull()) - } - } - - @Test - fun testLastOrNullPredicate() { - checkTerminal { - assertEquals(9, lastOrNull { it % 3 == 0 }) - } - checkTerminal { - assertNull(lastOrNull { it > 10 }) - } - } - - @Test - fun testSingle() { - checkTerminal(expected = { it is IllegalArgumentException }) { - single() - } - } - - @Test - fun testSinglePredicate() { - checkTerminal { - assertEquals(7, single { it % 7 == 0 }) - } - checkTerminal(expected = { it is IllegalArgumentException }) { - single { it % 3 == 0 } - } - checkTerminal(expected = { it is NoSuchElementException }) { - single { it > 10 } - } - } - - @Test - fun testSingleOrNull() { - checkTerminal { - assertNull(singleOrNull()) - } - } - - @Test - fun testSingleOrNullPredicate() { - checkTerminal { - assertEquals(7, singleOrNull { it % 7 == 0 }) - } - checkTerminal { - assertNull(singleOrNull { it % 3 == 0 }) - } - checkTerminal { - assertNull(singleOrNull { it > 10 }) - } - } - - @Test - fun testDrop() { - checkTransform(sourceList.drop(3)) { - drop(3) - } - } - - @Test - fun testDropWhile() { - checkTransform(sourceList.dropWhile { it < 4}) { - dropWhile { it < 4 } - } - } - - @Test - fun testFilter() { - checkTransform(sourceList.filter { it % 2 == 0 }) { - filter { it % 2 == 0 } - } - } - - @Test - fun testFilterIndexed() { - checkTransform(sourceList.filterIndexed { index, _ -> index % 2 == 0 }) { - filterIndexed { index, _ -> index % 2 == 0 } - } - } - - @Test - fun testFilterIndexedToCollection() { - checkTerminal { - val list = mutableListOf() - filterIndexedTo(list) { index, _ -> index % 2 == 0 } - assertEquals(listOf(1, 3, 5, 7, 9), list) - } - } - - @Test - fun testFilterIndexedToChannel() { - checkTerminal { - val channel = Channel() - val result = GlobalScope.async { channel.toList() } - filterIndexedTo(channel) { index, _ -> index % 2 == 0 } - channel.close() - assertEquals(listOf(1, 3, 5, 7, 9), result.await()) - } - } - - @Test - fun testFilterNot() { - checkTransform(sourceList.filterNot { it % 2 == 0 }) { - filterNot { it % 2 == 0 } - } - } - - @Test - fun testFilterNotNullToCollection() { - checkTerminal { - val list = mutableListOf() - filterNotNullTo(list) - assertEquals((1..10).toList(), list) - } - } - - @Test - fun testFilterNotNullToChannel() { - checkTerminal { - val channel = Channel() - val result = GlobalScope.async { channel.toList() } - filterNotNullTo(channel) - channel.close() - assertEquals((1..10).toList(), result.await()) - } - } - - @Test - fun testFilterNotToCollection() { - checkTerminal { - val list = mutableListOf() - filterNotTo(list) { it % 2 == 0 } - assertEquals(listOf(1, 3, 5, 7, 9), list) - } - } - - @Test - fun testFilterNotToChannel() { - checkTerminal { - val channel = Channel() - val result = GlobalScope.async { channel.toList() } - filterNotTo(channel) { it % 2 == 0 } - channel.close() - assertEquals(listOf(1, 3, 5, 7, 9), result.await()) - } - } - - @Test - fun testFilterToCollection() { - checkTerminal { - val list = mutableListOf() - filterTo(list) { it % 2 == 0 } - assertEquals(listOf(2, 4, 6, 8, 10), list) - } - } - - @Test - fun testFilterToChannel() { - checkTerminal { - val channel = Channel() - val result = GlobalScope.async { channel.toList() } - filterTo(channel) { it % 2 == 0 } - channel.close() - assertEquals(listOf(2, 4, 6, 8, 10), result.await()) - } - } - - @Test - fun testTake() { - checkTransform(sourceList.take(3)) { - take(3) - } - } - - @Test - fun testTakeWhile() { - checkTransform(sourceList.takeWhile { it < 4 }) { - takeWhile { it < 4 } - } - } - - @Test - fun testAssociate() { - checkTerminal { - assertEquals(sourceList.associate { it to it.toString() }, associate { it to it.toString() }) - } - } - - @Test - fun testAssociateBy() { - checkTerminal { - assertEquals(sourceList.associateBy { it.toString() }, associateBy { it.toString() }) - } - } - - @Test - fun testAssociateByTwo() { - checkTerminal { - assertEquals(sourceList.associateBy({ it.toString() }, { it + 1}), associateBy({ it.toString() }, { it + 1})) - } - } - - @Test - fun testAssociateByToMap() { - checkTerminal { - val map = mutableMapOf() - associateByTo(map) { it.toString() } - assertEquals(sourceList.associateBy { it.toString() }, map) - } - } - - @Test - fun testAssociateByTwoToMap() { - checkTerminal { - val map = mutableMapOf() - associateByTo(map, { it.toString() }, { it + 1}) - assertEquals(sourceList.associateBy({ it.toString() }, { it + 1}), map) - } - } - - @Test - fun testAssociateToMap() { - checkTerminal { - val map = mutableMapOf() - associateTo(map) { it to it.toString() } - assertEquals(sourceList.associate { it to it.toString() }, map) - } - } - - @Test - fun testToChannel() { - checkTerminal { - val channel = Channel() - val result = GlobalScope.async { channel.toList() } - toChannel(channel) - channel.close() - assertEquals(sourceList, result.await()) - } - } - - @Test - fun testToCollection() { - checkTerminal { - val list = mutableListOf() - toCollection(list) - assertEquals(sourceList, list) - } - } - - @Test - fun testToList() { - checkTerminal { - val list = toList() - assertEquals(sourceList, list) - } - } - - @Test - fun testToMap() { - checkTerminal { - val map = map { it to it.toString() }.toMap() - assertEquals(sourceList.map { it to it.toString() }.toMap(), map) - } - } - - @Test - fun testToMapWithMap() { - checkTerminal { - val map = mutableMapOf() - map { it to it.toString() }.toMap(map) - assertEquals(sourceList.map { it to it.toString() }.toMap(), map) - } - } - - @Test - fun testToMutableList() { - checkTerminal { - val list = toMutableList() - assertEquals(sourceList, list) - } - } - - @Test - fun testToSet() { - checkTerminal { - val set = toSet() - assertEquals(sourceList.toSet(), set) - } - } - - @Test - fun testFlatMap() { - checkTransform(sourceList.flatMap { listOf("A$it", "B$it") }) { - flatMap { - GlobalScope.produce(coroutineContext) { - send("A$it") - send("B$it") - } - } - } - } - - @Test - fun testGroupBy() { - checkTerminal { - val map = groupBy { it % 2 } - assertEquals(sourceList.groupBy { it % 2 }, map) - } - } - - @Test - fun testGroupByTwo() { - checkTerminal { - val map = groupBy({ it % 2 }, { it.toString() }) - assertEquals(sourceList.groupBy({ it % 2 }, { it.toString() }), map) - } - } - - @Test - fun testGroupByTo() { - checkTerminal { - val map = mutableMapOf>() - groupByTo(map) { it % 2 } - assertEquals(sourceList.groupBy { it % 2 }, map) - } - } - - @Test - fun testGroupByToTwo() { - checkTerminal { - val map = mutableMapOf>() - groupByTo(map, { it % 2 }, { it.toString() }) - assertEquals(sourceList.groupBy({ it % 2 }, { it.toString() }), map) - } - } - - @Test - fun testMap() { - checkTransform(sourceList.map { it.toString() }) { - map { it.toString() } - } - } - - @Test - fun testMapIndexed() { - checkTransform(sourceList.mapIndexed { index, v -> "$index$v" }) { - mapIndexed { index, v -> "$index$v" } - } - } - - @Test - fun testMapIndexedNotNull() { - checkTransform(sourceList.mapIndexedNotNull { index, v -> "$index$v".takeIf { v % 2 == 0 } }) { - mapIndexedNotNull { index, v -> "$index$v".takeIf { v % 2 == 0 } } - } - } - - @Test - fun testMapIndexedNotNullToCollection() { - checkTerminal { - val list = mutableListOf() - mapIndexedNotNullTo(list) { index, v -> "$index$v".takeIf { v % 2 == 0 } } - assertEquals(sourceList.mapIndexedNotNull { index, v -> "$index$v".takeIf { v % 2 == 0 } }, list) - } - } - - @Test - fun testMapIndexedNotNullToChannel() { - checkTerminal { - val channel = Channel() - val result = GlobalScope.async { channel.toList() } - mapIndexedNotNullTo(channel) { index, v -> "$index$v".takeIf { v % 2 == 0 } } - channel.close() - assertEquals(sourceList.mapIndexedNotNull { index, v -> "$index$v".takeIf { v % 2 == 0 } }, result.await()) - } - } - - @Test - fun testMapIndexedToCollection() { - checkTerminal { - val list = mutableListOf() - mapIndexedTo(list) { index, v -> "$index$v" } - assertEquals(sourceList.mapIndexed { index, v -> "$index$v" }, list) - } - } - - @Test - fun testMapIndexedToChannel() { - checkTerminal { - val channel = Channel() - val result = GlobalScope.async { channel.toList() } - mapIndexedTo(channel) { index, v -> "$index$v" } - channel.close() - assertEquals(sourceList.mapIndexed { index, v -> "$index$v" }, result.await()) - } - } - - @Test - fun testMapNotNull() { - checkTransform(sourceList.mapNotNull { (it + 3).takeIf { it % 2 == 0 } }) { - mapNotNull { (it + 3).takeIf { it % 2 == 0 } } - } - } - - @Test - fun testMapNotNullToCollection() { - checkTerminal { - val list = mutableListOf() - mapNotNullTo(list) { (it + 3).takeIf { it % 2 == 0 } } - assertEquals(sourceList.mapNotNull { (it + 3).takeIf { it % 2 == 0 } }, list) - } - } - - @Test - fun testMapNotNullToChannel() { - checkTerminal { - val channel = Channel() - val result = GlobalScope.async { channel.toList() } - mapNotNullTo(channel) { (it + 3).takeIf { it % 2 == 0 } } - channel.close() - assertEquals(sourceList.mapNotNull { (it + 3).takeIf { it % 2 == 0 } }, result.await()) - } - } - - @Test - fun testMapToCollection() { - checkTerminal { - val list = mutableListOf() - mapTo(list) { it + 3 } - assertEquals(sourceList.map { it + 3 }, list) - } - } - - @Test - fun testMapToChannel() { - checkTerminal { - val channel = Channel() - val result = GlobalScope.async { channel.toList() } - mapTo(channel) { it + 3 } - channel.close() - assertEquals(sourceList.map { it + 3 }, result.await()) - } - } - - @Test - fun testWithIndex() { - checkTransform(sourceList.asSequence().withIndex().toList()) { - withIndex() - } - } - - @Test - fun testDistinctBy() { - checkTransform(sourceList.distinctBy { it / 2 }) { - distinctBy { it / 2 } - } - } - - @Test - fun testToMutableSet() { - checkTerminal { - val set = toMutableSet() - assertEquals(sourceList.toSet(), set) - } - } - - @Test - fun testAll() { - checkTerminal { - val all = all { it < 11 } - assertEquals(sourceList.all { it < 11 }, all) - } - } - - @Test - fun testAny() { - checkTerminal { - val any = any() - assertEquals(sourceList.any(), any) - } - } - - @Test - fun testAnyPredicate() { - checkTerminal { - val any = any { it % 3 == 0 } - assertEquals(sourceList.any { it % 3 == 0 }, any) - } - } - - @Test - fun testCount() { - checkTerminal { - val c = count() - assertEquals(sourceList.count(), c) - } - } - - @Test - fun testCountPredicate() { - checkTerminal { - val c = count { it % 3 == 0 } - assertEquals(sourceList.count { it % 3 == 0 }, c) - } - } - - @Test - fun testFold() { - checkTerminal { - val c = fold(1) { a, b -> a + b } - assertEquals(sourceList.fold(1) { a, b -> a + b }, c) - } - } - - @Test - fun testFoldIndexed() { - checkTerminal { - val c = foldIndexed(1) { i, a, b -> i * a + b } - assertEquals(sourceList.foldIndexed(1) { i, a, b -> i * a + b }, c) - } - } - - @Test - fun testMaxBy() { - checkTerminal { - val c = maxBy { it % 3 } - assertEquals(sourceList.maxBy { it % 3 }, c) - } - } - - @Test - fun testMaxWith() { - checkTerminal { - val c = maxWith(compareBy { it % 3 }) - assertEquals(sourceList.maxWith(compareBy { it % 3 }), c) - } - } - - @Test - fun testMinBy() { - checkTerminal { - val c = maxBy { it % 3 } - assertEquals(sourceList.maxBy { it % 3 }, c) - } - } - - @Test - fun testMinWith() { - checkTerminal { - val c = maxWith(compareBy { it % 3 }) - assertEquals(sourceList.maxWith(compareBy { it % 3 }), c) - } - } - - @Test - fun testNone() { - checkTerminal { - val none = none() - assertEquals(sourceList.none(), none) - } - } - - @Test - fun testNonePredicate() { - checkTerminal { - val none = none { it > 10 } - assertEquals(sourceList.none { it > 10 }, none) - } - } - - @Test - fun testReduce() { - checkTerminal { - val c = reduce { a, b -> a + b } - assertEquals(sourceList.reduce { a, b -> a + b }, c) - } - } - - @Test - fun testReduceIndexed() { - checkTerminal { - val c = reduceIndexed { i, a, b -> i * a + b } - assertEquals(sourceList.reduceIndexed { i, a, b -> i * a + b }, c) - } - } - - @Test - fun testSubBy() { - checkTerminal { - val c = sumBy { it } - assertEquals(sourceList.sumBy { it }, c) - } - } - - @Test - fun testSubByDouble() { - checkTerminal { - val c = sumByDouble { it.toDouble() } - assertEquals(sourceList.sumByDouble { it.toDouble() }, c) - } - } - - @Test - fun testPartition() { - checkTerminal { - val pair = partition { it % 2 == 0 } - assertEquals(sourceList.partition { it % 2 == 0 }, pair) - } - } - - @Test - fun testZip() { - val expect = sourceList.zip(sourceList) { a, b -> a + 2 * b } - checkTransform(expect) { - with(CoroutineScope(coroutineContext)) { - zip(testSource()) { a, b -> a + 2*b } - } - } - checkTransform(expect) { - with(CoroutineScope(coroutineContext)) { - testSource().zip(this@checkTransform) { a, b -> a + 2*b } - } - } - } - - // ------------------ - - private fun checkTerminal( - expected: ((Throwable?) -> Unit)? = null, - terminal: suspend ReceiveChannel.() -> Unit - ) { - checkTerminalCompletion(expected, terminal) - checkTerminalCancellation(expected, terminal) - } - - private fun checkTerminalCompletion( - expected: ((Throwable?) -> Unit)? = null, - terminal: suspend ReceiveChannel.() -> Unit - ) { - val src = runBlocking { - val src = testSource() - try { - // terminal operation - terminal(src) - // source must be cancelled at the end of terminal op - if (expected != null) error("Exception was expected") - } catch (e: Throwable) { - if (expected == null) throw e - expected(e) - } - src - } - assertTrue(src.isClosedForReceive, "Source must be closed") - } - - private fun checkTerminalCancellation( - expected: ((Throwable?) -> Unit)? = null, - terminal: suspend ReceiveChannel.() -> Unit - ) { - val src = runBlocking { - val src = testSource() - // terminal operation in a separate async context started until the first suspension - val d = async(NonCancellable, start = CoroutineStart.UNDISPATCHED) { - terminal(src) - } - // then cancel it - d.cancel() - // and try to get it's result - try { - d.await() - } catch (e: CancellationException) { - // ok -- was cancelled - } catch (e: Throwable) { - // if threw a different exception -- must be an expected one - if (expected == null) throw e - expected(e) - } - src - } - // source must be cancelled at the end of terminal op even if it was cancelled while in process - assertTrue(src.isClosedForReceive, "Source must be closed") - } - - private fun checkTransform( - expect: List, - transform: suspend ReceiveChannel.() -> ReceiveChannel - ) { - // check for varying number of received elements from the channel - for (nReceive in 0..expect.size) { - checkTransform(nReceive, expect, transform) - } - } - - private fun checkTransform( - nReceive: Int, - expect: List, - transform: suspend ReceiveChannel.() -> ReceiveChannel - ) { - val src = runBlocking { - val src = testSource() - // transform - val res = transform(src) - // receive nReceive elements from the result - repeat(nReceive) { i -> - assertEquals(expect[i], res.receive()) - } - if (nReceive < expect.size) { - // then cancel - res.cancel() - } else { - // then check that result is closed - assertNull(res.receiveOrNull(), "Result has unexpected values") - } - src - } - // source must be cancelled when runBlocking processes all the scheduled stuff - assertTrue(src.isClosedForReceive, "Source must be closed") - } -} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt index 7613f04d29..da20f0c5ee 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt @@ -14,7 +14,9 @@ class ChannelsJvmTest : TestBase() { fun testBlocking() { val ch = Channel() val sum = GlobalScope.async { - ch.sumBy { it } + var sum = 0 + ch.consumeEach { sum += it } + sum } repeat(10) { ch.sendBlocking(it) diff --git a/reactive/kotlinx-coroutines-jdk9/src/Publish.kt b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt index d274083668..6fd9a5e75b 100644 --- a/reactive/kotlinx-coroutines-jdk9/src/Publish.kt +++ b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt @@ -28,7 +28,7 @@ import org.reactivestreams.FlowAdapters * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect * to cancellation and error handling may change in the future. */ -@ExperimentalCoroutinesApi +@ExperimentalCoroutinesApi // Since 1.3.x public fun flowPublish( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt index 903b60a2cf..c7fcb1c2b6 100644 --- a/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt +++ b/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt @@ -24,7 +24,7 @@ import kotlinx.coroutines.flow.* * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused. * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead. */ -@ExperimentalCoroutinesApi +@ExperimentalCoroutinesApi // Since 1.3.x public fun ObservableValue.asFlow(): Flow = callbackFlow { val listener = ChangeListener { _, _, newValue -> try {