Skip to content

Commit f4cd8f9

Browse files
authored
Manual Publish Acknowledgement (#710)
1 parent a201f63 commit f4cd8f9

13 files changed

Lines changed: 593 additions & 10 deletions

File tree

docsrc/typedoc-browser.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
"../lib/browser/mqtt5.ts",
1111
"../lib/browser/ws.ts",
1212
"../lib/common/event.ts",
13-
"../lib/common/http.ts"
13+
"../lib/common/http.ts",
14+
"../lib/common/mqtt_shared.ts"
1415
],
1516
"tsconfig": "../tsconfig.browser.json",
1617
"out": "../docs/browser",

docsrc/typedoc-node.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
"../lib/common/event.ts",
1515
"../lib/common/http.ts",
1616
"../lib/common/platform.ts",
17-
"../lib/common/resource_safety.ts"
17+
"../lib/common/resource_safety.ts",
18+
"../lib/common/mqtt_shared.ts"
1819
],
1920
"out": "../docs/node",
2021
"excludeExternals": true,

lib/common/event.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,25 @@ import { EventEmitter } from 'events';
1919
*/
2020
export type EventKey = string | symbol;
2121

22+
export type EventEmissionCallback = () => void
23+
2224
/**
2325
* @internal
2426
*/
2527
class BufferedEvent {
28+
public callback?: EventEmissionCallback;
2629
public next?: BufferedEvent;
2730
public args: any[];
2831
constructor(public event: EventKey, args: any[]) {
2932
this.args = args;
3033
}
34+
35+
static newWithEmissionCallback(key: EventKey, callback: EventEmissionCallback, args: any[]) : BufferedEvent {
36+
let bufferedEvent : BufferedEvent = new BufferedEvent(key, args);
37+
bufferedEvent.callback = callback;
38+
39+
return bufferedEvent;
40+
}
3141
}
3242

3343
/**
@@ -68,6 +78,9 @@ export class BufferedEventEmitter extends EventEmitter {
6878
while (this.eventQueue) {
6979
const event = this.eventQueue;
7080
super.emit(event.event, ...event.args);
81+
if (event.callback) {
82+
event.callback();
83+
}
7184
this.eventQueue = this.eventQueue.next;
7285
}
7386
}
@@ -94,4 +107,22 @@ export class BufferedEventEmitter extends EventEmitter {
94107

95108
return super.emit(event, ...args);
96109
}
110+
111+
emitWithCallback(event: EventKey, emissionCallback: EventEmissionCallback, ...args: any[]) : boolean {
112+
if (this.corked) {
113+
// queue requests in order
114+
let last = this.lastQueuedEvent;
115+
this.lastQueuedEvent = BufferedEvent.newWithEmissionCallback(event, emissionCallback, args);
116+
if (last) {
117+
last.next = this.lastQueuedEvent;
118+
} else {
119+
this.eventQueue = this.lastQueuedEvent;
120+
}
121+
return this.listeners(event).length > 0;
122+
}
123+
124+
let result = super.emit(event, ...args);
125+
emissionCallback();
126+
return result;
127+
}
97128
}

lib/common/mqtt5.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*/
1010

1111
import * as mqtt5_packet from "./mqtt5_packet";
12+
import * as mqtt_shared from "../common/mqtt_shared";
1213
import { ICrtError } from "./error";
1314

1415
/**
@@ -339,6 +340,15 @@ export interface MessageReceivedEvent {
339340
* PUBLISH packet received from the server
340341
*/
341342
message: mqtt5_packet.PublishPacket;
343+
344+
/**
345+
* An object that allows the event recipient to take control of when the Publish packet's acknowledgement
346+
* packet is sent. If the acknowledgement handle is not acquired by an event listener during the emission
347+
* process, the client will automatically send the acknowledgement itself.
348+
*
349+
* Undefined if this publish is not acknowledgeable (QoS 0).
350+
*/
351+
acknowledgementControl?: mqtt_shared.PublishAcknowledgementHandleWrapper
342352
}
343353

344354
/**

lib/common/mqtt_shared.ts

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
* @packageDocumentation
88
*/
99

10+
import * as event from "./event";
11+
1012

