Skip to content

docs: Add documentation for SSE serialization, reconnection, and heartbeat #612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codeSnippets/snippets/client-sse/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ val hamcrest_version: String by project
plugins {
application
kotlin("jvm")
kotlin("plugin.serialization").version("2.1.20")
}

application {
Expand All @@ -25,6 +26,7 @@ dependencies {
implementation("io.ktor:ktor-client-core:$ktor_version")
implementation("io.ktor:ktor-client-cio:$ktor_version")
implementation("io.ktor:ktor-client-logging:$ktor_version")
implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
implementation("ch.qos.logback:logback-classic:$logback_version")
testImplementation("junit:junit:$junit_version")
testImplementation("org.hamcrest:hamcrest:$hamcrest_version")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,18 @@ package com.example

import io.ktor.client.*
import io.ktor.client.plugins.sse.*
import io.ktor.client.request.*
import io.ktor.sse.*
import kotlinx.coroutines.*
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import kotlinx.serialization.serializer

@Serializable
data class Customer(val id: Int, val firstName: String, val lastName: String)

@Serializable
data class Product(val id: Int, val prices: List<Int>)

fun main() {
val client = HttpClient {
Expand All @@ -20,6 +31,26 @@ fun main() {
}
}
}

// example with deserialization
client.sse({
url("http://localhost:8080/serverSentEvents")
}, deserialize = {
typeInfo, jsonString ->
val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!)
Json.decodeFromString(serializer, jsonString)!!
}) { // `this` is `ClientSSESessionWithDeserialization`
incoming.collect { event: TypedServerSentEvent<String> ->
when (event.event) {
"customer" -> {
val customer: Customer? = deserialize<Customer>(event.data)
}
"product" -> {
val product: Product? = deserialize<Product>(event.data)
}
}
}
}
client.close()
Comment on lines +35 to +54
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove nullable operator (!!) in deserialization function

The use of !! in the deserialization function could cause a NullPointerException if deserialization fails. It's best to handle this case more gracefully.

        client.sse({
            url("http://localhost:8080/serverSentEvents")
        }, deserialize = {
                typeInfo, jsonString ->
            val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!)
-            Json.decodeFromString(serializer, jsonString)!!
+            Json.decodeFromString(serializer, jsonString)
        }) { // `this` is `ClientSSESessionWithDeserialization`

Also consider adding error handling for the deserialization process:

        client.sse({
            url("http://localhost:8080/serverSentEvents")
        }, deserialize = {
                typeInfo, jsonString ->
            val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!)
-            Json.decodeFromString(serializer, jsonString)!!
+            try {
+                Json.decodeFromString(serializer, jsonString)
+            } catch (e: Exception) {
+                println("Failed to deserialize: $jsonString")
+                null
+            }
        }) { // `this` is `ClientSSESessionWithDeserialization`

}
client.close()
}
2 changes: 2 additions & 0 deletions codeSnippets/snippets/server-sse/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ val logback_version: String by project
plugins {
application
kotlin("jvm")
kotlin("plugin.serialization").version("2.1.20")
}

application {
Expand All @@ -21,6 +22,7 @@ dependencies {
implementation("io.ktor:ktor-server-core:$ktor_version")
implementation("io.ktor:ktor-server-netty:$ktor_version")
implementation("io.ktor:ktor-server-sse:$ktor_version")
implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
implementation("ch.qos.logback:logback-classic:$logback_version")
testImplementation("io.ktor:ktor-server-test-host-jvm:$ktor_version")
testImplementation("org.jetbrains.kotlin:kotlin-test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ import io.ktor.server.routing.*
import io.ktor.server.sse.*
import io.ktor.sse.*
import kotlinx.coroutines.*
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import kotlinx.serialization.serializer
import kotlin.time.Duration.Companion.milliseconds

@Serializable
data class Customer(val id: Int, val firstName: String, val lastName: String)

@Serializable
data class Product(val id: Int, val prices: List<Int>)

fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)

Expand All @@ -18,5 +28,27 @@ fun Application.module() {
delay(1000)
}
}

// example with serialization
sse("/json", serialize = { typeInfo, it ->
val serializer = Json.serializersModule.serializer(typeInfo.kotlinType!!)
Json.encodeToString(serializer, it)
}) {
send(Customer(0, "Jet", "Brains"))
send(Product(0, listOf(100, 200)))
}

// example with heartbeat
sse("/heartbeat") {
heartbeat {
period = 10.milliseconds
event = ServerSentEvent("heartbeat")
}
// ...
repeat(4) {
send(ServerSentEvent("Hello"))
delay(10.milliseconds)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package com.example

import io.ktor.client.plugins.sse.*
import io.ktor.server.testing.*
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.collectIndexed
import kotlinx.coroutines.withTimeout
import kotlin.test.Test
import kotlin.test.assertEquals

Expand All @@ -25,4 +27,55 @@ class ApplicationTest {
}
}
}
}

