Skip to content

Commit 444fb51

Browse files
committed
refactor: extract RenegotiationHandler with stable-state and concurrency guards
- Extract renegotiation logic from CallBloc into a standalone RenegotiationHandler class - Add two stable-state guards: pre-offer check and TOCTOU guard after createOffer - Add _isHandling/_pendingRetry flags to serialize concurrent onRenegotiationNeeded firings - Catch WebtritSignalingErrorException for server-side error logging without swallowing - Catch plain String errors (flutter_webrtc native) separately from Dart exceptions - Add unit tests covering stable-state skip, concurrency serialization, and error paths - Document server-mediated vs P2P topology constraints and Perfect Negotiation limitation
1 parent 9a12065 commit 444fb51

3 files changed

Lines changed: 356 additions & 0 deletions

File tree

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import 'dart:async';
2+
3+
import 'package:flutter_webrtc/flutter_webrtc.dart';
4+
import 'package:logging/logging.dart';
5+
import 'package:webtrit_signaling/webtrit_signaling.dart';
6+
import 'call_error_reporter.dart';
7+
import 'sdp_munger.dart';
8+
9+
final _logger = Logger('RenegotiationHandler');
10+
11+
/// Callback responsible for sending the renegotiation offer to the remote peer
12+
/// via the signaling channel. The caller constructs the transport-specific
13+
/// request; this handler stays decoupled from the signaling layer.
14+
typedef RenegotiationExecutor = Future<void> Function(String callId, int? lineId, RTCSessionDescription jsep);
15+
16+
/// Handles WebRTC renegotiation triggered by [RTCPeerConnection.onRenegotiationNeeded].
17+
///
18+
/// ## Architecture constraints
19+
///
20+
/// This handler is designed for a **server-mediated** topology (e.g. Janus SFU)
21+
/// where offer/answer exchanges are serialised by the media server.
22+
/// Glare (simultaneous offers from both peers) cannot occur in this topology,
23+
/// so the simplified skip-on-non-stable strategy is sufficient and safe.
24+
///
25+
/// **P2P note:** in a direct peer-to-peer topology, simultaneous offers from
26+
/// both sides are possible. The current skip logic would silently drop one of
27+
/// the offers. Full [Perfect Negotiation](https://www.w3.org/TR/webrtc/#perfect-negotiation-example)
28+
/// with rollback must be implemented before removing the media server.
29+
///
30+
/// ## Stable-state guards
31+
///
32+
/// Two checks enforce the RTCPeerConnection state machine rules
33+
/// (RFC 8829 §4, W3C WebRTC §4.7):
34+
///
35+
/// 1. **Before `createOffer`** — skips if the current signaling state is not
36+
/// `stable`. In a server-mediated call the skipped renegotiation is safe:
37+
/// if the event was triggered by an incoming remote offer
38+
/// (e.g. [CalleeVideoOfferPolicy.includeInactiveTrack]), the pending track
39+
/// will already be included in the answer; if it was triggered by a genuine
40+
/// local change, libwebrtc will re-fire [onRenegotiationNeeded] once the
41+
/// peer connection returns to `stable`.
42+
///
43+
/// 2. **After `createOffer` (TOCTOU guard)** — re-checks the state before
44+
/// calling `setLocalDescription`. The `await createOffer()` yields control
45+
/// to the event loop; a concurrent native callback may update the Dart-side
46+
/// cached [RTCPeerConnection.signalingState] in that gap. If the check misses
47+
/// a concurrent state change (stale cache), the [on String catch] below is the
48+
/// authoritative fallback.
49+
///
50+
/// ## Concurrency guard
51+
///
52+
/// [onRenegotiationNeeded] can fire multiple times in rapid succession (e.g.
53+
/// libwebrtc re-fires it after a glare-rollback). Because [handle] is called
54+
/// with `unawaited`, two concurrent cycles can overlap and both pass the
55+
/// stable-state guard simultaneously — resulting in two competing offers and
56+
/// a new glare condition.
57+
///
58+
/// The [_isHandling] flag serialises concurrent calls. If a cycle is already
59+
/// in progress the new invocation sets [_pendingRetry] and returns immediately.
60+
/// When the active cycle finishes it checks [_pendingRetry] and re-runs [handle]
61+
/// with the latest parameters — ensuring no renegotiation is silently lost even
62+
/// when libwebrtc does not re-fire [onRenegotiationNeeded] after the first cycle.
63+
class RenegotiationHandler {
64+
RenegotiationHandler({required this.callErrorReporter, this.sdpMunger});
65+
66+
final CallErrorReporter callErrorReporter;
67+
final SDPMunger? sdpMunger;
68+
69+
bool _isHandling = false;
70+
bool _pendingRetry = false;
71+
72+
/// Executes a renegotiation cycle for [callId] on [peerConnection].
73+
///
74+
/// Serialises concurrent invocations via [_isHandling]/[_pendingRetry] (see
75+
/// class-level doc). Skips silently when the signaling state is not `stable`.
76+
/// Transient wrong-state errors from [RTCPeerConnection.setLocalDescription]
77+
/// are logged at WARNING and do not escalate to [callErrorReporter].
78+
/// All other errors are forwarded to [callErrorReporter].
79+
Future<void> handle(
80+
String callId,
81+
int? lineId,
82+
RTCPeerConnection peerConnection,
83+
RenegotiationExecutor execute,
84+
) async {
85+
if (_isHandling) {
86+
_logger.fine(() => 'onRenegotiationNeeded: queued retry — already handling a renegotiation cycle for $callId');
87+
_pendingRetry = true;
88+
return;
89+
}
90+
_isHandling = true;
91+
_pendingRetry = false;
92+
try {
93+
final stateBeforeOffer = peerConnection.signalingState;
94+
_logger.fine(() => 'onRenegotiationNeeded signalingState: $stateBeforeOffer');
95+
if (stateBeforeOffer != RTCSignalingState.RTCSignalingStateStable) {
96+
_logger.fine(() => 'onRenegotiationNeeded skipped: not in stable state ($stateBeforeOffer)');
97+
return;
98+
}
99+
100+
final localDescription = await peerConnection.createOffer({});
101+
sdpMunger?.apply(localDescription);
102+
_logger.info(() => 'onRenegotiationNeeded offer SDP (callId=$callId):\n${localDescription.sdp}');
103+
104+
final stateAfterOffer = peerConnection.signalingState;
105+
if (stateAfterOffer != RTCSignalingState.RTCSignalingStateStable) {
106+
_logger.fine(
107+
() =>
108+
'onRenegotiationNeeded: state changed to $stateAfterOffer after createOffer, skipping setLocalDescription',
109+
);
110+
return;
111+
}
112+
113+
// According to RFC 8829 5.6 (https://datatracker.ietf.org/doc/html/rfc8829#section-5.6),
114+
// localDescription should be set before sending the offer to transition into have-local-offer state.
115+
await peerConnection.setLocalDescription(localDescription);
116+
117+
await execute(callId, lineId, localDescription);
118+
} on WebtritSignalingErrorException catch (e) {
119+
_logger.warning(
120+
() => 'onRenegotiationNeeded: UpdateRequest rejected by server (callId=$callId, lineId=$lineId): $e',
121+
);
122+
callErrorReporter.handle(e, null, 'RenegotiationHandler.handle error (callId=$callId, lineId=$lineId)');
123+
} on String catch (e) {
124+
// flutter_webrtc surfaces native errors as plain strings. A "wrong state" failure
125+
// on setLocalDescription means a concurrent setRemoteDescription (e.g. from an
126+
// incoming updating_call) moved the PC out of stable between the TOCTOU guard and
127+
// the setLocalDescription call. This is a transient race — libwebrtc keeps the
128+
// [[NegotiationNeeded]] flag set and will re-fire onRenegotiationNeeded once the
129+
// PC returns to stable. No user notification is needed.
130+
if (e.contains('wrong state') || e.contains('have-remote-offer') || e.contains('have-local-offer')) {
131+
_logger.warning(
132+
() =>
133+
'onRenegotiationNeeded: setLocalDescription failed in wrong state ($e) '
134+
'— libwebrtc will re-fire onRenegotiationNeeded when stable',
135+
);
136+
} else {
137+
callErrorReporter.handle(e, null, 'RenegotiationHandler.handle error (callId=$callId, lineId=$lineId)');
138+
}
139+
} catch (e, s) {
140+
callErrorReporter.handle(e, s, 'RenegotiationHandler.handle error (callId=$callId, lineId=$lineId)');
141+
} finally {
142+
_isHandling = false;
143+
if (_pendingRetry) {
144+
_pendingRetry = false;
145+
_logger.fine(() => 'onRenegotiationNeeded: running pending retry for $callId');
146+
unawaited(handle(callId, lineId, peerConnection, execute));
147+
}
148+
}
149+
}
150+
}

