From 0fd28fae4e2289747ce42f07a384cafef3ba4f32 Mon Sep 17 00:00:00 2001 From: Mariia Skripchenko <61115099+marychatte@users.noreply.github.com> Date: Fri, 15 Nov 2024 13:28:42 +0100 Subject: [PATCH] KTOR-7435 Add serialization for SSE (#4363) --- .../test/server/tests/ServerSentEvents.kt | 30 +- .../ktor-client-core/api/ktor-client-core.api | 37 + .../api/ktor-client-core.klib.api | 34 + .../client/plugins/sse/ClientSSESession.kt | 197 +++- .../src/io/ktor/client/plugins/sse/SSE.kt | 64 +- .../io/ktor/client/plugins/sse/SSEConfig.kt | 4 +- .../io/ktor/client/plugins/sse/builders.kt | 862 +++++++++++++++++- .../src/io/ktor/client/request/HttpRequest.kt | 3 +- .../tests/plugins/SSESessionParserTest.kt | Bin 6887 -> 8040 bytes .../tests/plugins/ServerSentEventsTest.kt | 114 ++- .../common/src/io/ktor/http/cio/Multipart.kt | 8 +- .../src/io/ktor/utils/io/ByteChannel.kt | 3 +- .../ktor-server-sse/api/ktor-server-sse.api | 13 + .../api/ktor-server-sse.klib.api | 13 + .../ktor-server-sse/build.gradle.kts | 10 + .../common/src/io/ktor/server/sse/Routing.kt | 141 ++- .../common/src/io/ktor/server/sse/SSE.kt | 22 +- .../io/ktor/server/sse/SSEServerContent.kt | 16 +- .../io/ktor/server/sse/ServerSSESession.kt | 81 +- .../ktor/server/sse/ServerSentEventsTest.kt | 99 ++ ktor-shared/ktor-sse/api/ktor-sse.api | 52 +- ktor-shared/ktor-sse/api/ktor-sse.klib.api | 49 +- .../common/src/io/ktor/sse/ServerSentEvent.kt | 77 +- 23 files changed, 1829 insertions(+), 100 deletions(-) diff --git a/buildSrc/src/main/kotlin/test/server/tests/ServerSentEvents.kt b/buildSrc/src/main/kotlin/test/server/tests/ServerSentEvents.kt index 33589b9d9e9..002a106bc99 100644 --- a/buildSrc/src/main/kotlin/test/server/tests/ServerSentEvents.kt +++ b/buildSrc/src/main/kotlin/test/server/tests/ServerSentEvents.kt @@ -84,13 +84,41 @@ internal fun Application.serverSentEvents() { emit(SseEvent("Hello")) } ) - } get("/content-type-text-plain") { call.response.header(HttpHeaders.ContentType, ContentType.Text.Plain.toString()) call.respond(HttpStatusCode.OK) } + + get("/person") { + val times = call.parameters["times"]?.toInt() ?: 1 + call.respondSseEvents( + flow { + repeat(times) { + emit(SseEvent(data = "Name $it", event = "event $it", id = "$it")) + } + } + ) + } + + get("/json") { + val customer = """ + { "id": 1, + "firstName": "Jet", + "lastName": "Brains" + }""".trimIndent() + val product = """ + { "name": "Milk", + "price": "100" + }""".trimIndent() + call.respondSseEvents( + flow { + emit(SseEvent(data = customer)) + emit(SseEvent(data = product)) + } + ) + } } } } diff --git a/ktor-client/ktor-client-core/api/ktor-client-core.api b/ktor-client/ktor-client-core/api/ktor-client-core.api index 99aafdb303f..c5f07b8bf65 100644 --- a/ktor-client/ktor-client-core/api/ktor-client-core.api +++ b/ktor-client/ktor-client-core/api/ktor-client-core.api @@ -767,24 +767,48 @@ public final class io/ktor/client/plugins/sse/BuildersKt { public static synthetic fun serverSentEvents-1wIb-0I$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun serverSentEvents-3bFjkrY (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun serverSentEvents-3bFjkrY$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun serverSentEvents-BqdlHlk (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun serverSentEvents-BqdlHlk$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun serverSentEvents-Mswn-_c (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun serverSentEvents-Mswn-_c$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun serverSentEvents-mY9Nd3A (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun serverSentEvents-mY9Nd3A$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun serverSentEvents-pTj2aPc (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun serverSentEvents-pTj2aPc$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun serverSentEventsSession-Mswn-_c (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun serverSentEventsSession-Mswn-_c$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun serverSentEventsSession-i8z2VEo (Lio/ktor/client/HttpClient;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun serverSentEventsSession-i8z2VEo$default (Lio/ktor/client/HttpClient;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun serverSentEventsSession-mY9Nd3A (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun serverSentEventsSession-mY9Nd3A (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun serverSentEventsSession-mY9Nd3A$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun serverSentEventsSession-mY9Nd3A$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun serverSentEventsSession-tL6_L-A (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun serverSentEventsSession-tL6_L-A$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun serverSentEventsSession-xEWcMm4 (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun serverSentEventsSession-xEWcMm4$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun sse-BAHpl2s (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun sse-BAHpl2s$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun sse-Mswn-_c (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun sse-Mswn-_c (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun sse-Mswn-_c$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun sse-Mswn-_c$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun sse-Q9yt8Vw (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun sse-Q9yt8Vw$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun sse-mY9Nd3A (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun sse-mY9Nd3A$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun sse-tL6_L-A (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun sse-tL6_L-A$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun sseSession-Mswn-_c (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun sseSession-Mswn-_c$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun sseSession-i8z2VEo (Lio/ktor/client/HttpClient;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun sseSession-i8z2VEo$default (Lio/ktor/client/HttpClient;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun sseSession-mY9Nd3A (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun sseSession-mY9Nd3A (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun sseSession-mY9Nd3A$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun sseSession-mY9Nd3A$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun sseSession-tL6_L-A (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun sseSession-tL6_L-A$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun sseSession-xEWcMm4 (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun sseSession-xEWcMm4$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/time/Duration;Ljava/lang/Boolean;Ljava/lang/Boolean;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } @@ -796,6 +820,14 @@ public final class io/ktor/client/plugins/sse/ClientSSESession : io/ktor/client/ public fun getIncoming ()Lkotlinx/coroutines/flow/Flow; } +public final class io/ktor/client/plugins/sse/ClientSSESessionWithDeserialization : io/ktor/client/plugins/sse/SSESessionWithDeserialization { + public fun (Lio/ktor/client/call/HttpClientCall;Lio/ktor/client/plugins/sse/SSESessionWithDeserialization;)V + public final fun getCall ()Lio/ktor/client/call/HttpClientCall; + public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext; + public fun getDeserializer ()Lkotlin/jvm/functions/Function2; + public fun getIncoming ()Lkotlinx/coroutines/flow/Flow; +} + public final class io/ktor/client/plugins/sse/DefaultClientSSESession : io/ktor/client/plugins/sse/SSESession { public fun (Lio/ktor/client/plugins/sse/SSEClientContent;Lio/ktor/utils/io/ByteReadChannel;Lkotlin/coroutines/CoroutineContext;)V public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext; @@ -845,6 +877,11 @@ public abstract interface class io/ktor/client/plugins/sse/SSESession : kotlinx/ public abstract fun getIncoming ()Lkotlinx/coroutines/flow/Flow; } +public abstract interface class io/ktor/client/plugins/sse/SSESessionWithDeserialization : kotlinx/coroutines/CoroutineScope { + public abstract fun getDeserializer ()Lkotlin/jvm/functions/Function2; + public abstract fun getIncoming ()Lkotlinx/coroutines/flow/Flow; +} + public final class io/ktor/client/plugins/websocket/BuildersKt { public static final fun WebSockets (Lio/ktor/client/HttpClientConfig;Lkotlin/jvm/functions/Function1;)V public static final fun webSocket (Lio/ktor/client/HttpClient;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/ktor-client/ktor-client-core/api/ktor-client-core.klib.api b/ktor-client/ktor-client-core/api/ktor-client-core.klib.api index 0ea6d0e80bc..01db08dcc68 100644 --- a/ktor-client/ktor-client-core/api/ktor-client-core.klib.api +++ b/ktor-client/ktor-client-core/api/ktor-client-core.klib.api @@ -82,6 +82,13 @@ abstract interface io.ktor.client.plugins.sse/SSESession : kotlinx.coroutines/Co abstract fun (): kotlinx.coroutines.flow/Flow // io.ktor.client.plugins.sse/SSESession.incoming.|(){}[0] } +abstract interface io.ktor.client.plugins.sse/SSESessionWithDeserialization : kotlinx.coroutines/CoroutineScope { // io.ktor.client.plugins.sse/SSESessionWithDeserialization|null[0] + abstract val deserializer // io.ktor.client.plugins.sse/SSESessionWithDeserialization.deserializer|{}deserializer[0] + abstract fun (): kotlin/Function2 // io.ktor.client.plugins.sse/SSESessionWithDeserialization.deserializer.|(){}[0] + abstract val incoming // io.ktor.client.plugins.sse/SSESessionWithDeserialization.incoming|{}incoming[0] + abstract fun (): kotlinx.coroutines.flow/Flow> // io.ktor.client.plugins.sse/SSESessionWithDeserialization.incoming.|(){}[0] +} + abstract interface io.ktor.client.plugins.websocket/ClientWebSocketSession : io.ktor.websocket/WebSocketSession { // io.ktor.client.plugins.websocket/ClientWebSocketSession|null[0] abstract val call // io.ktor.client.plugins.websocket/ClientWebSocketSession.call|{}call[0] abstract fun (): io.ktor.client.call/HttpClientCall // io.ktor.client.plugins.websocket/ClientWebSocketSession.call.|(){}[0] @@ -437,6 +444,19 @@ final class io.ktor.client.plugins.sse/ClientSSESession : io.ktor.client.plugins final fun (): kotlinx.coroutines.flow/Flow // io.ktor.client.plugins.sse/ClientSSESession.incoming.|(){}[0] } +final class io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization : io.ktor.client.plugins.sse/SSESessionWithDeserialization { // io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization|null[0] + constructor (io.ktor.client.call/HttpClientCall, io.ktor.client.plugins.sse/SSESessionWithDeserialization) // io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization.|(io.ktor.client.call.HttpClientCall;io.ktor.client.plugins.sse.SSESessionWithDeserialization){}[0] + + final val call // io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization.call|{}call[0] + final fun (): io.ktor.client.call/HttpClientCall // io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization.call.|(){}[0] + final val coroutineContext // io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization.coroutineContext|{}coroutineContext[0] + final fun (): kotlin.coroutines/CoroutineContext // io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization.coroutineContext.|(){}[0] + final val deserializer // io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization.deserializer|{}deserializer[0] + final fun (): kotlin/Function2 // io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization.deserializer.|(){}[0] + final val incoming // io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization.incoming|{}incoming[0] + final fun (): kotlinx.coroutines.flow/Flow> // io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization.incoming.|(){}[0] +} + final class io.ktor.client.plugins.sse/DefaultClientSSESession : io.ktor.client.plugins.sse/SSESession { // io.ktor.client.plugins.sse/DefaultClientSSESession|null[0] constructor (io.ktor.client.plugins.sse/SSEClientContent, io.ktor.utils.io/ByteReadChannel, kotlin.coroutines/CoroutineContext) // io.ktor.client.plugins.sse/DefaultClientSSESession.|(io.ktor.client.plugins.sse.SSEClientContent;io.ktor.utils.io.ByteReadChannel;kotlin.coroutines.CoroutineContext){}[0] @@ -1398,6 +1418,8 @@ final fun io.ktor.client/HttpClient(io.ktor.client.engine/HttpClientEngine, kotl final fun io.ktor.client/HttpClient(kotlin/Function1, kotlin/Unit> = ...): io.ktor.client/HttpClient // io.ktor.client/HttpClient|HttpClient(kotlin.Function1,kotlin.Unit>){}[0] final inline fun (io.ktor.client.request.forms/FormBuilder).io.ktor.client.request.forms/append(kotlin/String, io.ktor.http/Headers = ..., kotlin/Long? = ..., crossinline kotlin/Function1) // io.ktor.client.request.forms/append|append@io.ktor.client.request.forms.FormBuilder(kotlin.String;io.ktor.http.Headers;kotlin.Long?;kotlin.Function1){}[0] final inline fun <#A: kotlin/Any?> io.ktor.client.plugins/unwrapRequestTimeoutException(kotlin/Function0<#A>): #A // io.ktor.client.plugins/unwrapRequestTimeoutException|unwrapRequestTimeoutException(kotlin.Function0<0:0>){0§}[0] +final inline fun <#A: reified kotlin/Any?> (io.ktor.client.plugins.sse/SSESessionWithDeserialization).io.ktor.client.plugins.sse/deserialize(io.ktor.sse/TypedServerSentEvent): #A? // io.ktor.client.plugins.sse/deserialize|deserialize@io.ktor.client.plugins.sse.SSESessionWithDeserialization(io.ktor.sse.TypedServerSentEvent){0§}[0] +final inline fun <#A: reified kotlin/Any?> (io.ktor.client.plugins.sse/SSESessionWithDeserialization).io.ktor.client.plugins.sse/deserialize(kotlin/String?): #A? // io.ktor.client.plugins.sse/deserialize|deserialize@io.ktor.client.plugins.sse.SSESessionWithDeserialization(kotlin.String?){0§}[0] final inline fun <#A: reified kotlin/Any?> (io.ktor.client.request/HttpRequestBuilder).io.ktor.client.request/setBody(#A) // io.ktor.client.request/setBody|setBody@io.ktor.client.request.HttpRequestBuilder(0:0){0§}[0] final suspend fun (io.ktor.client.call/HttpClientCall).io.ktor.client.call/save(): io.ktor.client.call/HttpClientCall // io.ktor.client.call/save|save@io.ktor.client.call.HttpClientCall(){}[0] final suspend fun (io.ktor.client.plugins.cache.storage/CacheStorage).io.ktor.client.plugins.cache.storage/store(io.ktor.client.statement/HttpResponse): io.ktor.client.plugins.cache.storage/CachedResponseData // io.ktor.client.plugins.cache.storage/store|store@io.ktor.client.plugins.cache.storage.CacheStorage(io.ktor.client.statement.HttpResponse){}[0] @@ -1414,17 +1436,29 @@ final suspend fun (io.ktor.client.statement/HttpResponse).io.ktor.client.stateme final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.cookies/cookies(io.ktor.http/Url): kotlin.collections/List // io.ktor.client.plugins.cookies/cookies|cookies@io.ktor.client.HttpClient(io.ktor.http.Url){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.cookies/cookies(kotlin/String): kotlin.collections/List // io.ktor.client.plugins.cookies/cookies|cookies@io.ktor.client.HttpClient(kotlin.String){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEvents(kotlin/Function1, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/serverSentEvents|serverSentEvents@io.ktor.client.HttpClient(kotlin.Function1;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.coroutines.SuspendFunction1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEvents(kotlin/Function1, kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/serverSentEvents|serverSentEvents@io.ktor.client.HttpClient(kotlin.Function1;kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.coroutines.SuspendFunction1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEvents(kotlin/String, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/serverSentEvents|serverSentEvents@io.ktor.client.HttpClient(kotlin.String;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1;kotlin.coroutines.SuspendFunction1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEvents(kotlin/String, kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/serverSentEvents|serverSentEvents@io.ktor.client.HttpClient(kotlin.String;kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1;kotlin.coroutines.SuspendFunction1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEvents(kotlin/String? = ..., kotlin/String? = ..., kotlin/Int? = ..., kotlin/String? = ..., kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/serverSentEvents|serverSentEvents@io.ktor.client.HttpClient(kotlin.String?;kotlin.String?;kotlin.Int?;kotlin.String?;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1;kotlin.coroutines.SuspendFunction1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEvents(kotlin/String? = ..., kotlin/String? = ..., kotlin/Int? = ..., kotlin/String? = ..., kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/serverSentEvents|serverSentEvents@io.ktor.client.HttpClient(kotlin.String?;kotlin.String?;kotlin.Int?;kotlin.String?;kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1;kotlin.coroutines.SuspendFunction1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEventsSession(kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1): io.ktor.client.plugins.sse/ClientSSESession // io.ktor.client.plugins.sse/serverSentEventsSession|serverSentEventsSession@io.ktor.client.HttpClient(kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEventsSession(kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1): io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization // io.ktor.client.plugins.sse/serverSentEventsSession|serverSentEventsSession@io.ktor.client.HttpClient(kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEventsSession(kotlin/String, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ...): io.ktor.client.plugins.sse/ClientSSESession // io.ktor.client.plugins.sse/serverSentEventsSession|serverSentEventsSession@io.ktor.client.HttpClient(kotlin.String;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEventsSession(kotlin/String, kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ...): io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization // io.ktor.client.plugins.sse/serverSentEventsSession|serverSentEventsSession@io.ktor.client.HttpClient(kotlin.String;kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEventsSession(kotlin/String? = ..., kotlin/String? = ..., kotlin/Int? = ..., kotlin/String? = ..., kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ...): io.ktor.client.plugins.sse/ClientSSESession // io.ktor.client.plugins.sse/serverSentEventsSession|serverSentEventsSession@io.ktor.client.HttpClient(kotlin.String?;kotlin.String?;kotlin.Int?;kotlin.String?;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/serverSentEventsSession(kotlin/String? = ..., kotlin/String? = ..., kotlin/Int? = ..., kotlin/String? = ..., kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ...): io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization // io.ktor.client.plugins.sse/serverSentEventsSession|serverSentEventsSession@io.ktor.client.HttpClient(kotlin.String?;kotlin.String?;kotlin.Int?;kotlin.String?;kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sse(kotlin/Function1, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/sse|sse@io.ktor.client.HttpClient(kotlin.Function1;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.coroutines.SuspendFunction1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sse(kotlin/Function1, kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/sse|sse@io.ktor.client.HttpClient(kotlin.Function1;kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.coroutines.SuspendFunction1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sse(kotlin/String, kotlin/Function1 = ..., kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/sse|sse@io.ktor.client.HttpClient(kotlin.String;kotlin.Function1;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.coroutines.SuspendFunction1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sse(kotlin/String, kotlin/Function1 = ..., kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/sse|sse@io.ktor.client.HttpClient(kotlin.String;kotlin.Function1;kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.coroutines.SuspendFunction1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sse(kotlin/String? = ..., kotlin/String? = ..., kotlin/Int? = ..., kotlin/String? = ..., kotlin/Function1 = ..., kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/sse|sse@io.ktor.client.HttpClient(kotlin.String?;kotlin.String?;kotlin.Int?;kotlin.String?;kotlin.Function1;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.coroutines.SuspendFunction1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sse(kotlin/String? = ..., kotlin/String? = ..., kotlin/Int? = ..., kotlin/String? = ..., kotlin/Function1 = ..., kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.sse/sse|sse@io.ktor.client.HttpClient(kotlin.String?;kotlin.String?;kotlin.Int?;kotlin.String?;kotlin.Function1;kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.coroutines.SuspendFunction1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sseSession(kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1): io.ktor.client.plugins.sse/ClientSSESession // io.ktor.client.plugins.sse/sseSession|sseSession@io.ktor.client.HttpClient(kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sseSession(kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1): io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization // io.ktor.client.plugins.sse/sseSession|sseSession@io.ktor.client.HttpClient(kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sseSession(kotlin/String, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ...): io.ktor.client.plugins.sse/ClientSSESession // io.ktor.client.plugins.sse/sseSession|sseSession@io.ktor.client.HttpClient(kotlin.String;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sseSession(kotlin/String, kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ...): io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization // io.ktor.client.plugins.sse/sseSession|sseSession@io.ktor.client.HttpClient(kotlin.String;kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sseSession(kotlin/String? = ..., kotlin/String? = ..., kotlin/Int? = ..., kotlin/String? = ..., kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ...): io.ktor.client.plugins.sse/ClientSSESession // io.ktor.client.plugins.sse/sseSession|sseSession@io.ktor.client.HttpClient(kotlin.String?;kotlin.String?;kotlin.Int?;kotlin.String?;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] +final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.sse/sseSession(kotlin/String? = ..., kotlin/String? = ..., kotlin/Int? = ..., kotlin/String? = ..., kotlin/Function2, kotlin.time/Duration? = ..., kotlin/Boolean? = ..., kotlin/Boolean? = ..., kotlin/Function1 = ...): io.ktor.client.plugins.sse/ClientSSESessionWithDeserialization // io.ktor.client.plugins.sse/sseSession|sseSession@io.ktor.client.HttpClient(kotlin.String?;kotlin.String?;kotlin.Int?;kotlin.String?;kotlin.Function2;kotlin.time.Duration?;kotlin.Boolean?;kotlin.Boolean?;kotlin.Function1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.websocket/webSocket(io.ktor.http/HttpMethod = ..., kotlin/String? = ..., kotlin/Int? = ..., kotlin/String? = ..., kotlin/Function1 = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.websocket/webSocket|webSocket@io.ktor.client.HttpClient(io.ktor.http.HttpMethod;kotlin.String?;kotlin.Int?;kotlin.String?;kotlin.Function1;kotlin.coroutines.SuspendFunction1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.websocket/webSocket(kotlin/Function1, kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.websocket/webSocket|webSocket@io.ktor.client.HttpClient(kotlin.Function1;kotlin.coroutines.SuspendFunction1){}[0] final suspend fun (io.ktor.client/HttpClient).io.ktor.client.plugins.websocket/webSocket(kotlin/String, kotlin/Function1 = ..., kotlin.coroutines/SuspendFunction1) // io.ktor.client.plugins.websocket/webSocket|webSocket@io.ktor.client.HttpClient(kotlin.String;kotlin.Function1;kotlin.coroutines.SuspendFunction1){}[0] diff --git a/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/ClientSSESession.kt b/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/ClientSSESession.kt index e554ee5619b..43287d5f361 100644 --- a/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/ClientSSESession.kt +++ b/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/ClientSSESession.kt @@ -6,22 +6,209 @@ package io.ktor.client.plugins.sse import io.ktor.client.call.* import io.ktor.sse.* +import io.ktor.util.reflect.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* /** - * A Server-sent events session. - */ + * A session for handling Server-Sent Events (SSE) from a server. + * + * Example of usage: + * ```kotlin + * client.sse("http://localhost:8080/sse") { // `this` is `ClientSSESession` + * incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * } + * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). +*/ public interface SSESession : CoroutineScope { /** - * An incoming server-sent events flow. + * An incoming Server-Sent Events (SSE) flow. + * + * Each [ServerSentEvent] can contain following fields: + * - [ServerSentEvent.data] data field of the event. + * - [ServerSentEvent.event] string identifying the type of event. + * - [ServerSentEvent.id] event ID. + * - [ServerSentEvent.retry] reconnection time, in milliseconds to wait before reconnecting. + * - [ServerSentEvent.comments] comment lines starting with a ':' character. */ public val incoming: Flow } /** - * A client Server-sent events session. + * A session with deserialization support for handling Server-Sent Events (SSE) from a server. * - * @property call associated with session. + * Example of usage: + * ```kotlin + * client.sse({ + * url("http://localhost:8080/serverSentEvents") + * }, deserialize = { + * typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) { // `this` is `ClientSSESessionWithDeserialization` + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). + */ +public interface SSESessionWithDeserialization : CoroutineScope { + /** + * An incoming Server-Sent Events (SSE) flow. + * + * Each [TypedServerSentEvent] can contain following fields: + * - [TypedServerSentEvent.data] data field of the event. It can be deserialized into an object + * of desired type using the [deserialize] function + * - [TypedServerSentEvent.event] string identifying the type of event. + * - [TypedServerSentEvent.id] event ID. + * - [TypedServerSentEvent.retry] reconnection time, in milliseconds to wait before reconnecting. + * - [TypedServerSentEvent.comments] comment lines starting with a ':' character. + */ + public val incoming: Flow> + + /** + * Deserializer for transforming the `data` field of a `ServerSentEvent` into a desired data object. + */ + public val deserializer: (TypeInfo, String) -> Any? +} + +/** + * Deserialize the provided [data] into an object of type [T] using the deserializer function + * defined in the [SSESessionWithDeserialization] interface. + * + * @param data The string data to deserialize. + * @return The deserialized object of type [T], or null if deserialization is not successful. + * + * Example of usage: + * ```kotlin + * client.sse({ + * url("http://localhost:8080/serverSentEvents") + * }, deserialize = { + * typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) { // `this` is `ClientSSESessionWithDeserialization` + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public inline fun SSESessionWithDeserialization.deserialize(data: String?): T? { + return data?.let { + deserializer(typeInfo(), data) as? T + } +} + +/** + * Deserialize the provided [event] data into an object of type [T] using the deserializer function + * defined in the [SSESessionWithDeserialization] interface. + * + * @param event The Server-sent event containing data to deserialize. + * @return The deserialized object of type [T], or null if deserialization is not successful. + * + * Example of usage: + * ```kotlin + * client.sse({ + * url("http://localhost:8080/serverSentEvents") + * }, deserialize = { + * typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) { // `this` is `ClientSSESessionWithDeserialization` + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public inline fun SSESessionWithDeserialization.deserialize( + event: TypedServerSentEvent +): T? = deserialize(event.data) + +/** + * A client session for handling Server-Sent Events (SSE) from a server. + * + * @property call The HTTP call associated with the session. + * + * Example of usage: + * ```kotlin + * client.sse("http://localhost:8080/sse") { // `this` is `ClientSSESession` + * incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * } + * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). */ public class ClientSSESession(public val call: HttpClientCall, delegate: SSESession) : SSESession by delegate + +/** + * A client session with deserialization support for handling Server-Sent Events (SSE) from a server. + * + * @property call The HTTP call associated with the session. + * + * Example of usage: + * ```kotlin + * client.sse({ + * url("http://localhost:8080/serverSentEvents") + * }, deserialize = { + * typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) { // `this` is `ClientSSESessionWithDeserialization` + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). + */ +public class ClientSSESessionWithDeserialization( + public val call: HttpClientCall, + delegate: SSESessionWithDeserialization +) : SSESessionWithDeserialization by delegate diff --git a/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/SSE.kt b/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/SSE.kt index ecb623f9d37..e88b58c61ee 100644 --- a/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/SSE.kt +++ b/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/SSE.kt @@ -11,30 +11,65 @@ import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.http.* import io.ktor.http.content.* +import io.ktor.sse.* import io.ktor.util.* import io.ktor.util.logging.* import io.ktor.util.pipeline.* +import io.ktor.util.reflect.* import io.ktor.utils.io.* +import kotlinx.coroutines.flow.* +import kotlin.coroutines.* internal val LOGGER = KtorSimpleLogger("io.ktor.client.plugins.sse.SSE") /** - * Indicates if a client engine supports Server-sent events. + * Indicates if a client engine supports Server-Sent Events (SSE). */ public data object SSECapability : HttpClientEngineCapability /** - * Client Server-sent events plugin that allows you to establish an SSE connection to a server - * and receive Server-sent events from it. + * Client Server-Sent Events (SSE) plugin that allows you to establish an SSE connection to a server + * and receive Server-Sent Events from it. + * For a simple session, use [ClientSSESession]. + * For a session with deserialization, use [ClientSSESessionWithDeserialization]. * * ```kotlin * val client = HttpClient { * install(SSE) * } - * client.sse { - * val event = incoming.receive() + * + * // SSE request + * client.serverSentEvents("http://localhost:8080/sse") { // `this` is `ClientSSESession` + * incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * } + * + * // SSE request with deserialization + * client.sse({ + * url("http://localhost:8080/serverSentEvents") + * }, deserialize = { + * typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) { // `this` is `ClientSSESessionWithDeserialization` + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } * } * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). */ @OptIn(InternalAPI::class) public val SSE: ClientPlugin = createClientPlugin( @@ -96,7 +131,24 @@ public val SSE: ClientPlugin = createClientPlugin( } LOGGER.trace("Receive SSE session from ${response.request.url}: $session") - proceedWith(HttpResponseContainer(info, ClientSSESession(context, session))) + + val deserializer = response.request.attributes.getOrNull(deserializerAttr) + val clientSSESession = deserializer?.let { + ClientSSESessionWithDeserialization( + context, + object : SSESessionWithDeserialization { + override val incoming: Flow> = + session.incoming.map { event: ServerSentEvent -> + TypedServerSentEvent(event.data, event.event, event.id, event.retry, event.comments) + } + + override val deserializer: (TypeInfo, String) -> Any? = deserializer + + override val coroutineContext: CoroutineContext = session.coroutineContext + } + ) + } ?: ClientSSESession(context, session) + proceedWith(HttpResponseContainer(info, clientSSESession)) } } diff --git a/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/SSEConfig.kt b/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/SSEConfig.kt index e52f296a547..b84098f4f8d 100644 --- a/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/SSEConfig.kt +++ b/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/SSEConfig.kt @@ -23,14 +23,14 @@ public class SSEConfig { public var reconnectionTime: Duration = 3000.milliseconds /** - * Add events consisting only of comments in the incoming flow. + * Adds events consisting only of comments in the incoming flow. */ public fun showCommentEvents() { showCommentEvents = true } /** - * Add events consisting only of the retry field in the incoming flow. + * Adds events consisting only of the retry field in the incoming flow. */ public fun showRetryEvents() { showRetryEvents = true diff --git a/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/builders.kt b/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/builders.kt index e8c4e8f82bc..0c27d308628 100644 --- a/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/builders.kt +++ b/ktor-client/ktor-client-core/common/src/io/ktor/client/plugins/sse/builders.kt @@ -10,6 +10,7 @@ import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.http.* import io.ktor.util.* +import io.ktor.util.reflect.* import kotlinx.coroutines.* import kotlin.time.* @@ -17,9 +18,20 @@ internal val sseRequestAttr = AttributeKey("SSERequestFlag") internal val reconnectionTimeAttr = AttributeKey("SSEReconnectionTime") internal val showCommentEventsAttr = AttributeKey("SSEShowCommentEvents") internal val showRetryEventsAttr = AttributeKey("SSEShowRetryEvents") +internal val deserializerAttr = AttributeKey<(TypeInfo, String) -> Any?>("SSEDeserializer") /** * Installs the [SSE] plugin using the [config] as configuration. + * + * Example of usage: + * ```kotlin + * val client = HttpClient() { + * SSE { + * showCommentEvents() + * showRetryEvents() + * } + * } + * ``` */ public fun HttpClientConfig<*>.SSE(config: SSEConfig.() -> Unit) { install(SSE) { @@ -27,42 +39,52 @@ public fun HttpClientConfig<*>.SSE(config: SSEConfig.() -> Unit) { } } +// Builders for the `ClientSSESession` + /** - * Opens a [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.serverSentEventsSession { + * url("http://localhost:8080/sse") + * } + * session.incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * ``` */ public suspend fun HttpClient.serverSentEventsSession( reconnectionTime: Duration? = null, showCommentEvents: Boolean? = null, showRetryEvents: Boolean? = null, block: HttpRequestBuilder.() -> Unit -): ClientSSESession { - plugin(SSE) - - val sessionDeferred = CompletableDeferred() - val statement = prepareRequest { - block() - addAttribute(sseRequestAttr, true) - addAttribute(reconnectionTimeAttr, reconnectionTime) - addAttribute(showCommentEventsAttr, showCommentEvents) - addAttribute(showRetryEventsAttr, showRetryEvents) - } - @Suppress("SuspendFunctionOnCoroutineScope") - launch { - try { - statement.body { session -> - sessionDeferred.complete(session) - } - } catch (cause: CancellationException) { - sessionDeferred.cancel(cause) - } catch (cause: Throwable) { - sessionDeferred.completeExceptionally(mapToSSEException(response = null, cause)) - } - } - return sessionDeferred.await() -} +): ClientSSESession = processSession(reconnectionTime, showCommentEvents, showRetryEvents, block) {} /** - * Opens a [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.serverSentEventsSession { + * url("http://localhost:8080/sse") + * } + * session.incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * ``` */ public suspend fun HttpClient.serverSentEventsSession( scheme: String? = null, @@ -79,7 +101,23 @@ public suspend fun HttpClient.serverSentEventsSession( } /** - * Opens a [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.serverSentEventsSession { + * url("http://localhost:8080/sse") + * } + * session.incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * ``` */ public suspend fun HttpClient.serverSentEventsSession( urlString: String, @@ -93,7 +131,22 @@ public suspend fun HttpClient.serverSentEventsSession( } /** - * Opens a [block] with [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server and performs [block]. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.serverSentEvents("http://localhost:8080/sse") { // `this` is `ClientSSESession` + * incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * } + * ``` */ public suspend fun HttpClient.serverSentEvents( request: HttpRequestBuilder.() -> Unit, @@ -115,7 +168,22 @@ public suspend fun HttpClient.serverSentEvents( } /** - * Opens a [block] with [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server and performs [block]. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.serverSentEvents("http://localhost:8080/sse") { // `this` is `ClientSSESession` + * incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * } + * ``` */ public suspend fun HttpClient.serverSentEvents( scheme: String? = null, @@ -141,7 +209,22 @@ public suspend fun HttpClient.serverSentEvents( } /** - * Opens a [block] with [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server and performs [block]. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.serverSentEvents("http://localhost:8080/sse") { // `this` is `ClientSSESession` + * incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * } + * ``` */ public suspend fun HttpClient.serverSentEvents( urlString: String, @@ -164,7 +247,23 @@ public suspend fun HttpClient.serverSentEvents( } /** - * Opens a [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.sseSession { + * url("http://localhost:8080/sse") + * } + * session.incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * ``` */ public suspend fun HttpClient.sseSession( reconnectionTime: Duration? = null, @@ -174,7 +273,23 @@ public suspend fun HttpClient.sseSession( ): ClientSSESession = serverSentEventsSession(reconnectionTime, showCommentEvents, showRetryEvents, block) /** - * Opens a [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.sseSession { + * url("http://localhost:8080/sse") + * } + * session.incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * ``` */ public suspend fun HttpClient.sseSession( scheme: String? = null, @@ -189,7 +304,23 @@ public suspend fun HttpClient.sseSession( serverSentEventsSession(scheme, host, port, path, reconnectionTime, showCommentEvents, showRetryEvents, block) /** - * Opens a [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.sseSession { + * url("http://localhost:8080/sse") + * } + * session.incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * ``` */ public suspend fun HttpClient.sseSession( urlString: String, @@ -200,7 +331,22 @@ public suspend fun HttpClient.sseSession( ): ClientSSESession = serverSentEventsSession(urlString, reconnectionTime, showCommentEvents, showRetryEvents, block) /** - * Opens a [block] with [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server and performs [block]. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.sse("http://localhost:8080/sse") { // `this` is `ClientSSESession` + * incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * } + * ``` */ public suspend fun HttpClient.sse( request: HttpRequestBuilder.() -> Unit, @@ -211,7 +357,22 @@ public suspend fun HttpClient.sse( ): Unit = serverSentEvents(request, reconnectionTime, showCommentEvents, showRetryEvents, block) /** - * Opens a [block] with [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server and performs [block]. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.sse("http://localhost:8080/sse") { // `this` is `ClientSSESession` + * incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * } + * ``` */ public suspend fun HttpClient.sse( scheme: String? = null, @@ -227,7 +388,22 @@ public suspend fun HttpClient.sse( serverSentEvents(scheme, host, port, path, reconnectionTime, showCommentEvents, showRetryEvents, request, block) /** - * Opens a [block] with [ClientSSESession]. + * Opens a [ClientSSESession] to receive Server-Sent Events (SSE) from a server and performs [block]. + * + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.sse("http://localhost:8080/sse") { // `this` is `ClientSSESession` + * incoming.collect { event -> + * println("Id: ${event.id}") + * println("Event: ${event.event}") + * println("Data: ${event.data}") + * } + * } + * ``` */ public suspend fun HttpClient.sse( urlString: String, @@ -238,6 +414,618 @@ public suspend fun HttpClient.sse( block: suspend ClientSSESession.() -> Unit ): Unit = serverSentEvents(urlString, reconnectionTime, showCommentEvents, showRetryEvents, request, block) +// Builders for the `ClientSSESessionWithDeserialization` + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent`. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.serverSentEventsSession("http://localhost:8080/sse", deserialize = { typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) + * + * session.apply { + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.serverSentEventsSession( + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + block: HttpRequestBuilder.() -> Unit +): ClientSSESessionWithDeserialization = processSession(reconnectionTime, showCommentEvents, showRetryEvents, block) { + addAttribute( + deserializerAttr, + deserialize + ) +} + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent`. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.serverSentEventsSession("http://localhost:8080/sse", deserialize = { typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) + * + * session.apply { + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.serverSentEventsSession( + scheme: String? = null, + host: String? = null, + port: Int? = null, + path: String? = null, + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + block: HttpRequestBuilder.() -> Unit = {} +): ClientSSESessionWithDeserialization = + serverSentEventsSession(deserialize, reconnectionTime, showCommentEvents, showRetryEvents) { + url(scheme, host, port, path) + block() + } + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent`. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.serverSentEventsSession("http://localhost:8080/sse", deserialize = { typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) + * + * session.apply { + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.serverSentEventsSession( + urlString: String, + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + block: HttpRequestBuilder.() -> Unit = {} +): ClientSSESessionWithDeserialization = + serverSentEventsSession(deserialize, reconnectionTime, showCommentEvents, showRetryEvents) { + url.takeFrom(urlString) + block() + } + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent`. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.serverSentEvents({ + * url("http://localhost:8080/sse") + * }, deserialize = { + * typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) { // `this` is `ClientSSESessionWithDeserialization` + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.serverSentEvents( + request: HttpRequestBuilder.() -> Unit, + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + block: suspend ClientSSESessionWithDeserialization.() -> Unit +) { + val session = serverSentEventsSession(deserialize, reconnectionTime, showCommentEvents, showRetryEvents, request) + try { + block(session) + } catch (cause: CancellationException) { + throw cause + } catch (cause: Throwable) { + throw mapToSSEException(session.call.response, cause) + } finally { + session.cancel() + } +} + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent` and performs [block]. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.serverSentEvents({ + * url("http://localhost:8080/sse") + * }, deserialize = { + * typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) { // `this` is `ClientSSESessionWithDeserialization` + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.serverSentEvents( + scheme: String? = null, + host: String? = null, + port: Int? = null, + path: String? = null, + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + request: HttpRequestBuilder.() -> Unit = {}, + block: suspend ClientSSESessionWithDeserialization.() -> Unit +) { + serverSentEvents( + { + url(scheme, host, port, path) + request() + }, + deserialize, + reconnectionTime, + showCommentEvents, + showRetryEvents, + block + ) +} + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent` and performs [block]. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.sse({ + * url("http://localhost:8080/serverSentEvents") + * }, deserialize = { + * typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) { // `this` is `ClientSSESessionWithDeserialization` + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.serverSentEvents( + urlString: String, + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + request: HttpRequestBuilder.() -> Unit = {}, + block: suspend ClientSSESessionWithDeserialization.() -> Unit +) { + serverSentEvents( + { + url.takeFrom(urlString) + request() + }, + deserialize, + reconnectionTime, + showCommentEvents, + showRetryEvents, + block + ) +} + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent` and performs [block]. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.sseSession("http://localhost:8080/sse", deserialize = { typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) + * + * session.apply { + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.sseSession( + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + block: HttpRequestBuilder.() -> Unit +): ClientSSESessionWithDeserialization = + serverSentEventsSession(deserialize, reconnectionTime, showCommentEvents, showRetryEvents, block) + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent` and performs [block]. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.sseSession("http://localhost:8080/sse", deserialize = { typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) + * + * session.apply { + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.sseSession( + scheme: String? = null, + host: String? = null, + port: Int? = null, + path: String? = null, + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + block: HttpRequestBuilder.() -> Unit = {} +): ClientSSESessionWithDeserialization = serverSentEventsSession( + scheme, + host, + port, + path, + deserialize, + reconnectionTime, + showCommentEvents, + showRetryEvents, + block +) + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent` and performs [block]. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * val session = client.sseSession("http://localhost:8080/sse", deserialize = { typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) + * + * session.apply { + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.sseSession( + urlString: String, + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + block: HttpRequestBuilder.() -> Unit = {} +): ClientSSESessionWithDeserialization = + serverSentEventsSession(urlString, deserialize, reconnectionTime, showCommentEvents, showRetryEvents, block) + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent` and performs [block]. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.sse({ + * url("http://localhost:8080/sse") + * }, deserialize = { + * typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) { // `this` is `ClientSSESessionWithDeserialization` + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.sse( + request: HttpRequestBuilder.() -> Unit, + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + block: suspend ClientSSESessionWithDeserialization.() -> Unit +): Unit = serverSentEvents(request, deserialize, reconnectionTime, showCommentEvents, showRetryEvents, block) + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent` and performs [block]. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.sse({ + * url("http://localhost:8080/sse") + * }, deserialize = { + * typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) { // `this` is `ClientSSESessionWithDeserialization` + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.sse( + scheme: String? = null, + host: String? = null, + port: Int? = null, + path: String? = null, + request: HttpRequestBuilder.() -> Unit = {}, + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + block: suspend ClientSSESessionWithDeserialization.() -> Unit +): Unit = serverSentEvents( + scheme, + host, + port, + path, + deserialize, + reconnectionTime, + showCommentEvents, + showRetryEvents, + request, + block +) + +/** + * Opens a [ClientSSESessionWithDeserialization] to receive Server-Sent Events (SSE) from a server with ability to + * deserialize the `data` field of the `TypedServerSentEvent` and performs [block]. + * + * @param deserialize The deserializer function to transform the `data` field of the `TypedServerSentEvent` + * into an object + * @param reconnectionTime The time duration to wait before attempting reconnection in case of connection loss + * @param showCommentEvents When enabled, events containing only comments field will be presented in the incoming flow + * @param showRetryEvents When enabled, events containing only comments field will be presented in the incoming flow + * + * Example of usage: + * ```kotlin + * client.sse({ + * url("http://localhost:8080/sse") + * }, deserialize = { + * typeInfo, jsonString -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.decodeFromString(serializer, jsonString)!! + * }) { // `this` is `ClientSSESessionWithDeserialization` + * incoming.collect { event: TypedServerSentEvent -> + * when (event.event) { + * "customer" -> { + * val customer: Customer? = deserialize(event.data) + * } + * "product" -> { + * val product: Product? = deserialize(event.data) + * } + * } + * } + * } + * ``` + */ +public suspend fun HttpClient.sse( + urlString: String, + request: HttpRequestBuilder.() -> Unit = {}, + deserialize: (TypeInfo, String) -> Any?, + reconnectionTime: Duration? = null, + showCommentEvents: Boolean? = null, + showRetryEvents: Boolean? = null, + block: suspend ClientSSESessionWithDeserialization.() -> Unit +): Unit = serverSentEvents(urlString, deserialize, reconnectionTime, showCommentEvents, showRetryEvents, request, block) + +private suspend inline fun HttpClient.processSession( + reconnectionTime: Duration?, + showCommentEvents: Boolean?, + showRetryEvents: Boolean?, + block: HttpRequestBuilder.() -> Unit, + additionalAttributes: HttpRequestBuilder.() -> Unit +): T { + plugin(SSE) + + val sessionDeferred = CompletableDeferred() + val statement = prepareRequest { + block() + addAttribute(sseRequestAttr, true) + addAttribute(reconnectionTimeAttr, reconnectionTime) + addAttribute(showCommentEventsAttr, showCommentEvents) + addAttribute(showRetryEventsAttr, showRetryEvents) + additionalAttributes() + } + @Suppress("SuspendFunctionOnCoroutineScope") + launch { + try { + statement.body { session -> + sessionDeferred.complete(session) + } + } catch (cause: CancellationException) { + sessionDeferred.cancel(cause) + } catch (cause: Throwable) { + sessionDeferred.completeExceptionally(mapToSSEException(response = null, cause)) + } + } + return sessionDeferred.await() +} + private fun HttpRequestBuilder.addAttribute(attributeKey: AttributeKey, value: T?) { if (value != null) { attributes.put(attributeKey, value) diff --git a/ktor-client/ktor-client-core/common/src/io/ktor/client/request/HttpRequest.kt b/ktor-client/ktor-client-core/common/src/io/ktor/client/request/HttpRequest.kt index a55c6b4395d..e4f283b97a3 100644 --- a/ktor-client/ktor-client-core/common/src/io/ktor/client/request/HttpRequest.kt +++ b/ktor-client/ktor-client-core/common/src/io/ktor/client/request/HttpRequest.kt @@ -339,8 +339,9 @@ public class SSEClientResponseAdapter : ResponseAdapter { status == HttpStatusCode.OK && contentType?.withoutParameters() == ContentType.Text.EventStream ) { + outgoingContent as SSEClientContent DefaultClientSSESession( - outgoingContent as SSEClientContent, + outgoingContent, responseBody, callContext ) diff --git a/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/plugins/SSESessionParserTest.kt b/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/plugins/SSESessionParserTest.kt index 1715926351fa3dbaa6eb3b6b30043c7653089bac..68d906b26c8dc691259c8b252084de55de370b30 100644 GIT binary patch delta 988 zcmb7?!EVz)5Qc9Q<<clED$$3`j5dviwSOAx#Es3LW<6<685{ z=_tArNC#LL=Efnne0RpKSi?!t+X!WeVa?O-xa0IjqV$mM<~clhYjk+rW}nspx3dN( z-9|~M@iy_Y!rbDJI=I1?o4id2r%jZ3mN{1!Jyn({ZTitK#;1*VVX)mkI7X`{|Kd)| z%1(MpT3>jDIj&D|U&C#89m0%=;!eZHjX`bG4`UD1d=+F~?&2z~L1XCZLguOz@+>ia zDKLr?8p3{}u_}0u5o9`(e)c_pcv;QN79>^;@C+}bxXXZ1+-gxqaRYL(o4PXiUNnNl i`Tv9Y+fb|?ug=6(Ven+-z}B|AFX4N49X$N}=;k+P>-Js% delta 62 zcmaE1_uO { - client.serverSentEventsSession() + client.serverSentEventsSession {} }.apply { assertContains(message!!, SSE.key.name) } @@ -431,4 +433,112 @@ class ServerSentEventsTest : ClientLoader(2.minutes) { } } } + + class Person(val name: String) + class Data(val value: String) + + @Test + fun testDeserializer() = clientTests { + config { + install(SSE) + } + + test { client -> + val count = 10 + var size = 0 + client.sse( + { + url("$TEST_SERVER/sse/person") + parameter("times", count) + }, + deserialize = { _, it -> Person(it) } + ) { + incoming.collectIndexed { i, event -> + val person = deserialize(event) + assertEquals("Name $i", person?.name) + assertEquals("$i", event.id) + size++ + } + } + assertEquals(count, size) + } + } + + @Test + fun testExceptionIfWrongDeserializerProvided() = clientTests { + config { + install(SSE) + } + + test { client -> + assertFailsWith { + client.sse({ url("$TEST_SERVER/sse/person") }, { _, it -> Data(it) }) { + incoming.single().apply { + val data = deserialize(data) + assertEquals("Name 0", data?.name) + } + } + } + } + } + + class Person1(val name: String) + class Person2(val middleName: String) + + @Test + fun testDifferentDeserializers() = clientTests { + config { + install(SSE) + } + + test { client -> + client.sse({ url("$TEST_SERVER/sse/person") }, deserialize = { _, str -> Person1(str) }) { + incoming.single().apply { + assertEquals("Name 0", deserialize(data)?.name) + } + } + client.sse({ url("$TEST_SERVER/sse/person") }, deserialize = { _, str -> Person2(str) }) { + incoming.single().apply { + assertEquals("Name 0", deserialize(data)?.middleName) + } + } + } + } + + @Serializable + data class Customer(val id: Int, val firstName: String, val lastName: String) + + @Serializable + data class Product(val name: String, val price: Int) + + @Test + fun testJsonDeserializer() = clientTests { + config { + install(SSE) + } + + test { client -> + client.sse({ + url("$TEST_SERVER/sse/json") + }, deserialize = { typeInfo, jsonString -> + val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + Json.decodeFromString(serializer, jsonString) ?: Exception() + }) { + var firstIsCustomer = true + incoming.collect { event: TypedServerSentEvent -> + if (firstIsCustomer) { + val customer = deserialize(event.data) + assertEquals(1, customer?.id) + assertEquals("Jet", customer?.firstName) + assertEquals("Brains", customer?.lastName) + firstIsCustomer = false + } else { + val product = deserialize(event.data) + assertEquals("Milk", product?.name) + assertEquals(100, product?.price) + } + } + } + } + } } diff --git a/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/Multipart.kt b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/Multipart.kt index 5c2b5c0530d..670eb13fcea 100644 --- a/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/Multipart.kt +++ b/ktor-http/ktor-http-cio/common/src/io/ktor/http/cio/Multipart.kt @@ -305,9 +305,11 @@ internal fun parseBoundaryInternal(contentType: CharSequence): ByteArray { var position = 0 fun put(value: Byte) { - if (position >= boundaryBytes.size) throw IOException( - "Failed to parse multipart: boundary shouldn't be longer than 70 characters" - ) + if (position >= boundaryBytes.size) { + throw IOException( + "Failed to parse multipart: boundary shouldn't be longer than 70 characters" + ) + } boundaryBytes[position++] = value } diff --git a/ktor-io/common/src/io/ktor/utils/io/ByteChannel.kt b/ktor-io/common/src/io/ktor/utils/io/ByteChannel.kt index 822cf1542b1..816ed2c176b 100644 --- a/ktor-io/common/src/io/ktor/utils/io/ByteChannel.kt +++ b/ktor-io/common/src/io/ktor/utils/io/ByteChannel.kt @@ -272,5 +272,4 @@ public class ByteChannel(public val autoFlush: Boolean = false) : ByteReadChanne public class ConcurrentIOException( taskName: String, cause: Throwable? = null -) : IllegalStateException("Concurrent $taskName attempts", cause) { -} +) : IllegalStateException("Concurrent $taskName attempts", cause) diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.api b/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.api index 1cc44ada840..ccc9395d0d5 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.api +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.api @@ -1,6 +1,8 @@ public final class io/ktor/server/sse/RoutingKt { public static final fun sse (Lio/ktor/server/routing/Route;Ljava/lang/String;Lkotlin/jvm/functions/Function2;)V + public static final fun sse (Lio/ktor/server/routing/Route;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V public static final fun sse (Lio/ktor/server/routing/Route;Lkotlin/jvm/functions/Function2;)V + public static final fun sse (Lio/ktor/server/routing/Route;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V } public final class io/ktor/server/sse/SSEKt { @@ -9,9 +11,12 @@ public final class io/ktor/server/sse/SSEKt { public final class io/ktor/server/sse/SSEServerContent : io/ktor/http/content/OutgoingContent$WriteChannelContent { public fun (Lio/ktor/server/application/ApplicationCall;Lkotlin/jvm/functions/Function2;)V + public fun (Lio/ktor/server/application/ApplicationCall;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V + public synthetic fun (Lio/ktor/server/application/ApplicationCall;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun getCall ()Lio/ktor/server/application/ApplicationCall; public fun getContentType ()Lio/ktor/http/ContentType; public final fun getHandle ()Lkotlin/jvm/functions/Function2; + public final fun getSerialize ()Lkotlin/jvm/functions/Function2; public fun toString ()Ljava/lang/String; public fun writeTo (Lio/ktor/utils/io/ByteWriteChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } @@ -28,3 +33,11 @@ public final class io/ktor/server/sse/ServerSSESession$DefaultImpls { public static synthetic fun send$default (Lio/ktor/server/sse/ServerSSESession;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } +public abstract interface class io/ktor/server/sse/ServerSSESessionWithSerialization : io/ktor/server/sse/ServerSSESession { + public abstract fun getSerializer ()Lkotlin/jvm/functions/Function2; +} + +public final class io/ktor/server/sse/ServerSSESessionWithSerialization$DefaultImpls { + public static fun send (Lio/ktor/server/sse/ServerSSESessionWithSerialization;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.klib.api b/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.klib.api index e49e67192ea..93d8bbf2b19 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.klib.api +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.klib.api @@ -15,8 +15,14 @@ abstract interface io.ktor.server.sse/ServerSSESession : kotlinx.coroutines/Coro open suspend fun send(kotlin/String? = ..., kotlin/String? = ..., kotlin/String? = ..., kotlin/Long? = ..., kotlin/String? = ...) // io.ktor.server.sse/ServerSSESession.send|send(kotlin.String?;kotlin.String?;kotlin.String?;kotlin.Long?;kotlin.String?){}[0] } +abstract interface io.ktor.server.sse/ServerSSESessionWithSerialization : io.ktor.server.sse/ServerSSESession { // io.ktor.server.sse/ServerSSESessionWithSerialization|null[0] + abstract val serializer // io.ktor.server.sse/ServerSSESessionWithSerialization.serializer|{}serializer[0] + abstract fun (): kotlin/Function2 // io.ktor.server.sse/ServerSSESessionWithSerialization.serializer.|(){}[0] +} + final class io.ktor.server.sse/SSEServerContent : io.ktor.http.content/OutgoingContent.WriteChannelContent { // io.ktor.server.sse/SSEServerContent|null[0] constructor (io.ktor.server.application/ApplicationCall, kotlin.coroutines/SuspendFunction1) // io.ktor.server.sse/SSEServerContent.|(io.ktor.server.application.ApplicationCall;kotlin.coroutines.SuspendFunction1){}[0] + constructor (io.ktor.server.application/ApplicationCall, kotlin.coroutines/SuspendFunction1, kotlin/Function2? = ...) // io.ktor.server.sse/SSEServerContent.|(io.ktor.server.application.ApplicationCall;kotlin.coroutines.SuspendFunction1;kotlin.Function2?){}[0] final val call // io.ktor.server.sse/SSEServerContent.call|{}call[0] final fun (): io.ktor.server.application/ApplicationCall // io.ktor.server.sse/SSEServerContent.call.|(){}[0] @@ -24,6 +30,8 @@ final class io.ktor.server.sse/SSEServerContent : io.ktor.http.content/OutgoingC final fun (): io.ktor.http/ContentType // io.ktor.server.sse/SSEServerContent.contentType.|(){}[0] final val handle // io.ktor.server.sse/SSEServerContent.handle|{}handle[0] final fun (): kotlin.coroutines/SuspendFunction1 // io.ktor.server.sse/SSEServerContent.handle.|(){}[0] + final val serialize // io.ktor.server.sse/SSEServerContent.serialize|{}serialize[0] + final fun (): kotlin/Function2? // io.ktor.server.sse/SSEServerContent.serialize.|(){}[0] final fun toString(): kotlin/String // io.ktor.server.sse/SSEServerContent.toString|toString(){}[0] final suspend fun writeTo(io.ktor.utils.io/ByteWriteChannel) // io.ktor.server.sse/SSEServerContent.writeTo|writeTo(io.ktor.utils.io.ByteWriteChannel){}[0] @@ -33,4 +41,9 @@ final val io.ktor.server.sse/SSE // io.ktor.server.sse/SSE|{}SSE[0] final fun (): io.ktor.server.application/ApplicationPlugin // io.ktor.server.sse/SSE.|(){}[0] final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin.coroutines/SuspendFunction1) // io.ktor.server.sse/sse|sse@io.ktor.server.routing.Route(kotlin.coroutines.SuspendFunction1){}[0] +final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/Function2, kotlin.coroutines/SuspendFunction1) // io.ktor.server.sse/sse|sse@io.ktor.server.routing.Route(kotlin.Function2;kotlin.coroutines.SuspendFunction1){}[0] final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/String, kotlin.coroutines/SuspendFunction1) // io.ktor.server.sse/sse|sse@io.ktor.server.routing.Route(kotlin.String;kotlin.coroutines.SuspendFunction1){}[0] +final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/String, kotlin/Function2, kotlin.coroutines/SuspendFunction1) // io.ktor.server.sse/sse|sse@io.ktor.server.routing.Route(kotlin.String;kotlin.Function2;kotlin.coroutines.SuspendFunction1){}[0] +final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(#A) // io.ktor.server.sse/send|send@io.ktor.server.sse.ServerSSESessionWithSerialization(0:0){0§}[0] +final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(#A? = ..., kotlin/String? = ..., kotlin/String? = ..., kotlin/Long? = ..., kotlin/String? = ...) // io.ktor.server.sse/send|send@io.ktor.server.sse.ServerSSESessionWithSerialization(0:0?;kotlin.String?;kotlin.String?;kotlin.Long?;kotlin.String?){0§}[0] +final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(io.ktor.sse/TypedServerSentEvent<#A>) // io.ktor.server.sse/send|send@io.ktor.server.sse.ServerSSESessionWithSerialization(io.ktor.sse.TypedServerSentEvent<0:0>){0§}[0] diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/build.gradle.kts b/ktor-server/ktor-server-plugins/ktor-server-sse/build.gradle.kts index 5148437ca11..f1ad9f6b9fc 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/build.gradle.kts +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/build.gradle.kts @@ -1,9 +1,19 @@ description = "Server-sent events (SSE) support" +plugins { + id("kotlinx-serialization") +} + kotlin.sourceSets { commonMain { dependencies { api(project(":ktor-shared:ktor-sse")) } } + commonTest { + dependencies { + api(project(":ktor-shared:ktor-serialization:ktor-serialization-kotlinx")) + api(project(":ktor-shared:ktor-serialization:ktor-serialization-kotlinx:ktor-serialization-kotlinx-json")) + } + } } diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/Routing.kt b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/Routing.kt index 4ed05d3988a..8665c97bb2e 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/Routing.kt +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/Routing.kt @@ -7,21 +7,35 @@ package io.ktor.server.sse import io.ktor.http.* import io.ktor.server.response.* import io.ktor.server.routing.* +import io.ktor.util.reflect.* /** * Adds a route to handle Server-Sent Events (SSE) at the specified [path] using the provided [handler]. * Requires [SSE] plugin to be installed. * - * @param path URL path at which to handle SSE requests. - * @param handler function that defines the behavior of the SSE session. It is invoked when a client connects to the SSE - * endpoint. Inside the handler, you can use the functions provided by [ServerSSESession] + * @param path A URL path at which to handle Server-Sent Events (SSE) requests. + * @param handler A function that defines the behavior of the SSE session. It is invoked when a client connects to the SSE + * endpoint. Inside the handler, you can use the functions provided by [ServerSSESessionWithSerialization] * to send events to the connected clients. * + * Example of usage: + * ```kotlin + * install(SSE) + * routing { + * sse("/events") { + * repeat(100) { + * send(ServerSentEvent("event $it")) + * } + * } + * } + * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). + * * @see ServerSSESession */ public fun Route.sse(path: String, handler: suspend ServerSSESession.() -> Unit) { - plugin(SSE) - route(path, HttpMethod.Get) { sse(handler) } @@ -31,13 +45,122 @@ public fun Route.sse(path: String, handler: suspend ServerSSESession.() -> Unit) * Adds a route to handle Server-Sent Events (SSE) using the provided [handler]. * Requires [SSE] plugin to be installed. * - * @param handler function that defines the behavior of the SSE session. It is invoked when a client connects to the SSE - * endpoint. Inside the handler, you can use the functions provided by [ServerSSESession] + * @param handler A function that defines the behavior of the SSE session. It is invoked when a client connects to the SSE + * endpoint. Inside the handler, you can use the functions provided by [ServerSSESessionWithSerialization] * to send events to the connected clients. * + * Example of usage: + * ```kotlin + * install(SSE) + * routing { + * sse { + * repeat(100) { + * send(ServerSentEvent("event $it")) + * } + * } + * } + * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). + * * @see ServerSSESession */ -public fun Route.sse(handler: suspend ServerSSESession.() -> Unit) { +public fun Route.sse(handler: suspend ServerSSESession.() -> Unit): Unit = processSSEWithoutSerialization(handler) + +/** + * Adds a route to handle Server-Sent Events (SSE) at the specified [path] using the provided [handler]. + * Requires [SSE] plugin to be installed. + * + * @param path A URL path at which to handle Server-Sent Events (SSE) requests. + * @param serialize A function to serialize data objects into the `data` field of a `ServerSentEvent`. + * @param handler A function that defines the behavior of the SSE session. It is invoked when a client connects to the SSE + * endpoint. Inside the handler, you can use the functions provided by [ServerSSESessionWithSerialization] + * to send events to the connected clients. + * + * Example of usage: + * ```kotlin + * install(SSE) + * routing { + * sse("/json", serialize = { typeInfo, it -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.encodeToString(serializer, it) + * }) { + * send(Customer(0, "Jet", "Brains")) + * send(Product(0, listOf(100, 200))) + * } + * } + * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). + * + * @see ServerSSESessionWithSerialization + */ +public fun Route.sse( + path: String, + serialize: (TypeInfo, Any) -> String, + handler: suspend ServerSSESessionWithSerialization.() -> Unit +) { + route(path, HttpMethod.Get) { + sse(serialize, handler) + } +} + +/** + * Adds a route to handle Server-Sent Events (SSE) using the provided [handler]. + * Requires [SSE] plugin to be installed. + * + * @param serialize A function to serialize data objects into the `data` field of a `ServerSentEvent`. + * @param handler A function that defines the behavior of the SSE session. It is invoked when a client connects to the SSE + * endpoint. Inside the handler, you can use the functions provided by [ServerSSESessionWithSerialization] + * to send events to the connected clients. + * + * Example of usage: + * ```kotlin + * install(SSE) + * routing { + * sse(serialize = { typeInfo, it -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.encodeToString(serializer, it) + * }) { + * send(Customer(0, "Jet", "Brains")) + * send(Product(0, listOf(100, 200))) + * } + * } + * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). + * + * @see ServerSSESessionWithSerialization + */ +public fun Route.sse( + serialize: (TypeInfo, Any) -> String, + handler: suspend ServerSSESessionWithSerialization.() -> Unit +): Unit = processSSEWithSerialization(serialize, handler) + +private fun Route.processSSEWithoutSerialization( + handler: suspend ServerSSESession.() -> Unit +) = processSSE(null, handler) + +private fun Route.processSSEWithSerialization( + serialize: ((TypeInfo, Any) -> String), + handler: suspend ServerSSESessionWithSerialization.() -> Unit +) { + val sessionHandler: suspend ServerSSESession.() -> Unit = { + check(this is ServerSSESessionWithSerialization) { + "Impossible state. Please report this bug: https://youtrack.jetbrains.com/newIssue?project=KTOR" + } + handler() + } + processSSE(serialize, sessionHandler) +} + +private fun Route.processSSE( + serialize: ((TypeInfo, Any) -> String)?, + handler: suspend ServerSSESession.() -> Unit +) { plugin(SSE) handle { @@ -45,6 +168,6 @@ public fun Route.sse(handler: suspend ServerSSESession.() -> Unit) { call.response.header(HttpHeaders.CacheControl, "no-store") call.response.header(HttpHeaders.Connection, "keep-alive") call.response.header("X-Accel-Buffering", "no") - call.respond(SSEServerContent(call, handler)) + call.respond(SSEServerContent(call, handler, serialize)) } } diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSE.kt b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSE.kt index efbc9129de6..bf8b558f8fd 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSE.kt +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSE.kt @@ -12,17 +12,27 @@ internal val LOGGER = KtorSimpleLogger("io.ktor.server.plugins.sse.SSE") /** * Server-Sent Events (SSE) support plugin. It is required to be installed first before binding any sse endpoints. * - * To learn more, see [specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). - * - * Example: + * Example of usage: * ```kotlin * install(SSE) + * routing { + * sse("/default") { + * repeat(100) { + * send(ServerSentEvent("event $it")) + * } + * } * - * install(Routing) { - * sse { - * send(ServerSentEvent("Hello")) + * sse("/serialization", serialize = { typeInfo, it -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.encodeToString(serializer, it) + * }) { + * send(Customer(0, "Jet", "Brains")) + * send(Product(0, listOf(100, 200))) * } * } * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). */ public val SSE: ApplicationPlugin = createApplicationPlugin("SSE") {} diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSEServerContent.kt b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSEServerContent.kt index 8dd94e6ddf4..45b260c7411 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSEServerContent.kt +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSEServerContent.kt @@ -8,6 +8,7 @@ import io.ktor.http.* import io.ktor.http.content.* import io.ktor.server.application.* import io.ktor.server.request.* +import io.ktor.util.reflect.* import io.ktor.utils.io.* import kotlinx.coroutines.* @@ -25,8 +26,14 @@ import kotlinx.coroutines.* */ public class SSEServerContent( public val call: ApplicationCall, - public val handle: suspend ServerSSESession.() -> Unit + public val handle: suspend ServerSSESession.() -> Unit, + public val serialize: ((TypeInfo, Any) -> String)? = null ) : OutgoingContent.WriteChannelContent() { + public constructor( + call: ApplicationCall, + handle: suspend ServerSSESession.() -> Unit, + ) : this(call, handle, null) + override val contentType: ContentType = ContentType.Text.EventStream override suspend fun writeTo(channel: ByteWriteChannel) { @@ -36,6 +43,13 @@ public class SSEServerContent( try { coroutineScope { session = DefaultServerSSESession(channel, call, coroutineContext) + if (serialize != null) { + session = object : + ServerSSESessionWithSerialization, + ServerSSESession by session as DefaultServerSSESession { + override val serializer: (TypeInfo, Any) -> String = serialize + } + } session?.handle() } } finally { diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/ServerSSESession.kt b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/ServerSSESession.kt index 2d281ee243d..46627989635 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/ServerSSESession.kt +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/ServerSSESession.kt @@ -6,17 +6,34 @@ package io.ktor.server.sse import io.ktor.server.application.* import io.ktor.sse.* +import io.ktor.util.reflect.* +import io.ktor.websocket.* import kotlinx.coroutines.* /** - * Represents a server-side server-sent events session. + * Represents a server-side Server-Sent Events (SSE) session. * An [ServerSSESession] allows the server to send [ServerSentEvent] to the client over a single HTTP connection. * - * @see [SSE] + * Example of usage: + * ```kotlin + * install(SSE) + * routing { + * sse("/default") { + * repeat(100) { + * send(ServerSentEvent("event $it")) + * } + * } + * } + * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). + * + * @see SSE */ public interface ServerSSESession : CoroutineScope { /** - * Associated received [call] that originating this session. + * The received [call] that originated this session. */ public val call: ApplicationCall @@ -55,3 +72,61 @@ public interface ServerSSESession : CoroutineScope { */ public suspend fun close() } + +/** + * Represents a server-side Server-Sent Events (SSE) session with serialization support. + * An [ServerSSESessionWithSerialization] allows the server to send [ServerSentEvent] to the client over a single HTTP connection. + * + * Example of usage: + * ```kotlin + * install(SSE) + * routing { + * sse("/serialization", serialize = { typeInfo, it -> + * val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + * Json.encodeToString(serializer, it) + * }) { + * send(Customer(0, "Jet", "Brains")) + * send(Product(0, listOf(100, 200))) + * } + * } + * ``` + * + * To learn more, see [the SSE](https://en.wikipedia.org/wiki/Server-sent_events) + * and [the SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). + * + * @see SSE + */ +public interface ServerSSESessionWithSerialization : ServerSSESession { + /** + * Serializer for transforming data object into field `data` of `ServerSentEvent`. + */ + public val serializer: (TypeInfo, Any) -> String +} + +public suspend inline fun ServerSSESessionWithSerialization.send(event: TypedServerSentEvent) { + send( + ServerSentEvent( + event.data?.let { + serializer(typeInfo(), it) + }, + event.event, + event.id, + event.retry, + event.comments + ) + ) +} + +public suspend inline fun ServerSSESessionWithSerialization.send( + data: T? = null, + event: String? = null, + id: String? = null, + retry: Long? = null, + comments: String? = null +) { + send(TypedServerSentEvent(data, event, id, retry, comments)) +} + +public suspend inline fun ServerSSESessionWithSerialization.send(data: T) { + send(ServerSentEvent(serializer(typeInfo(), data))) +} diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/common/test/io/ktor/server/sse/ServerSentEventsTest.kt b/ktor-server/ktor-server-plugins/ktor-server-sse/common/test/io/ktor/server/sse/ServerSentEventsTest.kt index 3f82c477c78..f9200fd40ca 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/common/test/io/ktor/server/sse/ServerSentEventsTest.kt +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/common/test/io/ktor/server/sse/ServerSentEventsTest.kt @@ -13,6 +13,8 @@ import io.ktor.server.testing.* import io.ktor.sse.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* +import kotlinx.serialization.* +import kotlinx.serialization.json.* import kotlin.test.* class ServerSentEventsTest { @@ -192,6 +194,103 @@ class ServerSentEventsTest { assertEquals(expectedOneEventData.lines(), actualOneEventData.toString().lines()) } + class Person1(val age: Int) + class Person2(val number: Int) + + @Test + fun testSerializerInRoute() = testApplication { + install(SSE) + routing { + sse("/person", serialize = { typeInfo, data -> + when (typeInfo.type) { + Person1::class -> { + "Age ${(data as Person1).age}" + } + + else -> { + data.toString() + } + } + }) { + repeat(10) { + send(Person1(it)) + } + } + } + + val client = createSseClient() + + client.sse("/person") { + incoming.collectIndexed { i, person -> + assertEquals("Age $i", person.data) + } + } + } + + @Test + fun testDifferentSerializers() = testApplication { + install(SSE) + routing { + sse(serialize = { typeInfo, data -> + when (typeInfo.type) { + Person1::class -> { + "Age ${(data as Person1).age}" + } + + Person2::class -> { + "Number ${(data as Person2).number}" + } + + else -> { + data.toString() + } + } + }) { + send(Person1(22)) + send(Person2(123456)) + } + } + + val client = createSseClient() + client.sse { + var first = true + incoming.collect { + if (first) { + assertEquals("Age 22", it.data) + first = false + } else { + assertEquals("Number 123456", it.data) + } + } + } + } + + @Serializable + data class Customer(val id: Int, val firstName: String, val lastName: String) + + @Serializable + data class Product(val id: Int, val prices: List) + + @Test + fun testJsonSerializer() = testApplication { + install(SSE) + routing { + sse("/json", serialize = { typeInfo, it -> + val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!) + Json.encodeToString(serializer, it) + }) { + send(Customer(0, "Jet", "Brains")) + send(Product(0, listOf(100, 200))) + } + } + + assertEquals( + "data: {\"id\":0,\"firstName\":\"Jet\",\"lastName\":\"Brains\"}\r\n\r\n" + + "data: {\"id\":0,\"prices\":[100,200]}", + client.get("/json").bodyAsText().trim() + ) + } + private fun ApplicationTestBuilder.createSseClient(): HttpClient { val client = createClient { install(io.ktor.client.plugins.sse.SSE) diff --git a/ktor-shared/ktor-sse/api/ktor-sse.api b/ktor-shared/ktor-sse/api/ktor-sse.api index 4d9751075f9..7c6231a033e 100644 --- a/ktor-shared/ktor-sse/api/ktor-sse.api +++ b/ktor-shared/ktor-sse/api/ktor-sse.api @@ -1,12 +1,22 @@ -public final class io/ktor/sse/ServerSentEvent { +public final class io/ktor/sse/ServerSentEvent : io/ktor/sse/ServerSentEventMetadata { public fun ()V public fun (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;)V public synthetic fun (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V - public final fun getComments ()Ljava/lang/String; - public final fun getData ()Ljava/lang/String; - public final fun getEvent ()Ljava/lang/String; - public final fun getId ()Ljava/lang/String; - public final fun getRetry ()Ljava/lang/Long; + public final fun component1 ()Ljava/lang/String; + public final fun component2 ()Ljava/lang/String; + public final fun component3 ()Ljava/lang/String; + public final fun component4 ()Ljava/lang/Long; + public final fun component5 ()Ljava/lang/String; + public final fun copy (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;)Lio/ktor/sse/ServerSentEvent; + public static synthetic fun copy$default (Lio/ktor/sse/ServerSentEvent;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;ILjava/lang/Object;)Lio/ktor/sse/ServerSentEvent; + public fun equals (Ljava/lang/Object;)Z + public fun getComments ()Ljava/lang/String; + public synthetic fun getData ()Ljava/lang/Object; + public fun getData ()Ljava/lang/String; + public fun getEvent ()Ljava/lang/String; + public fun getId ()Ljava/lang/String; + public fun getRetry ()Ljava/lang/Long; + public fun hashCode ()I public fun toString ()Ljava/lang/String; } @@ -17,3 +27,33 @@ public final class io/ktor/sse/ServerSentEventKt { public static final fun getEND_OF_LINE_VARIANTS ()Lkotlin/text/Regex; } +public abstract interface class io/ktor/sse/ServerSentEventMetadata { + public abstract fun getComments ()Ljava/lang/String; + public abstract fun getData ()Ljava/lang/Object; + public abstract fun getEvent ()Ljava/lang/String; + public abstract fun getId ()Ljava/lang/String; + public abstract fun getRetry ()Ljava/lang/Long; +} + +public final class io/ktor/sse/TypedServerSentEvent : io/ktor/sse/ServerSentEventMetadata { + public fun ()V + public fun (Ljava/lang/Object;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;)V + public synthetic fun (Ljava/lang/Object;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun component1 ()Ljava/lang/Object; + public final fun component2 ()Ljava/lang/String; + public final fun component3 ()Ljava/lang/String; + public final fun component4 ()Ljava/lang/Long; + public final fun component5 ()Ljava/lang/String; + public final fun copy (Ljava/lang/Object;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;)Lio/ktor/sse/TypedServerSentEvent; + public static synthetic fun copy$default (Lio/ktor/sse/TypedServerSentEvent;Ljava/lang/Object;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;ILjava/lang/Object;)Lio/ktor/sse/TypedServerSentEvent; + public fun equals (Ljava/lang/Object;)Z + public fun getComments ()Ljava/lang/String; + public fun getData ()Ljava/lang/Object; + public fun getEvent ()Ljava/lang/String; + public fun getId ()Ljava/lang/String; + public fun getRetry ()Ljava/lang/Long; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; + public final fun toString (Lkotlin/jvm/functions/Function1;)Ljava/lang/String; +} + diff --git a/ktor-shared/ktor-sse/api/ktor-sse.klib.api b/ktor-shared/ktor-sse/api/ktor-sse.klib.api index 9aa81adf36b..b53022fb7bd 100644 --- a/ktor-shared/ktor-sse/api/ktor-sse.klib.api +++ b/ktor-shared/ktor-sse/api/ktor-sse.klib.api @@ -6,7 +6,46 @@ // - Show declarations: true // Library unique name: -final class io.ktor.sse/ServerSentEvent { // io.ktor.sse/ServerSentEvent|null[0] +sealed interface <#A: kotlin/Any?> io.ktor.sse/ServerSentEventMetadata { // io.ktor.sse/ServerSentEventMetadata|null[0] + abstract val comments // io.ktor.sse/ServerSentEventMetadata.comments|{}comments[0] + abstract fun (): kotlin/String? // io.ktor.sse/ServerSentEventMetadata.comments.|(){}[0] + abstract val data // io.ktor.sse/ServerSentEventMetadata.data|{}data[0] + abstract fun (): #A? // io.ktor.sse/ServerSentEventMetadata.data.|(){}[0] + abstract val event // io.ktor.sse/ServerSentEventMetadata.event|{}event[0] + abstract fun (): kotlin/String? // io.ktor.sse/ServerSentEventMetadata.event.|(){}[0] + abstract val id // io.ktor.sse/ServerSentEventMetadata.id|{}id[0] + abstract fun (): kotlin/String? // io.ktor.sse/ServerSentEventMetadata.id.|(){}[0] + abstract val retry // io.ktor.sse/ServerSentEventMetadata.retry|{}retry[0] + abstract fun (): kotlin/Long? // io.ktor.sse/ServerSentEventMetadata.retry.|(){}[0] +} + +final class <#A: kotlin/Any?> io.ktor.sse/TypedServerSentEvent : io.ktor.sse/ServerSentEventMetadata<#A> { // io.ktor.sse/TypedServerSentEvent|null[0] + constructor (#A? = ..., kotlin/String? = ..., kotlin/String? = ..., kotlin/Long? = ..., kotlin/String? = ...) // io.ktor.sse/TypedServerSentEvent.|(1:0?;kotlin.String?;kotlin.String?;kotlin.Long?;kotlin.String?){}[0] + + final val comments // io.ktor.sse/TypedServerSentEvent.comments|{}comments[0] + final fun (): kotlin/String? // io.ktor.sse/TypedServerSentEvent.comments.|(){}[0] + final val data // io.ktor.sse/TypedServerSentEvent.data|{}data[0] + final fun (): #A? // io.ktor.sse/TypedServerSentEvent.data.|(){}[0] + final val event // io.ktor.sse/TypedServerSentEvent.event|{}event[0] + final fun (): kotlin/String? // io.ktor.sse/TypedServerSentEvent.event.|(){}[0] + final val id // io.ktor.sse/TypedServerSentEvent.id|{}id[0] + final fun (): kotlin/String? // io.ktor.sse/TypedServerSentEvent.id.|(){}[0] + final val retry // io.ktor.sse/TypedServerSentEvent.retry|{}retry[0] + final fun (): kotlin/Long? // io.ktor.sse/TypedServerSentEvent.retry.|(){}[0] + + final fun component1(): #A? // io.ktor.sse/TypedServerSentEvent.component1|component1(){}[0] + final fun component2(): kotlin/String? // io.ktor.sse/TypedServerSentEvent.component2|component2(){}[0] + final fun component3(): kotlin/String? // io.ktor.sse/TypedServerSentEvent.component3|component3(){}[0] + final fun component4(): kotlin/Long? // io.ktor.sse/TypedServerSentEvent.component4|component4(){}[0] + final fun component5(): kotlin/String? // io.ktor.sse/TypedServerSentEvent.component5|component5(){}[0] + final fun copy(#A? = ..., kotlin/String? = ..., kotlin/String? = ..., kotlin/Long? = ..., kotlin/String? = ...): io.ktor.sse/TypedServerSentEvent<#A> // io.ktor.sse/TypedServerSentEvent.copy|copy(1:0?;kotlin.String?;kotlin.String?;kotlin.Long?;kotlin.String?){}[0] + final fun equals(kotlin/Any?): kotlin/Boolean // io.ktor.sse/TypedServerSentEvent.equals|equals(kotlin.Any?){}[0] + final fun hashCode(): kotlin/Int // io.ktor.sse/TypedServerSentEvent.hashCode|hashCode(){}[0] + final fun toString(): kotlin/String // io.ktor.sse/TypedServerSentEvent.toString|toString(){}[0] + final fun toString(kotlin/Function1<#A, kotlin/String>): kotlin/String // io.ktor.sse/TypedServerSentEvent.toString|toString(kotlin.Function1<1:0,kotlin.String>){}[0] +} + +final class io.ktor.sse/ServerSentEvent : io.ktor.sse/ServerSentEventMetadata { // io.ktor.sse/ServerSentEvent|null[0] constructor (kotlin/String? = ..., kotlin/String? = ..., kotlin/String? = ..., kotlin/Long? = ..., kotlin/String? = ...) // io.ktor.sse/ServerSentEvent.|(kotlin.String?;kotlin.String?;kotlin.String?;kotlin.Long?;kotlin.String?){}[0] final val comments // io.ktor.sse/ServerSentEvent.comments|{}comments[0] @@ -20,6 +59,14 @@ final class io.ktor.sse/ServerSentEvent { // io.ktor.sse/ServerSentEvent|null[0] final val retry // io.ktor.sse/ServerSentEvent.retry|{}retry[0] final fun (): kotlin/Long? // io.ktor.sse/ServerSentEvent.retry.|(){}[0] + final fun component1(): kotlin/String? // io.ktor.sse/ServerSentEvent.component1|component1(){}[0] + final fun component2(): kotlin/String? // io.ktor.sse/ServerSentEvent.component2|component2(){}[0] + final fun component3(): kotlin/String? // io.ktor.sse/ServerSentEvent.component3|component3(){}[0] + final fun component4(): kotlin/Long? // io.ktor.sse/ServerSentEvent.component4|component4(){}[0] + final fun component5(): kotlin/String? // io.ktor.sse/ServerSentEvent.component5|component5(){}[0] + final fun copy(kotlin/String? = ..., kotlin/String? = ..., kotlin/String? = ..., kotlin/Long? = ..., kotlin/String? = ...): io.ktor.sse/ServerSentEvent // io.ktor.sse/ServerSentEvent.copy|copy(kotlin.String?;kotlin.String?;kotlin.String?;kotlin.Long?;kotlin.String?){}[0] + final fun equals(kotlin/Any?): kotlin/Boolean // io.ktor.sse/ServerSentEvent.equals|equals(kotlin.Any?){}[0] + final fun hashCode(): kotlin/Int // io.ktor.sse/ServerSentEvent.hashCode|hashCode(){}[0] final fun toString(): kotlin/String // io.ktor.sse/ServerSentEvent.toString|toString(){}[0] } diff --git a/ktor-shared/ktor-sse/common/src/io/ktor/sse/ServerSentEvent.kt b/ktor-shared/ktor-sse/common/src/io/ktor/sse/ServerSentEvent.kt index ad1cf960b56..fe064d38ced 100644 --- a/ktor-shared/ktor-sse/common/src/io/ktor/sse/ServerSentEvent.kt +++ b/ktor-shared/ktor-sse/common/src/io/ktor/sse/ServerSentEvent.kt @@ -6,6 +6,26 @@ package io.ktor.sse import io.ktor.utils.io.* +/** + * Server-sent event interface. + * + * @property data data field of the event. + * @property event string identifying the type of event. + * @property id event ID. + * @property retry reconnection time, in milliseconds to wait before reconnecting. + * @property comments comment lines starting with a ':' character. + * + * @see ServerSentEvent with default String parameter `data` + * @see TypedServerSentEvent with parameterized parameter `data` + */ +public sealed interface ServerSentEventMetadata { + public val data: T? + public val event: String? + public val id: String? + public val retry: Long? + public val comments: String? +} + /** * Server-sent event. * @@ -14,22 +34,49 @@ import io.ktor.utils.io.* * @property id event ID. * @property retry reconnection time, in milliseconds to wait before reconnecting. * @property comments comment lines starting with a ':' character. + * + * @see TypedServerSentEvent with parameterized parameter `data` */ -public class ServerSentEvent( - public val data: String? = null, - public val event: String? = null, - public val id: String? = null, - public val retry: Long? = null, - public val comments: String? = null -) { - override fun toString(): String { - return buildString { - appendField("data", data) - appendField("event", event) - appendField("id", id) - appendField("retry", retry) - appendField("", comments) - } +public data class ServerSentEvent( + override val data: String? = null, + override val event: String? = null, + override val id: String? = null, + override val retry: Long? = null, + override val comments: String? = null +) : ServerSentEventMetadata { + override fun toString(): String = eventToString(data, event, id, retry, comments) +} + +/** + * Server-sent event with generic parameter [data]. + * + * @property data data field of the event. + * @property event string identifying the type of event. + * @property id event ID. + * @property retry reconnection time, in milliseconds to wait before reconnecting. + * @property comments comment lines starting with a ':' character. + * + * @see ServerSentEvent with default String parameter `data` + */ +public data class TypedServerSentEvent( + override val data: T? = null, + override val event: String? = null, + override val id: String? = null, + override val retry: Long? = null, + override val comments: String? = null +) : ServerSentEventMetadata { + @InternalAPI + public fun toString(serializer: (T) -> String): String = + eventToString(data?.let { serializer(it) }, event, id, retry, comments) +} + +private fun eventToString(data: String?, event: String?, id: String?, retry: Long?, comments: String?): String { + return buildString { + appendField("data", data) + appendField("event", event) + appendField("id", id) + appendField("retry", retry) + appendField("", comments) } }