Skip to content

Commit 2f72367

Browse files
authored
Merge queued remote source changes. (#784)
* fix: Synchronize access to queuedRemoteSourceChanges. * fix: Reduce the number queued of source-add/source-removes Merges consecutive source-add or source-remove operations to reduce the number of messages sent. In a large conference in which participants join at a high rate there can be a lot of source-adds queued and it's more efficient to merge them in one message.
1 parent 274d8b4 commit 2f72367

File tree

2 files changed

+161
-7
lines changed

2 files changed

+161
-7
lines changed

src/main/java/org/jitsi/jicofo/AbstractParticipant.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import java.util.*;
2121

22-
import org.jetbrains.annotations.*;
22+
import com.google.common.collect.*;
2323
import org.jitsi.jicofo.conference.*;
2424
import org.jitsi.jicofo.conference.source.*;
2525
import org.jitsi.xmpp.extensions.colibri.*;
@@ -52,7 +52,7 @@ public abstract class AbstractParticipant
5252
/**
5353
* List of remote source addition or removal operations that have not yet been signaled to this participant.
5454
*/
55-
private List<SourcesToAddOrRemove> queuedRemoteSourceChanges = new ArrayList<>();
55+
private final List<SourcesToAddOrRemove> queuedRemoteSourceChanges = new ArrayList<>();
5656

5757
/**
5858
* Returns currently stored map of RTP description to Colibri content name.
@@ -118,9 +118,23 @@ public void setRTPDescription(List<ContentPacketExtension> jingleContents)
118118
*/
119119
public List<SourcesToAddOrRemove> clearQueuedRemoteSourceChanges()
120120
{
121-
List<SourcesToAddOrRemove> ret = queuedRemoteSourceChanges;
122-
queuedRemoteSourceChanges = new ArrayList<>();
123-
return ret;
121+
synchronized (queuedRemoteSourceChanges)
122+
{
123+
List<SourcesToAddOrRemove> ret = new ArrayList<>(queuedRemoteSourceChanges);
124+
queuedRemoteSourceChanges.clear();
125+
return ret;
126+
}
127+
}
128+
129+
/**
130+
* Gets the list of pending remote sources, without clearing them. For testing.
131+
*/
132+
public List<SourcesToAddOrRemove> getQueuedRemoteSourceChanges()
133+
{
134+
synchronized (queuedRemoteSourceChanges)
135+
{
136+
return new ArrayList<>(queuedRemoteSourceChanges);
137+
}
124138
}
125139

126140
/**
@@ -130,7 +144,20 @@ public List<SourcesToAddOrRemove> clearQueuedRemoteSourceChanges()
130144
*/
131145
public void queueRemoteSourcesToAdd(ConferenceSourceMap sourcesToAdd)
132146
{
133-
queuedRemoteSourceChanges.add(new SourcesToAddOrRemove(AddOrRemove.Add, sourcesToAdd));
147+
synchronized (queuedRemoteSourceChanges)
148+
{
149+
SourcesToAddOrRemove previous = Iterables.getLast(queuedRemoteSourceChanges, null);
150+
if (previous != null && previous.getAction() == AddOrRemove.Add)
151+
{
152+
// We merge sourcesToAdd with the previous sources queued to be added to reduce the number of
153+
// source-add messages that need to be sent.
154+
queuedRemoteSourceChanges.remove(queuedRemoteSourceChanges.size() - 1);
155+
sourcesToAdd = sourcesToAdd.copy();
156+
sourcesToAdd.add(previous.getSources());
157+
}
158+
159+
queuedRemoteSourceChanges.add(new SourcesToAddOrRemove(AddOrRemove.Add, sourcesToAdd));
160+
}
134161
}
135162

136163
/**
@@ -140,7 +167,20 @@ public void queueRemoteSourcesToAdd(ConferenceSourceMap sourcesToAdd)
140167
*/
141168
public void queueRemoteSourcesToRemove(ConferenceSourceMap sourcesToRemove)
142169
{
143-
queuedRemoteSourceChanges.add(new SourcesToAddOrRemove(AddOrRemove.Remove, sourcesToRemove));
170+
synchronized (queuedRemoteSourceChanges)
171+
{
172+
SourcesToAddOrRemove previous = Iterables.getLast(queuedRemoteSourceChanges, null);
173+
if (previous != null && previous.getAction() == AddOrRemove.Remove)
174+
{
175+
// We merge sourcesToRemove with the previous sources queued to be remove to reduce the number of
176+
// source-remove messages that need to be sent.
177+
queuedRemoteSourceChanges.remove(queuedRemoteSourceChanges.size() - 1);
178+
sourcesToRemove = sourcesToRemove.copy();
179+
sourcesToRemove.add(previous.getSources());
180+
}
181+
182+
queuedRemoteSourceChanges.add(new SourcesToAddOrRemove(AddOrRemove.Remove, sourcesToRemove));
183+
}
144184
}
145185

146186
/**
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Jicofo, the Jitsi Conference Focus.
3+
*
4+
* Copyright @ 2021-Present 8x8, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.jitsi.jicofo.conference
19+
20+
import io.kotest.core.spec.style.ShouldSpec
21+
import io.kotest.matchers.shouldBe
22+
import org.jitsi.jicofo.AbstractParticipant
23+
import org.jitsi.jicofo.conference.AddOrRemove.Add
24+
import org.jitsi.jicofo.conference.AddOrRemove.Remove
25+
import org.jitsi.jicofo.conference.source.ConferenceSourceMap
26+
import org.jitsi.jicofo.conference.source.EndpointSourceSet
27+
import org.jitsi.jicofo.conference.source.Source
28+
import org.jitsi.utils.MediaType.AUDIO
29+
import org.jitsi.utils.logging2.LoggerImpl
30+
import org.jxmpp.jid.impl.JidCreate
31+
32+
class AbstractParticipantTest : ShouldSpec() {
33+
init {
34+
val jid1 = JidCreate.from("jid1")
35+
val s1 = ConferenceSourceMap(jid1 to EndpointSourceSet(Source(1, AUDIO))).unmodifiable
36+
37+
val jid2 = JidCreate.from("jid2")
38+
val s2 = ConferenceSourceMap(jid2 to EndpointSourceSet(Source(2, AUDIO))).unmodifiable
39+
val s2new = ConferenceSourceMap(
40+
jid2 to EndpointSourceSet(
41+
setOf(
42+
Source(2, AUDIO),
43+
Source(222, AUDIO)
44+
)
45+
)
46+
).unmodifiable
47+
48+
val jid3 = JidCreate.from("jid3")
49+
val s3 = ConferenceSourceMap(jid3 to EndpointSourceSet(Source(3, AUDIO))).unmodifiable
50+
51+
val participant = TestParticipant()
52+
53+
context("Queueing remote sources") {
54+
participant.queueRemoteSourcesToAdd(s1)
55+
participant.queuedRemoteSourceChanges.size shouldBe 1
56+
participant.queuedRemoteSourceChanges[0].action shouldBe Add
57+
participant.queuedRemoteSourceChanges[0].sources.toMap() shouldBe s1.toMap()
58+
59+
// Consecutive source-adds should be merged.
60+
participant.queueRemoteSourcesToAdd(s2)
61+
participant.queuedRemoteSourceChanges.size shouldBe 1
62+
participant.queuedRemoteSourceChanges[0].action shouldBe Add
63+
participant.queuedRemoteSourceChanges[0].sources.toMap() shouldBe
64+
s1.copy().apply { add(s2) }.toMap()
65+
66+
// Consecutive source-adds should be merged.
67+
participant.queueRemoteSourcesToAdd(s2new)
68+
participant.queuedRemoteSourceChanges.size shouldBe 1
69+
participant.queuedRemoteSourceChanges[0].action shouldBe Add
70+
participant.queuedRemoteSourceChanges[0].sources.toMap() shouldBe
71+
s1.copy().apply { add(s2); add(s2new) }.toMap()
72+
73+
// A source-remove after a series of source-adds should be a new entry.
74+
participant.queueRemoteSourcesToRemove(s2new)
75+
participant.queuedRemoteSourceChanges.size shouldBe 2
76+
participant.queuedRemoteSourceChanges[0].action shouldBe Add
77+
participant.queuedRemoteSourceChanges[0].sources.toMap() shouldBe
78+
s1.copy().apply { add(s2); add(s2new) }.toMap()
79+
participant.queuedRemoteSourceChanges[1].action shouldBe Remove
80+
participant.queuedRemoteSourceChanges[1].sources.toMap() shouldBe s2new.toMap()
81+
82+
// A source-add following source-remove should be a new entry.
83+
participant.queueRemoteSourcesToAdd(s3)
84+
participant.queuedRemoteSourceChanges.size shouldBe 3
85+
participant.queuedRemoteSourceChanges[0].action shouldBe Add
86+
participant.queuedRemoteSourceChanges[0].sources.toMap() shouldBe
87+
s1.copy().apply { add(s2); add(s2new) }.toMap()
88+
participant.queuedRemoteSourceChanges[1].action shouldBe Remove
89+
participant.queuedRemoteSourceChanges[1].sources.toMap() shouldBe s2new.toMap()
90+
participant.queuedRemoteSourceChanges[2].action shouldBe Add
91+
participant.queuedRemoteSourceChanges[2].sources.toMap() shouldBe s3.toMap()
92+
93+
// Consecutive source-adds should be merged.
94+
participant.queueRemoteSourcesToAdd(s1)
95+
participant.queuedRemoteSourceChanges.size shouldBe 3
96+
participant.queuedRemoteSourceChanges[0].action shouldBe Add
97+
participant.queuedRemoteSourceChanges[0].sources.toMap() shouldBe
98+
s1.copy().apply { add(s2); add(s2new) }.toMap()
99+
participant.queuedRemoteSourceChanges[1].action shouldBe Remove
100+
participant.queuedRemoteSourceChanges[1].sources.toMap() shouldBe s2new.toMap()
101+
participant.queuedRemoteSourceChanges[2].action shouldBe Add
102+
participant.queuedRemoteSourceChanges[2].sources.toMap() shouldBe
103+
s3.copy().apply { add(s1) }.toMap()
104+
105+
participant.clearQueuedRemoteSourceChanges()
106+
participant.queuedRemoteSourceChanges shouldBe emptyList()
107+
}
108+
}
109+
}
110+
111+
class TestParticipant : AbstractParticipant(LoggerImpl("AbstractParticipantTest")) {
112+
override fun getSources() = ConferenceSourceMap()
113+
override fun isSessionEstablished() = true
114+
}

0 commit comments

Comments
 (0)