1113
/**
1214
* Converts payload to Buffer or string regardless of the supplied type
@@ -121,4 +123,115 @@ export function isValidTopic(topic: any) : boolean {
121123
let topicAsString = topic as string;
122124

123125
return isValidTopicInternal(topicAsString, false);
124-
}
126+
}
127+
128+
export type PublishAcknowledgementFunctor = () => void;
129+
130+
/**
131+
* Wrapper class containing a one-use singleton handle that can be used to trigger sending the acknowledgement (Puback in
132+
* QoS 1, Pubrec in QoS 2) packet for an incoming publish.
133+
*/
134+
export class PublishAcknowledgementHandleWrapper {
135+
136+
private ackHandle : PublishAcknowledgementHandle | null;
137+
138+
constructor(handle : PublishAcknowledgementHandle | null) {
139+
this.ackHandle = handle;
140+
}
141+
142+
/**
143+
* Attempt to take the acknowledgement handle held by the wrapper. This will only succeed for the first caller;
144+
* after the initial call, null will be returned. By taking the handle, the caller assumes responsibility
145+
* for sending the acknowledgement packet associated with the incoming publish packet. Failing to trigger the
146+
* acknowledgement will cause the broker to potentially re-send the publish.
147+
*/
148+
acquireHandle() : PublishAcknowledgementHandle | null {
149+
let handle = this.ackHandle;
150+
this.ackHandle = null;
151+
152+
return handle;
153+
}
154+
}
155+
156+
function movePublishAcknowledgementHandleWrapper(wrapper: PublishAcknowledgementHandleWrapper | undefined, compositionFunctor?: PublishAcknowledgementFunctor) : PublishAcknowledgementHandleWrapper | undefined {
157+
if (wrapper) {
158+
let handle = wrapper.acquireHandle();
159+
if (compositionFunctor && handle) {
160+
let interiorHandle = handle;
161+
handle = new PublishAcknowledgementHandle(() => {
162+
interiorHandle.invokeAcknowledgement();
163+
compositionFunctor();
164+
});
165+
}
166+
167+
return new PublishAcknowledgementHandleWrapper(handle);
168+
}
169+
170+
return undefined;
171+
}
172+
173+
/** @internal */
174+
export function emitAcknowledgeableEvent<T>(emitter: event.BufferedEventEmitter, ackEvent: string, ackEventPayload: T, wrapperFieldName: string, ackHandleWrapper?: PublishAcknowledgementHandleWrapper, compositionFunctor?: PublishAcknowledgementFunctor) : void {
175+
ackHandleWrapper = movePublishAcknowledgementHandleWrapper(ackHandleWrapper, compositionFunctor);
176+
if (ackHandleWrapper) {
177+
(ackEventPayload as any)[wrapperFieldName] = ackHandleWrapper;
178+
emitter.emitWithCallback(ackEvent, () => {
179+
if (ackHandleWrapper) {
180+
let handle = ackHandleWrapper.acquireHandle();
181+
if (handle) {
182+
// Even if corked, all listeners have had a chance to react to the event
183+
// and acquire the acknowledgement handle if they wanted to. If no one did so, then we do it ourselves.
184+
handle.invokeAcknowledgement();
185+
}
186+
}
187+
}, ackEventPayload);
188+
} else {
189+
emitter.emit(ackEvent, ackEventPayload);
190+
}
191+
}
192+
193+
/** @internal */
194+
export function queueAcknowledgeableEvent<T>(emitter: event.BufferedEventEmitter, ackEvent: string, ackEventPayload: T, wrapperFieldName: string, ackHandleWrapper?: PublishAcknowledgementHandleWrapper, compositionFunctor?: PublishAcknowledgementFunctor) : void {
195+
let wrapper : PublishAcknowledgementHandleWrapper | undefined = movePublishAcknowledgementHandleWrapper(ackHandleWrapper, compositionFunctor);
196+
197+
queueMicrotask(() => {
198+
if (wrapper) {
199+
(ackEventPayload as any)[wrapperFieldName] = wrapper;
200+
emitter.emitWithCallback(ackEvent, () => {
201+
if (wrapper) {
202+
let handle = wrapper.acquireHandle();
203+
if (handle) {
204+
// Even if corked, all listeners have had a chance to react to the event
205+
// and acquire the acknowledgement handle if they wanted to. If no one did so, then we do it ourselves.
206+
handle.invokeAcknowledgement();
207+
}
208+
}
209+
}, ackEventPayload);
210+
} else {
211+
emitter.emit(ackEvent, ackEventPayload);
212+
}
213+
});
214+
}
215+
216+
/**
217+
* Object that allows the holder to trigger the acknowledgement for an associated publish packet.
218+
*/
219+
export class PublishAcknowledgementHandle {
220+
221+
private acknowledgementFunction? : PublishAcknowledgementFunctor;
222+
223+
constructor(acknowledgementFunction : PublishAcknowledgementFunctor) {
224+
this.acknowledgementFunction = acknowledgementFunction;
225+
}
226+
227+
/**
228+
* trigger the acknowledgement for an associated Publish packet
229+
*/
230+
invokeAcknowledgement() : void {
231+
let acknowledgementFunction = this.acknowledgementFunction;
232+
this.acknowledgementFunction = undefined;
233+
if (acknowledgementFunction) {
234+
acknowledgementFunction();
235+
}
236+
}
237+
}

