Skip to content

Commit

Permalink
KTOR-7845 Fix for threading issue in flushAndClose for reader job cha…
Browse files Browse the repository at this point in the history
…nnels (#4503)
  • Loading branch information
bjhham authored Nov 27, 2024
1 parent 4031c50 commit bc68267
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 2 deletions.
4 changes: 4 additions & 0 deletions ktor-io/api/ktor-io.api
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ public abstract interface class io/ktor/utils/io/ChannelJob {
public abstract fun getJob ()Lkotlinx/coroutines/Job;
}

public final class io/ktor/utils/io/CloseHookByteWriteChannelKt {
public static final fun onClose (Lio/ktor/utils/io/ByteWriteChannel;Lkotlin/jvm/functions/Function1;)Lio/ktor/utils/io/ByteWriteChannel;
}

public final class io/ktor/utils/io/ConcurrentIOException : java/lang/IllegalStateException {
public fun <init> (Ljava/lang/String;Ljava/lang/Throwable;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down
1 change: 1 addition & 0 deletions ktor-io/api/ktor-io.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/cancel() // io.kt
final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/close() // io.ktor.utils.io/close|[email protected](){}[0]
final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/close(kotlin/Throwable?) // io.ktor.utils.io/close|[email protected](kotlin.Throwable?){}[0]
final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/counted(): io.ktor.utils.io/CountedByteWriteChannel // io.ktor.utils.io/counted|[email protected](){}[0]
final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/onClose(kotlin.coroutines/SuspendFunction0<kotlin/Unit>): io.ktor.utils.io/ByteWriteChannel // io.ktor.utils.io/onClose|[email protected](kotlin.coroutines.SuspendFunction0<kotlin.Unit>){}[0]
final fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/rethrowCloseCauseIfNeeded() // io.ktor.utils.io/rethrowCloseCauseIfNeeded|[email protected](){}[0]
final fun (io.ktor.utils.io/ChannelJob).io.ktor.utils.io/cancel() // io.ktor.utils.io/cancel|[email protected](){}[0]
final fun (io.ktor.utils.io/ChannelJob).io.ktor.utils.io/getCancellationException(): kotlin.coroutines.cancellation/CancellationException // io.ktor.utils.io/getCancellationException|[email protected](){}[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import kotlinx.io.Buffer
import kotlinx.io.bytestring.*
import kotlinx.io.unsafe.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlin.math.*

@OptIn(InternalAPI::class)
Expand Down Expand Up @@ -309,7 +308,7 @@ public fun CoroutineScope.reader(
}
}

return ReaderJob(channel, job)
return ReaderJob(channel.onClose { job.join() }, job)
}

/**
Expand Down
24 changes: 24 additions & 0 deletions ktor-io/common/src/io/ktor/utils/io/CloseHookByteWriteChannel.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.utils.io

/**
* Wraps this channel to execute the provided action when closed using `flushAndClose()`.
*
* @param onClose The action to execute when the channel is closed.
* @return A new `ByteWriteChannel` that executes the given action upon closure.
*/
public fun ByteWriteChannel.onClose(onClose: suspend () -> Unit): ByteWriteChannel =
CloseHookByteWriteChannel(this, onClose)

internal class CloseHookByteWriteChannel(
private val delegate: ByteWriteChannel,
private val onClose: suspend () -> Unit
) : ByteWriteChannel by delegate {
override suspend fun flushAndClose() {
delegate.flushAndClose()
onClose()
}
}
21 changes: 21 additions & 0 deletions ktor-utils/jvm/test/io/ktor/tests/utils/FileChannelTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import io.ktor.util.cio.*
import io.ktor.utils.io.*
import io.ktor.utils.io.jvm.javaio.*
import kotlinx.coroutines.*
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.*
import org.junit.jupiter.api.extension.*
import java.io.*
import java.nio.file.Files
import kotlin.test.*
import kotlin.test.Test

Expand Down Expand Up @@ -117,4 +119,23 @@ class FileChannelTest {
// Assert (we cannot delete if there is a file handle open on it)
assertTrue(temp.delete())
}

@Test
fun `writeChannel finishes on close`() = runTest {
val file = Files.createTempFile("file", "txt").toFile()
val ch = file.writeChannel()
ch.writeStringUtf8("Hello")
ch.flushAndClose()
assertEquals(5, file.length())
assertEquals("Hello", file.readText())
}

@Test
fun `writeChannel writes to file on flush`() = runTest {
val file = Files.createTempFile("file", "txt").toFile()
val ch = file.writeChannel()
ch.writeStringUtf8("Hello")
ch.flush()
assertEquals("Hello", file.readText())
}
}

0 comments on commit bc68267

Please sign in to comment.