Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docsrc/typedoc-browser.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"../lib/browser/mqtt5.ts",
"../lib/browser/ws.ts",
"../lib/common/event.ts",
"../lib/common/http.ts"
"../lib/common/http.ts",
"../lib/common/mqtt_shared.ts"
],
"tsconfig": "../tsconfig.browser.json",
"out": "../docs/browser",
Expand Down
3 changes: 2 additions & 1 deletion docsrc/typedoc-node.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"../lib/common/event.ts",
"../lib/common/http.ts",
"../lib/common/platform.ts",
"../lib/common/resource_safety.ts"
"../lib/common/resource_safety.ts",
"../lib/common/mqtt_shared.ts"
],
"out": "../docs/node",
"excludeExternals": true,
Expand Down
31 changes: 31 additions & 0 deletions lib/common/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,25 @@ import { EventEmitter } from 'events';
*/
export type EventKey = string | symbol;

export type EventEmissionCallback = () => void

/**
* @internal
*/
class BufferedEvent {
public callback?: EventEmissionCallback;
public next?: BufferedEvent;
public args: any[];
constructor(public event: EventKey, args: any[]) {
this.args = args;
}

static newWithEmissionCallback(key: EventKey, callback: EventEmissionCallback, args: any[]) : BufferedEvent {
let bufferedEvent : BufferedEvent = new BufferedEvent(key, args);
bufferedEvent.callback = callback;

return bufferedEvent;
}
}

/**
Expand Down Expand Up @@ -68,6 +78,9 @@ export class BufferedEventEmitter extends EventEmitter {
while (this.eventQueue) {
const event = this.eventQueue;
super.emit(event.event, ...event.args);
if (event.callback) {
event.callback();
}
this.eventQueue = this.eventQueue.next;
}
}
Expand All @@ -94,4 +107,22 @@ export class BufferedEventEmitter extends EventEmitter {

return super.emit(event, ...args);
}

emitWithCallback(event: EventKey, emissionCallback: EventEmissionCallback, ...args: any[]) : boolean {
if (this.corked) {
// queue requests in order
let last = this.lastQueuedEvent;
this.lastQueuedEvent = BufferedEvent.newWithEmissionCallback(event, emissionCallback, args);
if (last) {
last.next = this.lastQueuedEvent;
} else {
this.eventQueue = this.lastQueuedEvent;
}
return this.listeners(event).length > 0;
}

let result = super.emit(event, ...args);
emissionCallback();
return result;
}
}
10 changes: 10 additions & 0 deletions lib/common/mqtt5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/

import * as mqtt5_packet from "./mqtt5_packet";
import * as mqtt_shared from "../common/mqtt_shared";
import { ICrtError } from "./error";

/**
Expand Down Expand Up @@ -339,6 +340,15 @@ export interface MessageReceivedEvent {
* PUBLISH packet received from the server
*/
message: mqtt5_packet.PublishPacket;

/**
* An object that allows the event recipient to take control of when the Publish packet's acknowledgement
* packet is sent. If the acknowledgement handle is not acquired by an event listener during the emission
* process, the client will automatically send the acknowledgement itself.
*
* Undefined if this publish is not acknowledgeable (QoS 0).
*/
acknowledgementControl?: mqtt_shared.PublishAcknowledgementHandleWrapper
}

/**
Expand Down
115 changes: 114 additions & 1 deletion lib/common/mqtt_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* @packageDocumentation
*/

import * as event from "./event";


