-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathmqtt5.ts
More file actions
700 lines (610 loc) · 25.9 KB
/
mqtt5.ts
File metadata and controls
700 lines (610 loc) · 25.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
/**
* Node.js specific MQTT5 client implementation
*
* [MQTT5 Client User Guide](https://www.github.com/awslabs/aws-crt-nodejs/blob/main/MQTT5-UserGuide.md)
*
* @packageDocumentation
* @module mqtt5
* @mergeTarget
*
*/
import crt_native from './binding';
import { NativeResourceMixin } from "./native_resource";
import { BufferedEventEmitter } from '../common/event';
import * as io from "./io";
import * as http from './http';
import * as mqtt5_packet from "../common/mqtt5_packet";
import * as mqtt5 from "../common/mqtt5";
import * as mqtt_shared from "../common/mqtt_shared";
import {CrtError} from "./error";
export { HttpProxyOptions } from './http';
export * from "../common/mqtt5";
export * from '../common/mqtt5_packet';
/**
* Websocket handshake http request transformation function signature
*/
export type WebsocketHandshakeTransform = (request: http.HttpRequest, done: (error_code?: number) => void) => void;
/**
* Information about the client's queue of operations
*/
export interface ClientStatistics {
/**
* Total number of operations submitted to the client that have not yet been completed. Unacked operations
* are a subset of this.
*/
incompleteOperationCount : number;
/**
* Total packet size of operations submitted to the client that have not yet been completed. Unacked operations
* are a subset of this.
*/
incompleteOperationSize : number;
/**
* Total number of operations that have been sent to the server and are waiting for a corresponding ACK before
* they can be completed.
*/
unackedOperationCount : number;
/**
* Total packet size of operations that have been sent to the server and are waiting for a corresponding ACK before
* they can be completed.
*/
unackedOperationSize : number;
};
/**
* Controls how disconnects affect the queued and in-progress operations tracked by the client. Also controls
* how operations are handled while the client is not connected. In particular, if the client is not connected,
* then any operation that would be failed on disconnect (according to these rules) will be rejected.
*/
export enum ClientOperationQueueBehavior {
/** Same as FailQos0PublishOnDisconnect */
Default = 0,
/**
* Re-queues QoS 1+ publishes on disconnect; un-acked publishes go to the front while unprocessed publishes stay
* in place. All other operations (QoS 0 publishes, subscribe, unsubscribe) are failed.
*/
FailNonQos1PublishOnDisconnect = 1,
/**
* QoS 0 publishes that are not complete at the time of disconnection are failed. Un-acked QoS 1+ publishes are
* re-queued at the head of the line for immediate retransmission on a session resumption. All other operations
* are requeued in original order behind any retransmissions.
*/
FailQos0PublishOnDisconnect = 2,
/**
* All operations that are not complete at the time of disconnection are failed, except operations that
* the MQTT5 spec requires to be retransmitted (un-acked QoS1+ publishes).
*/
FailAllOnDisconnect = 3,
}
/**
* Additional controls for client behavior with respect to operation validation and flow control; these checks
* go beyond the MQTT5 spec to respect limits of specific MQTT brokers.
*/
export enum ClientExtendedValidationAndFlowControl {
/**
* Do not do any additional validation or flow control
*/
None = 0,
/**
* Apply additional client-side validation and operational flow control that respects the
* default AWS IoT Core limits.
*
* Currently applies the following additional validation:
*
* 1. No more than 8 subscriptions per SUBSCRIBE packet
* 1. Topics and topic filters have a maximum of 7 slashes (8 segments), not counting any AWS rules prefix
* 1. Topics must be <= 256 bytes in length
* 1. Client id must be <= 128 bytes in length
*
* Also applies the following flow control:
*
* 1. Outbound throughput throttled to 512KB/s
* 1. Outbound publish TPS throttled to 100
*/
AwsIotCoreDefaults = 1,
}
/**
* Configuration options for mqtt5 client creation.
*/
export interface Mqtt5ClientConfig {
/**
* Host name of the MQTT server to connect to.
*/
hostName: string;
/**
* Network port of the MQTT server to connect to.
*/
port: number;
/**
* Controls how the MQTT5 client should behave with respect to MQTT sessions.
*/
sessionBehavior? : mqtt5.ClientSessionBehavior;
/**
* Controls how the reconnect delay is modified in order to smooth out the distribution of reconnection attempt
* timepoints for a large set of reconnecting clients.
*/
retryJitterMode? : mqtt5.RetryJitterType;
/**
* Minimum amount of time to wait to reconnect after a disconnect. Exponential backoff is performed with jitter
* after each connection failure.
*/
minReconnectDelayMs? : number;
/**
* Maximum amount of time to wait to reconnect after a disconnect. Exponential backoff is performed with jitter
* after each connection failure.
*/
maxReconnectDelayMs? : number;
/**
* Amount of time that must elapse with an established connection before the reconnect delay is reset to the minimum.
* This helps alleviate bandwidth-waste in fast reconnect cycles due to permission failures on operations.
*/
minConnectedTimeToResetReconnectDelayMs? : number;
/**
* Time interval to wait after sending a CONNECT request for a CONNACK to arrive. If one does not arrive, the
* connection will be shut down.
*/
connackTimeoutMs? : number;
/**
* All configurable options with respect to the CONNECT packet sent by the client, including the will. These
* connect properties will be used for every connection attempt made by the client.
*/
connectProperties?: mqtt5_packet.ConnectPacket;
/**
* Controls how disconnects affect the queued and in-progress operations tracked by the client. Also controls
* how new operations are handled while the client is not connected. In particular, if the client is not connected,
* then any operation that would be failed on disconnect (according to these rules) will also be rejected.
*
* @group Node-only
*/
offlineQueueBehavior? : ClientOperationQueueBehavior;
/**
* Time interval to wait after sending a PINGREQ for a PINGRESP to arrive. If one does not arrive, the client will
* close the current connection.
*
* @group Node-only
*/
pingTimeoutMs? : number;
/**
* Time interval to wait for an ack after sending a QoS 1+ PUBLISH, SUBSCRIBE, or UNSUBSCRIBE before
* failing the operation.
*
* @group Node-only
*/
ackTimeoutSeconds? : number;
/**
* Additional controls for client behavior with respect to topic alias usage.
*
* If this setting is left undefined, then topic aliasing behavior will be disabled.
*/
topicAliasingOptions? : mqtt5.TopicAliasingOptions
/**
* Client bootstrap to use. In almost all cases, this can be left undefined.
*
* @group Node-only
*/
clientBootstrap?: io.ClientBootstrap;
/**
* Controls socket properties of the underlying MQTT connections made by the client. Leave undefined to use
* defaults (no TCP keep alive, 10 second socket timeout).
*
* @group Node-only
*/
socketOptions?: io.SocketOptions;
/**
* TLS context for secure socket connections.
* If undefined, then a plaintext connection will be used.
*
* @group Node-only
*/
tlsCtx?: io.ClientTlsContext;
/**
* This callback allows a custom transformation of the HTTP request that acts as the websocket handshake.
* Websockets will be used if this is set to a valid transformation callback. To use websockets but not perform
* a transformation, just set this as a trivial completion callback. If undefined, the connection will be made
* with direct MQTT.
*
* @group Node-only
*/
websocketHandshakeTransform?: WebsocketHandshakeTransform;
/**
* Configures (tunneling) HTTP proxy usage when establishing MQTT connections
*
* @group Node-only
*/
httpProxyOptions?: http.HttpProxyOptions;
/**
* Additional controls for client behavior with respect to operation validation and flow control; these checks
* go beyond the base MQTT5 spec to respect limits of specific MQTT brokers.
*
* @group Node-only
*/
extendedValidationAndFlowControlOptions? : ClientExtendedValidationAndFlowControl;
}
/**
* Node.js specific MQTT5 client implementation
*
* Not all parts of the MQTT5 spec are supported. We currently do not support:
*
* * AUTH packets and the authentication fields in the CONNECT packet
* * QoS 2
*
* [MQTT5 Client User Guide](https://www.github.com/awslabs/aws-crt-nodejs/blob/main/MQTT5-UserGuide.md)
*
* This client is based on native resources. When finished with the client, you must call close() to dispose of
* them or they will leak.
*
*/
export class Mqtt5Client extends NativeResourceMixin(BufferedEventEmitter) implements mqtt5.IMqtt5Client {
/**
* Client constructor
*
* @param config The configuration for this client
*/
constructor(config: Mqtt5ClientConfig) {
super();
this._super(crt_native.mqtt5_client_new(
this,
config,
(client: Mqtt5Client) => { Mqtt5Client._s_on_stopped(client); },
(client: Mqtt5Client) => { Mqtt5Client._s_on_attempting_connect(client); },
(client: Mqtt5Client, connack : mqtt5_packet.ConnackPacket, settings: mqtt5.NegotiatedSettings) => { Mqtt5Client._s_on_connection_success(client, connack, settings); },
(client: Mqtt5Client, errorCode: number, connack? : mqtt5_packet.ConnackPacket) => { Mqtt5Client._s_on_connection_failure(client, new CrtError(errorCode), connack); },
(client: Mqtt5Client, errorCode: number, disconnect? : mqtt5_packet.DisconnectPacket) => { Mqtt5Client._s_on_disconnection(client, new CrtError(errorCode), disconnect); },
(client: Mqtt5Client, message : mqtt5_packet.PublishPacket, pubackControlId?: any) => { Mqtt5Client._s_on_message_received(client, message, pubackControlId); },
config.clientBootstrap ? config.clientBootstrap.native_handle() : null,
config.socketOptions ? config.socketOptions.native_handle() : null,
config.tlsCtx ? config.tlsCtx.native_handle() : null,
config.httpProxyOptions ? config.httpProxyOptions.create_native_handle() : null
));
}
/**
* Triggers cleanup of native resources associated with the MQTT5 client. Once this has been invoked, callbacks
* and events are not guaranteed to be received.
*
* This must be called when finished with a client; otherwise, native resources will leak. It is not safe
* to invoke any further operations on the client after close() has been called.
*
* For a running client, safe and proper shutdown can be accomplished by
*
* ```ts
* const stopped = once(client, "stopped");
* client.stop();
* await stopped;
* client.close();
* ```
*
* This is an asynchronous operation.
*
* @group Node-only
*/
close() {
crt_native.mqtt5_client_close(this.native_handle());
}
/**
* Notifies the MQTT5 client that you want it to maintain connectivity to the configured endpoint.
* The client will attempt to stay connected using the properties of the reconnect-related parameters
* in the mqtt5 client configuration.
*
* This is an asynchronous operation.
*/
start() {
crt_native.mqtt5_client_start(this.native_handle());
}
/**
* Notifies the MQTT5 client that you want it to end connectivity to the configured endpoint, disconnecting any
* existing connection and halting reconnection attempts.
*
* This is an asynchronous operation. Once the process completes, no further events will be emitted until the client
* has {@link start} invoked. Invoking {@link start start()} after a {@link stop stop()} will always result in a
* new MQTT session.
*
* @param disconnectPacket (optional) properties of a DISCONNECT packet to send as part of the shutdown process
*/
stop(disconnectPacket?: mqtt5_packet.DisconnectPacket) {
crt_native.mqtt5_client_stop(this.native_handle(), disconnectPacket);
}
/**
* Subscribe to one or more topic filters by queuing a SUBSCRIBE packet to be sent to the server.
*
* @param packet SUBSCRIBE packet to send to the server
* @returns a promise that will be rejected with an error or resolved with the SUBACK response
*/
async subscribe(packet: mqtt5_packet.SubscribePacket) : Promise<mqtt5_packet.SubackPacket> {
return new Promise<mqtt5_packet.SubackPacket>((resolve, reject) => {
function curriedPromiseCallback(client: Mqtt5Client, errorCode: number, suback?: mqtt5_packet.SubackPacket){
return Mqtt5Client._s_on_suback_callback(resolve, reject, client, errorCode, suback);
}
try {
crt_native.mqtt5_client_subscribe(this.native_handle(), packet, curriedPromiseCallback);
} catch (e) {
reject(e);
}
});
}
/**
* Unsubscribe from one or more topic filters by queuing an UNSUBSCRIBE packet to be sent to the server.
*
* @param packet UNSUBSCRIBE packet to send to the server
* @returns a promise that will be rejected with an error or resolved with the UNSUBACK response
*/
async unsubscribe(packet: mqtt5_packet.UnsubscribePacket) : Promise<mqtt5_packet.UnsubackPacket> {
return new Promise<mqtt5_packet.UnsubackPacket>((resolve, reject) => {
function curriedPromiseCallback(client: Mqtt5Client, errorCode: number, unsuback?: mqtt5_packet.UnsubackPacket){
return Mqtt5Client._s_on_unsuback_callback(resolve, reject, client, errorCode, unsuback);
}
try {
crt_native.mqtt5_client_unsubscribe(this.native_handle(), packet, curriedPromiseCallback);
} catch (e) {
reject(e);
}
});
}
/**
* Send a message to subscribing clients by queuing a PUBLISH packet to be sent to the server.
*
* @param packet PUBLISH packet to send to the server
* @returns a promise that will be rejected with an error or resolved with the PUBACK response (QoS 1) or
* undefined (QoS 0)
*/
async publish(packet: mqtt5_packet.PublishPacket) : Promise<mqtt5.PublishCompletionResult> {
return new Promise<mqtt5.PublishCompletionResult>((resolve, reject) => {
if (packet && packet.payload) {
packet.payload = mqtt_shared.normalize_payload(packet.payload);
}
function curriedPromiseCallback(client: Mqtt5Client, errorCode: number, result: mqtt5.PublishCompletionResult){
return Mqtt5Client._s_on_puback_callback(resolve, reject, client, errorCode, result);
}
try {
crt_native.mqtt5_client_publish(this.native_handle(), packet, curriedPromiseCallback);
} catch (e) {
reject(e);
}
});
}
/**
* Queries a small set of numerical statistics about the current state of the client's operation queue
*
* @group Node-only
*/
getOperationalStatistics() : ClientStatistics {
return crt_native.mqtt5_client_get_queue_statistics(this.native_handle());
}
/**
* Queries a small set of numerical statistics about the current state of the client's operation queue
* @deprecated use getOperationalStatistics instead
*
* @group Node-only
*/
getQueueStatistics() : ClientStatistics {
return this.getOperationalStatistics();
}
/**
* Event emitted when the client encounters a serious error condition, such as invalid input, napi failures, and
* other potentially unrecoverable situations.
*
* Listener type: {@link ErrorEventListener}
*
* @event
*/
static ERROR : string = 'error';
/**
* Event emitted when an MQTT PUBLISH packet is received by the client.
*
* Listener type: {@link MessageReceivedEventListener}
*
* @event
*/
static MESSAGE_RECEIVED : string = 'messageReceived';
/**
* Event emitted when the client begins a connection attempt.
*
* Listener type: {@link AttemptingConnectEventListener}
*
* @event
*/
static ATTEMPTING_CONNECT : string = 'attemptingConnect';
/**
* Event emitted when the client successfully establishes an MQTT connection. Only emitted after
* an {@link ATTEMPTING_CONNECT attemptingConnect} event.
*
* Listener type: {@link ConnectionSuccessEventListener}
*
* @event
*/
static CONNECTION_SUCCESS : string = 'connectionSuccess';
/**
* Event emitted when the client fails to establish an MQTT connection. Only emitted after
* an {@link ATTEMPTING_CONNECT attemptingConnect} event.
*
* Listener type: {@link ConnectionFailureEventListener}
*
* @event
*/
static CONNECTION_FAILURE : string = 'connectionFailure';
/**
* Event emitted when the client's current connection is closed for any reason. Only emitted after
* a {@link CONNECTION_SUCCESS connectionSuccess} event.
*
* Listener type: {@link DisconnectionEventListener}
*
* @event
*/
static DISCONNECTION : string = 'disconnection';
/**
* Event emitted when the client finishes shutdown as a result of the user invoking {@link stop}.
*
* Listener type: {@link StoppedEventListener}
*
* @event
*/
static STOPPED : string = 'stopped';
/**
* Registers a listener for the client's {@link ERROR error} event. An {@link ERROR error} event is emitted when
* the client encounters a serious error condition, such as invalid input, napi failures, and other potentially
* unrecoverable situations.
*
* @param event the type of event to listen to
* @param listener the event listener to add
*/
on(event: 'error', listener: mqtt5.ErrorEventListener): this;
/**
* Registers a listener for the client's {@link MESSAGE_RECEIVED messageReceived} event. A
* {@link MESSAGE_RECEIVED messageReceived} event is emitted when an MQTT PUBLISH packet is received by the
* client.
*
* @param event the type of event to listen to
* @param listener the event listener to add
*/
on(event: 'messageReceived', listener: mqtt5.MessageReceivedEventListener): this;
/**
* Registers a listener for the client's {@link ATTEMPTING_CONNECT attemptingConnect} event. A
* {@link ATTEMPTING_CONNECT attemptingConnect} event is emitted every time the client begins a connection attempt.
*
* @param event the type of event to listen to
* @param listener the event listener to add
*/
on(event: 'attemptingConnect', listener: mqtt5.AttemptingConnectEventListener): this;
/**
* Registers a listener for the client's {@link CONNECTION_SUCCESS connectionSuccess} event. A
* {@link CONNECTION_SUCCESS connectionSuccess} event is emitted every time the client successfully establishes
* an MQTT connection.
*
* @param event the type of event to listen to
* @param listener the event listener to add
*/
on(event: 'connectionSuccess', listener: mqtt5.ConnectionSuccessEventListener): this;
/**
* Registers a listener for the client's {@link CONNECTION_FAILURE connectionFailure} event. A
* {@link CONNECTION_FAILURE connectionFailure} event is emitted every time the client fails to establish an
* MQTT connection.
*
* @param event the type of event to listen to
* @param listener the event listener to add
*/
on(event: 'connectionFailure', listener: mqtt5.ConnectionFailureEventListener): this;
/**
* Registers a listener for the client's {@link DISCONNECTION disconnection} event. A
* {@link DISCONNECTION disconnection} event is emitted when the client's current MQTT connection is closed
* for any reason.
*
* @param event the type of event to listen to
* @param listener the event listener to add
*/
on(event: 'disconnection', listener: mqtt5.DisconnectionEventListener): this;
/**
* Registers a listener for the client's {@link STOPPED stopped} event. A
* {@link STOPPED stopped} event is emitted when the client finishes shutdown as a
* result of the user invoking {@link stop}.
*
* @param event the type of event to listen to
* @param listener the event listener to add
*/
on(event: 'stopped', listener: mqtt5.StoppedEventListener): this;
on(event: string | symbol, listener: (...args: any[]) => void): this {
super.on(event, listener);
return this;
}
/*
* Private helper functions
*
* Callbacks come through static functions so that the native threadsafe function objects do not
* capture the client object itself, simplifying the number of strong references to the client floating around.
*/
private static _s_on_stopped(client: Mqtt5Client) {
process.nextTick(() => {
let stoppedEvent: mqtt5.StoppedEvent = {};
client.emit(Mqtt5Client.STOPPED, stoppedEvent);
});
}
private static _s_on_attempting_connect(client: Mqtt5Client) {
process.nextTick(() => {
let attemptingConnectEvent: mqtt5.AttemptingConnectEvent = {};
client.emit(Mqtt5Client.ATTEMPTING_CONNECT, attemptingConnectEvent);
});
}
private static _s_on_connection_success(client: Mqtt5Client, connack: mqtt5_packet.ConnackPacket, settings: mqtt5.NegotiatedSettings) {
let connectionSuccessEvent: mqtt5.ConnectionSuccessEvent = {
connack: connack,
settings: settings
};
process.nextTick(() => {
client.emit(Mqtt5Client.CONNECTION_SUCCESS, connectionSuccessEvent);
});
}
private static _s_on_connection_failure(client: Mqtt5Client, error: CrtError, connack?: mqtt5_packet.ConnackPacket) {
let connectionFailureEvent: mqtt5.ConnectionFailureEvent = {
error: error
};
if (connack !== null && connack !== undefined) {
connectionFailureEvent.connack = connack;
}
process.nextTick(() => {
client.emit(Mqtt5Client.CONNECTION_FAILURE, connectionFailureEvent);
});
}
private static _s_on_disconnection(client: Mqtt5Client, error: CrtError, disconnect?: mqtt5_packet.DisconnectPacket) {
let disconnectionEvent: mqtt5.DisconnectionEvent = {
error: error
};
if (disconnect !== null && disconnect !== undefined) {
disconnectionEvent.disconnect = disconnect;
}
process.nextTick(() => {
client.emit(Mqtt5Client.DISCONNECTION, disconnectionEvent);
});
}
private static _s_on_suback_callback(resolve : (value: (mqtt5_packet.SubackPacket | PromiseLike<mqtt5_packet.SubackPacket>)) => void, reject : (reason?: any) => void, client: Mqtt5Client, errorCode: number, suback?: mqtt5_packet.SubackPacket) {
if (errorCode == 0 && suback !== undefined) {
resolve(suback);
} else {
reject(io.error_code_to_string(errorCode));
}
}
private static _s_on_unsuback_callback(resolve : (value: (mqtt5_packet.UnsubackPacket | PromiseLike<mqtt5_packet.UnsubackPacket>)) => void, reject : (reason?: any) => void, client: Mqtt5Client, errorCode: number, unsuback?: mqtt5_packet.UnsubackPacket) {
if (errorCode == 0 && unsuback !== undefined) {
resolve(unsuback);
} else {
reject(io.error_code_to_string(errorCode));
}
}
private static _s_on_puback_callback(resolve : (value: (mqtt5.PublishCompletionResult | PromiseLike<mqtt5.PublishCompletionResult>)) => void, reject : (reason?: any) => void, client: Mqtt5Client, errorCode: number, result:mqtt5.PublishCompletionResult) {
if (errorCode == 0) {
resolve(result);
} else {
reject(io.error_code_to_string(errorCode));
}
}
private static _s_on_message_received(client: Mqtt5Client, message : mqtt5_packet.PublishPacket, pubackControlId? : any) {
let acknowledgementControl : mqtt_shared.PublishAcknowledgementHandleWrapper | undefined = undefined;
if (pubackControlId !== undefined) {
const controlId = pubackControlId;
acknowledgementControl = new mqtt_shared.PublishAcknowledgementHandleWrapper(
new mqtt_shared.PublishAcknowledgementHandle(() => {
crt_native.mqtt5_client_invoke_publish_acknowledgement(client.native_handle(), controlId);
})
);
}
let messageReceivedEvent: mqtt5.MessageReceivedEvent = {
message: message
};
if (acknowledgementControl) {
messageReceivedEvent.acknowledgementControl = acknowledgementControl;
}
process.nextTick(() => {
client.emitWithCallback(Mqtt5Client.MESSAGE_RECEIVED, () => {
/*
* Even if corked, all listeners have had a chance to react to the event
* and acquire the acknowledgement handle if they wanted to. If no one did so, then we do it ourselves.
*/
if (acknowledgementControl) {
const handle = acknowledgementControl.acquireHandle();
if (handle) {
handle.invokeAcknowledgement();
}
}
}, messageReceivedEvent);
});
}
}