Skip to content

Commit 4523f32

Browse files
committed
feat: Add MediaJsonEncoder and an option to export audio to a WS.
1 parent 96d6959 commit 4523f32

File tree

6 files changed

+245
-1
lines changed

6 files changed

+245
-1
lines changed

jvb/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@
143143
<groupId>${project.groupId}</groupId>
144144
<artifactId>jicoco-jetty</artifactId>
145145
</dependency>
146+
<dependency>
147+
<groupId>${project.groupId}</groupId>
148+
<artifactId>jicoco-mediajson</artifactId>
149+
</dependency>
146150
<dependency>
147151
<groupId>${project.groupId}</groupId>
148152
<artifactId>jicoco-metrics</artifactId>

jvb/src/main/java/org/jitsi/videobridge/Conference.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.jitsi.utils.logging2.*;
3030
import org.jitsi.utils.queue.*;
3131
import org.jitsi.videobridge.colibri2.*;
32+
import org.jitsi.videobridge.export.*;
3233
import org.jitsi.videobridge.message.*;
3334
import org.jitsi.videobridge.metrics.*;
3435
import org.jitsi.videobridge.relay.*;
@@ -40,7 +41,6 @@
4041
import org.json.simple.*;
4142
import org.jxmpp.jid.*;
4243

43-
import java.time.*;
4444
import java.util.*;
4545
import java.util.concurrent.*;
4646
import java.util.concurrent.atomic.*;
@@ -180,6 +180,9 @@ public long getLocalVideoSsrc()
180180
@Nullable
181181
private final String meetingId;
182182

183+
@NotNull
184+
private final Exporter exporter = new Exporter();
185+
183186
/**
184187
* A regex pattern to trim UUIDs to just their first 8 hex characters.
185188
*/
@@ -599,6 +602,7 @@ void expire()
599602
logger.debug(() -> "Expiring endpoints.");
600603
getEndpoints().forEach(AbstractEndpoint::expire);
601604
getRelays().forEach(Relay::expire);
605+
exporter.stop();
602606
speechActivity.expire();
603607

604608
updateStatisticsOnExpire();
@@ -1118,6 +1122,14 @@ private void sendOut(PacketInfo packetInfo)
11181122
prevHandler = relay;
11191123
}
11201124
}
1125+
if (exporter.wants(packetInfo))
1126+
{
1127+
if (prevHandler != null)
1128+
{
1129+
prevHandler.send(packetInfo.clone());
1130+
}
1131+
prevHandler = exporter;
1132+
}
11211133

11221134
if (prevHandler != null)
11231135
{
@@ -1130,6 +1142,11 @@ private void sendOut(PacketInfo packetInfo)
11301142
}
11311143
}
11321144

