Skip to content

Commit 89fb6ec

Browse files
authored
fix(signaling): guard Transaction against double-complete race (WT-1046) (#1110)
* fix(signaling): guard Transaction against double-complete race (WT-1046) Add _isDone flag and _finish() helper to Transaction so that all three terminal paths (handleResponse, terminateByDisconnect, _onTimeout) are symmetric. Previously only _onTimeout had the isCompleted guard; a late server response arriving after a timeout could call _completer.complete() on an already-completed Completer, throwing StateError and cascading into WebtritSignalingTransactionTimeoutException for all ICE transactions. Add unit tests covering all terminal paths and race scenarios using fake_async for deterministic timer control. * fix(signaling): log warning on duplicate Transaction completion attempt * fix(signaling): address Copilot review — update docs and wrap catchError with unawaited
1 parent f2d64b7 commit 89fb6ec

2 files changed

Lines changed: 193 additions & 3 deletions

File tree

packages/webtrit_signaling/lib/src/transaction.dart

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,25 @@
11
import 'dart:async';
22

3+
import 'package:logging/logging.dart';
4+
35
import 'exceptions.dart';
46

7+
final _logger = Logger('Transaction');
8+
9+
/// Represents a single in-flight signaling request and its expected response.
10+
///
11+
/// A [Transaction] is created for every request sent to the signaling server.
12+
/// It holds a [Completer] that resolves when one of three terminal events occurs:
13+
///
14+
/// - [handleResponse] — the server replied within the timeout window.
15+
/// - [terminateByDisconnect] — the WebSocket closed before a reply arrived.
16+
/// - The internal timeout timer fires (after [timeoutDuration]).
17+
///
18+
/// Only the **first** terminal event takes effect. Subsequent calls to any of
19+
/// the three completion paths are logged at [Level.WARNING] and ignored via
20+
/// the [_isDone] guard, preventing a `StateError: Future already completed`
21+
/// if, for example, a late server response arrives after the timeout has
22+
/// already fired.
523
class Transaction {
624
static int _createCounter = 0;
725

@@ -14,6 +32,10 @@ class Transaction {
1432
final _completer = Completer<Map<String, dynamic>>();
1533
late final Timer _timer;
1634

35+
/// `true` once any terminal path ([handleResponse], [terminateByDisconnect],
36+
/// or timeout) has run. Guards against double-completion of [_completer].
37+
var _isDone = false;
38+
1739
Transaction({required this.signalingClientId, String? id, required Duration timeoutDuration}) {
1840
if (id != null) {
1941
this.id = id;
@@ -28,22 +50,55 @@ class Transaction {
2850

2951
Future<Map<String, dynamic>> get future => _completer.future;
3052

53+
/// Called when the server sends a response matching this transaction's [id].
54+
///
55+
/// Completes [future] with [responseMessage]. No-op if the transaction has
56+
/// already been resolved by a timeout or disconnect.
3157
void handleResponse(Map<String, dynamic> responseMessage) {
32-
_timer.cancel();
58+
if (_isDone) {
59+
_logger.warning(
60+
'$signalingClientId handleResponse called on already-completed transaction $id — ignoring late response',
61+
);
62+
return;
63+
}
64+
_finish();
3365
_completer.complete(responseMessage);
3466
}
3567

68+
/// Called when the WebSocket disconnects before a response is received.
69+
///
70+
/// Completes [future] with a
71+
/// [WebtritSignalingTransactionTerminateByDisconnectException]. No-op if the
72+
/// transaction has already been resolved.
3673
void terminateByDisconnect([int? closeCode, String? closeReason]) {
37-
_timer.cancel();
74+
if (_isDone) {
75+
_logger.warning(
76+
'$signalingClientId terminateByDisconnect called on already-completed transaction $id — ignoring (code: $closeCode reason: $closeReason)',
77+
);
78+
return;
79+
}
80+
_finish();
3881
_completer.completeError(
3982
WebtritSignalingTransactionTerminateByDisconnectException(signalingClientId, id, closeCode, closeReason),
4083
);
4184
}
4285

4386
void _onTimeout() {
44-
if (_completer.isCompleted) {
87+
if (_isDone) {
88+
_logger.warning(
89+
'$signalingClientId _onTimeout fired on already-completed transaction $id — ignoring late timeout',
90+
);
4591
return;
4692
}
93+
_finish();
4794
_completer.completeError(WebtritSignalingTransactionTimeoutException(signalingClientId, id), StackTrace.current);
4895
}
96+
97+
/// Marks the transaction as done and cancels the timeout timer.
98+
///
99+
/// Must be called before completing [_completer] in every terminal path.
100+
void _finish() {
101+
_isDone = true;
102+
_timer.cancel();
103+
}
49104
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import 'dart:async';
2+
3+
import 'package:fake_async/fake_async.dart';
4+
import 'package:test/test.dart';
5+
6+
import 'package:webtrit_signaling/src/exceptions.dart';
7+
import 'package:webtrit_signaling/src/transaction.dart';
8+
9+
void main() {
10+
const clientId = 0;
11+
const timeout = Duration(seconds: 5);
12+
13+
// Helper to suppress unhandled future errors inside fakeAsync blocks.
14+
// catchError must return the same type as the future.
15+
void suppressError(Transaction tx) {
16+
unawaited(tx.future.catchError((_) => <String, dynamic>{}));
17+
}
18+
19+
group('Transaction.handleResponse', () {
20+
test('completes future with response value', () async {
21+
final tx = Transaction(signalingClientId: clientId, timeoutDuration: timeout);
22+
tx.handleResponse({'response': 'ack'});
23+
expect(await tx.future, {'response': 'ack'});
24+
});
25+
26+
test('cancels timeout — timer does not fire after response', () {
27+
fakeAsync((async) {
28+
final tx = Transaction(signalingClientId: clientId, timeoutDuration: timeout);
29+
tx.handleResponse({'response': 'ack'});
30+
31+
Object? error;
32+
unawaited(
33+
tx.future.catchError((e) {
34+
error = e;
35+
return <String, dynamic>{};
36+
}),
37+
);
38+
async.elapse(timeout * 2);
39+
40+
expect(error, isNull);
41+
});
42+
});
43+
44+
test('second call is silently ignored — no StateError', () async {
45+
final tx = Transaction(signalingClientId: clientId, timeoutDuration: timeout);
46+
tx.handleResponse({'first': true});
47+
expect(() => tx.handleResponse({'second': true}), returnsNormally);
48+
expect(await tx.future, {'first': true});
49+
});
50+
51+
test('late call after timeout does not throw StateError', () {
52+
fakeAsync((async) {
53+
final tx = Transaction(signalingClientId: clientId, timeoutDuration: timeout);
54+
suppressError(tx);
55+
async.elapse(timeout);
56+
expect(() => tx.handleResponse({'late': 'response'}), returnsNormally);
57+
});
58+
});
59+
});
60+
61+
group('Transaction.terminateByDisconnect', () {
62+
test('completes future with disconnect error', () {
63+
final tx = Transaction(signalingClientId: clientId, timeoutDuration: timeout);
64+
tx.terminateByDisconnect(1000, 'normal');
65+
expect(tx.future, throwsA(isA<WebtritSignalingTransactionTerminateByDisconnectException>()));
66+
});
67+
68+
test('cancels timeout — timer does not fire after disconnect', () {
69+
fakeAsync((async) {
70+
final tx = Transaction(signalingClientId: clientId, timeoutDuration: timeout);
71+
72+
Object? capturedError;
73+
unawaited(
74+
tx.future.catchError((e) {
75+
capturedError = e;
76+
return <String, dynamic>{};
77+
}),
78+
);
79+
80+
tx.terminateByDisconnect();
81+
async.flushMicrotasks();
82+
async.elapse(timeout * 2);
83+
84+
expect(capturedError, isA<WebtritSignalingTransactionTerminateByDisconnectException>());
85+
});
86+
});
87+
88+
test('late call after timeout does not throw StateError', () {
89+
fakeAsync((async) {
90+
final tx = Transaction(signalingClientId: clientId, timeoutDuration: timeout);
91+
suppressError(tx);
92+
async.elapse(timeout);
93+
expect(() => tx.terminateByDisconnect(1001, 'going away'), returnsNormally);
94+
});
95+
});
96+
});
97+
98+
group('Transaction timeout', () {
99+
test('completes future with timeout error after duration elapses', () {
100+
fakeAsync((async) {
101+
final tx = Transaction(signalingClientId: clientId, timeoutDuration: timeout);
102+
103+
Object? capturedError;
104+
unawaited(
105+
tx.future.catchError((e) {
106+
capturedError = e;
107+
return <String, dynamic>{};
108+
}),
109+
);
110+
111+
expect(capturedError, isNull);
112+
async.elapse(timeout);
113+
expect(capturedError, isA<WebtritSignalingTransactionTimeoutException>());
114+
});
115+
});
116+
117+
test('does not fire if handleResponse called first', () {
118+
fakeAsync((async) {
119+
final tx = Transaction(signalingClientId: clientId, timeoutDuration: timeout);
120+
tx.handleResponse({'ok': true});
121+
122+
Object? error;
123+
unawaited(
124+
tx.future.catchError((e) {
125+
error = e;
126+
return <String, dynamic>{};
127+
}),
128+
);
129+
async.elapse(timeout * 2);
130+
131+
expect(error, isNull);
132+
});
133+
});
134+
});
135+
}

0 commit comments

Comments
 (0)