Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docsrc/typedoc-node.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"../lib/native/binding.d.ts",
"../lib/common/event.ts",
"../lib/common/http.ts",
"../lib/common/mqtt_shared.ts",
"../lib/common/platform.ts",
"../lib/common/resource_safety.ts",
"../lib/common/mqtt_shared.ts"
Expand Down
139 changes: 139 additions & 0 deletions lib/browser/mqtt5.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import {v4 as uuid} from "uuid";
import url from "url";
import {HttpsProxyAgent} from "https-proxy-agent";
import * as auth from "./auth";
import {once} from "events";
import * as model from "./mqtt_internal/model";
import * as mqtt_server from "@test/mqtt_server";
import * as promise from "../common/promise";
import * as mqtt_shared from "../common/mqtt_shared";
import * as mqtt5_packet from "../common/mqtt5_packet";

jest.setTimeout(30000);

Expand Down Expand Up @@ -554,3 +560,136 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir
await test_utils.doRetainTest(new mqtt5.Mqtt5Client(config), new mqtt5.Mqtt5Client(config), new mqtt5.Mqtt5Client(config));
})
});

class ClientTestFixture {

private server : mqtt_server.MqttServer;

constructor(config: mqtt_server.MqttServerConfig) {
this.server = new mqtt_server.MqttServer(config);
}

async start() {
await this.server.start();
}

getServer() : mqtt_server.MqttServer { return this.server; }
}

function buildDefaultClientConfig(fixture : ClientTestFixture) : mqtt5.Mqtt5ClientConfig {
return {
hostName: "localhost",
port: fixture.getServer().getPort(),
sessionBehavior: mqtt5.ClientSessionBehavior.Default,
connectProperties: {
keepAliveIntervalSeconds: 120
},
connectTimeoutMs: 10000,
websocketOptions: {
urlFactoryOptions: {
urlFactory: mqtt5.Mqtt5WebsocketUrlFactoryType.Ws
}
}
};
}

test('Manual Puback - Acquire', async () => {
let config: mqtt_server.MqttServerConfig = {
protocolVersion: model.ProtocolMode.Mqtt5
};

let fixture = new ClientTestFixture(config);
await fixture.start();

let clientConfig = buildDefaultClientConfig(fixture);
let client = new mqtt5.Mqtt5Client(clientConfig);

let connectionSuccess = once(client, "connectionSuccess");
let stopped = once(client, "stopped");

client.start();
await connectionSuccess;

let pubackReceived : boolean = false;
let ackHandlePromise : promise.LiftedPromise<mqtt_shared.PublishAcknowledgementHandle> = promise.newLiftedPromise<mqtt_shared.PublishAcknowledgementHandle>();

client.addListener(mqtt5.Mqtt5Client.MESSAGE_RECEIVED, (event : mqtt5.MessageReceivedEvent) => {
if (event.message.qos != mqtt5_packet.QoS.AtMostOnce) {
expect(event.acknowledgementControl).toBeDefined();

// @ts-ignore
ackHandlePromise.resolve(event.acknowledgementControl.acquireHandle());
}
});

let serverPuback= promise.newLiftedPromise<mqtt5_packet.PubackPacket>();
fixture.getServer().addListener('packetReceived', (packet : mqtt5_packet.IPacket) => {
if (packet.type == mqtt5_packet.PacketType.Puback) {
pubackReceived = true;
serverPuback.resolve(packet as mqtt5_packet.PubackPacket);
}
});

await client.publish({
topicName: "test/topic",
qos: mqtt5_packet.QoS.AtLeastOnce
});

let ackHandle : mqtt_shared.PublishAcknowledgementHandle = await ackHandlePromise.promise;

// Awkward way of trying to check that a puback didn't get automatically sent. We wait for a generous period of
// time and verify that the flag is still false (we can't check the promise for non-resolution).
await new Promise((resolve, reject) => setTimeout(resolve, 1000));

expect(pubackReceived).toBe(false);

ackHandle.invokeAcknowledgement();

await serverPuback.promise;
expect(pubackReceived).toBe(true);

client.stop();
await stopped;

fixture.getServer().stop();
});