1145+
public void setConnects(List<Connect> exports)
1146+
{
1147+
exporter.setConnects(exports);
1148+
}
1149+
11331150
public boolean hasRelays()
11341151
{
11351152
return !relaysById.isEmpty();

jvb/src/main/kotlin/org/jitsi/videobridge/colibri2/Colibri2ConferenceHandler.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class Colibri2ConferenceHandler(
7676
for (e in conferenceModifyIQ.endpoints) {
7777
responseBuilder.addEndpoint(handleColibri2Endpoint(e, ignoreUnknownEndpoints))
7878
}
79+
conferenceModifyIQ.connects?.let { conference.setConnects(it.getConnects()) }
7980
for (r in conferenceModifyIQ.relays) {
8081
if (!RelayConfig.config.enabled) {
8182
throw IqProcessingException(Condition.feature_not_implemented, "Octo is disabled in configuration.")
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright @ 2024 - Present, 8x8 Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.jitsi.videobridge.export
17+
18+
import org.eclipse.jetty.websocket.api.WebSocketAdapter
19+
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest
20+
import org.eclipse.jetty.websocket.client.WebSocketClient
21+
import org.jitsi.nlj.PacketInfo
22+
import org.jitsi.nlj.format.OpusPayloadType
23+
import org.jitsi.nlj.rtp.AudioRtpPacket
24+
import org.jitsi.nlj.util.PacketInfoQueue
25+
import org.jitsi.utils.logging2.createLogger
26+
import org.jitsi.videobridge.PotentialPacketHandler
27+
import org.jitsi.videobridge.colibri2.FeatureNotImplementedException
28+
import org.jitsi.videobridge.util.ByteBufferPool
29+
import org.jitsi.videobridge.util.TaskPools
30+
import org.jitsi.videobridge.websocket.config.WebsocketServiceConfig
31+
import org.jitsi.xmpp.extensions.colibri2.Connect
32+
33+
class Exporter : PotentialPacketHandler {
34+
val logger = createLogger()
35+
var started = false
36+
val queue = PacketInfoQueue(
37+
"${javaClass.simpleName}-packet-queue",
38+
TaskPools.IO_POOL,
39+
this::doHandlePacket,
40+
1024
41+
)
42+
43+
private var wsNotConnectedErrors = 0
44+
private fun logWsNotConnectedError(): Boolean = (wsNotConnectedErrors++ % 1000) == 0
45+
private val encoder = MediaJsonEncoder {
46+
if (recorderWebSocket.isConnected) {
47+
recorderWebSocket.remote?.sendString(it.toJson())
48+
?: logger.info("Websocket is connected, but remote is null")
49+
} else if (logWsNotConnectedError()) {
50+
logger.info("Can not send packet, websocket is not connected (count=$wsNotConnectedErrors).")
51+
}
52+
}
53+
private var recorderWebSocket = WebSocketAdapter()
54+
55+
fun setConnects(connects: List<Connect>) {
56+
when {
57+
started && connects.isNotEmpty() -> throw FeatureNotImplementedException("Changing connects once enabled.")
58+
connects.isEmpty() -> stop()
59+
connects.size > 1 -> throw FeatureNotImplementedException("Multiple connects")
60+
connects[0].video -> throw FeatureNotImplementedException("Video")
61+
else -> start(connects[0])
62+
}
63+
}
64+
65+
/** Run inside the queue thread, handle a packet. */
66+
private fun doHandlePacket(packet: PacketInfo): Boolean {
67+
if (started) {
68+
encoder.encode(packet.packetAs(), packet.endpointId!!)
69+
}
70+
ByteBufferPool.returnBuffer(packet.packet.buffer)
71+
return true
72+
}
73+
74+
/** Whether we want to accept a packet. */
75+
override fun wants(packet: PacketInfo): Boolean {
76+
if (!started || packet.packet is AudioRtpPacket) return false
77+
if (packet.payloadType !is OpusPayloadType) {
78+
logger.warn("Ignore audio with unsupported payload type: ${packet.payloadType}")
79+
return false
80+
}
81+
return true
82+
}
83+
84+
/** Accept a packet, add it to the queue. */
85+
override fun send(packet: PacketInfo) {
86+
if (started) {
87+
queue.add(packet)
88+
} else {
89+
ByteBufferPool.returnBuffer(packet.packet.buffer)
90+
}
91+
}
92+
93+
fun stop() {
94+
started = false
95+
logger.info("Stopping.")
96+
recorderWebSocket.session?.close(org.eclipse.jetty.websocket.core.CloseStatus.SHUTDOWN, "closing")
97+
}
98+
99+
fun start(connect: Connect) {
100+
if (connect.video) throw FeatureNotImplementedException("Video")
101+
if (connect.protocol != Connect.Protocols.MEDIAJSON) {
102+
throw FeatureNotImplementedException("Protocol ${connect.protocol}")
103+
}
104+
if (connect.type != Connect.Types.RECORDER) {
105+
throw FeatureNotImplementedException("Type ${connect.type}")
106+
}
107+
108+
logger.info("Starting with url=${connect.url}")
109+
webSocketClient.connect(recorderWebSocket, connect.url, ClientUpgradeRequest())
110+
started = true
111+
}
112+
113+
companion object {
114+
val webSocketClient = WebSocketClient().apply {
115+
idleTimeout = WebsocketServiceConfig.config.idleTimeout
116+
start()
117+
}
118+
}
119+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright @ 2024 - Present, 8x8 Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.jitsi.videobridge.export
17+
18+
import org.jitsi.mediajson.CustomParameters
19+
import org.jitsi.mediajson.Event
20+
import org.jitsi.mediajson.Media
21+
import org.jitsi.mediajson.MediaEvent
22+
import org.jitsi.mediajson.MediaFormat
23+
import org.jitsi.mediajson.Start
24+
import org.jitsi.mediajson.StartEvent
25+
import org.jitsi.nlj.rtp.AudioRtpPacket
26+
import org.jitsi.nlj.util.Rfc3711IndexTracker
27+
import java.time.Clock
28+
import java.time.Duration
29+
import java.time.Instant
30+
import kotlin.io.encoding.Base64
31+
import kotlin.io.encoding.ExperimentalEncodingApi
32+
33+
/**
34+
* Encodes the media in a conference into a mediajson format. Maintains state for each SSRC in order to maintain a
35+
* common space for timestamps.
36+
*
37+
* Note this supports only OPUS and assumes a common clock with a rate of 48000 for all SSRCs (which is required by
38+
* OPUS/RTP).
39+
*/
40+
class MediaJsonEncoder(
41+
/** Encoded mediajson events are sent to this function */
42+
private val handleEvent: (Event) -> Unit
43+
) {
44+
/** Reference time, timestamps are set relative to this instant. **/
45+
private val ref: Instant = Clock.systemUTC().instant()
46+
private val ssrcsStarted = mutableMapOf<Long, SsrcState>()
47+
48+
/** Global sequence number for all events */
49+
var seq = 0
50+
51+
fun encode(p: AudioRtpPacket, epId: String) = synchronized(ssrcsStarted) {
52+
val state = ssrcsStarted.computeIfAbsent(p.ssrc) { ssrc ->
53+
SsrcState(
54+
p.timestamp,
55+
(Duration.between(ref, Clock.systemUTC().instant()).toNanos() * 48.0e-6).toLong()
56+
).also {
57+
handleEvent(createStart(epId, ssrc))
58+
}
59+
}
60+
61+
handleEvent(encodeMedia(p, state, epId))
62+
}
63+
64+
private fun createStart(epId: String, ssrc: Long) = StartEvent(
65+
++seq,
66+
Start(
67+
"$epId-$ssrc",
68+
MediaFormat(
69+
"opus",
70+
48000,
71+
2
72+
),
73+
CustomParameters(endpointId = epId)
74+
)
75+
)
76+
77+
@OptIn(ExperimentalEncodingApi::class)
78+
private fun encodeMedia(p: AudioRtpPacket, state: SsrcState, epId: String): Event {
79+
++seq
80+
val elapsedRtpTime = p.timestamp - state.initialRtpTs
81+
return MediaEvent(
82+
seq,
83+
media = Media(
84+
"$epId-${p.ssrc}",
85+
state.indexTracker.update(p.sequenceNumber),
86+
state.offset + elapsedRtpTime,
87+
Base64.encode(p.buffer, p.payloadOffset, p.payloadOffset + p.payloadLength)
88+
)
89+
)
90+
}
91+
92+
private class SsrcState(
93+
val initialRtpTs: Long,
94+
// Offset of this SSRC since the start time in RTP units
95+
val offset: Long,
96+
val indexTracker: Rfc3711IndexTracker = Rfc3711IndexTracker()
97+
)
98+
}

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@
9898
<artifactId>jicoco-mucclient</artifactId>
9999
<version>${jicoco.version}</version>
100100
</dependency>
101+
<dependency>
102+
<groupId>${project.groupId}</groupId>
103+
<artifactId>jicoco-mediajson</artifactId>
104+
<version>${jicoco.version}</version>
105+
</dependency>
101106
<dependency>
102107
<groupId>${project.groupId}</groupId>
103108
<artifactId>jicoco-test-kotlin</artifactId>

0 commit comments

Comments
 (0)