@Test
fun testJsonEvents() {
testApplication {
application {
module()
}

val client = createClient {
install(SSE)
}

client.sse("/json") {
incoming.collectIndexed { i, event ->
when (i) {
0 -> assertEquals("""{"id":0,"firstName":"Jet","lastName":"Brains"}""", event.data)
1 -> assertEquals("""{"id":0,"prices":[100,200]}""", event.data)
}
}
}
}
}

@Test
fun testHeartbeat() {
testApplication {
application {
module()
}

val client = createClient {
install(SSE)
}

var hellos = 0
var heartbeats = 0
withTimeout(5_000) {
client.sse("/heartbeat") {
incoming.collect { event ->
when (event.data) {
"Hello" -> hellos++
"heartbeat" -> heartbeats++
}
if (hellos > 3 && heartbeats > 3) {
cancel()
}
}
}
}
}
}
}
95 changes: 64 additions & 31 deletions topics/client-server-sent-events.topic
Original file line number Diff line number Diff line change
Expand Up @@ -47,49 +47,63 @@
You can optionally configure the SSE plugin within the <code>install</code> block by setting the supported
properties of the
<a href="https://api.ktor.io/ktor-client/ktor-client-core/io.ktor.client.plugins.sse/-s-s-e-config/index.html">SSEConfig</a>
class:
class.
</p>
<deflist>
<def id="reconnectionTime">
<title><code>reconnectionTime</code></title>
Sets the reconnection delay. If the connection to the server is lost, the
client will wait for the specified time before attempting to reconnect.
</def>
<def id="showCommentEvents">
<title><code>showCommentEvents()</code></title>
Adds events that contain only comments in the incoming flow.
</def>
<def id="showRetryEvents">
<title><code>showRetryEvents()</code></title>
Adds events that contain only the <code>retry</code> field in the incoming flow.
</def>
</deflist>
<p>
In the following example, the SSE plugin is installed into the HTTP client and configured to include events
that only contain comments and those that contain only the <code>retry</code> field in the incoming flow:
</p>
<code-block lang="kotlin" src="snippets/client-sse/src/main/kotlin/com.example/Application.kt"
include-lines="8-13"/>
<chapter title="SSE reconnect" id="sse-reconnect">
<tldr>
<p>️️Not supported on: <code>OkHttp</code></p>
</tldr>
<p>
To enable automatic reconnection with supported engines, set
<code>maxReconnectionAttempts</code> to a value greater than <code>0</code>. You can also configure the
delay between attempts using <code>reconnectionTime</code>:
</p>
<code-block lang="kotlin">
install(SSE) {
maxReconnectionAttempts = 4
reconnectionTime = 2.seconds
}
</code-block>
<p>
If the connection to the server is lost, the client will wait for the specified
<code>reconnectionTime</code> before attempting to reconnect. It will make up to
the specified number of <code>maxReconnectionAttempts</code> attempts to reestablish the connection.
</p>
</chapter>
<chapter title="Filter events" id="filter-events">
<p>
In the following example, the SSE plugin is installed into the HTTP client and configured to include events
that only contain comments and those that contain only the <code>retry</code> field in the incoming flow:
</p>
<code-block lang="kotlin" src="snippets/client-sse/src/main/kotlin/com.example/Application.kt"
include-lines="20-23"/>
</chapter>
</chapter>
<chapter title="Handle SSE sessions" id="handle-sse-sessions">
<p>
A client's SSE session is represented by the
<a href="https://api.ktor.io/ktor-client/ktor-client-core/io.ktor.client.plugins.sse/-default-client-s-s-e-session/index.html">DefaultClientSSESession</a>
<a href="https://api.ktor.io/ktor-client/ktor-client-core/io.ktor.client.plugins.sse/-client-s-s-e-session/index.html">
<code>ClientSSESession</code>
</a>
interface. This interface exposes the API that allows you to receive server-sent events from a server.
</p>
<chapter title="Access an SSE session" id="access-sse-session">
<p>The <code>HttpClient</code> allows you to get access to an SSE session in one of the following ways:</p>
<list>
<li>The <a href="https://api.ktor.io/ktor-client/ktor-client-core/io.ktor.client.plugins.sse/sse.html">sse()</a>
<li>The
<a href="https://api.ktor.io/ktor-client/ktor-client-core/io.ktor.client.plugins.sse/sse.html">
<code>sse()</code>
</a>
function creates the SSE session and allows you to act on it.
</li>
<li>The
<a href="https://api.ktor.io/ktor-client/ktor-client-core/io.ktor.client.plugins.sse/sse-session.html">sseSession()</a>
<a href="https://api.ktor.io/ktor-client/ktor-client-core/io.ktor.client.plugins.sse/sse-session.html">
<code>sseSession()</code>
</a>
function allows you to open an SSE session.
</li>
</list>
<p>The following parameters are available to both functions. To specify the URL endpoint, choose from two
options:</p>
<p>To specify the URL endpoint, you can choose from two options:</p>
<list>
<li>Use the <code>urlString</code> parameter to specify the whole URL as a string.</li>
<li>Use the <code>schema</code>, <code>host</code>, <code>port</code>, and <code>path</code> parameters
Expand Down Expand Up @@ -138,20 +152,39 @@
An incoming server-sent events flow.
</def>
</deflist>
</chapter>
<chapter title="Example" id="example">
<p>
The example below creates a new SSE session with the <code>events</code> endpoint,
reads events through the <code>incoming</code> property and prints the received
<a href="https://api.ktor.io/ktor-shared/ktor-sse/io.ktor.sse/-server-sent-event/index.html"><code>ServerSentEvent</code></a>
.
</p>
<code-block lang="kotlin" src="snippets/client-sse/src/main/kotlin/com.example/Application.kt"
include-symbol="main"/>
include-lines="18-33,55-56"/>
<p>For the full example, see
<a href="https://github.com/ktorio/ktor-documentation/tree/%ktor_version%/codeSnippets/snippets/client-sse">client-sse</a>.
</p>
</chapter>
<chapter title="Deserialization" id="deserialization">
<p>
The SSE plugin supports deserialization of server-sent events into type-safe Kotlin objects. This
feature is particularly useful when working with structured data from the server.
</p>
<p>
To enable deserialization, provide a custom deserialization function using the <code>deserialize</code>
parameter on an SSE access function and use the
<a href="https://api.ktor.io/ktor-client/ktor-client-core/io.ktor.client.plugins.sse/-client-s-s-e-session-with-deserialization/index.html">
<code>ClientSSESessionWithDeserialization</code>
</a>
class to handle the deserialized events.
</p>
<p>
Here's an example using <code>kotlinx.serialization</code> to deserialize JSON data:
</p>
<code-block lang="Kotlin" src="snippets/client-sse/src/main/kotlin/com.example/Application.kt"
include-lines="36-53"/>
<p>For the full example, see
<a href="https://github.com/ktorio/ktor-documentation/tree/%ktor_version%/codeSnippets/snippets/client-sse">client-sse</a>.
</p>
</chapter>
</chapter>

