Skip to content

Commit c2f221d

Browse files
authored
fix: Handle Jibri errors on initial request (#603)
* add addInstance helper method * handle 'transient' XMPP error from a jibri * retry a jibri session in the event of an initial request failure * add jibrisession unit test * add jibridetector unit test
1 parent 5936ddb commit c2f221d

File tree

4 files changed

+216
-5
lines changed

4 files changed

+216
-5
lines changed

src/main/java/org/jitsi/jicofo/recording/jibri/JibriSession.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,8 +372,15 @@ private void startInternal()
372372
catch (Exception e)
373373
{
374374
logger.error("Failed to send start Jibri IQ: " + e, e);
375-
376-
throw new StartException(StartException.INTERNAL_SERVER_ERROR);
375+
jibriDetector.memberHadTransientError(jibriJid);
376+
if (!maxRetriesExceeded())
377+
{
378+
retryRequestWithAnotherJibri();
379+
}
380+
else
381+
{
382+
throw new StartException(StartException.INTERNAL_SERVER_ERROR);
383+
}
377384
}
378385
}
379386

src/main/java/org/jitsi/jicofo/xmpp/BaseBrewery.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,25 @@ synchronized public void chatRoomPropertyChanged(
265265
processMemberPresence(member);
266266
}
267267

268+
/**
269+
* Notify this {@link BaseBrewery} that the member with jid {@code member} experienced
270+
* an error which is expected to be transient.
271+
*
272+
* @param member the {@link Jid} of the member who experienced the error
273+
*/
274+
synchronized public void memberHadTransientError(Jid member)
275+
{
276+
BrewInstance instance = find(member);
277+
if (instance != null)
278+
{
279+
logger.info("Jid member " + member + " had a transient error, moving to the back" +
280+
"of the queue");
281+
// Move the instance to the back of the list
282+
removeInstance(instance);
283+
addInstance(instance);
284+
}
285+
}
286+
268287
/**
269288
* Process chat room member status changed. Extract the appropriate
270289
* presence extension and use it to process it further.
@@ -325,9 +344,7 @@ protected void processInstanceStatusChanged(
325344
if (instance == null)
326345
{
327346
instance = new BrewInstance(jid, extension);
328-
instances.add(instance);
329-
330-
logger.info("Added brewery instance: " + jid);
347+
addInstance(instance);
331348
}
332349
else
333350
{
@@ -373,6 +390,13 @@ private BrewInstance find(Jid jid)
373390
.orElse(null);
374391
}
375392

393+
private void addInstance(BrewInstance i)
394+
{
395+
instances.add(i);
396+
397+
logger.info("Added brewery instance: " + i.jid);
398+
}
399+
376400
/**
377401
* Removes an instance from the cache and notifies that instance is going
378402
* offline.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package org.jitsi.jicofo.recording.jibri
2+
3+
import io.kotest.core.spec.style.ShouldSpec
4+
import io.kotest.matchers.shouldBe
5+
import io.mockk.every
6+
import io.mockk.mockk
7+
import net.java.sip.communicator.service.protocol.event.ChatRoomMemberPresenceChangeEvent
8+
import org.jitsi.protocol.xmpp.XmppChatMember
9+
import org.jitsi.xmpp.extensions.health.HealthStatusPacketExt
10+
import org.jitsi.xmpp.extensions.jibri.JibriBusyStatusPacketExt
11+
import org.jitsi.xmpp.extensions.jibri.JibriStatusPacketExt
12+
import org.jivesoftware.smack.packet.ExtensionElement
13+
import org.jivesoftware.smack.packet.Presence
14+
import org.jxmpp.jid.EntityFullJid
15+
import org.jxmpp.jid.impl.JidCreate
16+
17+
class JibriDetectorTest : ShouldSpec({
18+
val detector = JibriDetector(mockk(), "brewery_name", false)
19+
val jibriJids = listOf(
20+
JidCreate.entityFullFrom("[email protected]/nick"),
21+
JidCreate.entityFullFrom("[email protected]/nick")
22+
)
23+
24+
jibriJids
25+
.map { createJibriMember(it) }
26+
.forEach {
27+
detector.memberPresenceChanged(ChatRoomMemberPresenceChangeEvent(
28+
mockk(),
29+
it,
30+
ChatRoomMemberPresenceChangeEvent.MEMBER_JOINED,
31+
"reason"
32+
))
33+
}
34+
35+
context("When selecting a Jibri, JibriDetector") {
36+
should("pick the first one from the list") {
37+
detector.selectJibri() shouldBe jibriJids[0]
38+
}
39+
context("and a jibri has had a transient error") {
40+
detector.memberHadTransientError(jibriJids[0])
41+
should("pick the next one") {
42+
detector.selectJibri() shouldBe jibriJids[1]
43+
}
44+
context("and the next member has a transient error") {
45+
detector.memberHadTransientError(jibriJids[1])
46+
should("pick the next one") {
47+
// We will have rolled around to the first Jibri again here
48+
detector.selectJibri() shouldBe jibriJids[0]
49+
}
50+
}
51+
}
52+
}
53+
54+
})
55+
56+
private fun createJibriMember(jid: EntityFullJid): XmppChatMember {
57+
return mockk {
58+
every { occupantJid } returns jid
59+
every { presence } returns mockk<Presence> {
60+
every { getExtension<ExtensionElement>(JibriStatusPacketExt.ELEMENT_NAME, JibriStatusPacketExt.NAMESPACE)} answers {
61+
JibriStatusPacketExt().apply {
62+
healthStatus = HealthStatusPacketExt().apply { status = HealthStatusPacketExt.Health.HEALTHY }
63+
busyStatus = JibriBusyStatusPacketExt().apply { setAttribute("status", "idle") }
64+
}
65+
}
66+
}
67+
}
68+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package org.jitsi.jicofo.recording.jibri
2+
3+
import io.kotest.assertions.throwables.shouldThrow
4+
import io.kotest.core.spec.IsolationMode
5+
import io.kotest.core.spec.style.ShouldSpec
6+
import io.kotest.matchers.collections.shouldHaveSize
7+
import io.kotest.matchers.shouldNotBe
8+
import io.mockk.every
9+
import io.mockk.mockk
10+
import io.mockk.mockkStatic
11+
import io.mockk.spyk
12+
import io.mockk.verify
13+
import org.jitsi.eventadmin.EventAdmin
14+
import org.jitsi.impl.osgi.framework.BundleContextImpl
15+
import org.jitsi.jicofo.FocusBundleActivator
16+
import org.jitsi.osgi.ServiceUtils2
17+
import org.jitsi.protocol.xmpp.XmppConnection
18+
import org.jitsi.test.concurrent.FakeScheduledExecutorService
19+
import org.jitsi.utils.logging.Logger
20+
import org.jitsi.xmpp.extensions.jibri.JibriIq
21+
import org.jivesoftware.smack.packet.IQ
22+
import org.jivesoftware.smack.packet.XMPPError
23+
import org.jxmpp.jid.impl.JidCreate
24+
25+
class JibriSessionTest : ShouldSpec({
26+
isolationMode = IsolationMode.InstancePerLeaf
27+
28+
mockkStatic(ServiceUtils2::class)
29+
every { ServiceUtils2.getService(any(), EventAdmin::class.java)} returns mockk(relaxed = true)
30+
val bundleContext: BundleContextImpl = mockk(relaxed = true)
31+
val owner: JibriSession.Owner = mockk(relaxed = true)
32+
val roomName = JidCreate.entityBareFrom("[email protected]/baz")
33+
val initiator = JidCreate.bareFrom("[email protected]/baz")
34+
val pendingTimeout = 60L
35+
val maxNumRetries = 2
36+
val xmppConnection: XmppConnection = mockk()
37+
val executor: FakeScheduledExecutorService = spyk()
38+
val jibriList = mutableListOf(
39+
JidCreate.bareFrom("[email protected]"),
40+
JidCreate.bareFrom("[email protected]"),
41+
JidCreate.bareFrom("[email protected]")
42+
)
43+
val detector: JibriDetector = mockk {
44+
every { selectJibri() } returnsMany(jibriList)
45+
every { isAnyInstanceConnected } returns true
46+
every { memberHadTransientError(any()) } answers {
47+
// Simulate the real JibriDetector logic and put the Jibri at the back of the list
48+
jibriList.remove(arg(0))
49+
jibriList.add(arg(0))
50+
}
51+
}
52+
val logger: Logger = mockk(relaxed = true)
53+
54+
val jibriSession = JibriSession(
55+
bundleContext,
56+
owner,
57+
roomName,
58+
initiator,
59+
pendingTimeout,
60+
maxNumRetries,
61+
xmppConnection,
62+
executor,
63+
detector,
64+
false /* isSIP */,
65+
null /* sipAddress */,
66+
"displayName",
67+
"streamID",
68+
"youTubeBroadcastId",
69+
"sessionId",
70+
"applicationData",
71+
logger
72+
)
73+
74+
FocusBundleActivator.bundleContext = bundleContext
75+
76+
context("When sending a request to a Jibri to start a session throws an error") {
77+
val iqRequests = mutableListOf<IQ>()
78+
every { xmppConnection.sendPacketAndGetReply(capture(iqRequests)) } answers {
79+
// First return error
80+
IQ.createErrorResponse(arg(0), XMPPError.Condition.service_unavailable)
81+
} andThen {
82+
// Then return a successful response
83+
JibriIq().apply {
84+
status = JibriIq.Status.PENDING
85+
from = (arg(0) as IQ).to
86+
}
87+
}
88+
context("Trying to start a Jibri session") {
89+
should("retry with another jibri") {
90+
jibriSession.start()
91+
verify(exactly = 2) { xmppConnection.sendPacketAndGetReply(any()) }
92+
iqRequests shouldHaveSize 2
93+
iqRequests[0].to shouldNotBe iqRequests[1].to
94+
}
95+
}
96+
context("and that's the only Jibri") {
97+
every { detector.selectJibri() } returns JidCreate.bareFrom("[email protected]")
98+
every { xmppConnection.sendPacketAndGetReply(capture(iqRequests)) } answers {
99+
// First return error
100+
IQ.createErrorResponse(arg(0), XMPPError.Condition.service_unavailable)
101+
}
102+
context("trying to start a jibri session") {
103+
should("give up after exceeding the retry count") {
104+
shouldThrow<JibriSession.StartException> {
105+
jibriSession.start()
106+
}
107+
verify(exactly = maxNumRetries + 1) { xmppConnection.sendPacketAndGetReply(any())}
108+
}
109+
}
110+
}
111+
}
112+
})

0 commit comments

Comments
 (0)