-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathmqtt.ts
More file actions
722 lines (631 loc) · 25.9 KB
/
mqtt.ts
File metadata and controls
722 lines (631 loc) · 25.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
/**
*
* A module containing support for mqtt connection establishment and operations.
*
* @packageDocumentation
* @module mqtt
* @mergeTarget
*/
import crt_native from './binding';
import { NativeResource, NativeResourceMixin } from "./native_resource";
import { BufferedEventEmitter } from '../common/event';
import * as crt from "../common/mqtt_shared";
import { CrtError } from './error';
import * as io from "./io";
import { HttpProxyOptions, HttpRequest } from './http';
export { HttpProxyOptions } from './http';
import {
QoS,
Payload,
MqttRequest,
MqttSubscribeRequest,
MqttWill,
OnMessageCallback,
MqttConnectionConnected,
MqttConnectionDisconnected,
MqttConnectionResumed,
DEFAULT_RECONNECT_MIN_SEC,
DEFAULT_RECONNECT_MAX_SEC,
OnConnectionSuccessResult,
OnConnectionFailedResult,
OnConnectionClosedResult
} from "../common/mqtt";
export {
QoS, Payload, MqttRequest, MqttSubscribeRequest, MqttWill, OnMessageCallback, MqttConnectionConnected, MqttConnectionDisconnected,
MqttConnectionResumed, OnConnectionSuccessResult, OnConnectionFailedResult, OnConnectionClosedResult
} from "../common/mqtt";
/**
* Listener signature for event emitted from an {@link MqttClientConnection} when an error occurs
*
* @param error the error that occurred
*
* @category MQTT
*/
export type MqttConnectionError = (error: CrtError) => void;
/**
* Listener signature for event emitted from an {@link MqttClientConnection} when the connection has been
* interrupted unexpectedly.
*
* @param error description of the error that occurred
*
* @category MQTT
*/
export type MqttConnectionInterrupted = (error: CrtError) => void;
/**
* Listener signature for event emitted from an {@link MqttClientConnection} when the connection has been
* connected successfully.
*
* This listener is invoked for every successful connect and every successful reconnect.
*
* @param callback_data Data returned containing information about the successful connection.
*
* @category MQTT
*/
export type MqttConnectionSucess = (callback_data: OnConnectionSuccessResult) => void;
/**
* Listener signature for event emitted from an {@link MqttClientConnection} when the connection has been
* connected successfully.
*
* This listener is invoked for every failed connect and every failed reconnect.
*
* @param callback_data Data returned containing information about the failed connection.
*
* @category MQTT
*/
export type MqttConnectionFailure = (callback_data: OnConnectionFailedResult) => void;
/**
* Listener signature for event emitted from an {@link MqttClientConnection} when the connection has been
* disconnected and shutdown successfully.
*
* @param callback_data Data returned containing information about the closed/disconnected connection.
* Currently empty, but may contain data in the future.
*
* @category MQTT
*/
export type MqttConnectionClosed = (callback_data: OnConnectionClosedResult) => void;
/**
* MQTT client
*
* @category MQTT
*/
export class MqttClient extends NativeResource {
/**
* @param bootstrap The {@link io.ClientBootstrap} to use for socket connections. Leave undefined to use the
* default system-wide bootstrap (recommended).
*/
constructor(readonly bootstrap: io.ClientBootstrap | undefined = undefined) {
super(crt_native.mqtt_client_new(bootstrap != null ? bootstrap.native_handle() : null));
}
/**
* Creates a new {@link MqttClientConnection}
* @param config Configuration for the mqtt connection
* @returns A new connection
*/
new_connection(
config: MqttConnectionConfig) {
return new MqttClientConnection(this, config);
}
}
/**
* Configuration options for an MQTT connection
*
* @category MQTT
*/
export interface MqttConnectionConfig {
/**
* ID to place in CONNECT packet. Must be unique across all devices/clients.
* If an ID is already in use, the other client will be disconnected.
*/
client_id: string;
/** Server name to connect to */
host_name: string;
/** Server port to connect to */
port: number;
/** Socket options */
socket_options: io.SocketOptions;
/** If true, connect to MQTT over websockets */
use_websocket?: boolean;
/**
* Whether or not to start a clean session with each reconnect.
* If True, the server will forget all subscriptions with each reconnect.
* Set False to request that the server resume an existing session
* or start a new session that may be resumed after a connection loss.
* The `session_present` bool in the connection callback informs
* whether an existing session was successfully resumed.
* If an existing session is resumed, the server remembers previous subscriptions
* and sends messages (with QoS1 or higher) that were published while the client was offline.
*/
clean_session?: boolean;
/**
* The keep alive value, in seconds, to send in CONNECT packet.
* A PING will automatically be sent at this interval.
* The server will assume the connection is lost if no PING is received after 1.5X this value.
* This duration must be longer than {@link ping_timeout}.
*/
keep_alive?: number;
/**
* Milliseconds to wait for ping response before client assumes
* the connection is invalid and attempts to reconnect.
* This duration must be shorter than keep_alive_secs.
* Alternatively, TCP keep-alive via :attr:`SocketOptions.keep_alive`
* may accomplish this in a more efficient (low-power) scenario,
* but keep-alive options may not work the same way on every platform and OS version.
*/
ping_timeout?: number;
/**
* Milliseconds to wait for the response to the operation requires response by protocol.
* Set to zero to disable timeout. Otherwise, the operation will fail if no response is
* received within this amount of time after the packet is written to the socket.
* It applied to PUBLISH (QoS>0) and UNSUBSCRIBE now.
*/
protocol_operation_timeout?: number;
/**
* Minimum seconds to wait between reconnect attempts.
* Must be <= {@link reconnect_max_sec}.
* Wait starts at min and doubles with each attempt until max is reached.
*/
reconnect_min_sec?: number;
/**
* Maximum seconds to wait between reconnect attempts.
* Must be >= {@link reconnect_min_sec}.
* Wait starts at min and doubles with each attempt until max is reached.
*/
reconnect_max_sec?: number;
/**
* Will to send with CONNECT packet. The will is
* published by the server when its connection to the client is unexpectedly lost.
*/
will?: MqttWill;
/** Username to connect with */
username?: string;
/** Password to connect with */
password?: string;
/**
* TLS context for secure socket connections.
* If None is provided, then an unencrypted connection is used.
*/
tls_ctx?: io.ClientTlsContext;
/** Optional proxy options */
proxy_options?: HttpProxyOptions;
/** Optional enable Aws IoT SDK Metrics. The metrics includes SDK name, version, and platform.
*
* @group Node-only
*/
enable_metrics?: boolean;
/**
* Optional function to transform websocket handshake request.
* If provided, function is called each time a websocket connection is attempted.
* The function may modify the HTTP request before it is sent to the server.
*/
websocket_handshake_transform?: (request: HttpRequest, done: (error_code?: number) => void) => void;
}
/**
* Information about the connection's queue of operations
*/
export interface ConnectionStatistics {
/**
* Total number of operations submitted to the connection that have not yet been completed. Unacked operations
* are a subset of this.
*/
incompleteOperationCount: number;
/**
* Total packet size of operations submitted to the connection that have not yet been completed. Unacked operations
* are a subset of this.
*/
incompleteOperationSize: number;
/**
* Total number of operations that have been sent to the server and are waiting for a corresponding ACK before
* they can be completed.
*/
unackedOperationCount: number;
/**
* Total packet size of operations that have been sent to the server and are waiting for a corresponding ACK before
* they can be completed.
*/
unackedOperationSize: number;
};
/**
* MQTT client connection
*
* @category MQTT
*/
export class MqttClientConnection extends NativeResourceMixin(BufferedEventEmitter) {
readonly tls_ctx?: io.ClientTlsContext; // this reference keeps the tls_ctx alive beyond the life of the connection
/**
* @param client The client that owns this connection
* @param config The configuration for this connection
*/
constructor(readonly client: MqttClient, private config: MqttConnectionConfig) {
super();
if (config == null || config == undefined) {
throw new CrtError("MqttClientConnection constructor: config not defined");
}
// If there is a will, ensure that its payload is normalized to a DataView
const will = config.will ?
{
topic: config.will.topic,
qos: config.will.qos,
payload: crt.normalize_payload(config.will.payload),
retain: config.will.retain
}
: undefined;
/** clamp reconnection time out values */
var min_sec = DEFAULT_RECONNECT_MIN_SEC;
var max_sec = DEFAULT_RECONNECT_MAX_SEC;
if (config.reconnect_min_sec) {
min_sec = config.reconnect_min_sec;
// clamp max, in case they only passed in min
max_sec = Math.max(min_sec, max_sec);
}
if (config.reconnect_max_sec) {
max_sec = config.reconnect_max_sec;
// clamp min, in case they only passed in max (or passed in min > max)
min_sec = Math.min(min_sec, max_sec);
}
if (client == undefined || client == null) {
throw new CrtError("MqttClientConnection constructor: client not defined");
}
if (config.socket_options == undefined || config.socket_options == null) {
throw new CrtError("MqttClientConnection constructor: socket_options in configuration not defined");
}
this._super(crt_native.mqtt_client_connection_new(
client.native_handle(),
(error_code: number) => { this._on_connection_interrupted(error_code); },
(return_code: number, session_present: boolean) => { this._on_connection_resumed(return_code, session_present); },
(return_code: number, session_present: boolean) => { this._on_connection_success(return_code, session_present); },
(error_code: number) => { this._on_connection_failure(error_code); },
config.tls_ctx ? config.tls_ctx.native_handle() : null,
will,
config.username,
config.password,
config.use_websocket,
config.proxy_options ? config.proxy_options.create_native_handle() : undefined,
config.websocket_handshake_transform,
min_sec,
max_sec,
config.enable_metrics == false ? undefined : new crt.AwsIoTDeviceSDKMetrics()
));
this.tls_ctx = config.tls_ctx;
crt_native.mqtt_client_connection_on_message(this.native_handle(), this._on_any_publish.bind(this));
crt_native.mqtt_client_connection_on_closed(this.native_handle(), this._on_connection_closed.bind(this));
/*
* Failed mqtt operations (which is normal) emit error events as well as rejecting the original promise.
* By installing a default error handler here we help prevent common issues where operation failures bring
* the whole program to an end because a handler wasn't installed. Programs that install their own handler
* will be unaffected.
*/
this.on('error', (error) => { });
}
private close() {
crt_native.mqtt_client_connection_close(this.native_handle());
}
/**
* Emitted when the connection successfully establishes itself for the first time
*
* @event
*/
static CONNECT = 'connect';
/**
* Emitted when connection has disconnected successfully.
*
* @event
*/
static DISCONNECT = 'disconnect';
/**
* Emitted when an error occurs. The error will contain the error
* code and message.
*
* @event
*/
static ERROR = 'error';
/**
* Emitted when the connection is dropped unexpectedly. The error will contain the error
* code and message. The underlying mqtt implementation will attempt to reconnect.
*
* @event
*/
static INTERRUPT = 'interrupt';
/**
* Emitted when the connection reconnects (after an interrupt). Only triggers on connections after the initial one.
*
* @event
*/
static RESUME = 'resume';
/**
* Emitted when any MQTT publish message arrives.
*
* @event
*/
static MESSAGE = 'message';
/**
* Emitted on every successful connect and reconnect.
* Will contain a number with the connection reason code and
* a boolean indicating whether the connection resumed a session.
*
* @event
*/
static CONNECTION_SUCCESS = 'connection_success';
/**
* Emitted on an unsuccessful connect and reconnect.
* Will contain an error code indicating the reason for the unsuccessful connection.
*
* @event
*/
static CONNECTION_FAILURE = 'connection_failure';
/**
* Emitted when the MQTT connection was disconnected and shutdown successfully.
*
* @event
*/
static CLOSED = 'closed'
on(event: 'connect', listener: MqttConnectionConnected): this;
on(event: 'disconnect', listener: MqttConnectionDisconnected): this;
on(event: 'error', listener: MqttConnectionError): this;
on(event: 'interrupt', listener: MqttConnectionInterrupted): this;
on(event: 'resume', listener: MqttConnectionResumed): this;
on(event: 'message', listener: OnMessageCallback): this;
on(event: 'connection_success', listener: MqttConnectionSucess): this;
on(event: 'connection_failure', listener: MqttConnectionFailure): this;
on(event: 'closed', listener: MqttConnectionClosed): this;
// Overridden to allow uncorking on ready
on(event: string | symbol, listener: (...args: any[]) => void): this {
super.on(event, listener);
if (event == 'connect') {
process.nextTick(() => {
this.uncork();
})
}
return this;
}
/**
* Open the actual connection to the server (async).
* @returns A Promise which completes whether the connection succeeds or fails.
* If connection fails, the Promise will reject with an exception.
* If connection succeeds, the Promise will return a boolean that is
* true for resuming an existing session, or false if the session is new
*/
async connect() {
return new Promise<boolean>((resolve, reject) => {
reject = this._reject(reject);
if (this.config.socket_options == null || this.config.socket_options == undefined) {
throw new CrtError("MqttClientConnection connect: socket_options in configuration not defined");
}
try {
crt_native.mqtt_client_connection_connect(
this.native_handle(),
this.config.client_id,
this.config.host_name,
this.config.port,
this.config.socket_options.native_handle(),
this.config.keep_alive,
this.config.ping_timeout,
this.config.protocol_operation_timeout,
this.config.clean_session,
this._on_connect_callback.bind(this, resolve, reject),
);
} catch (e) {
reject(e);
}
});
}
/**
* The connection will automatically reconnect when disconnected, removing the need for this function.
* To cease automatic reconnection attempts, call {@link disconnect}.
* @deprecated
*/
async reconnect() {
return new Promise<boolean>((resolve, reject) => {
reject = this._reject(reject);
try {
crt_native.mqtt_client_connection_reconnect(this.native_handle(), this._on_connect_callback.bind(this, resolve, reject));
} catch (e) {
reject(e);
}
});
}
/**
* Publish message (async).
* If the device is offline, the PUBLISH packet will be sent once the connection resumes.
*
* @param topic Topic name
* @param payload Contents of message
* @param qos Quality of Service for delivering this message
* @param retain If true, the server will store the message and its QoS so that it can be
* delivered to future subscribers whose subscriptions match the topic name
* @returns Promise which returns a {@link MqttRequest} which will contain the packet id of
* the PUBLISH packet.
*
* * For QoS 0, completes as soon as the packet is sent.
* * For QoS 1, completes when PUBACK is received.
* * For QoS 2, completes when PUBCOMP is received.
*/
async publish(topic: string, payload: Payload, qos: QoS, retain: boolean = false) {
// Skip payload since it can be several different types
if (typeof(topic) !== 'string') {
return Promise.reject("topic is not a string");
}
if (typeof(qos) !== 'number') {
return Promise.reject("qos is not a number");
}
if (typeof(retain) !== 'boolean') {
return Promise.reject("retain is not a boolean");
}
return new Promise<MqttRequest>((resolve, reject) => {
reject = this._reject(reject);
try {
crt_native.mqtt_client_connection_publish(this.native_handle(), topic, crt.normalize_payload(payload), qos, retain, this._on_puback_callback.bind(this, resolve, reject));
} catch (e) {
reject(e);
}
});
}
/**
* Subscribe to a topic filter (async).
* The client sends a SUBSCRIBE packet and the server responds with a SUBACK.
*
* subscribe() may be called while the device is offline, though the async
* operation cannot complete successfully until the connection resumes.
*
* Once subscribed, `callback` is invoked each time a message matching
* the `topic` is received. It is possible for such messages to arrive before
* the SUBACK is received.
*
* @param topic Subscribe to this topic filter, which may include wildcards
* @param qos Maximum requested QoS that server may use when sending messages to the client.
* The server may grant a lower QoS in the SUBACK
* @param on_message Optional callback invoked when message received.
* @returns Promise which returns a {@link MqttSubscribeRequest} which will contain the
* result of the SUBSCRIBE. The Promise resolves when a SUBACK is returned
* from the server or is rejected when an exception occurs.
*/
async subscribe(topic: string, qos: QoS, on_message?: OnMessageCallback) {
if (typeof(topic) !== 'string') {
return Promise.reject("topic is not a string");
}
if (typeof(qos) !== 'number') {
return Promise.reject("qos is not a number");
}
return new Promise<MqttSubscribeRequest>((resolve, reject) => {
reject = this._reject(reject);
try {
crt_native.mqtt_client_connection_subscribe(this.native_handle(), topic, qos, on_message, this._on_suback_callback.bind(this, resolve, reject));
} catch (e) {
reject(e);
}
});
}
/**
* Unsubscribe from a topic filter (async).
* The client sends an UNSUBSCRIBE packet, and the server responds with an UNSUBACK.
* @param topic The topic filter to unsubscribe from. May contain wildcards.
* @returns Promise wihch returns a {@link MqttRequest} which will contain the packet id
* of the UNSUBSCRIBE packet being acknowledged. Promise is resolved when an
* UNSUBACK is received from the server or is rejected when an exception occurs.
*/
async unsubscribe(topic: string) {
if (typeof(topic) !== 'string') {
return Promise.reject("topic is not a string");
}
return new Promise<MqttRequest>((resolve, reject) => {
reject = this._reject(reject);
try {
crt_native.mqtt_client_connection_unsubscribe(this.native_handle(), topic, this._on_unsuback_callback.bind(this, resolve, reject));
} catch (e) {
reject(e);
}
});
}
/**
* Close the connection (async).
*
* Will free all native resources, rendering the connection unusable after the disconnect() call.
*
* @returns Promise which completes when the connection is closed.
*/
async disconnect() {
return new Promise<void>((resolve, reject) => {
reject = this._reject(reject);
try {
crt_native.mqtt_client_connection_disconnect(
this.native_handle(),
this._on_disconnect_callback.bind(this, resolve)
);
} catch (e) {
reject(e);
}
});
}
/**
* Queries a small set of numerical statistics about the current state of the connection's operation queue
*
* @group Node-only
*/
getOperationalStatistics(): ConnectionStatistics {
return crt_native.mqtt_client_connection_get_queue_statistics(this.native_handle());
}
/**
* Queries a small set of numerical statistics about the current state of the connection's operation queue
* @deprecated use getOperationalStatistics instead
*
* @group Node-only
*/
getQueueStatistics(): ConnectionStatistics {
return this.getOperationalStatistics();
}
// Wrap a promise rejection with a function that will also emit the error as an event
private _reject(reject: (reason: any) => void) {
return (reason: any) => {
reject(reason);
process.nextTick(() => {
this.emit('error', new CrtError(reason));
});
};
}
private _on_connection_failure(error_code: number) {
let failureCallbackData = { error: new CrtError(error_code) } as OnConnectionFailedResult;
this.emit('connection_failure', failureCallbackData);
}
private _on_connection_success(return_code: number, session_present: boolean) {
let successCallbackData = { session_present: session_present, reason_code: return_code } as OnConnectionSuccessResult;
this.emit('connection_success', successCallbackData);
}
private _on_connection_interrupted(error_code: number) {
this.emit('interrupt', new CrtError(error_code));
}
private _on_connection_resumed(return_code: number, session_present: boolean) {
this.emit('resume', return_code, session_present);
}
private _on_any_publish(topic: string, payload: ArrayBuffer, dup: boolean, qos: QoS, retain: boolean) {
this.emit('message', topic, payload, dup, qos, retain);
}
private _on_connection_closed() {
let closedCallbackData = {} as OnConnectionClosedResult;
this.emit('closed', closedCallbackData);
/**
* We call close() here instead of on disconnect because on_close is always called AFTER disconnect
* but if we call close() before, then we cannot emit the closed callback.
*/
this.close();
}
private _on_connect_callback(resolve: (value: (boolean | PromiseLike<boolean>)) => void, reject: (reason?: any) => void, error_code: number, return_code: number, session_present: boolean) {
if (error_code == 0 && return_code == 0) {
resolve(session_present);
this.emit('connect', session_present);
} else if (error_code != 0) {
reject("Failed to connect: " + io.error_code_to_string(error_code));
} else {
reject("Server rejected connection.");
}
}
private _on_puback_callback(resolve: (value: (MqttRequest | PromiseLike<MqttRequest>)) => void, reject: (reason?: any) => void, packet_id: number, error_code: number) {
if (error_code == 0) {
resolve({ packet_id });
} else {
reject("Failed to publish: " + io.error_code_to_string(error_code));
}
}
private _on_suback_callback(resolve: (value: (MqttSubscribeRequest | PromiseLike<MqttSubscribeRequest>)) => void, reject: (reason?: any) => void, packet_id: number, topic: string, qos: QoS, error_code: number) {
if (error_code == 0) {
resolve({ packet_id, topic, qos, error_code });
} else {
reject("Failed to subscribe: " + io.error_code_to_string(error_code));
}
}
private _on_unsuback_callback(resolve: (value: (MqttRequest | PromiseLike<MqttRequest>)) => void, reject: (reason?: any) => void, packet_id: number, error_code: number) {
if (error_code == 0) {
resolve({ packet_id });
} else {
reject("Failed to unsubscribe: " + io.error_code_to_string(error_code));
}
}
private _on_disconnect_callback(resolve: (value?: (void | PromiseLike<void>)) => void) {
resolve();
this.emit('disconnect');
/** NOTE: We are NOT calling close() here but instead calling it at
* on_closed because it is always called after disconnect */
}
}