</topic>
45 changes: 43 additions & 2 deletions topics/server-server-sent-events.topic
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@
<chapter title="SSE session block" id="session-block">
<p>
Within the <code>sse</code> block, you define the handler for the specified path, represented by the
<a href="https://api.ktor.io/ktor-server/ktor-server-plugins/ktor-server-sse/io.ktor.server.sse/-server-s-s-e-session/index.html">ServerSSESession</a>
<a href="https://api.ktor.io/ktor-server/ktor-server-plugins/ktor-server-sse/io.ktor.server.sse/-server-s-s-e-session/index.html">
<code>ServerSSESession</code>
</a>
class. The following functions and properties are available within the block:</p>
<deflist>
<def id="send">
Expand Down Expand Up @@ -121,7 +123,46 @@
</p>

<code-block lang="kotlin" src="snippets/server-sse/src/main/kotlin/com/example/Application.kt"
include-lines="14-21"/>
include-lines="23-30,53"/>
<p>For the full example, see
<a href="https://github.com/ktorio/ktor-documentation/tree/%ktor_version%/codeSnippets/snippets/server-sse">server-sse</a>.
</p>
</chapter>
<chapter title="SSE Heartbeat" id="heartbeat">
<p>
Heartbeats ensure the SSE connection stays active during periods of inactivity by periodically sending
events. As long as the session remains active, the server will send the specified event at the
configured interval.
</p>
<p>
To enable and configure a heartbeat, use the
<a href="https://api.ktor.io/ktor-server/ktor-server-plugins/ktor-server-sse/io.ktor.server.sse/heartbeat.html">
<code>.heartbeat()</code>
</a>
function within an SSE route handler:
</p>
<code-block lang="kotlin" src="snippets/server-sse/src/main/kotlin/com/example/Application.kt"
include-lines="24,42-47,52-53"/>
<p>
In this example, a heartbeat event is sent every 10 milliseconds to maintain the
connection.
</p>
</chapter>
<chapter title="Serialization" id="serialization">
<p>
To enable serialization, provide a custom serialization function using the <code>serialize</code>
parameter on an SSE route. Inside the handler, you can use the
<a href="https://api.ktor.io/ktor-server/ktor-server-plugins/ktor-server-sse/io.ktor.server.sse/-server-s-s-e-session-with-serialization/index.html">
<code>ServerSSESessionWithSerialization</code>
</a>
class to send serialized events:
</p>
<code-block lang="kotlin" src="snippets/server-sse/src/main/kotlin/com/example/Application.kt"
include-lines="12-17,20-24,32-39,53-54"/>
<p>
The <code>serialize</code> function is responsible for converting data objects into JSON, which is then
placed in the <code>data</code> field of a <code>ServerSentEvent</code>.
</p>
<p>For the full example, see
<a href="https://github.com/ktorio/ktor-documentation/tree/%ktor_version%/codeSnippets/snippets/server-sse">server-sse</a>.
</p>
Expand Down