Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion lib/browser/mqtt.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

import * as test_env from "@test/test_env";
import * as retry from "@test/retry";
import * as mqtt_server from "@test/mqtt_server";
import * as test_metrics from "@test/metrics";
import { ClientBootstrap, SocketOptions } from './io';
import { MqttClient, MqttConnectionConfig } from './mqtt';
import { MqttClient, MqttConnectionConfig, MqttClientConnection } from './mqtt';
import { v4 as uuid } from 'uuid';
import * as mqtt_shared from "../common/mqtt_shared";

jest.setTimeout(30000);

Expand Down Expand Up @@ -61,3 +64,38 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt311_is_valid_ws_auth_mqtt())(
await test_connection(config, new MqttClient(new ClientBootstrap()));
})
});

async function doMetricsTestConnect311(server: mqtt_server.MqttServer, disableMetrics: boolean, username?: string) {
let clientConfig : MqttConnectionConfig = {
client_id: "irrelevant",
host_name: "localhost",
port: server.getPort(),
socket_options: new SocketOptions(),
disable_metrics: disableMetrics,
};

if (username !== undefined) {
clientConfig.username = username;
}

let client = new MqttClient();
let connection = new MqttClientConnection(client, clientConfig);

await connection.connect();
}

test('mqtt311 metrics - enabled, undefined username', async () => {
await test_metrics.doMetricsUsernameTest(mqtt_shared.ProtocolMode.Mqtt311, doMetricsTestConnect311, false);
});

test('mqtt311 metrics - disabled, undefined username', async () => {
await test_metrics.doMetricsUsernameTest(mqtt_shared.ProtocolMode.Mqtt311, doMetricsTestConnect311, true);
});

test('mqtt311 metrics - enabled, non-empty username', async () => {
await test_metrics.doMetricsUsernameTest(mqtt_shared.ProtocolMode.Mqtt311, doMetricsTestConnect311, false, "squidward");
});

test('mqtt311 metrics - disabled, non-empty username', async () => {
await test_metrics.doMetricsUsernameTest(mqtt_shared.ProtocolMode.Mqtt311, doMetricsTestConnect311, true, "krustykrab");
});
22 changes: 13 additions & 9 deletions lib/browser/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import * as ws from "./ws"
import * as auth from "./auth";
import * as promise from "../common/promise";
import * as model from "./mqtt_internal/model";
import * as mqtt_shared from "../common/mqtt_shared";
import * as mqtt_shared_browser from "./mqtt_shared_browser";

import {Node as TrieNode, Trie, TrieOp} from "./trie";

Expand All @@ -37,8 +39,6 @@ import {
Payload,
QoS
} from "../common/mqtt";

import {normalize_payload, normalize_payload_to_buffer, MqttConnectionConfigBase} from "../common/mqtt_shared";
import {once} from "events";