/**
* Converts payload to Buffer or string regardless of the supplied type
Expand Down Expand Up @@ -121,4 +123,115 @@ export function isValidTopic(topic: any) : boolean {
let topicAsString = topic as string;

return isValidTopicInternal(topicAsString, false);
}
}

export type PublishAcknowledgementFunctor = () => void;

/**
* Wrapper class containing a one-use singleton handle that can be used to trigger sending the acknowledgement (Puback in
* QoS 1, Pubrec in QoS 2) packet for an incoming publish.
*/
export class PublishAcknowledgementHandleWrapper {

private ackHandle : PublishAcknowledgementHandle | null;

constructor(handle : PublishAcknowledgementHandle | null) {
this.ackHandle = handle;
}

/**
Comment thread
sbSteveK marked this conversation as resolved.
* Attempt to take the acknowledgement handle held by the wrapper. This will only succeed for the first caller;
* after the initial call, null will be returned. By taking the handle, the caller assumes responsibility
* for sending the acknowledgement packet associated with the incoming publish packet. Failing to trigger the
* acknowledgement will cause the broker to potentially re-send the publish.
*/
acquireHandle() : PublishAcknowledgementHandle | null {
let handle = this.ackHandle;
this.ackHandle = null;

return handle;
}
}

function movePublishAcknowledgementHandleWrapper(wrapper: PublishAcknowledgementHandleWrapper | undefined, compositionFunctor?: PublishAcknowledgementFunctor) : PublishAcknowledgementHandleWrapper | undefined {
if (wrapper) {
let handle = wrapper.acquireHandle();
if (compositionFunctor && handle) {
let interiorHandle = handle;
handle = new PublishAcknowledgementHandle(() => {
interiorHandle.invokeAcknowledgement();
compositionFunctor();
});
}

return new PublishAcknowledgementHandleWrapper(handle);
}

return undefined;
}

/** @internal */
export function emitAcknowledgeableEvent<T>(emitter: event.BufferedEventEmitter, ackEvent: string, ackEventPayload: T, wrapperFieldName: string, ackHandleWrapper?: PublishAcknowledgementHandleWrapper, compositionFunctor?: PublishAcknowledgementFunctor) : void {
ackHandleWrapper = movePublishAcknowledgementHandleWrapper(ackHandleWrapper, compositionFunctor);
if (ackHandleWrapper) {
(ackEventPayload as any)[wrapperFieldName] = ackHandleWrapper;
emitter.emitWithCallback(ackEvent, () => {
if (ackHandleWrapper) {
let handle = ackHandleWrapper.acquireHandle();
if (handle) {
// Even if corked, all listeners have had a chance to react to the event
// and acquire the acknowledgement handle if they wanted to. If no one did so, then we do it ourselves.
handle.invokeAcknowledgement();
}
}
}, ackEventPayload);
} else {
emitter.emit(ackEvent, ackEventPayload);
}
}

/** @internal */
export function queueAcknowledgeableEvent<T>(emitter: event.BufferedEventEmitter, ackEvent: string, ackEventPayload: T, wrapperFieldName: string, ackHandleWrapper?: PublishAcknowledgementHandleWrapper, compositionFunctor?: PublishAcknowledgementFunctor) : void {
let wrapper : PublishAcknowledgementHandleWrapper | undefined = movePublishAcknowledgementHandleWrapper(ackHandleWrapper, compositionFunctor);

queueMicrotask(() => {
if (wrapper) {
(ackEventPayload as any)[wrapperFieldName] = wrapper;
emitter.emitWithCallback(ackEvent, () => {
if (wrapper) {
let handle = wrapper.acquireHandle();
if (handle) {
// Even if corked, all listeners have had a chance to react to the event
// and acquire the acknowledgement handle if they wanted to. If no one did so, then we do it ourselves.
handle.invokeAcknowledgement();
}
}
}, ackEventPayload);
} else {
emitter.emit(ackEvent, ackEventPayload);
}
});
}

/**
* Object that allows the holder to trigger the acknowledgement for an associated publish packet.
*/
export class PublishAcknowledgementHandle {

private acknowledgementFunction? : PublishAcknowledgementFunctor;

constructor(acknowledgementFunction : PublishAcknowledgementFunctor) {
this.acknowledgementFunction = acknowledgementFunction;
}

/**
* trigger the acknowledgement for an associated Publish packet
*/
invokeAcknowledgement() : void {
let acknowledgementFunction = this.acknowledgementFunction;
this.acknowledgementFunction = undefined;
if (acknowledgementFunction) {
acknowledgementFunction();
}
}
}
6 changes: 5 additions & 1 deletion lib/native/binding.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { PublishCompletionResult } from "../common/mqtt5";
import * as eventstream from "./eventstream";
import { ConnectionStatistics } from "./mqtt";
import * as mqtt_request_response from "../native/mqtt_request_response";
import * as mqtt_shared from "../common/mqtt_shared";


