Skip to content

Commit e15bd6b

Browse files
authored
fix(signaling): cancel in-flight connect on disconnect (#1093)
* fix(signaling): cancel in-flight connect on disconnect via generation counter disconnect() returned early when _client==null (connect still awaiting factory), leaving _connecting=true and the in-flight _connectAsync running. The next connect() was silently dropped, and if the stale factory eventually resolved, the module set _client and emitted SignalingConnected even though disconnect() had been called. Root cause: the guard in disconnect() (`if (client == null) return`) prevented it from resetting _connecting or invalidating the async task. Fix: introduce a monotonically increasing _generation counter. - connect() captures the current generation before starting _connectAsync - disconnect() increments _generation and resets _connecting=false - _connectAsync checks its generation at every suspension point; if it no longer matches, it cleans up its client without emitting events - the finally block only resets _connecting for the current generation Adds four tests reproducing the race: two document the broken behavior (expected to pass after the fix), two verify correct behavior. * fix(signaling): add diagnostic logs for cancelled in-flight connect * refactor(signaling): replace generation counter with Object identity token * fix(signaling): address Copilot review comments * fix(signaling): restore library directive to silence dangling doc comment warning
1 parent 07b3a66 commit e15bd6b

2 files changed

Lines changed: 241 additions & 12 deletions

File tree

packages/webtrit_signaling_service/webtrit_signaling_service_platform_interface/lib/src/signaling_module_impl.dart

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,14 @@ class SignalingModuleImpl implements SignalingModule {
111111

112112
WebtritSignalingClient? _client;
113113
bool _disposed = false;
114-
bool _connecting = false;
114+
115+
/// Identity token for the active connect attempt.
116+
///
117+
/// Non-null while a [_connectAsync] is in progress; null when idle.
118+
/// [connect] creates a fresh [Object] and passes it to [_connectAsync].
119+
/// [disconnect] sets it to null, which [_connectAsync] detects after each
120+
/// suspension point — the "latest wins" pattern for non-cancellable Futures.
121+
Object? _connectToken;
115122

116123
/// Completer resolved by [_onDisconnect] after a graceful [disconnect] call.
117124
Completer<void>? _disconnectAck;
@@ -181,15 +188,22 @@ class SignalingModuleImpl implements SignalingModule {
181188
/// Clears the session buffer on each call.
182189
@override
183190
void connect() {
184-
if (_disposed || _connecting) return;
191+
if (_disposed || _connectToken != null) return;
192+
final token = _connectToken = Object();
185193
_eventBuffer.clear();
186-
unawaited(_connectAsync());
194+
unawaited(_connectAsync(token));
187195
}
188196

189197
@override
190198
Future<void> disconnect() async {
199+
final hadInFlightConnect = _connectToken != null;
200+
_connectToken = null; // invalidate any in-flight _connectAsync
201+
191202
final client = _client;
192-
if (client == null) return;
203+
if (client == null) {
204+
if (hadInFlightConnect) _logger.fine('disconnect: in-flight connect cancelled');
205+
return;
206+
}
193207
_client = null;
194208
_intentionalDisconnect = true;
195209
_disconnectAck = Completer<void>();
@@ -220,9 +234,7 @@ class SignalingModuleImpl implements SignalingModule {
220234
// Internal
221235
// ---------------------------------------------------------------------------
222236

223-
Future<void> _connectAsync() async {
224-
if (_connecting) return;
225-
_connecting = true;
237+
Future<void> _connectAsync(Object connectToken) async {
226238
try {
227239
final existing = _client;
228240
if (existing != null) {
@@ -234,7 +246,7 @@ class SignalingModuleImpl implements SignalingModule {
234246
}
235247
}
236248

237-
if (_disposed) return;
249+
if (_connectToken != connectToken || _disposed) return;
238250

239251
_emit(SignalingConnecting());
240252

@@ -249,11 +261,14 @@ class SignalingModuleImpl implements SignalingModule {
249261
force: true,
250262
);
251263

252-
if (_disposed) {
264+
if (_connectToken != connectToken || _disposed) {
265+
// This connect was superseded by disconnect() or a newer connect() —
266+
// clean up the client without emitting events.
267+
_logger.fine('_connectAsync: stale connect discarded');
253268
try {
254269
await client.disconnect(SignalingDisconnectCode.normalClosure.code);
255270
} catch (e, s) {
256-
_logger.warning('_connectAsync dispose-disconnect error', e, s);
271+
_logger.warning('_connectAsync stale-disconnect error', e, s);
257272
}
258273
return;
259274
}
@@ -270,7 +285,7 @@ class SignalingModuleImpl implements SignalingModule {
270285
_emit(SignalingConnected());
271286
unawaited(_requestQueue.flush(execute: client.execute, isActive: () => identical(_client, client)));
272287
} catch (e, s) {
273-
if (_disposed) return;
288+
if (_connectToken != connectToken || _disposed) return;
274289
_logger.warning('_connectAsync failed', e, s);
275290

276291
final errorString = e.toString();
@@ -280,7 +295,9 @@ class SignalingModuleImpl implements SignalingModule {
280295
_emit(SignalingConnectionFailed(error: e, isRepeated: isRepeated, recommendedReconnectDelay: reconnectDelay));
281296
}
282297
} finally {
283-
_connecting = false;
298+
// Only clear the token if this is still the active connect — a superseded
299+
// _connectAsync must not remove the token owned by the active one.
300+
if (_connectToken == connectToken) _connectToken = null;
284301
}
285302
}
286303

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/// Unit tests for [SignalingModuleImpl].
2+
///
3+
/// Reproduces the disconnect-during-connect race condition:
4+
///
5+
/// When [SignalingModuleImpl.disconnect] is called while [_connectAsync] is
6+
/// still awaiting the client factory (_client == null, _connectToken != null),
7+
/// disconnect() must cancel the in-flight attempt by clearing _connectToken.
8+
/// The next connect() call must then be allowed to start a fresh attempt
9+
/// instead of being blocked by the stale in-flight operation.
10+
///
11+
/// Tests labelled "BUG:" reproduce the broken behavior on the unfixed code.
12+
/// Tests labelled "after fix:" verify the correct behavior.
13+
library;
14+
15+
import 'dart:async';
16+
17+
import 'package:flutter_test/flutter_test.dart';
18+
import 'package:ssl_certificates/ssl_certificates.dart';
19+
import 'package:webtrit_signaling/webtrit_signaling.dart';
20+
import 'package:webtrit_signaling_service_platform_interface/webtrit_signaling_service_platform_interface.dart';
21+
22+
// ---------------------------------------------------------------------------
23+
// Fake client
24+
// ---------------------------------------------------------------------------
25+
26+
class _FakeClient extends Fake implements WebtritSignalingClient {
27+
DisconnectHandler? _onDisconnect;
28+
29+
@override
30+
void listen({
31+
required StateHandshakeHandler onStateHandshake,
32+
required EventHandler onEvent,
33+
required ErrorHandler onError,
34+
required DisconnectHandler onDisconnect,
35+
}) {
36+
_onDisconnect = onDisconnect;
37+
}
38+
39+
@override
40+
Future<void> disconnect([int? code, String? reason]) async {
41+
_onDisconnect?.call(code, reason);
42+
}
43+
44+
@override
45+
Future<void> execute(Request request, [Duration? timeout]) async {}
46+
}
47+
48+
// ---------------------------------------------------------------------------
49+
// Controlled factory
50+
// ---------------------------------------------------------------------------
51+
52+
/// A factory where each call gets its own [Completer].
53+
/// Tracks how many times the factory was called and allows releasing each
54+
/// call independently.
55+
class _ControlledFactory {
56+
final _calls = <Completer<WebtritSignalingClient>>[];
57+
int get callCount => _calls.length;
58+
59+
Completer<WebtritSignalingClient> get lastCall => _calls.last;
60+
61+
SignalingClientFactory get factory =>
62+
({
63+
required Uri url,
64+
required String tenantId,
65+
required String token,
66+
required Duration connectionTimeout,
67+
required TrustedCertificates certs,
68+
required bool force,
69+
}) {
70+
final c = Completer<WebtritSignalingClient>();
71+
_calls.add(c);
72+
return c.future;
73+
};
74+
75+
void release(int callIndex, WebtritSignalingClient client) => _calls[callIndex].complete(client);
76+
}
77+
78+
// ---------------------------------------------------------------------------
79+
// Helper
80+
// ---------------------------------------------------------------------------
81+
82+
SignalingModuleImpl _buildModule(SignalingClientFactory factory) => SignalingModuleImpl(
83+
coreUrl: 'https://example.com',
84+
tenantId: 'tenant',
85+
token: 'token',
86+
trustedCertificates: TrustedCertificates.empty,
87+
clientFactory: factory,
88+
);
89+
90+
// ---------------------------------------------------------------------------
91+
// Tests
92+
// ---------------------------------------------------------------------------
93+
94+
void main() {
95+
group('SignalingModuleImpl — disconnect() during in-flight connect()', () {
96+
// -----------------------------------------------------------------------
97+
// Core bug: disconnect while _client==null leaves _connecting=true
98+
// -----------------------------------------------------------------------
99+
100+
test('BUG: second connect() is silently dropped because _connecting stays true after disconnect()', () async {
101+
// Arrange
102+
final factory = _ControlledFactory();
103+
final module = _buildModule(factory.factory);
104+
105+
// Act: connect #1 → factory called, hangs
106+
module.connect();
107+
await Future<void>.delayed(Duration.zero);
108+
expect(factory.callCount, 1, reason: 'first connect() must call the factory');
109+
110+
// disconnect() while _client==null — the bug: does not reset _connecting
111+
await module.disconnect();
112+
113+
// connect #2 — must call the factory again; currently it is silently dropped
114+
module.connect();
115+
await Future<void>.delayed(Duration.zero);
116+
117+
// BUG: factory was not called a second time — second connect() was dropped
118+
expect(
119+
factory.callCount,
120+
2,
121+
reason: 'second connect() was silently dropped: factory call count did not increase',
122+
);
123+
});
124+
125+
// -----------------------------------------------------------------------
126+
// Consequence: stale connect completes and leaves module in wrong state
127+
// -----------------------------------------------------------------------
128+
129+
test('BUG: stale in-flight connect completes after disconnect() and leaves module connected', () async {
130+
// Arrange
131+
final factory = _ControlledFactory();
132+
final module = _buildModule(factory.factory);
133+
final events = <SignalingModuleEvent>[];
134+
module.events.listen(events.add);
135+
136+
// connect #1 → factory hangs
137+
module.connect();
138+
await Future<void>.delayed(Duration.zero);
139+
140+
// disconnect() — no-op when _client==null (the bug)
141+
await module.disconnect();
142+
143+
// Release the stale factory (simulates slow WebSocket that eventually connects)
144+
factory.release(0, _FakeClient());
145+
await Future<void>.delayed(Duration.zero);
146+
147+
// BUG: module is now "connected" even though we explicitly disconnected
148+
expect(
149+
module.isConnected,
150+
isFalse,
151+
reason: 'stale connect() completed after disconnect() and incorrectly set _client',
152+
);
153+
expect(
154+
events.whereType<SignalingConnected>(),
155+
isEmpty,
156+
reason: 'SignalingConnected must not fire for a connect() superseded by disconnect()',
157+
);
158+
});
159+
160+
// -----------------------------------------------------------------------
161+
// Expected correct behavior after the fix
162+
// -----------------------------------------------------------------------
163+
164+
test('after fix: disconnect() cancels in-flight connect so second connect() reaches the factory', () async {
165+
final factory = _ControlledFactory();
166+
final module = _buildModule(factory.factory);
167+
final events = <SignalingModuleEvent>[];
168+
module.events.listen(events.add);
169+
170+
// connect #1 → hangs
171+
module.connect();
172+
await Future<void>.delayed(Duration.zero);
173+
174+
// disconnect() → must cancel connect #1 and reset _connecting
175+
await module.disconnect();
176+
177+
// connect #2 → must be accepted (factory called again)
178+
module.connect();
179+
await Future<void>.delayed(Duration.zero);
180+
expect(factory.callCount, 2, reason: 'second connect() must call the factory');
181+
182+
// Release connect #2 — module should become connected
183+
factory.release(1, _FakeClient());
184+
await Future<void>.delayed(Duration.zero);
185+
186+
expect(module.isConnected, isTrue);
187+
expect(events.whereType<SignalingConnected>(), isNotEmpty);
188+
});
189+
190+
test('after fix: stale factory result is discarded, SignalingConnected not emitted', () async {
191+
final factory = _ControlledFactory();
192+
final module = _buildModule(factory.factory);
193+
final events = <SignalingModuleEvent>[];
194+
module.events.listen(events.add);
195+
196+
// connect #1 → hangs
197+
module.connect();
198+
await Future<void>.delayed(Duration.zero);
199+
200+
// disconnect() cancels connect #1
201+
await module.disconnect();
202+
203+
// Release the stale factory call AFTER disconnect
204+
factory.release(0, _FakeClient());
205+
await Future<void>.delayed(Duration.zero);
206+
207+
// Stale result must be discarded
208+
expect(module.isConnected, isFalse);
209+
expect(events.whereType<SignalingConnected>(), isEmpty);
210+
});
211+
});
212+
}

0 commit comments

Comments
 (0)