Skip to content

Commit e996731

Browse files
committed
fix: prevent keepalive write-after-close and add regression test
1 parent 5e85c88 commit e996731

4 files changed

Lines changed: 128 additions & 2 deletions

File tree

packages/webtrit_signaling/lib/src/exceptions.dart

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,15 @@ class WebtritSignalingTransactionTimeoutException extends WebtritSignalingTransa
5252
const WebtritSignalingTransactionTimeoutException(super.id, super.transactionId);
5353
}
5454

55+
class WebtritSignalingBadStateException extends WebtritSignalingException {
56+
const WebtritSignalingBadStateException(super.id, this.error);
57+
58+
final StateError error;
59+
60+
@override
61+
String toString() => '${super.toString()}, stateError: $error';
62+
}
63+
5564
class WebtritSignalingKeepaliveTransactionTimeoutException extends WebtritSignalingTransactionTimeoutException {
5665
const WebtritSignalingKeepaliveTransactionTimeoutException(super.id, super.transactionId);
5766
}

packages/webtrit_signaling/lib/src/webtrit_signaling_client.dart

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,15 @@ class WebtritSignalingClient {
271271
void _addData(dynamic data) {
272272
_logger.fine(() => '_addData add: $data');
273273

274-
_wsc.sink.add(data);
274+
try {
275+
if (_wsc.closeCode != null) {
276+
throw StateError('Cannot add event after closing (detected via closeCode)');
277+
}
278+
279+
_wsc.sink.add(data);
280+
} on StateError catch (e) {
281+
throw WebtritSignalingBadStateException(_id, e);
282+
}
275283
}
276284

277285
void _cleanupTransactions(int? code, String? reason) {
@@ -301,9 +309,23 @@ class WebtritSignalingClient {
301309
final elapsed = await _executeKeepaliveTransaction(defaultExecuteTransactionTimeoutDuration);
302310
_logger.finest('handshake keepalive latency: $elapsed');
303311

304-
_startKeepaliveTimer();
312+
if (_wsc.closeCode == null) {
313+
_startKeepaliveTimer();
314+
}
305315
} on WebtritSignalingTransactionTimeoutException catch (e, stackTrace) {
306316
_onError(WebtritSignalingKeepaliveTransactionTimeoutException(e.id, e.transactionId), stackTrace);
317+
} on WebtritSignalingBadStateException {
318+
_logger.fine('Keepalive stopped gracefully due to closed socket.');
319+
// Catches the specific exception thrown when attempting to write to a closed socket.
320+
// This indicates a race condition where the Keepalive timer triggered shortly after
321+
// the socket was closed but before the timer could be cancelled.
322+
//
323+
// Since the socket is already in a terminal state, the standard disconnection
324+
// handlers (onDone/onDisconnect) are responsible for the lifecycle management.
325+
// Reporting this as an error would be redundant and potentially misleading.
326+
//
327+
// Gracefully terminate the recursive Keepalive loop here.
328+
return;
307329
} catch (error, stackTrace) {
308330
_onError(error, stackTrace);
309331
}

packages/webtrit_signaling/pubspec.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,5 @@ dependencies:
1919
dev_dependencies:
2020
lints: ^6.0.0
2121
test: ^1.26.3
22+
mocktail: ^1.0.4
23+
fake_async: ^1.3.1
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import 'dart:async';
2+
import 'dart:convert';
3+
4+
import 'package:fake_async/fake_async.dart';
5+
import 'package:mocktail/mocktail.dart';
6+
import 'package:test/test.dart';
7+
import 'package:web_socket_channel/web_socket_channel.dart';
8+
9+
import 'package:webtrit_signaling/webtrit_signaling.dart';
10+
11+
class MockWebSocketChannel extends Mock implements WebSocketChannel {}
12+
13+
class MockWebSocketSink extends Mock implements WebSocketSink {}
14+
15+
void main() {
16+
group('WebtritSignalingClient Keepalive Race Condition', () {
17+
late MockWebSocketChannel mockChannel;
18+
late MockWebSocketSink mockSink;
19+
late StreamController<dynamic> streamController;
20+
late WebtritSignalingClient client;
21+
22+
setUp(() {
23+
mockChannel = MockWebSocketChannel();
24+
mockSink = MockWebSocketSink();
25+
streamController = StreamController<dynamic>();
26+
27+
when(() => mockChannel.sink).thenReturn(mockSink);
28+
when(() => mockChannel.stream).thenAnswer((_) => streamController.stream);
29+
when(() => mockChannel.closeCode).thenReturn(null);
30+
when(() => mockSink.close(any(), any())).thenAnswer((_) => Future.value());
31+
});
32+
33+
tearDown(() {
34+
streamController.close();
35+
});
36+
37+
test('should gracefully terminate keepalive loop without error when timer fires on a closed socket', () {
38+
fakeAsync((async) {
39+
client = WebtritSignalingClient.inner(mockChannel);
40+
41+
var errorReported = false;
42+
Object? reportedError;
43+
44+
client.listen(
45+
onStateHandshake: (_) {},
46+
onEvent: (_) {},
47+
onError: (e, s) {
48+
errorReported = true;
49+
reportedError = e;
50+
},
51+
onDisconnect: (_, _) {},
52+
);
53+
54+
var isPhysicalSocketClosed = false;
55+
56+
// Throws StateError to simulate writing to a closed sink.
57+
when(() => mockSink.add(any())).thenAnswer((invocation) {
58+
if (isPhysicalSocketClosed) {
59+
throw StateError('Bad state: Cannot add event after closing.');
60+
}
61+
});
62+
63+
// Initiates the Keepalive timer (100ms interval).
64+
final handshakeData = jsonEncode({
65+
'handshake': 'state',
66+
'timestamp': 1705322000000,
67+
'keepalive_interval': 100,
68+
'registration': {'status': 'registered'},
69+
'lines': [],
70+
'user_active_calls': [],
71+
'presence_contacts_info': {},
72+
'guest_line': null,
73+
});
74+
75+
streamController.add(handshakeData);
76+
async.flushMicrotasks();
77+
78+
// Simulates a physical connection drop without explicit client disconnection.
79+
// The client remains unaware of the transport failure, allowing the timer to persist.
80+
isPhysicalSocketClosed = true;
81+
82+
// Advances time to trigger the Keepalive timer (100ms + buffer).
83+
async.elapse(const Duration(milliseconds: 200));
84+
85+
// Confirms the write was attempted (validating the race condition logic executed).
86+
verify(() => mockSink.add(any())).called(greaterThan(0));
87+
88+
// Ensures the internal exception was handled silently and not reported.
89+
expect(errorReported, isFalse, reason: 'Race condition caused an exception to leak to onError: $reportedError');
90+
});
91+
});
92+
});
93+
}

0 commit comments

Comments
 (0)