Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.4.0 experimentality #2316

Merged
merged 4 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,22 @@ open class ChannelSinkBenchmark {
for (i in start until (start + count))
send(i)
}

// Migrated from deprecated operators, are good only for stressing channels

private fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
GlobalScope.produce(context, onCompletion = { cancel() }) {
for (e in this@filter) {
if (predicate(e)) send(e)
}
}

private suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
var accumulator = initial
consumeEach {
accumulator = operation(accumulator, it)
}
return accumulator
}
}

1 change: 0 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge ([Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onEmpty (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ public suspend fun <T> withContext(
*
* This inline function calls [withContext].
*/
@ExperimentalCoroutinesApi
public suspend inline operator fun <T> CoroutineDispatcher.invoke(
noinline block: suspend CoroutineScope.() -> T
): T = withContext(this, block)
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/CompletableDeferred.kt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public interface CompletableDeferred<T> : Deferred<T> {
* This function transitions this deferred in the same ways described by [CompletableDeferred.complete] and
* [CompletableDeferred.completeExceptionally].
*/
@ExperimentalCoroutinesApi // since 1.3.2, tentatively until 1.4.0
public fun <T> CompletableDeferred<T>.completeWith(result: Result<T>): Boolean =
result.fold({ complete(it) }, { completeExceptionally(it) })

Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/CoroutineStart.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public enum class CoroutineStart {
* Cancellability of coroutine at suspension points depends on the particular implementation details of
* suspending functions as in [DEFAULT].
*/
@ExperimentalCoroutinesApi
@ExperimentalCoroutinesApi // Since 1.0.0, no ETA on stability
ATOMIC,

/**
Expand All @@ -71,7 +71,7 @@ public enum class CoroutineStart {
*
* **Note: This is an experimental api.** Execution semantics of coroutines may change in the future when this mode is used.
*/
@ExperimentalCoroutinesApi
@ExperimentalCoroutinesApi // Since 1.0.0, no ETA on stability
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
UNDISPATCHED;

/**
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Debug.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal expect fun assert(value: () -> Boolean)
* Copy mechanism is used only on JVM, but it might be convenient to implement it in common exceptions,
* so on JVM their stacktraces will be properly recovered.
*/
@ExperimentalCoroutinesApi
@ExperimentalCoroutinesApi // Since 1.2.0, no ETA on stability

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is needed for CopyableThrowable to become stable? Is there any issue to watch? Can we help somehow?

This came to my attention, because Retrofit now produces not very helpful stacktraces from suspending calls and the corresponding fix is blocked because this is experimental: square/retrofit#3474

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best way is to let us know and file an issue. Filed #2367

public interface CopyableThrowable<T> where T : Throwable, T : CopyableThrowable<T> {

/**
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public interface Delay {
* }
* ```
*/
@ExperimentalCoroutinesApi
public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved

/**
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public fun <E> ReceiveChannel<E>.broadcast(
val scope = GlobalScope + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> }
// We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()`
// which passes all exceptions upstream to the source ReceiveChannel
return scope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) {
return scope.broadcast(capacity = capacity, start = start, onCompletion = { cancelConsumed(it) }) {
for (e in this@broadcast) {
send(e)
}
Expand Down
384 changes: 191 additions & 193 deletions kotlinx-coroutines-core/common/src/channels/Channels.common.kt

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
@FlowPreview
@Deprecated(
message = "Use channelFlow with awaitClose { } instead of flowViaChannel and invokeOnClose { }.",
level = DeprecationLevel.WARNING
)
level = DeprecationLevel.ERROR
) // To be removed in 1.4.x
@Suppress("DeprecatedCallableAddReplaceWith")
public fun <T> flowViaChannel(
bufferSize: Int = BUFFERED,
Expand Down
3 changes: 1 addition & 2 deletions kotlinx-coroutines-core/common/src/flow/Migration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): F
message = "'scanReduce' was renamed to 'runningReduce' to be consistent with Kotlin standard library",
replaceWith = ReplaceWith("runningReduce(operation)")
)
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = runningReduce(operation)

@Deprecated(
Expand Down Expand Up @@ -482,4 +481,4 @@ public fun <T> Flow<T>.replay(bufferSize: Int): Flow<T> = noImpl()
message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'",
replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)")
)
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
9 changes: 0 additions & 9 deletions kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
* .collect { println(it) } // prints Begin, a, b, c
* ```
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
Expand Down Expand Up @@ -142,7 +141,6 @@ public fun <T> Flow<T>.onStart(
* In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception.
* Use [catch] if you need to suppress failure and replace it with emission of elements.
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
Expand Down Expand Up @@ -173,7 +171,6 @@ public fun <T> Flow<T>.onCompletion(
* }.collect { println(it) } // prints 1, 2
* ```
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onEmpty(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow {
Expand All @@ -198,12 +195,6 @@ private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?>
}
}

// It was only released in 1.3.0-M2, remove in 1.4.0
/** @suppress */
@Deprecated(level = DeprecationLevel.HIDDEN, message = "binary compatibility with a version w/o FlowCollector receiver")
public fun <T> Flow<T>.onCompletion(action: suspend (cause: Throwable?) -> Unit): Flow<T> =
onCompletion { action(it) }

private suspend fun <T> FlowCollector<T>.invokeSafely(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit,
cause: Throwable?
Expand Down
Loading