test('Manual Puback - No Acquire', async () => {
let config: mqtt_server.MqttServerConfig = {
protocolVersion: model.ProtocolMode.Mqtt5
};

let fixture = new ClientTestFixture(config);
await fixture.start();

let clientConfig = buildDefaultClientConfig(fixture);
let client = new mqtt5.Mqtt5Client(clientConfig);

let connectionSuccess = once(client, "connectionSuccess");
let stopped = once(client, "stopped");

client.start();
await connectionSuccess;

let pubackReceived : boolean = false;
let serverPuback= promise.newLiftedPromise<mqtt5_packet.PubackPacket>();
fixture.getServer().addListener('packetReceived', (packet : mqtt5_packet.IPacket) => {
if (packet.type == mqtt5_packet.PacketType.Puback) {
pubackReceived = true;
serverPuback.resolve(packet as mqtt5_packet.PubackPacket);
}
});

await client.publish({
topicName: "test/topic",
qos: mqtt5_packet.QoS.AtLeastOnce
});

await serverPuback.promise;
expect(pubackReceived).toBe(true);

client.stop();
await stopped;

fixture.getServer().stop();
});
9 changes: 4 additions & 5 deletions lib/browser/mqtt5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export * from "../common/mqtt5";
export * from '../common/mqtt5_packet';
export { Mqtt5ClientConfigBase } from "../common/mqtt_shared";