lib/native/binding.d.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { PublishCompletionResult } from "../common/mqtt5";
2020
import * as eventstream from "./eventstream";
2121
import { ConnectionStatistics } from "./mqtt";
2222
import * as mqtt_request_response from "../native/mqtt_request_response";
23+
import * as mqtt_shared from "../common/mqtt_shared";
2324

2425

2526
/**
@@ -199,7 +200,7 @@ export function mqtt5_client_new(
199200
on_connection_success_handler: (client: Mqtt5Client, connack: mqtt5_packet.ConnackPacket, settings: NegotiatedSettings) => void,
200201
on_connection_failure_handler: (client: Mqtt5Client, errorCode: number, connack?: mqtt5_packet.ConnackPacket) => void,
201202
on_disconnection_handler: (client: Mqtt5Client, errorCode: number, disconnect?: mqtt5_packet.DisconnectPacket) => void,
202-
on_message_received_handler: (client: Mqtt5Client, message: mqtt5_packet.PublishPacket) => void,
203+
on_message_received_handler: (client: Mqtt5Client, message: mqtt5_packet.PublishPacket, pubackControlId?: any) => void,
203204
client_bootstrap?: NativeHandle,
204205
socket_options?: NativeHandle,
205206
tls_ctx?: NativeHandle,
@@ -224,6 +225,9 @@ export function mqtt5_client_publish(client: NativeHandle, publish_packet: mqtt5
224225
/** @internal */
225226
export function mqtt5_client_get_queue_statistics(client: NativeHandle) : ClientStatistics;
226227

228+
/** @internal */
229+
export function mqtt5_client_invoke_publish_acknowledgement(client: NativeHandle, pubackControlId: NativeHandle) : void;
230+
227231
/** @internal */
228232
export function mqtt5_client_close(client: NativeHandle) : void;
229233

lib/native/mqtt5.spec.ts

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {v4 as uuid} from "uuid";
1212
import * as io from "./io";
1313
import {once} from "events";
1414

15-
jest.setTimeout(30000);
15+
jest.setTimeout(45000);
1616

1717
function createNodeSpecificTestConfig (testType: test_utils.SuccessfulConnectionTestType) : mqtt5.Mqtt5ClientConfig {
1818

@@ -592,6 +592,58 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir
592592
})
593593
});
594594

