@@ -66,13 +66,9 @@ class Manager extends EventEmitter {
66
66
_Backoff ? backoff;
67
67
String readyState = 'closed' ;
68
68
late String uri;
69
- List connecting = [];
70
- num ? lastPing;
71
- bool encoding = false ;
72
- List packetBuffer = [];
73
69
bool reconnecting = false ;
74
70
75
- late engine_socket.Socket engine;
71
+ engine_socket.Socket ? engine;
76
72
Encoder encoder = Encoder ();
77
73
Decoder decoder = Decoder ();
78
74
late bool autoConnect;
@@ -98,41 +94,6 @@ class Manager extends EventEmitter {
98
94
if (autoConnect) open ();
99
95
}
100
96
101
- ///
102
- /// Propagate given event to sockets and emit on `this`
103
- ///
104
- /// @api private
105
- ///
106
- void emitAll (String event, [data]) {
107
- emit (event, data);
108
- for (var nsp in nsps.keys) {
109
- nsps[nsp]! .emit (event, data);
110
- }
111
- }
112
-
113
- ///
114
- /// Update `socket.id` of all sockets
115
- ///
116
- /// @api private
117
- ///
118
- void updateSocketIds () {
119
- for (var nsp in nsps.keys) {
120
- nsps[nsp]! .id = generateId (nsp);
121
- }
122
- }
123
-
124
- ///
125
- /// generate `socket.id` for the given `nsp`
126
- ///
127
- /// @param {String} nsp
128
- /// @return {String}
129
- /// @api private
130
- ///
131
- String generateId (String nsp) {
132
- if (nsp.startsWith ('/' )) nsp = nsp.substring (1 );
133
- return (nsp.isEmpty ? '' : (nsp + '#' )) + (engine.id ?? '' );
134
- }
135
-
136
97
num ? get randomizationFactor => _randomizationFactor;
137
98
set randomizationFactor (num ? v) {
138
99
_randomizationFactor = v;
@@ -182,12 +143,12 @@ class Manager extends EventEmitter {
182
143
183
144
_logger.fine ('opening $uri ' );
184
145
engine = engine_socket.Socket (uri, options);
185
- var socket = engine;
146
+ var socket = engine! ;
186
147
readyState = 'opening' ;
187
148
skipReconnect = false ;
188
149
189
150
// emit `open`
190
- var openSub = util.on (socket, 'open' , (_) {
151
+ var openSubDestroy = util.on (socket, 'open' , (_) {
191
152
onopen ();
192
153
if (callback != null ) callback ();
193
154
});
@@ -197,7 +158,7 @@ class Manager extends EventEmitter {
197
158
_logger.fine ('connect_error' );
198
159
cleanup ();
199
160
readyState = 'closed' ;
200
- emitAll ( 'connect_error ' , data);
161
+ super . emit ( 'error ' , data);
201
162
if (callback != null ) {
202
163
callback ({'error' : 'Connection error' , 'data' : data});
203
164
} else {
@@ -210,19 +171,22 @@ class Manager extends EventEmitter {
210
171
if (timeout != null ) {
211
172
_logger.fine ('connect attempt will timeout after $timeout ' );
212
173
174
+ if (timeout == 0 ) {
175
+ openSubDestroy
176
+ .destroy (); // prevents a race condition with the 'open' event
177
+ }
213
178
// set timer
214
179
var timer = Timer (Duration (milliseconds: timeout! .toInt ()), () {
215
180
_logger.fine ('connect attempt timed out after $timeout ' );
216
- openSub .destroy ();
181
+ openSubDestroy .destroy ();
217
182
socket.close ();
218
183
socket.emit ('error' , 'timeout' );
219
- emitAll ('connect_timeout' , timeout);
220
184
});
221
185
222
186
subs.add (Destroyable (() => timer.cancel ()));
223
187
}
224
188
225
- subs.add (openSub );
189
+ subs.add (openSubDestroy );
226
190
subs.add (errorSub);
227
191
228
192
return this ;
@@ -244,7 +208,7 @@ class Manager extends EventEmitter {
244
208
emit ('open' );
245
209
246
210
// add subs
247
- var socket = engine;
211
+ var socket = engine! ;
248
212
subs.add (util.on (socket, 'data' , ondata));
249
213
subs.add (util.on (socket, 'ping' , onping));
250
214
// subs.add(util.on(socket, 'pong', onpong));
@@ -259,8 +223,7 @@ class Manager extends EventEmitter {
259
223
/// @api private
260
224
///
261
225
void onping ([_]) {
262
- lastPing = DateTime .now ().millisecondsSinceEpoch;
263
- emitAll ('ping' );
226
+ emit ('ping' );
264
227
}
265
228
266
229
///
@@ -297,7 +260,7 @@ class Manager extends EventEmitter {
297
260
///
298
261
void onerror (err) {
299
262
_logger.fine ('error $err ' );
300
- emitAll ('error' , err);
263
+ emit ('error' , err);
301
264
}
302
265
303
266
///
@@ -309,24 +272,9 @@ class Manager extends EventEmitter {
309
272
Socket socket (String nsp, Map opts) {
310
273
var socket = nsps[nsp];
311
274
312
- var onConnecting = ([_]) {
313
- if (! connecting.contains (socket)) {
314
- connecting.add (socket);
315
- }
316
- };
317
-
318
275
if (socket == null ) {
319
276
socket = Socket (this , nsp, opts);
320
277
nsps[nsp] = socket;
321
- socket.on ('connecting' , onConnecting);
322
- socket.on ('connect' , (_) {
323
- socket! .id = generateId (nsp);
324
- });
325
-
326
- if (autoConnect) {
327
- // manually call here since connecting event is fired before listening
328
- onConnecting ();
329
- }
330
278
}
331
279
332
280
return socket;
@@ -338,8 +286,16 @@ class Manager extends EventEmitter {
338
286
/// @param {Socket} socket
339
287
///
340
288
void destroy (socket) {
341
- connecting.remove (socket);
342
- if (connecting.isNotEmpty) return ;
289
+ final nsps = this .nsps.keys;
290
+
291
+ for (var nsp in nsps) {
292
+ final socket = this .nsps[nsp];
293
+
294
+ if (socket! .active) {
295
+ _logger.fine ('socket $nsp is still active, skipping close' );
296
+ return ;
297
+ }
298
+ }
343
299
344
300
close ();
345
301
}
@@ -352,37 +308,21 @@ class Manager extends EventEmitter {
352
308
///
353
309
void packet (Map packet) {
354
310
_logger.fine ('writing packet $packet ' );
355
- if (packet.containsKey ('query' ) && packet['type' ] == 0 ) {
356
- packet['nsp' ] += '''?${packet ['query' ]}''' ;
357
- }
358
311
359
312
// if (encoding != true) {
360
313
// encode, then write to engine with result
361
314
// encoding = true;
362
315
var encodedPackets = encoder.encode (packet);
363
316
364
317
for (var i = 0 ; i < encodedPackets.length; i++ ) {
365
- engine.write (encodedPackets[i], packet['options' ]);
318
+ engine! .write (encodedPackets[i], packet['options' ]);
366
319
}
367
320
// } else {
368
321
// add packet to the queue
369
322
// packetBuffer.add(packet);
370
323
// }
371
324
}
372
325
373
- ///
374
- /// If packet buffer is non-empty, begins encoding the
375
- /// next packet in line.
376
- ///
377
- /// @api private
378
- ///
379
- void processPacketQueue () {
380
- if (packetBuffer.isNotEmpty && encoding != true ) {
381
- var pack = packetBuffer.removeAt (0 );
382
- packet (pack);
383
- }
384
- }
385
-
386
326
///
387
327
/// Clean up transport subscriptions and packet buffer.
388
328
///
@@ -397,10 +337,6 @@ class Manager extends EventEmitter {
397
337
sub.destroy ();
398
338
}
399
339
400
- packetBuffer = [];
401
- encoding = false ;
402
- lastPing = null ;
403
-
404
340
decoder.destroy ();
405
341
}
406
342
@@ -422,7 +358,7 @@ class Manager extends EventEmitter {
422
358
}
423
359
backoff! .reset ();
424
360
readyState = 'closed' ;
425
- engine.close ();
361
+ engine? .close ();
426
362
}
427
363
428
364
///
@@ -454,7 +390,7 @@ class Manager extends EventEmitter {
454
390
if (backoff! .attempts >= reconnectionAttempts! ) {
455
391
_logger.fine ('reconnect failed' );
456
392
backoff! .reset ();
457
- emitAll ('reconnect_failed' );
393
+ emit ('reconnect_failed' );
458
394
reconnecting = false ;
459
395
} else {
460
396
var delay = backoff! .duration;
@@ -465,8 +401,7 @@ class Manager extends EventEmitter {
465
401
if (skipReconnect! ) return ;
466
402
467
403
_logger.fine ('attempting reconnect' );
468
- emitAll ('reconnect_attempt' , backoff! .attempts);
469
- emitAll ('reconnecting' , backoff! .attempts);
404
+ emit ('reconnect_attempt' , backoff! .attempts);
470
405
471
406
// check again for the case socket closed in above events
472
407
if (skipReconnect! ) return ;
@@ -476,7 +411,7 @@ class Manager extends EventEmitter {
476
411
_logger.fine ('reconnect attempt error' );
477
412
reconnecting = false ;
478
413
reconnect ();
479
- emitAll ('reconnect_error' , err['data' ]);
414
+ emit ('reconnect_error' , err['data' ]);
480
415
} else {
481
416
_logger.fine ('reconnect success' );
482
417
onreconnect ();
@@ -498,8 +433,7 @@ class Manager extends EventEmitter {
498
433
var attempt = backoff! .attempts;
499
434
reconnecting = false ;
500
435
backoff! .reset ();
501
- updateSocketIds ();
502
- emitAll ('reconnect' , attempt);
436
+ emit ('reconnect' , attempt);
503
437
}
504
438
}
505
439
0 commit comments