/**
* Factory function that allows the user to completely control the url used to form the websocket handshake
* request.
Expand Down Expand Up @@ -144,7 +145,7 @@ export interface Mqtt5ClientConfig extends mqtt_shared.Mqtt5ClientConfigBase {
websocketOptions?: Mqtt5WebsocketConfig;
}

function convertSessionBehavuiorToSessionPolicy(behavior?: mqtt5.ClientSessionBehavior) : internal_mqtt_client.ResumeSessionPolicyType {
function convertSessionBehaviorToSessionPolicy(behavior?: mqtt5.ClientSessionBehavior) : internal_mqtt_client.ResumeSessionPolicyType {
switch (behavior) {
case mqtt5.ClientSessionBehavior.Default:
case mqtt5.ClientSessionBehavior.Clean:
Expand Down Expand Up @@ -235,7 +236,7 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli

let internalConnectOptions : internal_mqtt_client.ConnectOptions = {
keepAliveIntervalSeconds: this.config.connectProperties?.keepAliveIntervalSeconds ?? 1200,
resumeSessionPolicy: convertSessionBehavuiorToSessionPolicy(this.config.sessionBehavior),
resumeSessionPolicy: convertSessionBehaviorToSessionPolicy(this.config.sessionBehavior),
};

applyConnectPacketToInternalConnectOptions(internalConnectOptions, config.connectProperties);
Expand Down Expand Up @@ -623,9 +624,7 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
message: event.publish
};

setTimeout(() => {
this.emit(Mqtt5Client.MESSAGE_RECEIVED, messageReceivedEvent);
}, 0);
mqtt_shared.queueAcknowledgeableEvent(this, Mqtt5Client.MESSAGE_RECEIVED, messageReceivedEvent, "acknowledgementControl", event.acknowledgementControl);
}
}

Expand Down
114 changes: 112 additions & 2 deletions lib/browser/mqtt_internal/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import * as mqtt_server from "@test/mqtt_server";
import * as model from "./model";
import * as mqtt_client from "./client";
import * as mqtt5_packet from "../../common/mqtt5_packet";
import * as mqtt_shared from "../../common/mqtt_shared";
import * as promise from "../../common/promise";
import * as mqtt5 from "../../common/mqtt5";

import {once} from "events";
import * as ws from "../ws";
import {MqttServer} from "../../../test/mqtt_server";
import {PublishReceivedEvent} from "./client";

var websocket = require('@httptoolkit/websocket-stream')

Expand Down Expand Up @@ -1278,7 +1280,7 @@ async function doResubscribeTest(protocolVersion : model.ProtocolMode, resubscri
expect(reconnectSuccess1.connack.sessionPresent).toBeFalsy();

// seems easiest to use time rather than trying to reason about unresolved promises (negative events)
await new Promise<void>((resolve) => { setTimeout(() => { resolve(); }, 2000)});
await sleep( 2000);

// @ts-ignore
config.connackOverrides.sessionPresent = true;
Expand All @@ -1289,7 +1291,7 @@ async function doResubscribeTest(protocolVersion : model.ProtocolMode, resubscri
expect(reconnectSuccess2.connack.sessionPresent).toBeTruthy();

// seems easiest to use time rather than trying to reason about unresolved promises (negative events)
await new Promise<void>((resolve) => { setTimeout(() => { resolve(); }, 2000)});
await sleep(2000);

client.stop();
await stopped;
Expand Down Expand Up @@ -1321,3 +1323,111 @@ describe("Resubscribe - Enabled on session lost", () => {
await doResubscribeTest(protocolVersionToMode(protocolVersion), mqtt_client.ResubscribeModeType.EnabledOnSessionResumptionFail);
})
});

async function doManualAcknowledgementNoAcquireTest(protocolVersion : model.ProtocolMode, iterations: number) {
let config : mqtt_server.MqttServerConfig = {
protocolVersion: protocolVersion,
};

let fixture = new ClientTestFixture(config);
await fixture.start();

let client = new mqtt_client.Client(buildDefaultClientConfig(fixture, protocolVersion));

let connectionSuccess = once(client, "connectionSuccess");
let stopped = once(client, "stopped");

client.start();
await connectionSuccess;

let serverPuback= promise.newLiftedPromise<mqtt5_packet.PubackPacket>();
fixture.getServer().addListener('packetReceived', (packet : mqtt5_packet.IPacket) => {
if (packet.type == mqtt5_packet.PacketType.Puback) {
serverPuback.resolve(packet as mqtt5_packet.PubackPacket);
}
});

await client.publish({
topicName: "test/topic",
qos: mqtt5_packet.QoS.AtLeastOnce
});

await serverPuback.promise;

client.stop();
await stopped;

fixture.getServer().stop();
}

describe("Manual Acknowledgement - No Acquire", () => {
test.each(modes)("MQTT %p", async (protocolVersion) => {
await doManualAcknowledgementNoAcquireTest(protocolVersionToMode(protocolVersion), 4);
})
});

async function doManualAcknowledgementAcquireTest(protocolVersion : model.ProtocolMode, iterations: number) {
let config : mqtt_server.MqttServerConfig = {
protocolVersion: protocolVersion,
};

let fixture = new ClientTestFixture(config);
await fixture.start();

let client = new mqtt_client.Client(buildDefaultClientConfig(fixture, protocolVersion));

let connectionSuccess = once(client, "connectionSuccess");
let stopped = once(client, "stopped");

client.start();
await connectionSuccess;

let pubackReceived : boolean = false;
let ackHandlePromise : promise.LiftedPromise<mqtt_shared.PublishAcknowledgementHandle> = promise.newLiftedPromise<mqtt_shared.PublishAcknowledgementHandle>();

client.addListener('publishReceived', (event : PublishReceivedEvent) => {
if (event.publish.qos != mqtt5_packet.QoS.AtMostOnce) {
expect(event.acknowledgementControl).toBeDefined();

// @ts-ignore
ackHandlePromise.resolve(event.acknowledgementControl.acquireHandle());
}
});

let serverPuback= promise.newLiftedPromise<mqtt5_packet.PubackPacket>();
fixture.getServer().addListener('packetReceived', (packet : mqtt5_packet.IPacket) => {
if (packet.type == mqtt5_packet.PacketType.Puback) {
pubackReceived = true;
serverPuback.resolve(packet as mqtt5_packet.PubackPacket);
}
});

await client.publish({
topicName: "test/topic",
qos: mqtt5_packet.QoS.AtLeastOnce
});

let ackHandle : mqtt_shared.PublishAcknowledgementHandle = await ackHandlePromise.promise;

// Awkward way of trying to check that a puback didn't get automatically sent. We wait for a generous period of
// time and verify that the flag is still false (we can't check the promise for non-resolution).
await sleep(1000);

expect(pubackReceived).toBe(false);

ackHandle.invokeAcknowledgement();

await serverPuback.promise;
expect(pubackReceived).toBe(true);

client.stop();
await stopped;

fixture.getServer().stop();
}

describe("Manual Acknowledgement - Acquire", () => {
test.each(modes)("MQTT %p", async (protocolVersion) => {
await doManualAcknowledgementAcquireTest(protocolVersionToMode(protocolVersion), 4);
})
});
Loading
Loading