Skip to content

Commit a94fb91

Browse files
marychattee5l
authored andcommitted
KTOR-7908 Add heartbeat to SSE (#4543)
1 parent 639fd51 commit a94fb91

File tree

5 files changed

+113
-7
lines changed

5 files changed

+113
-7
lines changed

ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.api

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
public final class io/ktor/server/sse/Heartbeat {
2+
public fun <init> ()V
3+
public final fun getDuration-UwyO8pc ()J
4+
public final fun getEvent ()Lio/ktor/sse/ServerSentEvent;
5+
public final fun setDuration-LRDsOJo (J)V
6+
public final fun setEvent (Lio/ktor/sse/ServerSentEvent;)V
7+
}
8+
19
public final class io/ktor/server/sse/RoutingKt {
210
public static final fun sse (Lio/ktor/server/routing/Route;Ljava/lang/String;Lkotlin/jvm/functions/Function2;)V
311
public static final fun sse (Lio/ktor/server/routing/Route;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V
@@ -33,6 +41,10 @@ public final class io/ktor/server/sse/ServerSSESession$DefaultImpls {
3341
public static synthetic fun send$default (Lio/ktor/server/sse/ServerSSESession;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
3442
}
3543

44+
public final class io/ktor/server/sse/ServerSSESessionKt {
45+
public static final fun heartbeat (Lio/ktor/server/sse/ServerSSESession;Lkotlin/jvm/functions/Function1;)V
46+
}
47+
3648
public abstract interface class io/ktor/server/sse/ServerSSESessionWithSerialization : io/ktor/server/sse/ServerSSESession {
3749
public abstract fun getSerializer ()Lkotlin/jvm/functions/Function2;
3850
}

ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.klib.api

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,17 @@ abstract interface io.ktor.server.sse/ServerSSESessionWithSerialization : io.kto
2020
abstract fun <get-serializer>(): kotlin/Function2<io.ktor.util.reflect/TypeInfo, kotlin/Any, kotlin/String> // io.ktor.server.sse/ServerSSESessionWithSerialization.serializer.<get-serializer>|<get-serializer>(){}[0]
2121
}
2222

23+
final class io.ktor.server.sse/Heartbeat { // io.ktor.server.sse/Heartbeat|null[0]
24+
constructor <init>() // io.ktor.server.sse/Heartbeat.<init>|<init>(){}[0]
25+
26+
final var duration // io.ktor.server.sse/Heartbeat.duration|{}duration[0]
27+
final fun <get-duration>(): kotlin.time/Duration // io.ktor.server.sse/Heartbeat.duration.<get-duration>|<get-duration>(){}[0]
28+
final fun <set-duration>(kotlin.time/Duration) // io.ktor.server.sse/Heartbeat.duration.<set-duration>|<set-duration>(kotlin.time.Duration){}[0]
29+
final var event // io.ktor.server.sse/Heartbeat.event|{}event[0]
30+
final fun <get-event>(): io.ktor.sse/ServerSentEvent // io.ktor.server.sse/Heartbeat.event.<get-event>|<get-event>(){}[0]
31+
final fun <set-event>(io.ktor.sse/ServerSentEvent) // io.ktor.server.sse/Heartbeat.event.<set-event>|<set-event>(io.ktor.sse.ServerSentEvent){}[0]
32+
}
33+
2334
final class io.ktor.server.sse/SSEServerContent : io.ktor.http.content/OutgoingContent.WriteChannelContent { // io.ktor.server.sse/SSEServerContent|null[0]
2435
constructor <init>(io.ktor.server.application/ApplicationCall, kotlin.coroutines/SuspendFunction1<io.ktor.server.sse/ServerSSESession, kotlin/Unit>) // io.ktor.server.sse/SSEServerContent.<init>|<init>(io.ktor.server.application.ApplicationCall;kotlin.coroutines.SuspendFunction1<io.ktor.server.sse.ServerSSESession,kotlin.Unit>){}[0]
2536
constructor <init>(io.ktor.server.application/ApplicationCall, kotlin.coroutines/SuspendFunction1<io.ktor.server.sse/ServerSSESession, kotlin/Unit>, kotlin/Function2<io.ktor.util.reflect/TypeInfo, kotlin/Any, kotlin/String>? = ...) // io.ktor.server.sse/SSEServerContent.<init>|<init>(io.ktor.server.application.ApplicationCall;kotlin.coroutines.SuspendFunction1<io.ktor.server.sse.ServerSSESession,kotlin.Unit>;kotlin.Function2<io.ktor.util.reflect.TypeInfo,kotlin.Any,kotlin.String>?){}[0]
@@ -44,6 +55,7 @@ final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin.coroutine
4455
final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/Function2<io.ktor.util.reflect/TypeInfo, kotlin/Any, kotlin/String>, kotlin.coroutines/SuspendFunction1<io.ktor.server.sse/ServerSSESessionWithSerialization, kotlin/Unit>) // io.ktor.server.sse/sse|[email protected](kotlin.Function2<io.ktor.util.reflect.TypeInfo,kotlin.Any,kotlin.String>;kotlin.coroutines.SuspendFunction1<io.ktor.server.sse.ServerSSESessionWithSerialization,kotlin.Unit>){}[0]
4556
final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/String, kotlin.coroutines/SuspendFunction1<io.ktor.server.sse/ServerSSESession, kotlin/Unit>) // io.ktor.server.sse/sse|[email protected](kotlin.String;kotlin.coroutines.SuspendFunction1<io.ktor.server.sse.ServerSSESession,kotlin.Unit>){}[0]
4657
final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/String, kotlin/Function2<io.ktor.util.reflect/TypeInfo, kotlin/Any, kotlin/String>, kotlin.coroutines/SuspendFunction1<io.ktor.server.sse/ServerSSESessionWithSerialization, kotlin/Unit>) // io.ktor.server.sse/sse|[email protected](kotlin.String;kotlin.Function2<io.ktor.util.reflect.TypeInfo,kotlin.Any,kotlin.String>;kotlin.coroutines.SuspendFunction1<io.ktor.server.sse.ServerSSESessionWithSerialization,kotlin.Unit>){}[0]
58+
final fun (io.ktor.server.sse/ServerSSESession).io.ktor.server.sse/heartbeat(kotlin/Function1<io.ktor.server.sse/Heartbeat, kotlin/Unit>) // io.ktor.server.sse/heartbeat|[email protected](kotlin.Function1<io.ktor.server.sse.Heartbeat,kotlin.Unit>){}[0]
4759
final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(#A) // io.ktor.server.sse/send|[email protected](0:0){0§<kotlin.Any>}[0]
4860
final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(#A? = ..., kotlin/String? = ..., kotlin/String? = ..., kotlin/Long? = ..., kotlin/String? = ...) // io.ktor.server.sse/send|[email protected](0:0?;kotlin.String?;kotlin.String?;kotlin.Long?;kotlin.String?){0§<kotlin.Any>}[0]
4961
final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(io.ktor.sse/TypedServerSentEvent<#A>) // io.ktor.server.sse/send|[email protected](io.ktor.sse.TypedServerSentEvent<0:0>){0§<kotlin.Any>}[0]

ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSEServerContent.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public class SSEServerContent(
5353
session?.handle()
5454
}
5555
} finally {
56+
val heartbeatJob = call.attributes.getOrNull(heartbeatJobKey)
57+
heartbeatJob?.cancel()
5658
session?.close()
5759
}
5860
}

ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/ServerSSESession.kt

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ package io.ktor.server.sse
66

77
import io.ktor.server.application.*
88
import io.ktor.sse.*
9+
import io.ktor.util.*
910
import io.ktor.util.reflect.*
1011
import io.ktor.websocket.*
1112
import kotlinx.coroutines.*
13+
import kotlin.time.Duration
14+
import kotlin.time.Duration.Companion.seconds
1215

1316
/**
1417
* Represents a server-side Server-Sent Events (SSE) session.
@@ -130,3 +133,36 @@ public suspend inline fun <reified T : Any> ServerSSESessionWithSerialization.se
130133
public suspend inline fun <reified T : Any> ServerSSESessionWithSerialization.send(data: T) {
131134
send(ServerSentEvent(serializer(typeInfo<T>(), data)))
132135
}
136+
137+
/**
138+
* Starts a heartbeat for the ServerSSESession.
139+
*
140+
* The heartbeat will send the specified [Heartbeat.event] at the specified [Heartbeat.duration] interval
141+
* as long as the session is active.
142+
*
143+
* @param heartbeatConfig a lambda that configures the [Heartbeat] object used for the heartbeat.
144+
*/
145+
public fun ServerSSESession.heartbeat(heartbeatConfig: Heartbeat.() -> Unit) {
146+
val heartbeat = Heartbeat().apply(heartbeatConfig)
147+
val heartbeatJob = Job(call.coroutineContext[Job])
148+
launch(heartbeatJob + CoroutineName("sse-heartbeat")) {
149+
while (true) {
150+
send(heartbeat.event)
151+
delay(heartbeat.duration)
152+
}
153+
}
154+
call.attributes.put(heartbeatJobKey, heartbeatJob)
155+
}
156+
157+
internal val heartbeatJobKey = AttributeKey<Job>("HeartbeatJobAttributeKey")
158+
159+
/**
160+
* Represents a heartbeat configuration for a [ServerSSESession].
161+
*
162+
* @property duration the duration between heartbeat events, default is 30 seconds.
163+
* @property event the [ServerSentEvent] to be sent as the heartbeat, default is a [ServerSentEvent] with the comment "heartbeat".
164+
*/
165+
public class Heartbeat {
166+
public var duration: Duration = 30.seconds
167+
public var event: ServerSentEvent = ServerSentEvent(comments = "heartbeat")
168+
}

ktor-server/ktor-server-plugins/ktor-server-sse/common/test/io/ktor/server/sse/ServerSentEventsTest.kt

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
2-
* Copyright 2014-2023 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3-
*/
2+
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
44

55
package io.ktor.server.sse
66

@@ -11,11 +11,19 @@ import io.ktor.client.statement.*
1111
import io.ktor.http.*
1212
import io.ktor.server.testing.*
1313
import io.ktor.sse.*
14-
import kotlinx.coroutines.*
15-
import kotlinx.coroutines.flow.*
16-
import kotlinx.serialization.*
17-
import kotlinx.serialization.json.*
18-
import kotlin.test.*
14+
import kotlinx.coroutines.cancel
15+
import kotlinx.coroutines.delay
16+
import kotlinx.coroutines.flow.collectIndexed
17+
import kotlinx.coroutines.flow.single
18+
import kotlinx.coroutines.launch
19+
import kotlinx.coroutines.withTimeout
20+
import kotlinx.serialization.Serializable
21+
import kotlinx.serialization.json.Json
22+
import kotlinx.serialization.serializer
23+
import kotlin.test.Test
24+
import kotlin.test.assertContains
25+
import kotlin.test.assertEquals
26+
import kotlin.time.Duration.Companion.milliseconds
1927

2028
class ServerSentEventsTest {
2129

@@ -291,6 +299,42 @@ class ServerSentEventsTest {
291299
)
292300
}
293301

302+
@Test
303+
fun testHeartbeat() = testApplication {
304+
install(SSE)
305+
routing {
306+
sse {
307+
heartbeat {
308+
duration = 10.milliseconds
309+
event = ServerSentEvent("heartbeat")
310+
}
311+
312+
repeat(4) {
313+
send("Hello")
314+
delay(10.milliseconds)
315+
}
316+
}
317+
}
318+
319+
val client = createSseClient()
320+
321+
var hellos = 0
322+
var heartbeats = 0
323+
withTimeout(5_000) {
324+
client.sse {
325+
incoming.collect { event ->
326+
when (event.data) {
327+
"Hello" -> hellos++
328+
"heartbeat" -> heartbeats++
329+
}
330+
if (hellos > 3 && heartbeats > 3) {
331+
cancel()
332+
}
333+
}
334+
}
335+
}
336+
}
337+
294338
private fun ApplicationTestBuilder.createSseClient(): HttpClient {
295339
val client = createClient {
296340
install(io.ktor.client.plugins.sse.SSE)

0 commit comments

Comments
 (0)