Skip to content

Commit 0f5a3a5

Browse files
author
Bret Ambrose
committed
Move browser-only mqtt functionality to browser
1 parent a0bb400 commit 0f5a3a5

8 files changed

Lines changed: 259 additions & 241 deletions

File tree

lib/browser/mqtt.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ import {
3939
Payload,
4040
QoS
4141
} from "../common/mqtt";
42-
import {normalize_payload, normalize_payload_to_buffer} from "../common/mqtt_shared";
42+
import {normalize_payload} from "../common/mqtt_shared";
43+
import {normalize_payload_to_buffer} from "./mqtt_shared_browser";
4344
import {once} from "events";
4445

4546
export {

lib/browser/mqtt_internal/client.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import * as mqtt5 from "../../common/mqtt5"
88
import * as protocol from "./protocol"
99
import * as model from "./model"
1010
import * as mqtt_shared from "../../common/mqtt_shared";
11+
import * as mqtt_shared_browser from "../mqtt_shared_browser";
1112
import * as promise from "../../common/promise"
1213
import * as ws from "../ws"
1314
import {flogError, flogDebug, flogInfo, logDebug, logInfo} from "../../common/io";
@@ -1005,7 +1006,7 @@ export class Client extends BufferedEventEmitter {
10051006
lastReconnectDelay : this.lastReconnectDelay,
10061007
connectionFailureCount : this.connectionFailureCount,
10071008
};
1008-
let nextDelay : number = mqtt_shared.calculateNextReconnectDelay(reconnectContext);
1009+
let nextDelay : number = mqtt_shared_browser.calculateNextReconnectDelay(reconnectContext);
10091010

10101011
this.lastReconnectDelay = nextDelay;
10111012
this.connectionFailureCount += 1;

lib/browser/mqtt_internal/validate.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {CrtError} from "../error";
77
import * as encoder from "./encoder";
88
import * as mqtt5_packet from '../../common/mqtt5_packet';
99
import * as mqtt5_common from "../../common/mqtt5";
10-
import * as mqtt_shared from '../../common/mqtt_shared';
10+
import * as mqtt_shared_browser from "../mqtt_shared_browser";
1111
import * as model from "./model";
1212

1313
/**
@@ -387,7 +387,7 @@ export function validateOptionalBinaryData(value: mqtt5_packet.BinaryData | unde
387387

388388
function validateTopic(value: string, fieldName:string) {
389389
validateString(value, fieldName);
390-
if (!mqtt_shared.isValidTopic(value)) {
390+
if (!mqtt_shared_browser.isValidTopic(value)) {
391391
throw new CrtError(`value "${value}" of field "${fieldName}" is not a valid topic`);
392392
}
393393
}
@@ -402,7 +402,7 @@ function validateOptionalTopic(value: string | undefined, fieldName:string) {
402402

403403
function validateTopicFilter(value: string, fieldName:string) {
404404
validateString(value, fieldName);
405-
if (!mqtt_shared.isValidTopicFilter(value)) {
405+
if (!mqtt_shared_browser.isValidTopicFilter(value)) {
406406
throw new CrtError(`value "${value}" of field "${fieldName}" is not a valid topic filter`);
407407
}
408408
}
@@ -574,7 +574,7 @@ function validateBinaryPuback(packet: model.PubackPacketBinary, mode: model.Prot
574574
}
575575

576576
function validateSubscription(subscription: model.SubscriptionBinary, mode: model.ProtocolMode, settings: mqtt5_common.NegotiatedSettings) {
577-
let properties = mqtt_shared.computeTopicProperties(subscription.topicFilterAsString, true);
577+
let properties = mqtt_shared_browser.computeTopicProperties(subscription.topicFilterAsString, true);
578578
if (properties.isShared) {
579579
if (!settings.sharedSubscriptionsAvailable) {
580580
throw new CrtError("Shared subscriptions are not supported by the server");

lib/browser/mqtt_request_response.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {BufferedEventEmitter} from "../common/event";
2121
import {CrtError} from "./error";
2222
import {LiftedPromise, newLiftedPromise} from "../common/promise";
2323
import * as io from "../common/io";
24-
import * as mqtt_shared from "../common/mqtt_shared";
24+
import * as mqtt_shared_browser from "./mqtt_shared_browser";
2525

2626
export * from "../common/mqtt_request_response";
2727

@@ -1075,7 +1075,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_
10751075
}
10761076

10771077
function validateResponsePath(responsePath: mqtt_request_response.ResponsePath) {
1078-
if (!mqtt_shared.isValidTopic(responsePath.topic)) {
1078+
if (!mqtt_shared_browser.isValidTopic(responsePath.topic)) {
10791079
throw new CrtError(`"${JSON.stringify(responsePath.topic)})" is not a valid topic`);
10801080
}
10811081

@@ -1104,7 +1104,7 @@ function validateRequestOptions(requestOptions: mqtt_request_response.RequestRes
11041104
}
11051105

11061106
for (const topicFilter of requestOptions.subscriptionTopicFilters) {
1107-
if (!mqtt_shared.isValidTopicFilter(topicFilter)) {
1107+
if (!mqtt_shared_browser.isValidTopicFilter(topicFilter)) {
11081108
throw new CrtError(`Invalid request options - "${JSON.stringify(topicFilter)}" is not a valid topic filter`);
11091109
}
11101110
}
@@ -1133,7 +1133,7 @@ function validateRequestOptions(requestOptions: mqtt_request_response.RequestRes
11331133
throw new CrtError("Invalid request options - null publishTopic");
11341134
}
11351135

1136-
if (!mqtt_shared.isValidTopic(requestOptions.publishTopic)) {
1136+
if (!mqtt_shared_browser.isValidTopic(requestOptions.publishTopic)) {
11371137
throw new CrtError(`Invalid request options - "${JSON.stringify(requestOptions.publishTopic)}" is not a valid topic`);
11381138
}
11391139

@@ -1167,7 +1167,7 @@ function validateStreamingOptions(streamOptions: mqtt_request_response.Streaming
11671167
throw new CrtError("Invalid streaming options - subscriptionTopicFilter not a string");
11681168
}
11691169

1170-
if (!mqtt_shared.isValidTopicFilter(streamOptions.subscriptionTopicFilter)) {
1170+
if (!mqtt_shared_browser.isValidTopicFilter(streamOptions.subscriptionTopicFilter)) {
11711171
throw new CrtError("Invalid streaming options - subscriptionTopicFilter not a valid topic filter");
11721172
}
11731173
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
import * as mqtt_shared_browser from "./mqtt_shared_browser";
7+
8+
test('MQTT topic properties - valid topic filter', async () => {
9+
expect(mqtt_shared_browser.computeTopicProperties("a/b/c", true).isValid).toEqual(true);
10+
expect(mqtt_shared_browser.computeTopicProperties("#", true).isValid).toEqual(true);
11+
expect(mqtt_shared_browser.computeTopicProperties("/#", true).isValid).toEqual(true);
12+
expect(mqtt_shared_browser.computeTopicProperties("sports/basketball/#", true).isValid).toEqual(true);
13+
expect(mqtt_shared_browser.computeTopicProperties("+", true).isValid).toEqual(true);
14+
expect(mqtt_shared_browser.computeTopicProperties("/+", true).isValid).toEqual(true);
15+
expect(mqtt_shared_browser.computeTopicProperties("+/a", true).isValid).toEqual(true);
16+
expect(mqtt_shared_browser.computeTopicProperties("+/basketball/#", true).isValid).toEqual(true);
17+
expect(mqtt_shared_browser.computeTopicProperties("washington/+/player1", true).isValid).toEqual(true);
18+
});
19+
20+
test('MQTT topic properties - invalid topic filter', async () => {
21+
expect(mqtt_shared_browser.computeTopicProperties("", true).isValid).toEqual(false);
22+
expect(mqtt_shared_browser.computeTopicProperties("derp+", true).isValid).toEqual(false);
23+
expect(mqtt_shared_browser.computeTopicProperties("derp+/", true).isValid).toEqual(false);
24+
expect(mqtt_shared_browser.computeTopicProperties("derp#/", true).isValid).toEqual(false);
25+
expect(mqtt_shared_browser.computeTopicProperties("#/a", true).isValid).toEqual(false);
26+
expect(mqtt_shared_browser.computeTopicProperties("sport/basketball#", true).isValid).toEqual(false);
27+
expect(mqtt_shared_browser.computeTopicProperties("sport/basketball/#/ranking", true).isValid).toEqual(false);
28+
});
29+
30+
test('MQTT topic properties - shared filter', async () => {
31+
expect(mqtt_shared_browser.computeTopicProperties("$share/b//", true).isShared).toEqual(true);
32+
expect(mqtt_shared_browser.computeTopicProperties("$share/a/b", true).isShared).toEqual(true);
33+
expect(mqtt_shared_browser.computeTopicProperties("$share/a/b/c", true).isShared).toEqual(true);
34+
});
35+
36+
test('MQTT topic properties - not shared filter', async () => {
37+
expect(mqtt_shared_browser.computeTopicProperties("a/b/c", true).isShared).toEqual(false);
38+
expect(mqtt_shared_browser.computeTopicProperties("$share//c", true).isShared).toEqual(false);
39+
expect(mqtt_shared_browser.computeTopicProperties("$share/a", true).isShared).toEqual(false);
40+
expect(mqtt_shared_browser.computeTopicProperties("$share/+/a", true).isShared).toEqual(false);
41+
expect(mqtt_shared_browser.computeTopicProperties("$share/#/a", true).isShared).toEqual(false);
42+
expect(mqtt_shared_browser.computeTopicProperties("$share/b/", true).isShared).toEqual(false);
43+
});
44+
45+
test('MQTT topic properties - has wildcard', async () => {
46+
expect(mqtt_shared_browser.computeTopicProperties("#", true).hasWildcard).toEqual(true);
47+
expect(mqtt_shared_browser.computeTopicProperties("+", true).hasWildcard).toEqual(true);
48+
expect(mqtt_shared_browser.computeTopicProperties("a/+/+", true).hasWildcard).toEqual(true);
49+
expect(mqtt_shared_browser.computeTopicProperties("a/b/#", true).hasWildcard).toEqual(true);
50+
});
51+
52+
test('MQTT topic properties - does not have wildcard', async () => {
53+
expect(mqtt_shared_browser.computeTopicProperties("a/b/c", true).hasWildcard).toEqual(false);
54+
expect(mqtt_shared_browser.computeTopicProperties("/", true).hasWildcard).toEqual(false);
55+
});
56+
57+
test('MQTT topic properties - valid topic', async () => {
58+
expect(mqtt_shared_browser.computeTopicProperties("a/b/c", false).isValid).toEqual(true);
59+
expect(mqtt_shared_browser.computeTopicProperties("/", false).isValid).toEqual(true);
60+
expect(mqtt_shared_browser.computeTopicProperties("///a", false).isValid).toEqual(true);
61+
});
62+
63+
test('MQTT topic properties - invalid topic', async () => {
64+
expect(mqtt_shared_browser.computeTopicProperties("", false).isValid).toEqual(false);
65+
expect(mqtt_shared_browser.computeTopicProperties("#", false).isValid).toEqual(false);
66+
expect(mqtt_shared_browser.computeTopicProperties("/#", false).isValid).toEqual(false);
67+
expect(mqtt_shared_browser.computeTopicProperties("sports/basketball/#", false).isValid).toEqual(false);
68+
expect(mqtt_shared_browser.computeTopicProperties("+", false).isValid).toEqual(false);
69+
expect(mqtt_shared_browser.computeTopicProperties("/+", false).isValid).toEqual(false);
70+
expect(mqtt_shared_browser.computeTopicProperties("+/a", false).isValid).toEqual(false);
71+
expect(mqtt_shared_browser.computeTopicProperties("+/basketball/#", false).isValid).toEqual(false);
72+
expect(mqtt_shared_browser.computeTopicProperties("washington/+/player1", false).isValid).toEqual(false);
73+
});

lib/browser/mqtt_shared_browser.ts

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
/**
7+
* @packageDocumentation
8+
* @module mqtt_shared
9+
*/
10+
11+
import * as mqtt_shared from "../common/mqtt_shared";
12+
import * as mqtt5 from "../common/mqtt5";
13+
14+
/**
15+
* Converts payload to Buffer only, regardless of the supplied type
16+
* @param payload The payload to convert
17+
* @internal
18+
*/
19+
export function normalize_payload_to_buffer(payload: any): Buffer {
20+
let normalized = mqtt_shared.normalize_payload(payload);
21+
if (typeof normalized === 'string') {
22+
// pass string through
23+
return Buffer.from(new TextEncoder().encode(normalized).buffer);
24+
}
25+
26+
return normalized;
27+
}
28+
29+
/** @internal */
30+
export interface TopicProperties {
31+
isValid: boolean;
32+
isShared: boolean;
33+
hasWildcard: boolean;
34+
}
35+
36+
/** @internal */
37+
export function computeTopicProperties(topic: string, isFilter: boolean) : TopicProperties {
38+
let properties : TopicProperties = {
39+
isValid: false,
40+
isShared: false,
41+
hasWildcard: false
42+
};
43+
44+
if (topic.length === 0) {
45+
return properties;
46+
}
47+
48+
let hasSharePrefix : boolean = false;
49+
let hasShareName : boolean = false;
50+
let sawHash : boolean = false;
51+
let index : number = 0;
52+
for (let segment of topic.split('/')) {
53+
if (sawHash) {
54+
return properties;
55+
}
56+
57+
if (segment.includes("+")) {
58+
if (!isFilter) {
59+
return properties;
60+
}
61+
62+
if (segment.length > 1) {
63+
return properties;
64+
}
65+
66+
properties.hasWildcard = true;
67+
}
68+
69+
if (segment.includes("#")) {
70+
if (!isFilter) {
71+
return properties;
72+
}
73+
74+
if (segment.length > 1) {
75+
return properties;
76+
}
77+
78+
properties.hasWildcard = true;
79+
sawHash = true;
80+
}
81+
82+
if (index == 0 && segment === "$share") {
83+
hasSharePrefix = true;
84+
}
85+
86+
if (index == 1 && hasSharePrefix && segment.length > 0 && !properties.hasWildcard) {
87+
hasShareName = true;
88+
}
89+
90+
if (hasShareName && ((index == 2 && segment.length > 0) || index > 2)) {
91+
properties.isShared = true;
92+
}
93+
94+
index += 1;
95+
}
96+
97+
properties.isValid = true;
98+
99+
return properties;
100+
}
101+
102+
/** @internal */
103+
export function isValidTopicFilter(topicFilter: any) : boolean {
104+
if (typeof(topicFilter) !== 'string') {
105+
return false;
106+
}
107+
108+
let properties = computeTopicProperties(topicFilter as string, true);
109+
return properties.isValid;
110+
}
111+
112+
/** @internal */
113+
export function isValidTopic(topic: any) : boolean {
114+
if (typeof(topic) !== 'string') {
115+
return false;
116+
}
117+
118+
let properties = computeTopicProperties(topic as string, false);
119+
return properties.isValid;
120+
}
121+
122+
123+
124+
function randomInRange(min: number, max: number) : number {
125+
return min + (max - min) * Math.random();
126+
}
127+
128+
/** @internal */
129+
export interface ReconnectDelayContext {
130+
retryJitterMode?: mqtt5.RetryJitterType,
131+
minReconnectDelayMs? : number,
132+
maxReconnectDelayMs? : number,
133+
lastReconnectDelay? : number,
134+
connectionFailureCount : number,
135+
}
136+
137+
const DEFAULT_MIN_RECONNECT_DELAY_MS : number = 1000;
138+
const DEFAULT_MAX_RECONNECT_DELAY_MS : number = 120000;
139+
140+
141+
function getOrderedReconnectDelayBounds(configMin: number | undefined, configMax: number | undefined) : [number, number] {
142+
const minDelay : number = Math.max(1, configMin ?? DEFAULT_MIN_RECONNECT_DELAY_MS);
143+
const maxDelay : number = Math.max(1, configMax ?? DEFAULT_MAX_RECONNECT_DELAY_MS);
144+
if (minDelay > maxDelay) {
145+
return [maxDelay, minDelay];
146+
} else {
147+
return [minDelay, maxDelay];
148+
}
149+
}
150+
151+
/**
152+
* Computes the next reconnect delay based on the Jitter/Retry configuration.
153+
* Implements jitter calculations in https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
154+
* @internal
155+
*/
156+
export function calculateNextReconnectDelay(context: ReconnectDelayContext) : number {
157+
const jitterType : mqtt5.RetryJitterType = context.retryJitterMode ?? mqtt5.RetryJitterType.Default;
158+
const [minDelay, maxDelay] : [number, number] = getOrderedReconnectDelayBounds(context.minReconnectDelayMs, context.maxReconnectDelayMs);
159+
const clampedFailureCount : number = Math.min(52, context.connectionFailureCount);
160+
let delay : number = 0;
161+
162+
if (jitterType == mqtt5.RetryJitterType.None) {
163+
delay = minDelay * Math.pow(2, clampedFailureCount);
164+
} else if (jitterType == mqtt5.RetryJitterType.Decorrelated && context.lastReconnectDelay) {
165+
delay = randomInRange(minDelay, 3 * context.lastReconnectDelay);
166+
} else {
167+
delay = randomInRange(minDelay, Math.min(maxDelay, minDelay * Math.pow(2, clampedFailureCount)));
168+
}
169+
170+
delay = Math.min(maxDelay, delay);
171+
172+
return delay;
173+
}

0 commit comments

Comments
 (0)