1515 */
1616package org.jitsi.videobridge.export
1717
18- import com.fasterxml.jackson.databind.JsonNode
19- import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
2018import org.eclipse.jetty.websocket.api.Session
2119import org.eclipse.jetty.websocket.api.WebSocketAdapter
2220import org.eclipse.jetty.websocket.client.ClientUpgradeRequest
2321import org.eclipse.jetty.websocket.client.WebSocketClient
2422import org.jitsi.config.JitsiConfig
23+ import org.jitsi.mediajson.Event
24+ import org.jitsi.mediajson.TranscriptionResultEvent
2525import org.jitsi.metaconfig.config
2626import org.jitsi.metaconfig.optionalconfig
2727import org.jitsi.nlj.PacketInfo
@@ -46,7 +46,7 @@ internal class Exporter(
4646 private val url : URI ,
4747 private val httpHeaders : Map <String , String >,
4848 val logger : Logger ,
49- private val handleTranscriptionResult : ((JsonNode ) -> Unit )
49+ private val handleTranscriptionResult : ((TranscriptionResultEvent ) -> Unit ),
5050) {
5151 private val isShuttingDown = AtomicBoolean (false )
5252 private val reconnectAttempts = AtomicInteger (0 )
@@ -59,6 +59,7 @@ internal class Exporter(
5959 private val instanceStarts = AtomicLong (0 )
6060 private val instanceTranscriptsReceived = AtomicLong (0 )
6161 private val instanceOtherMessagesReceived = AtomicLong (0 )
62+ private val instanceParseFailures = AtomicLong (0 )
6263
6364 val queue: PacketInfoQueue by lazy {
6465 PacketInfoQueue (
@@ -119,19 +120,24 @@ internal class Exporter(
119120
120121 private fun handleIncomingMessage (message : String ) {
121122 try {
122- val jsonNode = objectMapper.readTree(message)
123- logger.debug { " Received message from websocket: $jsonNode " }
124-
125- if (jsonNode.get(" type" )?.asText() == " transcription-result" ) {
126- transcriptsReceivedCount.inc()
127- instanceTranscriptsReceived.incrementAndGet()
128- handleTranscriptionResult(jsonNode)
129- } else {
130- otherMessagesReceivedCount.inc()
131- instanceOtherMessagesReceived.incrementAndGet()
123+ val event = Event .parse(message)
124+ logger.debug { " Received message from websocket: ${event.toJson()} " }
125+
126+ when (event) {
127+ is TranscriptionResultEvent -> {
128+ transcriptsReceivedCount.inc()
129+ instanceTranscriptsReceived.incrementAndGet()
130+ handleTranscriptionResult(event)
131+ }
132+ else -> {
133+ otherMessagesReceivedCount.inc()
134+ instanceOtherMessagesReceived.incrementAndGet()
135+ }
132136 }
133137 } catch (e: Exception ) {
134138 logger.warn(" Failed to parse incoming websocket message: $message " , e)
139+ parseFailuresCount.inc()
140+ instanceParseFailures.incrementAndGet()
135141 }
136142 }
137143
@@ -224,6 +230,7 @@ internal class Exporter(
224230 put(" starts" , instanceStarts.get())
225231 put(" transcripts_received" , instanceTranscriptsReceived.get())
226232 put(" other_messages_received" , instanceOtherMessagesReceived.get())
233+ put(" parse_failures" , instanceParseFailures.get())
227234 put(" queue_size" , queue.size())
228235 }
229236
@@ -263,7 +270,10 @@ internal class Exporter(
263270 " Number of non-transcription messages received by Exporter"
264271 )
265272
266- private val objectMapper = jacksonObjectMapper()
273+ private val parseFailuresCount = VideobridgeMetricsContainer .instance.registerCounter(
274+ " exporter_parse_failures" ,
275+ " Number of messages that failed to parse"
276+ )
267277
268278 private val maxReconnectAttempts: Int? by optionalconfig {
269279 " videobridge.exporter.max-reconnect-attempts" .from(JitsiConfig .newConfig)
0 commit comments