lib/features/call/utils/utils.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export 'ice_filter.dart';
77
export 'logging_rtp_traffic_monitor_delegate.dart';
88
export 'peer_connection_factory.dart';
99
export 'peer_connection_manager.dart';
10+
export 'renegotiation_handler.dart';
1011
export 'peer_connection_policy_applier.dart';
1112
export 'rtp_traffic_monitor.dart';
1213
export 'sdp_mod_builder.dart';
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import 'package:flutter_test/flutter_test.dart';
2+
import 'package:flutter_webrtc/flutter_webrtc.dart';
3+
import 'package:mocktail/mocktail.dart';
4+
5+
import 'package:webtrit_phone/features/call/utils/call_error_reporter.dart';
6+
import 'package:webtrit_phone/features/call/utils/renegotiation_handler.dart';
7+
import 'package:webtrit_phone/features/call/utils/sdp_munger.dart';
8+
9+
class MockRTCPeerConnection extends Mock implements RTCPeerConnection {}
10+
11+
class MockCallErrorReporter extends Mock implements CallErrorReporter {}
12+
13+
class MockSDPMunger extends Mock implements SDPMunger {}
14+
15+
void main() {
16+
late MockRTCPeerConnection mockPC;
17+
late MockCallErrorReporter mockErrorReporter;
18+
late MockSDPMunger mockMunger;
19+
late RenegotiationHandler handler;
20+
21+
const kCallId = 'call-1';
22+
const kLineId = 0;
23+
final kOffer = RTCSessionDescription('v=0\r\n', 'offer');
24+
25+
setUpAll(() {
26+
registerFallbackValue(RTCSessionDescription('', ''));
27+
registerFallbackValue(<String, dynamic>{});
28+
});
29+
30+
setUp(() {
31+
mockPC = MockRTCPeerConnection();
32+
mockErrorReporter = MockCallErrorReporter();
33+
mockMunger = MockSDPMunger();
34+
});
35+
36+
group('RenegotiationHandler — state guard (before offer)', () {
37+
test('skips entirely when state is have-remote-offer', () async {
38+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter);
39+
when(() => mockPC.signalingState).thenReturn(RTCSignalingState.RTCSignalingStateHaveRemoteOffer);
40+
41+
var executeCalled = false;
42+
await handler.handle(kCallId, kLineId, mockPC, (_, _, _) async => executeCalled = true);
43+
44+
verifyNever(() => mockPC.createOffer(any()));
45+
expect(executeCalled, isFalse);
46+
});
47+
48+
test('skips entirely when state is have-local-offer', () async {
49+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter);
50+
when(() => mockPC.signalingState).thenReturn(RTCSignalingState.RTCSignalingStateHaveLocalOffer);
51+
52+
var executeCalled = false;
53+
await handler.handle(kCallId, kLineId, mockPC, (_, _, _) async => executeCalled = true);
54+
55+
verifyNever(() => mockPC.createOffer(any()));
56+
expect(executeCalled, isFalse);
57+
});
58+
59+
test('skips entirely when state is closed', () async {
60+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter);
61+
when(() => mockPC.signalingState).thenReturn(RTCSignalingState.RTCSignalingStateClosed);
62+
63+
var executeCalled = false;
64+
await handler.handle(kCallId, kLineId, mockPC, (_, _, _) async => executeCalled = true);
65+
66+
verifyNever(() => mockPC.createOffer(any()));
67+
expect(executeCalled, isFalse);
68+
});
69+
70+
test('skips entirely when state is null', () async {
71+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter);
72+
when(() => mockPC.signalingState).thenReturn(null);
73+
74+
var executeCalled = false;
75+
await handler.handle(kCallId, kLineId, mockPC, (_, _, _) async => executeCalled = true);
76+
77+
verifyNever(() => mockPC.createOffer(any()));
78+
expect(executeCalled, isFalse);
79+
});
80+
});
81+
82+
group('RenegotiationHandler — TOCTOU guard (after offer)', () {
83+
test('skips setLocalDescription when state changed to have-remote-offer after createOffer', () async {
84+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter);
85+
86+
var callCount = 0;
87+
when(() => mockPC.signalingState).thenAnswer(
88+
(_) => callCount++ == 0
89+
? RTCSignalingState.RTCSignalingStateStable
90+
: RTCSignalingState.RTCSignalingStateHaveRemoteOffer,
91+
);
92+
when(() => mockPC.createOffer(any())).thenAnswer((_) async => kOffer);
93+
94+
var executeCalled = false;
95+
await handler.handle(kCallId, kLineId, mockPC, (_, _, _) async => executeCalled = true);
96+
97+
verifyNever(() => mockPC.setLocalDescription(any()));
98+
expect(executeCalled, isFalse);
99+
});
100+
});
101+
102+
group('RenegotiationHandler — happy path', () {
103+
setUp(() {
104+
var callCount = 0;
105+
when(
106+
() => mockPC.signalingState,
107+
).thenAnswer((_) => callCount++ == 0 ? RTCSignalingState.RTCSignalingStateStable : null);
108+
when(() => mockPC.createOffer(any())).thenAnswer((_) async => kOffer);
109+
when(() => mockPC.setLocalDescription(any())).thenAnswer((_) async {});
110+
});
111+
112+
test('calls setLocalDescription and execute when both states are stable', () async {
113+
when(() => mockPC.signalingState).thenReturn(RTCSignalingState.RTCSignalingStateStable);
114+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter);
115+
116+
RTCSessionDescription? capturedJsep;
117+
String? capturedCallId;
118+
int? capturedLineId;
119+
120+
await handler.handle(kCallId, kLineId, mockPC, (callId, lineId, jsep) async {
121+
capturedCallId = callId;
122+
capturedLineId = lineId;
123+
capturedJsep = jsep;
124+
});
125+
126+
verify(() => mockPC.setLocalDescription(kOffer)).called(1);
127+
expect(capturedCallId, kCallId);
128+
expect(capturedLineId, kLineId);
129+
expect(capturedJsep, kOffer);
130+
});
131+
132+
test('applies sdpMunger before setLocalDescription', () async {
133+
when(() => mockPC.signalingState).thenReturn(RTCSignalingState.RTCSignalingStateStable);
134+
when(() => mockMunger.apply(any())).thenReturn(null);
135+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter, sdpMunger: mockMunger);
136+
137+
await handler.handle(kCallId, kLineId, mockPC, (_, _, _) async {});
138+
139+
verify(() => mockMunger.apply(kOffer)).called(1);
140+
verify(() => mockPC.setLocalDescription(kOffer)).called(1);
141+
});
142+
143+
test('does not call sdpMunger when it is null', () async {
144+
when(() => mockPC.signalingState).thenReturn(RTCSignalingState.RTCSignalingStateStable);
145+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter);
146+
147+
await handler.handle(kCallId, kLineId, mockPC, (_, _, _) async {});
148+
149+
verifyNever(() => mockMunger.apply(any()));
150+
});
151+
});
152+
153+
group('RenegotiationHandler — execute error handling', () {
154+
setUp(() {
155+
when(() => mockErrorReporter.handle(any(), any(), any())).thenReturn(null);
156+
});
157+
158+
test('reports error via callErrorReporter when execute throws', () async {
159+
when(() => mockPC.signalingState).thenReturn(RTCSignalingState.RTCSignalingStateStable);
160+
when(() => mockPC.createOffer(any())).thenAnswer((_) async => kOffer);
161+
when(() => mockPC.setLocalDescription(any())).thenAnswer((_) async {});
162+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter);
163+
final exception = Exception('signaling error');
164+
165+
await handler.handle(kCallId, kLineId, mockPC, (_, _, _) async => throw exception);
166+
167+
verify(() => mockErrorReporter.handle(exception, any(), any())).called(1);
168+
});
169+
170+
test('reports error via callErrorReporter when createOffer throws', () async {
171+
when(() => mockPC.signalingState).thenReturn(RTCSignalingState.RTCSignalingStateStable);
172+
final exception = Exception('createOffer error');
173+
when(() => mockPC.createOffer(any())).thenThrow(exception);
174+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter);
175+
176+
await handler.handle(kCallId, kLineId, mockPC, (_, _, _) async {});
177+
178+
verify(() => mockErrorReporter.handle(exception, any(), any())).called(1);
179+
});
180+
181+
test('reports error via callErrorReporter when setLocalDescription throws', () async {
182+
when(() => mockPC.signalingState).thenReturn(RTCSignalingState.RTCSignalingStateStable);
183+
when(() => mockPC.createOffer(any())).thenAnswer((_) async => kOffer);
184+
final exception = Exception('setLocalDescription error');
185+
when(() => mockPC.setLocalDescription(any())).thenThrow(exception);
186+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter);
187+
188+
await handler.handle(kCallId, kLineId, mockPC, (_, _, _) async {});
189+
190+
verify(() => mockErrorReporter.handle(exception, any(), any())).called(1);
191+
});
192+
193+
test('does not rethrow when execute throws', () async {
194+
when(() => mockPC.signalingState).thenReturn(RTCSignalingState.RTCSignalingStateStable);
195+
when(() => mockPC.createOffer(any())).thenAnswer((_) async => kOffer);
196+
when(() => mockPC.setLocalDescription(any())).thenAnswer((_) async {});
197+
handler = RenegotiationHandler(callErrorReporter: mockErrorReporter);
198+
199+
await expectLater(
200+
handler.handle(kCallId, kLineId, mockPC, (_, _, _) async => throw Exception('error')),
201+
completes,
202+
);
203+
});
204+
});
205+
}

0 commit comments

Comments
 (0)