export {
Expand Down Expand Up @@ -118,7 +118,7 @@ export type AWSCredentials = auth.AWSCredentials;
*
* @category MQTT
*/
export interface MqttConnectionConfig extends MqttConnectionConfigBase {
export interface MqttConnectionConfig extends mqtt_shared.MqttConnectionConfigBase {
/** Socket options, ignored in browser */
socket_options: SocketOptions;

Expand Down Expand Up @@ -244,18 +244,22 @@ export class MqttClientConnection extends BufferedEventEmitter {
internalConnectOptions.clientId = this.config.client_id;
}

if (this.config.username) {
internalConnectOptions.username = this.config.username;
if (this.config.disable_metrics) {
if (this.config.username) {
internalConnectOptions.username = this.config.username;
}
} else {
internalConnectOptions.username = mqtt_shared_browser.buildFinalUsernameFromMetrics(new mqtt_shared.AwsIoTDeviceSDKMetrics(), this.config.username);
}

if (this.config.password) {
internalConnectOptions.password = normalize_payload_to_buffer(this.config.password);
internalConnectOptions.password = mqtt_shared_browser.normalize_payload_to_buffer(this.config.password);
}

if (this.config.will) {
internalConnectOptions.will = {
topicName: this.config.will.topic,
payload: normalize_payload_to_buffer(this.config.will.payload),
payload: mqtt_shared_browser.normalize_payload_to_buffer(this.config.will.payload),
qos: this.config.will.qos,
retain: this.config.will.retain,
};
Expand Down Expand Up @@ -291,7 +295,7 @@ export class MqttClientConnection extends BufferedEventEmitter {
}

let internalConfig : internal_mqtt_client.ClientConfig = {
protocolVersion: internal_mqtt_client.ProtocolMode.Mqtt311,
protocolVersion: mqtt_shared.ProtocolMode.Mqtt311,
offlineQueuePolicy: internal_mqtt_client.OfflineQueuePolicy.PreserveQos1PlusPublishes,
connectOptions: internalConnectOptions,
pingTimeoutMillis: this.config.ping_timeout ? this.config.ping_timeout : 30 * 1000,
Expand Down Expand Up @@ -496,7 +500,7 @@ export class MqttClientConnection extends BufferedEventEmitter {
* * For QoS 2, completes when PUBCOMP is received.
*/
async publish(topic: string, payload: Payload, qos: QoS, retain: boolean = false): Promise<MqttRequest> {
let payload_data = normalize_payload(payload);
let payload_data = mqtt_shared.normalize_payload(payload);

let options : internal_mqtt_client.PublishOptions = {};
if (this.config.protocol_operation_timeout) {
Expand Down
49 changes: 46 additions & 3 deletions lib/browser/mqtt5.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import url from "url";
import {HttpsProxyAgent} from "https-proxy-agent";
import * as auth from "./auth";
import {once} from "events";
import * as model from "./mqtt_internal/model";
import * as mqtt_server from "@test/mqtt_server";
import * as test_metrics from "@test/metrics";
import * as promise from "../common/promise";
import * as mqtt_shared from "../common/mqtt_shared";
import * as mqtt5_packet from "../common/mqtt5_packet";
Expand Down Expand Up @@ -595,7 +595,7 @@ function buildDefaultClientConfig(fixture : ClientTestFixture) : mqtt5.Mqtt5Clie

test('Manual Puback - Acquire', async () => {
let config: mqtt_server.MqttServerConfig = {
protocolVersion: model.ProtocolMode.Mqtt5
protocolVersion: mqtt_shared.ProtocolMode.Mqtt5
};

let fixture = new ClientTestFixture(config);
Expand Down Expand Up @@ -656,7 +656,7 @@ test('Manual Puback - Acquire', async () => {

test('Manual Puback - No Acquire', async () => {
let config: mqtt_server.MqttServerConfig = {
protocolVersion: model.ProtocolMode.Mqtt5
protocolVersion: mqtt_shared.ProtocolMode.Mqtt5
};

let fixture = new ClientTestFixture(config);
Expand Down Expand Up @@ -692,4 +692,47 @@ test('Manual Puback - No Acquire', async () => {
await stopped;

fixture.getServer().stop();
});

async function doMetricsTestConnect5(server: mqtt_server.MqttServer, disableMetrics: boolean, username?: string) {
let clientConfig : mqtt5.Mqtt5ClientConfig = {
hostName: "localhost",
port: server.getPort(),
disableMetrics: disableMetrics,
websocketOptions: {
urlFactoryOptions: {
urlFactory: mqtt5.Mqtt5WebsocketUrlFactoryType.Ws
}
}
};

if (username !== undefined) {
clientConfig.connectProperties = {
keepAliveIntervalSeconds: 1200,
username: username
};
}

let client = new mqtt5.Mqtt5Client(clientConfig);

let connectionSuccess = once(client, 'connectionSuccess');
client.start();

await connectionSuccess;
}

test('mqtt5 metrics - enabled, undefined username', async () => {
await test_metrics.doMetricsUsernameTest(mqtt_shared.ProtocolMode.Mqtt5, doMetricsTestConnect5, false);
});

test('mqtt5 metrics - disabled, undefined username', async () => {
await test_metrics.doMetricsUsernameTest(mqtt_shared.ProtocolMode.Mqtt5, doMetricsTestConnect5, true);
});

test('mqtt5 metrics - enabled, non-empty username', async () => {
await test_metrics.doMetricsUsernameTest(mqtt_shared.ProtocolMode.Mqtt5, doMetricsTestConnect5, false, "hello");
});

test('mqtt5 metrics - disabled, non-empty username', async () => {
await test_metrics.doMetricsUsernameTest(mqtt_shared.ProtocolMode.Mqtt5, doMetricsTestConnect5, true, "world");
});
19 changes: 15 additions & 4 deletions lib/browser/mqtt5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {CrtError} from "./error";
import * as ws from "./ws";
import * as mqtt_shared from "../common/mqtt_shared";
import * as auth from "./auth";
import * as mqtt_shared_browser from "./mqtt_shared_browser";
import * as validate from "./mqtt_internal/validate";

export * from "../common/mqtt5";
Expand Down Expand Up @@ -162,7 +163,17 @@ function convertSessionBehaviorToSessionPolicy(behavior?: mqtt5.ClientSessionBeh
}
}

function applyConnectPacketToInternalConnectOptions(internalConnectOptions : internal_mqtt_client.ConnectOptions, connectProperties?: mqtt5_packet.ConnectPacket) {
function buildInternalConnectOptions(internalConnectOptions : internal_mqtt_client.ConnectOptions, clientConfig: Mqtt5ClientConfig, connectProperties?: mqtt5_packet.ConnectPacket) {
if (clientConfig.disableMetrics) {
if (connectProperties?.username) {
internalConnectOptions.username = connectProperties.username;
}
} else {
internalConnectOptions.username = mqtt_shared_browser.buildFinalUsernameFromMetrics(new mqtt_shared.AwsIoTDeviceSDKMetrics(), connectProperties?.username);
}

internalConnectOptions.username = mqtt_shared_browser.buildFinalUsernameFromMetrics(new mqtt_shared.AwsIoTDeviceSDKMetrics(), connectProperties?.username);

if (!connectProperties) {
return;
}
Expand Down Expand Up @@ -239,7 +250,7 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
resumeSessionPolicy: convertSessionBehaviorToSessionPolicy(this.config.sessionBehavior),
};

applyConnectPacketToInternalConnectOptions(internalConnectOptions, config.connectProperties);
buildInternalConnectOptions(internalConnectOptions, config, config.connectProperties);

let provider : auth.CredentialsProvider | undefined = undefined;
if (this.config.websocketOptions) {
Expand Down Expand Up @@ -267,7 +278,7 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
};

let internalConfig : internal_mqtt_client.ClientConfig = {
protocolVersion: internal_mqtt_client.ProtocolMode.Mqtt5,
protocolVersion: mqtt_shared.ProtocolMode.Mqtt5,
offlineQueuePolicy: internal_mqtt_client.OfflineQueuePolicy.PreserveQos1PlusPublishes,
connectOptions: internalConnectOptions,
pingTimeoutMillis: DEFAULT_MQTT_PING_TIMEOUT_MS,
Expand Down Expand Up @@ -330,7 +341,7 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
// trump the stop request
if (disconnectPacket) {
disconnectPacket.type = mqtt5_packet.PacketType.Disconnect;
validate.validateInitialOutboundPacket(disconnectPacket, internal_mqtt_client.ProtocolMode.Mqtt5);
validate.validateInitialOutboundPacket(disconnectPacket, mqtt_shared.ProtocolMode.Mqtt5);
}

this.internalClient.stop(disconnectPacket);
Expand Down
Loading
Loading