Skip to content

Commit

Permalink
KTOR-7908 Add heartbeat to SSE (#4543)
Browse files Browse the repository at this point in the history
  • Loading branch information
marychatte authored and osipxd committed Jan 8, 2025
1 parent f597eb1 commit 4188d2a
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
public final class io/ktor/server/sse/Heartbeat {
public fun <init> ()V
public final fun getDuration-UwyO8pc ()J
public final fun getEvent ()Lio/ktor/sse/ServerSentEvent;
public final fun setDuration-LRDsOJo (J)V
public final fun setEvent (Lio/ktor/sse/ServerSentEvent;)V
}

public final class io/ktor/server/sse/RoutingKt {
public static final fun sse (Lio/ktor/server/routing/Route;Ljava/lang/String;Lkotlin/jvm/functions/Function2;)V
public static final fun sse (Lio/ktor/server/routing/Route;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V
Expand Down Expand Up @@ -33,6 +41,10 @@ public final class io/ktor/server/sse/ServerSSESession$DefaultImpls {
public static synthetic fun send$default (Lio/ktor/server/sse/ServerSSESession;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class io/ktor/server/sse/ServerSSESessionKt {
public static final fun heartbeat (Lio/ktor/server/sse/ServerSSESession;Lkotlin/jvm/functions/Function1;)V
}

public abstract interface class io/ktor/server/sse/ServerSSESessionWithSerialization : io/ktor/server/sse/ServerSSESession {
public abstract fun getSerializer ()Lkotlin/jvm/functions/Function2;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ abstract interface io.ktor.server.sse/ServerSSESessionWithSerialization : io.kto
abstract fun <get-serializer>(): kotlin/Function2<io.ktor.util.reflect/TypeInfo, kotlin/Any, kotlin/String> // io.ktor.server.sse/ServerSSESessionWithSerialization.serializer.<get-serializer>|<get-serializer>(){}[0]
}

final class io.ktor.server.sse/Heartbeat { // io.ktor.server.sse/Heartbeat|null[0]
constructor <init>() // io.ktor.server.sse/Heartbeat.<init>|<init>(){}[0]

final var duration // io.ktor.server.sse/Heartbeat.duration|{}duration[0]
final fun <get-duration>(): kotlin.time/Duration // io.ktor.server.sse/Heartbeat.duration.<get-duration>|<get-duration>(){}[0]
final fun <set-duration>(kotlin.time/Duration) // io.ktor.server.sse/Heartbeat.duration.<set-duration>|<set-duration>(kotlin.time.Duration){}[0]
final var event // io.ktor.server.sse/Heartbeat.event|{}event[0]
final fun <get-event>(): io.ktor.sse/ServerSentEvent // io.ktor.server.sse/Heartbeat.event.<get-event>|<get-event>(){}[0]
final fun <set-event>(io.ktor.sse/ServerSentEvent) // io.ktor.server.sse/Heartbeat.event.<set-event>|<set-event>(io.ktor.sse.ServerSentEvent){}[0]
}

final class io.ktor.server.sse/SSEServerContent : io.ktor.http.content/OutgoingContent.WriteChannelContent { // io.ktor.server.sse/SSEServerContent|null[0]
constructor <init>(io.ktor.server.application/ApplicationCall, kotlin.coroutines/SuspendFunction1<io.ktor.server.sse/ServerSSESession, kotlin/Unit>) // io.ktor.server.sse/SSEServerContent.<init>|<init>(io.ktor.server.application.ApplicationCall;kotlin.coroutines.SuspendFunction1<io.ktor.server.sse.ServerSSESession,kotlin.Unit>){}[0]
constructor <init>(io.ktor.server.application/ApplicationCall, kotlin.coroutines/SuspendFunction1<io.ktor.server.sse/ServerSSESession, kotlin/Unit>, kotlin/Function2<io.ktor.util.reflect/TypeInfo, kotlin/Any, kotlin/String>? = ...) // io.ktor.server.sse/SSEServerContent.<init>|<init>(io.ktor.server.application.ApplicationCall;kotlin.coroutines.SuspendFunction1<io.ktor.server.sse.ServerSSESession,kotlin.Unit>;kotlin.Function2<io.ktor.util.reflect.TypeInfo,kotlin.Any,kotlin.String>?){}[0]
Expand All @@ -44,6 +55,7 @@ final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin.coroutine
final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/Function2<io.ktor.util.reflect/TypeInfo, kotlin/Any, kotlin/String>, kotlin.coroutines/SuspendFunction1<io.ktor.server.sse/ServerSSESessionWithSerialization, kotlin/Unit>) // io.ktor.server.sse/sse|[email protected](kotlin.Function2<io.ktor.util.reflect.TypeInfo,kotlin.Any,kotlin.String>;kotlin.coroutines.SuspendFunction1<io.ktor.server.sse.ServerSSESessionWithSerialization,kotlin.Unit>){}[0]
final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/String, kotlin.coroutines/SuspendFunction1<io.ktor.server.sse/ServerSSESession, kotlin/Unit>) // io.ktor.server.sse/sse|[email protected](kotlin.String;kotlin.coroutines.SuspendFunction1<io.ktor.server.sse.ServerSSESession,kotlin.Unit>){}[0]
final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/String, kotlin/Function2<io.ktor.util.reflect/TypeInfo, kotlin/Any, kotlin/String>, kotlin.coroutines/SuspendFunction1<io.ktor.server.sse/ServerSSESessionWithSerialization, kotlin/Unit>) // io.ktor.server.sse/sse|[email protected](kotlin.String;kotlin.Function2<io.ktor.util.reflect.TypeInfo,kotlin.Any,kotlin.String>;kotlin.coroutines.SuspendFunction1<io.ktor.server.sse.ServerSSESessionWithSerialization,kotlin.Unit>){}[0]
final fun (io.ktor.server.sse/ServerSSESession).io.ktor.server.sse/heartbeat(kotlin/Function1<io.ktor.server.sse/Heartbeat, kotlin/Unit>) // io.ktor.server.sse/heartbeat|[email protected](kotlin.Function1<io.ktor.server.sse.Heartbeat,kotlin.Unit>){}[0]
final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(#A) // io.ktor.server.sse/send|[email protected](0:0){0§<kotlin.Any>}[0]
final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(#A? = ..., kotlin/String? = ..., kotlin/String? = ..., kotlin/Long? = ..., kotlin/String? = ...) // io.ktor.server.sse/send|[email protected](0:0?;kotlin.String?;kotlin.String?;kotlin.Long?;kotlin.String?){0§<kotlin.Any>}[0]
final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(io.ktor.sse/TypedServerSentEvent<#A>) // io.ktor.server.sse/send|[email protected](io.ktor.sse.TypedServerSentEvent<0:0>){0§<kotlin.Any>}[0]
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class SSEServerContent(
session?.handle()
}
} finally {
val heartbeatJob = call.attributes.getOrNull(heartbeatJobKey)
heartbeatJob?.cancel()
session?.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package io.ktor.server.sse

import io.ktor.server.application.*
import io.ktor.sse.*
import io.ktor.util.*
import io.ktor.util.reflect.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

/**
* Represents a server-side Server-Sent Events (SSE) session.
Expand Down Expand Up @@ -130,3 +133,36 @@ public suspend inline fun <reified T : Any> ServerSSESessionWithSerialization.se
public suspend inline fun <reified T : Any> ServerSSESessionWithSerialization.send(data: T) {
send(ServerSentEvent(serializer(typeInfo<T>(), data)))
}

/**
* Starts a heartbeat for the ServerSSESession.
*
* The heartbeat will send the specified [Heartbeat.event] at the specified [Heartbeat.duration] interval
* as long as the session is active.
*
* @param heartbeatConfig a lambda that configures the [Heartbeat] object used for the heartbeat.
*/
public fun ServerSSESession.heartbeat(heartbeatConfig: Heartbeat.() -> Unit) {
val heartbeat = Heartbeat().apply(heartbeatConfig)
val heartbeatJob = Job(call.coroutineContext[Job])
launch(heartbeatJob + CoroutineName("sse-heartbeat")) {
while (true) {
send(heartbeat.event)
delay(heartbeat.duration)
}
}
call.attributes.put(heartbeatJobKey, heartbeatJob)
}

internal val heartbeatJobKey = AttributeKey<Job>("HeartbeatJobAttributeKey")

/**
* Represents a heartbeat configuration for a [ServerSSESession].
*
* @property duration the duration between heartbeat events, default is 30 seconds.
* @property event the [ServerSentEvent] to be sent as the heartbeat, default is a [ServerSentEvent] with the comment "heartbeat".
*/
public class Heartbeat {
public var duration: Duration = 30.seconds
public var event: ServerSentEvent = ServerSentEvent(comments = "heartbeat")
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright 2014-2023 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.server.sse

Expand All @@ -11,11 +11,19 @@ import io.ktor.client.statement.*
import io.ktor.http.*
import io.ktor.server.testing.*
import io.ktor.sse.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.serialization.*
import kotlinx.serialization.json.*
import kotlin.test.*
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collectIndexed
import kotlinx.coroutines.flow.single
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import kotlinx.serialization.serializer
import kotlin.test.Test
import kotlin.test.assertContains
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.milliseconds

class ServerSentEventsTest {

Expand Down Expand Up @@ -291,6 +299,42 @@ class ServerSentEventsTest {
)
}

@Test
fun testHeartbeat() = testApplication {
install(SSE)
routing {
sse {
heartbeat {
duration = 10.milliseconds
event = ServerSentEvent("heartbeat")
}

repeat(4) {
send("Hello")
delay(10.milliseconds)
}
}
}

val client = createSseClient()

var hellos = 0
var heartbeats = 0
withTimeout(5_000) {
client.sse {
incoming.collect { event ->
when (event.data) {
"Hello" -> hellos++
"heartbeat" -> heartbeats++
}
if (hellos > 3 && heartbeats > 3) {
cancel()
}
}
}
}
}

private fun ApplicationTestBuilder.createSseClient(): HttpClient {
val client = createClient {
install(io.ktor.client.plugins.sse.SSE)
Expand Down

0 comments on commit 4188d2a

Please sign in to comment.