@@ -12,31 +12,44 @@ class MockWebSocketChannel extends Mock implements WebSocketChannel {}
1212
1313class MockWebSocketSink extends Mock implements WebSocketSink {}
1414
15+ /// Helper to create a valid handshake state JSON string.
16+ String _handshakeJson ({int keepaliveInterval = 100 }) {
17+ return jsonEncode ({
18+ 'handshake' : 'state' ,
19+ 'timestamp' : 1705322000000 ,
20+ 'keepalive_interval' : keepaliveInterval,
21+ 'registration' : {'status' : 'registered' },
22+ 'lines' : [],
23+ 'user_active_calls' : [],
24+ 'presence_contacts_info' : {},
25+ 'guest_line' : null ,
26+ });
27+ }
28+
1529void main () {
16- group ('WebtritSignalingClient Keepalive Race Condition' , () {
17- late MockWebSocketChannel mockChannel;
18- late MockWebSocketSink mockSink;
19- late StreamController <dynamic > streamController;
20- late WebtritSignalingClient client;
21-
22- setUp (() {
23- mockChannel = MockWebSocketChannel ();
24- mockSink = MockWebSocketSink ();
25- streamController = StreamController <dynamic >();
26-
27- when (() => mockChannel.sink).thenReturn (mockSink);
28- when (() => mockChannel.stream).thenAnswer ((_) => streamController.stream);
29- when (() => mockChannel.closeCode).thenReturn (null );
30- when (() => mockSink.close (any (), any ())).thenAnswer ((_) => Future .value ());
31- });
30+ late MockWebSocketChannel mockChannel;
31+ late MockWebSocketSink mockSink;
32+ late StreamController <dynamic > streamController;
33+
34+ setUp (() {
35+ mockChannel = MockWebSocketChannel ();
36+ mockSink = MockWebSocketSink ();
37+ streamController = StreamController <dynamic >();
38+
39+ when (() => mockChannel.sink).thenReturn (mockSink);
40+ when (() => mockChannel.stream).thenAnswer ((_) => streamController.stream);
41+ when (() => mockChannel.closeCode).thenReturn (null );
42+ when (() => mockSink.close (any (), any ())).thenAnswer ((_) => Future .value ());
43+ });
3244
33- tearDown (() {
34- streamController.close ();
35- });
45+ tearDown (() {
46+ streamController.close ();
47+ });
3648
37- test ('should gracefully terminate keepalive loop without error when timer fires on a closed socket' , () {
49+ group ('Keepalive race condition' , () {
50+ test ('should gracefully stop keepalive loop when timer fires on a closed socket' , () {
3851 fakeAsync ((async ) {
39- client = WebtritSignalingClient .inner (mockChannel);
52+ final client = WebtritSignalingClient .inner (mockChannel);
4053
4154 var errorReported = false ;
4255 Object ? reportedError;
@@ -53,41 +66,128 @@ void main() {
5366
5467 var isPhysicalSocketClosed = false ;
5568
56- // Throws StateError to simulate writing to a closed sink.
57- when (() => mockSink.add (any ())).thenAnswer ((invocation) {
69+ when (() => mockSink.add (any ())).thenAnswer ((_) {
5870 if (isPhysicalSocketClosed) {
59- throw StateError ('Bad state: Cannot add event after closing.' );
71+ throw StateError ('Cannot add event after closing.' );
6072 }
6173 });
6274
63- // Initiates the Keepalive timer (100ms interval).
64- final handshakeData = jsonEncode ({
65- 'handshake' : 'state' ,
66- 'timestamp' : 1705322000000 ,
67- 'keepalive_interval' : 100 ,
68- 'registration' : {'status' : 'registered' },
69- 'lines' : [],
70- 'user_active_calls' : [],
71- 'presence_contacts_info' : {},
72- 'guest_line' : null ,
73- });
74-
75- streamController.add (handshakeData);
75+ streamController.add (_handshakeJson ());
7676 async .flushMicrotasks ();
7777
78- // Simulates a physical connection drop without explicit client disconnection.
79- // The client remains unaware of the transport failure, allowing the timer to persist.
78+ // Simulate physical connection drop.
8079 isPhysicalSocketClosed = true ;
8180
82- // Advances time to trigger the Keepalive timer (100ms + buffer) .
81+ // Advance past the keepalive interval .
8382 async .elapse (const Duration (milliseconds: 200 ));
8483
85- // Confirms the write was attempted (validating the race condition logic executed).
8684 verify (() => mockSink.add (any ())).called (greaterThan (0 ));
85+ expect (errorReported, isFalse, reason: 'Race condition leaked to onError: $reportedError ' );
86+ });
87+ });
88+
89+ test ('should not restart keepalive timer when closeCode is set after successful keepalive' , () {
90+ fakeAsync ((async ) {
91+ final client = WebtritSignalingClient .inner (mockChannel);
92+
93+ String ? capturedTransactionId;
94+ var keepaliveRequestCount = 0 ;
95+
96+ // Capture the transaction ID from outgoing keepalive requests.
97+ when (() => mockSink.add (any ())).thenAnswer ((invocation) {
98+ final data = invocation.positionalArguments[0 ] as String ;
99+ final json = jsonDecode (data) as Map <String , dynamic >;
100+ if (json['handshake' ] == 'keepalive' ) {
101+ capturedTransactionId = json['transaction' ] as String ? ;
102+ keepaliveRequestCount++ ;
103+ }
104+ });
105+
106+ client.listen (onStateHandshake: (_) {}, onEvent: (_) {}, onError: (_, _) {}, onDisconnect: (_, _) {});
107+
108+ // Start keepalive timer via handshake (100ms interval).
109+ streamController.add (_handshakeJson ());
110+ async .flushMicrotasks ();
111+
112+ // First keepalive fires.
113+ async .elapse (const Duration (milliseconds: 110 ));
114+ async .flushMicrotasks ();
115+ expect (keepaliveRequestCount, equals (1 ));
116+
117+ // Respond to first keepalive with matching transaction ID.
118+ streamController.add (jsonEncode ({'handshake' : 'keepalive' , 'transaction' : capturedTransactionId}));
119+ async .flushMicrotasks ();
87120
88- // Ensures the internal exception was handled silently and not reported.
89- expect (errorReported, isFalse, reason: 'Race condition caused an exception to leak to onError: $reportedError ' );
121+ // Simulate socket closed before next keepalive.
122+ when (() => mockChannel.closeCode).thenReturn (1000 );
123+
124+ // Wait well beyond second keepalive interval — timer should not restart.
125+ async .elapse (const Duration (milliseconds: 500 ));
126+ async .flushMicrotasks ();
127+
128+ // Only one keepalive request should have been sent.
129+ expect (keepaliveRequestCount, equals (1 ));
90130 });
91131 });
92132 });
133+
134+ group ('Transaction cleanup on send failure' , () {
135+ test ('should throw WebtritSignalingBadStateException when sink.add throws StateError' , () async {
136+ final client = WebtritSignalingClient .inner (mockChannel);
137+
138+ client.listen (onStateHandshake: (_) {}, onEvent: (_) {}, onError: (_, _) {}, onDisconnect: (_, _) {});
139+
140+ // Initialize client with handshake.
141+ streamController.add (_handshakeJson ());
142+ await Future .delayed (Duration .zero);
143+
144+ // Any sink.add throws StateError (socket broken).
145+ when (() => mockSink.add (any ())).thenThrow (StateError ('Cannot add event after closing.' ));
146+
147+ final request = HangupRequest (transaction: 'test-tx' , line: 0 , callId: 'test-call-id' );
148+
149+ await expectLater (() => client.execute (request), throwsA (isA <WebtritSignalingBadStateException >()));
150+ });
151+
152+ test ('should throw WebtritSignalingBadStateException when closeCode is already set' , () async {
153+ final client = WebtritSignalingClient .inner (mockChannel);
154+
155+ client.listen (onStateHandshake: (_) {}, onEvent: (_) {}, onError: (_, _) {}, onDisconnect: (_, _) {});
156+
157+ // Initialize client with handshake.
158+ streamController.add (_handshakeJson ());
159+ await Future .delayed (Duration .zero);
160+
161+ // Socket already closed.
162+ when (() => mockChannel.closeCode).thenReturn (1000 );
163+
164+ final request = HangupRequest (transaction: 'test-tx' , line: 0 , callId: 'test-call-id' );
165+
166+ await expectLater (() => client.execute (request), throwsA (isA <WebtritSignalingBadStateException >()));
167+ });
168+
169+ test ('should not call sink.add when closeCode is detected before write' , () async {
170+ final client = WebtritSignalingClient .inner (mockChannel);
171+
172+ client.listen (onStateHandshake: (_) {}, onEvent: (_) {}, onError: (_, _) {}, onDisconnect: (_, _) {});
173+
174+ // Initialize client with handshake.
175+ streamController.add (_handshakeJson ());
176+ await Future .delayed (Duration .zero);
177+
178+ // closeCode set — socket already closed.
179+ when (() => mockChannel.closeCode).thenReturn (1000 );
180+
181+ final request = HangupRequest (transaction: 'test-tx' , line: 0 , callId: 'test-call-id' );
182+
183+ try {
184+ await client.execute (request);
185+ } on WebtritSignalingBadStateException {
186+ // Expected.
187+ }
188+
189+ // sink.add should NOT have been called — pre-check caught it.
190+ verifyNever (() => mockSink.add (any ()));
191+ });
192+ });
93193}
0 commit comments