Skip to content

Commit aed84b2

Browse files
sonnypsingpolyma
andauthored
stream-management: Implement requesting ACKs (#1054)
--- Co-authored-by: Stephen Paul Weber <[email protected]>
1 parent 80c6a4d commit aed84b2

File tree

9 files changed

+531
-14
lines changed

9 files changed

+531
-14
lines changed

package-lock.json

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/client-core/src/bind2/bind2.test.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ test("with function resource returning string", async () => {
6666
test("with function resource throwing", async () => {
6767
const error = new Error("foo");
6868

69-
7069
function resource() {
7170
throw error;
7271
}
@@ -102,7 +101,6 @@ test("with function resource returning resolved promise", async () => {
102101
test("with function resource returning rejected promise", async () => {
103102
const error = new Error("foo");
104103

105-
106104
async function resource() {
107105
throw error;
108106
}

packages/stream-management/README.md

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,50 @@ When the session is resumed the `online` event is not emitted as session resumpt
1010
However `entity.status` is set to `online`.
1111
If the session fails to resume, entity will fallback to regular session establishment in which case `online` event will be emitted.
1212

13-
Automatically responds to acks but does not support requesting acks yet.
13+
- Automatically responds to acks.
14+
- Periodically request acks.
15+
- If server fails to respond, triggers a reconnect.
16+
17+
## Events
18+
19+
### resumed
20+
21+
Indicates that the connection was resumed. When that happens the `online` event is not emitted but `xmpp.status` will be `online`.
22+
23+
```js
24+
const xmpp = client(...);
25+
const {streamManagement} = xmpp;
26+
27+
streamManagement.on('resumed', () => {
28+
console.log("session resumed");
29+
});
30+
```
31+
32+
### fail
33+
34+
Indicates that a stanza failed to send to the server and will not be retried.
35+
36+
```js
37+
const xmpp = client(...);
38+
const {streamManagement} = xmpp;
39+
40+
streamManagement.on('fail', (stanza) => {
41+
console.log("fail to send", stanza.toString());
42+
});
43+
```
44+
45+
### ack
46+
47+
Indicates that a stanza has been acknowledged by the server.
48+
49+
```js
50+
const xmpp = client(...);
51+
const {streamManagement} = xmpp;
52+
53+
streamManagement.on('ack', (stanza) => {
54+
console.log("stanza acknowledge by the server", stanza.toString());
55+
});
56+
```
1457

1558
## References
1659

packages/stream-management/index.js

Lines changed: 102 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import XMPPError from "@xmpp/error";
2-
import { procedure } from "@xmpp/events";
2+
import { EventEmitter, procedure } from "@xmpp/events";
33
import xml from "@xmpp/xml";
4+
import { datetime } from "@xmpp/time";
45

56
// https://xmpp.org/extensions/xep-0198.html
67

@@ -45,24 +46,52 @@ export default function streamManagement({
4546
bind2,
4647
sasl2,
4748
}) {
48-
const sm = {
49+
let timeoutTimeout = null;
50+
let requestAckTimeout = null;
51+
52+
const sm = new EventEmitter();
53+
Object.assign(sm, {
4954
allowResume: true,
5055
preferredMaximum: null,
5156
enabled: false,
5257
id: "",
58+
outbound_q: [],
5359
outbound: 0,
5460
inbound: 0,
5561
max: null,
56-
};
62+
timeout: 60_000,
63+
requestAckInterval: 300_000,
64+
debounceAckRequest: 100,
65+
});
66+
67+
entity.on("disconnect", () => {
68+
clearTimeout(timeoutTimeout);
69+
clearTimeout(requestAckTimeout);
70+
});
5771

58-
function resumed() {
72+
async function resumed(resumed) {
5973
sm.enabled = true;
74+
const oldOutbound = sm.outbound;
75+
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
76+
let item = sm.outbound_q.shift();
77+
sm.outbound++;
78+
sm.emit("ack", item.stanza);
79+
}
80+
let q = sm.outbound_q;
81+
sm.outbound_q = [];
82+
// This will trigger the middleware and re-add to the queue
83+
await entity.sendMany(q.map((item) => queueToStanza({ entity, item })));
84+
sm.emit("resumed");
6085
entity._ready(true);
6186
}
6287

6388
function failed() {
6489
sm.enabled = false;
6590
sm.id = "";
91+
let item;
92+
while ((item = sm.outbound_q.shift())) {
93+
sm.emit("fail", item.stanza);
94+
}
6695
sm.outbound = 0;
6796
}
6897

@@ -73,11 +102,20 @@ export default function streamManagement({
73102
}
74103

75104
entity.on("online", () => {
105+
if (sm.outbound_q.length > 0) {
106+
throw new Error(
107+
"Stream Management assertion failure, queue should be empty during online",
108+
);
109+
}
76110
sm.outbound = 0;
77111
sm.inbound = 0;
78112
});
79113

80114
entity.on("offline", () => {
115+
let item;
116+
while ((item = sm.outbound_q.shift())) {
117+
sm.emit("fail", item.stanza);
118+
}
81119
sm.outbound = 0;
82120
sm.inbound = 0;
83121
sm.enabled = false;
@@ -86,14 +124,20 @@ export default function streamManagement({
86124

87125
middleware.use((context, next) => {
88126
const { stanza } = context;
127+
clearTimeout(timeoutTimeout);
89128
if (["presence", "message", "iq"].includes(stanza.name)) {
90129
sm.inbound += 1;
91130
} else if (stanza.is("r", NS)) {
92131
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element.
93132
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
94133
} else if (stanza.is("a", NS)) {
95134
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
96-
sm.outbound = stanza.attrs.h;
135+
const oldOutbound = sm.outbound;
136+
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
137+
let item = sm.outbound_q.shift();
138+
sm.outbound++;
139+
sm.emit("ack", item.stanza);
140+
}
97141
}
98142

99143
return next();
@@ -105,6 +149,33 @@ export default function streamManagement({
105149
if (sasl2) {
106150
setupSasl2({ sasl2, sm, failed, resumed });
107151
}
152+
153+
function requestAck() {
154+
clearTimeout(timeoutTimeout);
155+
if (sm.timeout) {
156+
timeoutTimeout = setTimeout(
157+
() => entity.disconnect().catch(() => {}),
158+
sm.timeout,
159+
);
160+
}
161+
entity.send(xml("r", { xmlns: NS })).catch(() => {});
162+
// Periodically send r to check the connection
163+
// If a stanza goes out it will cancel this and set a sooner timer
164+
requestAckTimeout = setTimeout(requestAck, sm.requestAckInterval);
165+
}
166+
167+
middleware.filter((context, next) => {
168+
if (!sm.enabled) return next();
169+
const { stanza } = context;
170+
if (!["presence", "message", "iq"].includes(stanza.name)) return next();
171+
172+
sm.outbound_q.push({ stanza, stamp: datetime() });
173+
// Debounce requests so we send only one after a big run of stanza together
174+
clearTimeout(requestAckTimeout);
175+
requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest);
176+
return next();
177+
});
178+
108179
if (streamFeatures) {
109180
setupStreamFeature({
110181
streamFeatures,
@@ -133,8 +204,8 @@ function setupStreamFeature({
133204
// Resuming
134205
if (sm.id) {
135206
try {
136-
await resume(entity, sm);
137-
resumed();
207+
const element = await resume(entity, sm);
208+
await resumed(element);
138209
return;
139210
// If resumption fails, continue with session establishment
140211
} catch {
@@ -149,6 +220,12 @@ function setupStreamFeature({
149220

150221
const promiseEnable = enable(entity, sm);
151222

223+
if (sm.outbound_q.length > 0) {
224+
throw new Error(
225+
"Stream Management assertion failure, queue should be empty after enable",
226+
);
227+
}
228+
152229
// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
153230
sm.outbound = 0;
154231

@@ -172,7 +249,7 @@ function setupSasl2({ sasl2, sm, failed, resumed }) {
172249
},
173250
(element) => {
174251
if (element.is("resumed")) {
175-
resumed();
252+
resumed(element);
176253
} else if (element.is(failed)) {
177254
// const error = StreamError.fromElement(element)
178255
failed();
@@ -198,3 +275,20 @@ function setupBind2({ bind2, sm, failed, enabled }) {
198275
},
199276
);
200277
}
278+
279+
function queueToStanza({ entity, item }) {
280+
const { stanza, stamp } = item;
281+
if (
282+
stanza.name === "message" &&
283+
!stanza.getChild("delay", "urn:xmpp:delay")
284+
) {
285+
stanza.append(
286+
xml("delay", {
287+
xmlns: "urn:xmpp:delay",
288+
from: entity.jid.toString(),
289+
stamp,
290+
}),
291+
);
292+
}
293+
return stanza;
294+
}

packages/stream-management/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
"dependencies": {
1717
"@xmpp/error": "^0.14.0",
1818
"@xmpp/events": "^0.14.0",
19-
"@xmpp/xml": "^0.14.0"
19+
"@xmpp/xml": "^0.14.0",
20+
"@xmpp/time": "^0.14.0"
2021
},
2122
"engines": {
2223
"node": ">= 20.10"

0 commit comments

Comments
 (0)