/**
Expand Down Expand Up @@ -199,7 +200,7 @@ export function mqtt5_client_new(
on_connection_success_handler: (client: Mqtt5Client, connack: mqtt5_packet.ConnackPacket, settings: NegotiatedSettings) => void,
on_connection_failure_handler: (client: Mqtt5Client, errorCode: number, connack?: mqtt5_packet.ConnackPacket) => void,
on_disconnection_handler: (client: Mqtt5Client, errorCode: number, disconnect?: mqtt5_packet.DisconnectPacket) => void,
on_message_received_handler: (client: Mqtt5Client, message: mqtt5_packet.PublishPacket) => void,
on_message_received_handler: (client: Mqtt5Client, message: mqtt5_packet.PublishPacket, pubackControlId?: any) => void,
client_bootstrap?: NativeHandle,
socket_options?: NativeHandle,
tls_ctx?: NativeHandle,
Expand All @@ -224,6 +225,9 @@ export function mqtt5_client_publish(client: NativeHandle, publish_packet: mqtt5
/** @internal */
export function mqtt5_client_get_queue_statistics(client: NativeHandle) : ClientStatistics;

/** @internal */
export function mqtt5_client_invoke_publish_acknowledgement(client: NativeHandle, pubackControlId: NativeHandle) : void;

/** @internal */
export function mqtt5_client_close(client: NativeHandle) : void;

Expand Down
54 changes: 53 additions & 1 deletion lib/native/mqtt5.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {v4 as uuid} from "uuid";
import * as io from "./io";
import {once} from "events";

jest.setTimeout(30000);
jest.setTimeout(45000);

function createNodeSpecificTestConfig (testType: test_utils.SuccessfulConnectionTestType) : mqtt5.Mqtt5ClientConfig {

Expand Down Expand Up @@ -592,6 +592,58 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir
})
});

test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Manual publish acknowledgement - acknowledgment hold', async() =>{
// hold publish acknowledgement and verify broker re-delivers the message
await retry.networkTimeoutRetryWrapper( async () => {
let topic: string = `test-${uuid()}`;
let testPayload: Buffer = Buffer.from(`redrive-${uuid()}`, "utf-8");

let config: mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig();
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);

await test_utils.subPubAcquireControlTest(client, topic, testPayload);
})
});

test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Manual publish acknowledgement - acknowledgment invoke', async() =>{
// invoke publish acknowledgement, verify no re-delivery after 35 seconds
await retry.networkTimeoutRetryWrapper( async () => {
let topic: string = `test-${uuid()}`;
let testPayload: Buffer = Buffer.from(`redrive-${uuid()}`, "utf-8");

let config: mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig();
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);

await test_utils.subPubAcquireInvokeControlTest(client, topic, testPayload);
})
});

test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Manual publish acknowledgement - double-call', async() =>{
// acquireHandle() twice: first call returns a handle, second call returns null
await retry.networkTimeoutRetryWrapper( async () => {
let topic: string = `test-${uuid()}`;
let testPayload: Buffer = Buffer.from(`redrive-${uuid()}`, "utf-8");

let config: mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig();
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);

await test_utils.subPubDoubleAcquireControlTest(client, topic, testPayload);
})
});

test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Manual publish acknowledgement - post-calback acquire', async() =>{
// acquireHandle() after the messageReceived callback has returned results in null
await retry.networkTimeoutRetryWrapper( async () => {
let topic: string = `test-${uuid()}`;
let testPayload: Buffer = Buffer.from(`redrive-${uuid()}`, "utf-8");

let config: mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig();
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);

await test_utils.subPubPostCallbackAcquireControlTest(client, topic, testPayload);
})
});