595+
test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Manual publish acknowledgement - acknowledgment hold', async() =>{
596+
// hold publish acknowledgement and verify broker re-delivers the message
597+
await retry.networkTimeoutRetryWrapper( async () => {
598+
let topic: string = `test-${uuid()}`;
599+
let testPayload: Buffer = Buffer.from(`redrive-${uuid()}`, "utf-8");
600+
601+
let config: mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig();
602+
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
603+
604+
await test_utils.subPubAcquireControlTest(client, topic, testPayload);
605+
})
606+
});
607+
608+
test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Manual publish acknowledgement - acknowledgment invoke', async() =>{
609+
// invoke publish acknowledgement, verify no re-delivery after 35 seconds
610+
await retry.networkTimeoutRetryWrapper( async () => {
611+
let topic: string = `test-${uuid()}`;
612+
let testPayload: Buffer = Buffer.from(`redrive-${uuid()}`, "utf-8");
613+
614+
let config: mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig();
615+
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
616+
617+
await test_utils.subPubAcquireInvokeControlTest(client, topic, testPayload);
618+
})
619+
});
620+
621+
test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Manual publish acknowledgement - double-call', async() =>{
622+
// acquireHandle() twice: first call returns a handle, second call returns null
623+
await retry.networkTimeoutRetryWrapper( async () => {
624+
let topic: string = `test-${uuid()}`;
625+
let testPayload: Buffer = Buffer.from(`redrive-${uuid()}`, "utf-8");
626+
627+
let config: mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig();
628+
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
629+
630+
await test_utils.subPubDoubleAcquireControlTest(client, topic, testPayload);
631+
})
632+
});
633+
634+
test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Manual publish acknowledgement - post-calback acquire', async() =>{
635+
// acquireHandle() after the messageReceived callback has returned results in null
636+
await retry.networkTimeoutRetryWrapper( async () => {
637+
let topic: string = `test-${uuid()}`;
638+
let testPayload: Buffer = Buffer.from(`redrive-${uuid()}`, "utf-8");
639+
640+
let config: mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig();
641+
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
642+
643+
await test_utils.subPubPostCallbackAcquireControlTest(client, topic, testPayload);
644+
})
645+
});
646+
595647
test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Will test', async () => {
596648
await retry.networkTimeoutRetryWrapper( async () => {
597649
let willPayload: Buffer = Buffer.from("ToMyChildrenIBequeathNothing", "utf-8");

lib/native/mqtt5.ts

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ export class Mqtt5Client extends NativeResourceMixin(BufferedEventEmitter) imple
291291
(client: Mqtt5Client, connack : mqtt5_packet.ConnackPacket, settings: mqtt5.NegotiatedSettings) => { Mqtt5Client._s_on_connection_success(client, connack, settings); },
292292
(client: Mqtt5Client, errorCode: number, connack? : mqtt5_packet.ConnackPacket) => { Mqtt5Client._s_on_connection_failure(client, new CrtError(errorCode), connack); },
293293
(client: Mqtt5Client, errorCode: number, disconnect? : mqtt5_packet.DisconnectPacket) => { Mqtt5Client._s_on_disconnection(client, new CrtError(errorCode), disconnect); },
294-
(client: Mqtt5Client, message : mqtt5_packet.PublishPacket) => { Mqtt5Client._s_on_message_received(client, message); },
294+
(client: Mqtt5Client, message : mqtt5_packet.PublishPacket, pubackControlId?: any) => { Mqtt5Client._s_on_message_received(client, message, pubackControlId); },
295295
config.clientBootstrap ? config.clientBootstrap.native_handle() : null,
296296
config.socketOptions ? config.socketOptions.native_handle() : null,
297297
config.tlsCtx ? config.tlsCtx.native_handle() : null,
@@ -662,13 +662,39 @@ export class Mqtt5Client extends NativeResourceMixin(BufferedEventEmitter) imple
662662
}
663663
}
664664

665-
private static _s_on_message_received(client: Mqtt5Client, message : mqtt5_packet.PublishPacket) {
665+
private static _s_on_message_received(client: Mqtt5Client, message : mqtt5_packet.PublishPacket, pubackControlId? : any) {
666+
let acknowledgementControl : mqtt_shared.PublishAcknowledgementHandleWrapper | undefined = undefined;
667+
668+
if (pubackControlId !== undefined) {
669+
const controlId = pubackControlId;
670+
acknowledgementControl = new mqtt_shared.PublishAcknowledgementHandleWrapper(
671+
new mqtt_shared.PublishAcknowledgementHandle(() => {
672+
crt_native.mqtt5_client_invoke_publish_acknowledgement(client.native_handle(), controlId);
673+
})
674+
);
675+
}
676+
666677
let messageReceivedEvent: mqtt5.MessageReceivedEvent = {
667678
message: message
668679
};
669680

681+
if (acknowledgementControl) {
682+
messageReceivedEvent.acknowledgementControl = acknowledgementControl;
683+
}
684+
670685
process.nextTick(() => {
671-
client.emit(Mqtt5Client.MESSAGE_RECEIVED, messageReceivedEvent);
686+
client.emitWithCallback(Mqtt5Client.MESSAGE_RECEIVED, () => {
687+
/*
688+
* Even if corked, all listeners have had a chance to react to the event
689+
* and acquire the acknowledgement handle if they wanted to. If no one did so, then we do it ourselves.
690+
*/
691+
if (acknowledgementControl) {
692+
const handle = acknowledgementControl.acquireHandle();
693+
if (handle) {
694+
handle.invokeAcknowledgement();
695+
}
696+
}
697+
}, messageReceivedEvent);
672698
});
673699
}
674700
}

source/module.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1549,6 +1549,7 @@ static bool s_create_and_register_function(
15491549
CREATE_AND_REGISTER_FN(mqtt5_client_unsubscribe)
15501550
CREATE_AND_REGISTER_FN(mqtt5_client_publish)
15511551
CREATE_AND_REGISTER_FN(mqtt5_client_get_queue_statistics)
1552+
CREATE_AND_REGISTER_FN(mqtt5_client_invoke_publish_acknowledgement)
15521553
CREATE_AND_REGISTER_FN(mqtt5_client_close)
15531554

15541555
/* MQTT Client */

0 commit comments

Comments
 (0)