Skip to content

Commit 2453b03

Browse files
committed
feat: Add ping/pong to exporter.
1 parent c1fb1e9 commit 2453b03

File tree

3 files changed

+105
-2
lines changed

3 files changed

+105
-2
lines changed

jvb/src/main/kotlin/org/jitsi/videobridge/export/Exporter.kt

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import org.eclipse.jetty.websocket.client.ClientUpgradeRequest
2121
import org.eclipse.jetty.websocket.client.WebSocketClient
2222
import org.jitsi.config.JitsiConfig
2323
import org.jitsi.mediajson.Event
24+
import org.jitsi.mediajson.PingEvent
25+
import org.jitsi.mediajson.PongEvent
2426
import org.jitsi.mediajson.SessionEndEvent
2527
import org.jitsi.mediajson.TranscriptionResultEvent
2628
import org.jitsi.metaconfig.config
@@ -48,11 +50,21 @@ internal class Exporter(
4850
private val httpHeaders: Map<String, String>,
4951
val logger: Logger,
5052
private val handleTranscriptionResult: ((TranscriptionResultEvent) -> Unit),
53+
private val pingEnabled: Boolean = false,
54+
private val pingIntervalMs: Int = 0,
55+
private val pingTimeoutMs: Int = 0
5156
) {
5257
private val isShuttingDown = AtomicBoolean(false)
5358
private val reconnectAttempts = AtomicInteger(0)
5459
private var reconnectFuture: ScheduledFuture<*>? = null
5560

61+
// Ping/pong state
62+
private var pingScheduledFuture: ScheduledFuture<*>? = null
63+
private var pingTimeoutFuture: ScheduledFuture<*>? = null
64+
private val nextPingId = AtomicInteger(0)
65+
private val lastPingSentId = AtomicInteger(0)
66+
private val lastPongReceivedMs = AtomicLong(0)
67+
5668
// Instance-level counters for debugState
5769
private val instancePacketsSent = AtomicLong(0)
5870
private val instanceWebSocketFailures = AtomicLong(0)
@@ -74,6 +86,7 @@ internal class Exporter(
7486
override fun onWebSocketClose(statusCode: Int, reason: String?) =
7587
super.onWebSocketClose(statusCode, reason).also {
7688
logger.info("Websocket closed with status $statusCode, reason: $reason")
89+
stopPing()
7790
val internalError = statusCode == 1011
7891
if (internalError) {
7992
webSocketInternalErrors.inc()
@@ -90,6 +103,7 @@ internal class Exporter(
90103
serializer = initSerializer(this)
91104
reconnectAttempts.set(0)
92105
cancelReconnect()
106+
startPing()
93107
}
94108

95109
override fun onWebSocketError(cause: Throwable?) = super.onWebSocketError(cause).also {
@@ -130,6 +144,18 @@ internal class Exporter(
130144
instanceTranscriptsReceived.incrementAndGet()
131145
handleTranscriptionResult(event)
132146
}
147+
is PongEvent -> {
148+
val expectedId = lastPingSentId.get()
149+
if (event.id == expectedId) {
150+
logger.debug { "Received pong with matching id=${event.id}" }
151+
lastPongReceivedMs.set(System.currentTimeMillis())
152+
// Cancel any pending timeout
153+
pingTimeoutFuture?.cancel(false)
154+
pingTimeoutFuture = null
155+
} else {
156+
logger.warn("Received pong with id=${event.id}, expected id=$expectedId")
157+
}
158+
}
133159
else -> {
134160
otherMessagesReceivedCount.inc()
135161
instanceOtherMessagesReceived.incrementAndGet()
@@ -142,6 +168,61 @@ internal class Exporter(
142168
}
143169
}
144170

171+
private fun sendPing() {
172+
if (!recorderWebSocket.isConnected || isShuttingDown.get()) {
173+
return
174+
}
175+
176+
val pingId = nextPingId.incrementAndGet()
177+
val pingEvent = PingEvent(pingId)
178+
179+
try {
180+
recorderWebSocket.remote?.sendString(pingEvent.toJson())
181+
lastPingSentId.set(pingId)
182+
logger.debug { "Sent ping with id=$pingId" }
183+
184+
// Schedule timeout check
185+
pingTimeoutFuture = TaskPools.SCHEDULED_POOL.schedule({
186+
handlePingTimeout()
187+
}, pingTimeoutMs.toLong(), TimeUnit.MILLISECONDS)
188+
} catch (e: Exception) {
189+
logger.warn("Failed to send ping message", e)
190+
}
191+
}
192+
193+
private fun handlePingTimeout() {
194+
logger.warn("Ping timeout, reconnecting websocket")
195+
196+
// Force reconnect on timeout
197+
stopPing()
198+
recorderWebSocket.session?.close(1000, "Ping timeout")
199+
scheduleReconnect()
200+
}
201+
202+
private fun startPing() {
203+
if (!pingEnabled || pingIntervalMs <= 0) {
204+
return
205+
}
206+
207+
logger.info("Starting ping with interval=$pingIntervalMs ms, timeout=$pingTimeoutMs ms")
208+
stopPing()
209+
210+
// Schedule recurring ping
211+
pingScheduledFuture = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(
212+
{ sendPing() },
213+
pingIntervalMs.toLong(),
214+
pingIntervalMs.toLong(),
215+
TimeUnit.MILLISECONDS
216+
)
217+
}
218+
219+
private fun stopPing() {
220+
pingScheduledFuture?.cancel(false)
221+
pingScheduledFuture = null
222+
pingTimeoutFuture?.cancel(false)
223+
pingTimeoutFuture = null
224+
}
225+
145226
/** Run inside the queue thread, handle a packet. */
146227
private fun doHandlePacket(packet: PacketInfo): Boolean {
147228
if (recorderWebSocket.isConnected) {
@@ -214,6 +295,7 @@ internal class Exporter(
214295

215296
fun stop() {
216297
isShuttingDown.set(true)
298+
stopPing()
217299
cancelReconnect()
218300
if (recorderWebSocket.isConnected) {
219301
recorderWebSocket.remote?.sendString(SessionEndEvent().toJson())
@@ -236,6 +318,12 @@ internal class Exporter(
236318
put("other_messages_received", instanceOtherMessagesReceived.get())
237319
put("parse_failures", instanceParseFailures.get())
238320
put("queue_size", queue.size())
321+
put("ping_enabled", pingEnabled)
322+
if (pingEnabled) {
323+
put("ping_interval_ms", pingIntervalMs)
324+
put("ping_timeout_ms", pingTimeoutMs)
325+
put("last_pong_received_ms", lastPongReceivedMs.get())
326+
}
239327
}
240328

241329
companion object {

jvb/src/main/kotlin/org/jitsi/videobridge/export/ExporterWrapper.kt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,22 @@ class ExporterWrapper(
8181
val httpHeaders = connect.getHttpHeaders().associate { header ->
8282
header.name to header.value
8383
}
84-
exporter = Exporter(connect.url, httpHeaders, logger, handleTranscriptionResult).apply {
84+
85+
// Extract ping configuration if present
86+
val ping = connect.getPing()
87+
val pingEnabled = ping != null
88+
val pingIntervalMs = ping?.interval ?: 0
89+
val pingTimeoutMs = ping?.timeout ?: 0
90+
91+
exporter = Exporter(
92+
connect.url,
93+
httpHeaders,
94+
logger,
95+
handleTranscriptionResult,
96+
pingEnabled,
97+
pingIntervalMs,
98+
pingTimeoutMs
99+
).apply {
85100
start()
86101
}
87102
started = true

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@
133133
<dependency>
134134
<groupId>${project.groupId}</groupId>
135135
<artifactId>jitsi-xmpp-extensions</artifactId>
136-
<version>1.0-101-g2c9a6d6</version>
136+
<version>1.0-105-g07af8a9</version>
137137
</dependency>
138138
</dependencies>
139139
</dependencyManagement>

0 commit comments

Comments
 (0)