test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Will test', async () => {
await retry.networkTimeoutRetryWrapper( async () => {
let willPayload: Buffer = Buffer.from("ToMyChildrenIBequeathNothing", "utf-8");
Expand Down
32 changes: 29 additions & 3 deletions lib/native/mqtt5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ export class Mqtt5Client extends NativeResourceMixin(BufferedEventEmitter) imple
(client: Mqtt5Client, connack : mqtt5_packet.ConnackPacket, settings: mqtt5.NegotiatedSettings) => { Mqtt5Client._s_on_connection_success(client, connack, settings); },
(client: Mqtt5Client, errorCode: number, connack? : mqtt5_packet.ConnackPacket) => { Mqtt5Client._s_on_connection_failure(client, new CrtError(errorCode), connack); },
(client: Mqtt5Client, errorCode: number, disconnect? : mqtt5_packet.DisconnectPacket) => { Mqtt5Client._s_on_disconnection(client, new CrtError(errorCode), disconnect); },
(client: Mqtt5Client, message : mqtt5_packet.PublishPacket) => { Mqtt5Client._s_on_message_received(client, message); },
(client: Mqtt5Client, message : mqtt5_packet.PublishPacket, pubackControlId?: any) => { Mqtt5Client._s_on_message_received(client, message, pubackControlId); },
config.clientBootstrap ? config.clientBootstrap.native_handle() : null,
config.socketOptions ? config.socketOptions.native_handle() : null,
config.tlsCtx ? config.tlsCtx.native_handle() : null,
Expand Down Expand Up @@ -662,13 +662,39 @@ export class Mqtt5Client extends NativeResourceMixin(BufferedEventEmitter) imple
}
}

private static _s_on_message_received(client: Mqtt5Client, message : mqtt5_packet.PublishPacket) {
private static _s_on_message_received(client: Mqtt5Client, message : mqtt5_packet.PublishPacket, pubackControlId? : any) {
let acknowledgementControl : mqtt_shared.PublishAcknowledgementHandleWrapper | undefined = undefined;

if (pubackControlId !== undefined) {
const controlId = pubackControlId;
acknowledgementControl = new mqtt_shared.PublishAcknowledgementHandleWrapper(
new mqtt_shared.PublishAcknowledgementHandle(() => {
crt_native.mqtt5_client_invoke_publish_acknowledgement(client.native_handle(), controlId);
})
);
}

let messageReceivedEvent: mqtt5.MessageReceivedEvent = {
message: message
};

if (acknowledgementControl) {
Comment thread
sbSteveK marked this conversation as resolved.
messageReceivedEvent.acknowledgementControl = acknowledgementControl;
}

process.nextTick(() => {
client.emit(Mqtt5Client.MESSAGE_RECEIVED, messageReceivedEvent);
client.emitWithCallback(Mqtt5Client.MESSAGE_RECEIVED, () => {
/*
* Even if corked, all listeners have had a chance to react to the event
* and acquire the acknowledgement handle if they wanted to. If no one did so, then we do it ourselves.
*/
if (acknowledgementControl) {
const handle = acknowledgementControl.acquireHandle();
if (handle) {
handle.invokeAcknowledgement();
}
}
}, messageReceivedEvent);
});
}
}
1 change: 1 addition & 0 deletions source/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,7 @@ static bool s_create_and_register_function(
CREATE_AND_REGISTER_FN(mqtt5_client_unsubscribe)
CREATE_AND_REGISTER_FN(mqtt5_client_publish)
CREATE_AND_REGISTER_FN(mqtt5_client_get_queue_statistics)
CREATE_AND_REGISTER_FN(mqtt5_client_invoke_publish_acknowledgement)
CREATE_AND_REGISTER_FN(mqtt5_client_close)

/* MQTT Client */
Expand Down
Loading
Loading