Skip to content

Commit fbba7ee

Browse files
authored
Batch source-add/source-removes. (#801)
* ref: Move logic to handle sources from Conference to Participant. * feat: Delay sending source-add/source-removes in order to batch them. * squash: Fix typo in function name.
1 parent db0453c commit fbba7ee

File tree

5 files changed

+248
-61
lines changed

5 files changed

+248
-61
lines changed

src/main/java/org/jitsi/jicofo/JitsiMeetConferenceImpl.java

Lines changed: 6 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.jitsi.impl.protocol.xmpp.*;
2424
import org.jitsi.jicofo.auth.*;
2525
import org.jitsi.jicofo.bridge.*;
26-
import org.jitsi.jicofo.conference.*;
2726
import org.jitsi.jicofo.conference.source.*;
2827
import org.jitsi.jicofo.lipsynchack.*;
2928
import org.jitsi.jicofo.util.*;
@@ -124,8 +123,7 @@ public class JitsiMeetConferenceImpl
124123
private volatile ChatRoom chatRoom;
125124

126125
/**
127-
* Operation set used to handle Jingle sessions with conference
128-
* participants.
126+
* Operation set used to handle Jingle sessions with conference participants.
129127
*/
130128
private OperationSetJingle jingle;
131129

@@ -1402,24 +1400,7 @@ private void propagateNewSources(Participant sourceOwner, ConferenceSourceMap so
14021400

14031401
participants.stream()
14041402
.filter(otherParticipant -> otherParticipant != sourceOwner)
1405-
.forEach(
1406-
participant ->
1407-
{
1408-
if (participant.isSessionEstablished())
1409-
{
1410-
jingle.sendAddSourceIQ(
1411-
finalSources,
1412-
participant.getJingleSession(),
1413-
ConferenceConfig.config.getUseJsonEncodedSources()
1414-
&& participant.supportsJsonEncodedSources());
1415-
}
1416-
else
1417-
{
1418-
logger.warn("No jingle session yet for " + participant.getChatMember().getName());
1419-
1420-
participant.queueRemoteSourcesToAdd(finalSources);
1421-
}
1422-
});
1403+
.forEach(participant -> participant.addRemoteSources(finalSources));
14231404
}
14241405

14251406
/**
@@ -1694,30 +1675,10 @@ private XMPPError onSessionAcceptInternal(
16941675
}
16951676
}
16961677

1697-
// Loop over current participant and send 'source-add' notification
1678+
// Propagate [participant]'s sources to the other participants.
16981679
propagateNewSources(participant, sourcesAccepted);
1699-
1700-
boolean encodeSourcesAsJson
1701-
= ConferenceConfig.config.getUseJsonEncodedSources() && participant.supportsJsonEncodedSources();
1702-
for (SourcesToAddOrRemove sourcesToAddOrRemove : participant.clearQueuedRemoteSourceChanges())
1703-
{
1704-
AddOrRemove action = sourcesToAddOrRemove.getAction();
1705-
ConferenceSourceMap sources = sourcesToAddOrRemove.getSources();
1706-
logger.info(
1707-
"Sending a queued source-" + action.toString().toLowerCase()
1708-
+ " to " + participantId + ", sources:" + sources);
1709-
if (action == AddOrRemove.Add)
1710-
{
1711-
jingle.sendAddSourceIQ(
1712-
sourcesToAddOrRemove.getSources(),
1713-
jingleSession,
1714-
encodeSourcesAsJson);
1715-
}
1716-
else if (action == AddOrRemove.Remove)
1717-
{
1718-
jingle.sendRemoveSourceIQ(sourcesToAddOrRemove.getSources(), jingleSession, encodeSourcesAsJson);
1719-
}
1720-
}
1680+
// Now that the Jingle session is ready, signal any sources from other participants to [participant].
1681+
participant.sendQueuedRemoteSources();
17211682

17221683
return null;
17231684
}
@@ -1817,23 +1778,7 @@ private void sendSourceRemove(ConferenceSourceMap sources, Participant except)
18171778

18181779
participants.stream()
18191780
.filter(participant -> participant != except)
1820-
.forEach(
1821-
participant ->
1822-
{
1823-
if (participant.isSessionEstablished())
1824-
{
1825-
jingle.sendRemoveSourceIQ(
1826-
finalSources,
1827-
participant.getJingleSession(),
1828-
ConferenceConfig.config.getUseJsonEncodedSources()
1829-
&& participant.supportsJsonEncodedSources());
1830-
}
1831-
else
1832-
{
1833-
logger.warn("Remove source: no jingle session for " + participant.getEndpointId());
1834-
participant.queueRemoteSourcesToRemove(finalSources);
1835-
}
1836-
});
1781+
.forEach(participant -> participant.removeRemoteSources(finalSources));
18371782
}
18381783

18391784
/**

src/main/java/org/jitsi/jicofo/Participant.java

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.jetbrains.annotations.*;
2121
import org.jitsi.impl.protocol.xmpp.*;
22+
import org.jitsi.jicofo.conference.*;
2223
import org.jitsi.jicofo.conference.source.*;
2324
import org.jitsi.jicofo.xmpp.muc.*;
2425
import org.jitsi.utils.*;
@@ -33,6 +34,7 @@
3334

3435
import java.time.*;
3536
import java.util.*;
37+
import java.util.concurrent.*;
3638

3739
import static java.time.temporal.ChronoUnit.SECONDS;
3840

@@ -110,6 +112,17 @@ public static String getEndpointId(ChatRoomMember chatRoomMember)
110112
*/
111113
private final JitsiMeetConferenceImpl conference;
112114

115+
/**
116+
* The task, if any, currently scheduled to signal queued remote sources.
117+
*/
118+
private ScheduledFuture<?> signalQueuedSourcesTask;
119+
120+
/**
121+
* The lock used when queueing remote sources to be signaled with a delay, i.e. when setting
122+
* {@link #signalQueuedSourcesTask}.
123+
*/
124+
private final Object signalQueuedSourcesTaskSyncRoot = new Object();
125+
113126
/**
114127
* Creates new {@link Participant} for given chat room member.
115128
*
@@ -126,6 +139,7 @@ public Participant(
126139
this.conference = conference;
127140
this.roomMember = roomMember;
128141
this.logger = parentLogger.createChildLogger(getClass().getName());
142+
logger.addContext("participant", getEndpointId());
129143
}
130144

131145
/**
@@ -521,6 +535,151 @@ public void setMuted(MediaType mediaType, boolean value)
521535
}
522536
}
523537

538+
/**
539+
* Add a set of remote sources, which are to be signaled to the remote side. The sources may be signaled
540+
* immediately, or queued to be signaled later.
541+
* @param sources the sources to add.
542+
*/
543+
public void addRemoteSources(ConferenceSourceMap sources)
544+
{
545+
if (!isSessionEstablished())
546+
{
547+
logger.debug("No Jingle session yet, queueing source-add.");
548+
queueRemoteSourcesToAdd(sources);
549+
// No need to schedule, the sources will be signaled when the session is established.
550+
return;
551+
}
552+
553+
int delayMs = ConferenceConfig.config.getSourceSignalingDelayMs(conference.getParticipantCount());
554+
if (delayMs > 0)
555+
{
556+
synchronized (signalQueuedSourcesTaskSyncRoot)
557+
{
558+
queueRemoteSourcesToAdd(sources);
559+
scheduleSignalingOfQueuedSources(delayMs);
560+
}
561+
}
562+
else
563+
{
564+
OperationSetJingle jingle = conference.getJingle();
565+
if (jingle == null)
566+
{
567+
logger.error("Can not send Jingle source-add, no Jingle API available.");
568+
return;
569+
}
570+
jingle.sendAddSourceIQ(
571+
sources,
572+
getJingleSession(),
573+
ConferenceConfig.config.getUseJsonEncodedSources() && supportsJsonEncodedSources());
574+
}
575+
}
576+
577+
/**
578+
* Schedule a task to signal all queued remote sources to the remote side. If a task is already scheduled, does
579+
* not schedule a new one (the existing task will send all latest queued sources).
580+
* @param delayMs the delay in milliseconds after which the task is to execute.
581+
*/
582+
private void scheduleSignalingOfQueuedSources(int delayMs)
583+
{
584+
synchronized (signalQueuedSourcesTaskSyncRoot)
585+
{
586+
if (signalQueuedSourcesTask == null)
587+
{
588+
logger.debug("Scheduling a task to signal queued remote sources after " + delayMs + " ms.");
589+
signalQueuedSourcesTask = TaskPools.getScheduledPool().schedule(() ->
590+
{
591+
synchronized (signalQueuedSourcesTaskSyncRoot)
592+
{
593+
sendQueuedRemoteSources();
594+
signalQueuedSourcesTask = null;
595+
}
596+
},
597+
delayMs,
598+
TimeUnit.MILLISECONDS);
599+
}
600+
}
601+
}
602+
603+
604+
/**
605+
* Removee a set of remote sources, which are to be signaled as removed to the remote side. The sources may be
606+
* signaled immediately, or queued to be signaled later.
607+
* @param sources the sources to remove.
608+
*/
609+
public void removeRemoteSources(ConferenceSourceMap sources)
610+
{
611+
if (!isSessionEstablished())
612+
{
613+
logger.debug("No Jingle session yet, queueing source-remove.");
614+
queueRemoteSourcesToRemove(sources);
615+
// No need to schedule, the sources will be signaled when the session is established.
616+
return;
617+
}
618+
619+
int delayMs = ConferenceConfig.config.getSourceSignalingDelayMs(conference.getParticipantCount());
620+
if (delayMs > 0)
621+
{
622+
synchronized (signalQueuedSourcesTaskSyncRoot)
623+
{
624+
queueRemoteSourcesToRemove(sources);
625+
scheduleSignalingOfQueuedSources(delayMs);
626+
}
627+
}
628+
else
629+
{
630+
OperationSetJingle jingle = conference.getJingle();
631+
if (jingle == null)
632+
{
633+
logger.error("Can not send Jingle source-remove, no Jingle API available.");
634+
return;
635+
}
636+
jingle.sendRemoveSourceIQ(
637+
sources,
638+
getJingleSession(),
639+
ConferenceConfig.config.getUseJsonEncodedSources() && supportsJsonEncodedSources());
640+
}
641+
}
642+
643+
/**
644+
* Signal any queued remote source modifications (either addition or removal) to the remote side.
645+
*/
646+
public void sendQueuedRemoteSources()
647+
{
648+
OperationSetJingle jingle = conference.getJingle();
649+
if (jingle == null)
650+
{
651+
logger.error("Can not signal remote sources, no Jingle API available");
652+
return;
653+
}
654+
655+
if (!isSessionEstablished())
656+
{
657+
logger.warn("Can not singal remote sources, Jingle session not established.");
658+
return;
659+
}
660+
661+
boolean encodeSourcesAsJson
662+
= ConferenceConfig.config.getUseJsonEncodedSources() && supportsJsonEncodedSources();
663+
664+
for (SourcesToAddOrRemove sourcesToAddOrRemove : clearQueuedRemoteSourceChanges())
665+
{
666+
AddOrRemove action = sourcesToAddOrRemove.getAction();
667+
ConferenceSourceMap sources = sourcesToAddOrRemove.getSources();
668+
logger.info("Sending a queued source-" + action.toString().toLowerCase() + ", sources:" + sources);
669+
if (action == AddOrRemove.Add)
670+
{
671+
jingle.sendAddSourceIQ(
672+
sourcesToAddOrRemove.getSources(),
673+
jingleSession,
674+
encodeSourcesAsJson);
675+
}
676+
else if (action == AddOrRemove.Remove)
677+
{
678+
jingle.sendRemoveSourceIQ(sourcesToAddOrRemove.getSources(), jingleSession, encodeSourcesAsJson);
679+
}
680+
}
681+
}
682+
524683
/**
525684
* Checks whether the participant is muted.
526685
* @param mediaType the media type to check.

src/main/kotlin/org/jitsi/jicofo/ConferenceConfig.kt

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
*/
1818
package org.jitsi.jicofo
1919

20+
import com.typesafe.config.ConfigObject
21+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
2022
import org.jitsi.config.JitsiConfig.Companion.legacyConfig
2123
import org.jitsi.config.JitsiConfig.Companion.newConfig
2224
import org.jitsi.metaconfig.config
2325
import java.time.Duration
26+
import java.util.TreeMap
2427

28+
@SuppressFBWarnings(value = ["BX_UNBOXING_IMMEDIATELY_REBOXED"], justification = "False positive.")
2529
class ConferenceConfig {
2630
val conferenceStartTimeout: Duration by config {
2731
"org.jitsi.focus.IDLE_TIMEOUT".from(legacyConfig)
@@ -72,6 +76,19 @@ class ConferenceConfig {
7276
}
7377
fun useRandomSharedDocumentName(): Boolean = useRandomSharedDocumentName
7478

79+
private val sourceSignalingDelays: TreeMap<Int, Int> by config {
80+
"jicofo.conference.source-signaling-delays".from(newConfig)
81+
.convertFrom<ConfigObject> { cfg ->
82+
TreeMap(cfg.entries.associate { it.key.toInt() to it.value.unwrapped() as Int })
83+
}
84+
}
85+
86+
/**
87+
* Get the number of milliseconds to delay signaling of Jingle sources given a certain [conferenceSize].
88+
*/
89+
fun getSourceSignalingDelayMs(conferenceSize: Int) =
90+
sourceSignalingDelays.floorEntry(conferenceSize)?.value ?: 0
91+
7592
/**
7693
* Whether to strip simulcast streams when signaling receivers. This option requires that jitsi-videobridge
7794
* uses the first SSRC in the SIM group as the target SSRC when rewriting streams, as this is the only SSRC

src/main/resources/reference.conf

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,16 @@ jicofo {
191191
// If `true` the shared document uses a random name. Otherwise, it uses the conference name.
192192
use-random-name = false
193193
}
194+
195+
// How much to delay signaling Jingle source-add and source-remove in order to batch them and reduce the number of
196+
// messages (based on conference size). The values are in milliseconds.
197+
source-signaling-delays = {
198+
// Conferences with size <50 will have delay=0.
199+
// Conferences with size between 50 and 99 will have delay=500 ms.
200+
#50 = 500
201+
// Conferences with size >=100 have delay=1000 ms.
202+
#100 = 1000
203+
}
194204
}
195205

196206
// Configuration for the internal health checks performed by jicofo.

0 commit comments

Comments
 (0)