Skip to content

Commit 025dfa2

Browse files
committed
test: streamline, stop channels
1 parent a82b779 commit 025dfa2

File tree

2 files changed

+136
-90
lines changed

2 files changed

+136
-90
lines changed
Lines changed: 68 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
1-
import { TypedEventEmitter } from "@libp2p/interface";
21
import { createDecoder, createEncoder } from "@waku/core";
32
import {
43
AutoSharding,
54
IDecodedMessage,
65
IDecoder,
76
IEncoder
87
} from "@waku/interfaces";
9-
import {
10-
createRoutingInfo,
11-
delay,
12-
MockWakuEvents,
13-
MockWakuNode
14-
} from "@waku/utils";
8+
import { createRoutingInfo, delay, MockWakuNode } from "@waku/utils";
159
import { utf8ToBytes } from "@waku/utils/bytes";
1610
import { expect } from "chai";
1711
import { beforeEach, describe } from "mocha";
1812

13+
import {
14+
createMockNodes,
15+
sendAndWaitForEvent,
16+
TEST_CONSTANTS,
17+
waitFor
18+
} from "./test_utils.js";
19+
1920
import { ReliableChannel, StatusDetail } from "./index.js";
2021

2122
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
@@ -30,126 +31,117 @@ const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, {
3031
describe("Status", () => {
3132
let encoder: IEncoder;
3233
let decoder: IDecoder<IDecodedMessage>;
34+
let mockWakuNodeAlice: MockWakuNode;
35+
let mockWakuNodeBob: MockWakuNode;
36+
let reliableChannelAlice: ReliableChannel<any> | undefined;
37+
let reliableChannelBob: ReliableChannel<any> | undefined;
3338

3439
beforeEach(async () => {
3540
encoder = createEncoder({
3641
contentTopic: TEST_CONTENT_TOPIC,
3742
routingInfo: TEST_ROUTING_INFO
3843
});
3944
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
45+
46+
const mockNodes = createMockNodes();
47+
mockWakuNodeAlice = mockNodes.alice;
48+
mockWakuNodeBob = mockNodes.bob;
4049
});
4150

42-
it("Synced status is emitted when a message is received", async () => {
43-
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
44-
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
45-
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
51+
afterEach(async () => {
52+
if (reliableChannelAlice) {
53+
await reliableChannelAlice.stop();
54+
reliableChannelAlice = undefined;
55+
}
56+
if (reliableChannelBob) {
57+
await reliableChannelBob.stop();
58+
reliableChannelBob = undefined;
59+
}
60+
});
4661

47-
const reliableChannelAlice = await ReliableChannel.create(
62+
it("Synced status is emitted when a message is received", async () => {
63+
reliableChannelAlice = await ReliableChannel.create(
4864
mockWakuNodeAlice,
4965
"MyChannel",
5066
"alice",
5167
encoder,
5268
decoder
5369
);
54-
const reliableChannelBob = await ReliableChannel.create(
70+
reliableChannelBob = await ReliableChannel.create(
5571
mockWakuNodeBob,
5672
"MyChannel",
5773
"bob",
5874
encoder,
5975
decoder
6076
);
6177

62-
let statusDetail: StatusDetail;
78+
let statusDetail: StatusDetail | undefined;
6379
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
6480
statusDetail = event.detail;
6581
});
6682

6783
const message = utf8ToBytes("message in channel");
6884

6985
reliableChannelAlice.send(message);
70-
while (!statusDetail!) {
71-
await delay(50);
72-
}
86+
await waitFor(() => statusDetail);
7387

74-
expect(statusDetail.received).to.eq(1);
88+
expect(statusDetail!.received).to.eq(1);
7589
});
7690

7791
it("Synced status is emitted when a missing message is received", async () => {
78-
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
79-
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
80-
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
81-
82-
const reliableChannelAlice = await ReliableChannel.create(
92+
reliableChannelAlice = await ReliableChannel.create(
8393
mockWakuNodeAlice,
8494
"MyChannel",
8595
"alice",
8696
encoder,
8797
decoder,
8898
{
89-
retryIntervalMs: 300 // shorter retry so that it resends message in test
99+
retryIntervalMs: TEST_CONSTANTS.RETRY_INTERVAL_MS
90100
}
91101
);
92102

93103
// Send a message before Bob goes online so it's marked as missing
94-
let messageSent = false;
95-
reliableChannelAlice.addEventListener("message-sent", (_event) => {
96-
messageSent = true;
97-
});
98-
reliableChannelAlice.send(utf8ToBytes("missing message"));
99-
while (!messageSent) {
100-
await delay(50);
101-
}
104+
await sendAndWaitForEvent(
105+
reliableChannelAlice,
106+
utf8ToBytes("missing message")
107+
);
102108

103-
const reliableChannelBob = await ReliableChannel.create(
109+
reliableChannelBob = await ReliableChannel.create(
104110
mockWakuNodeBob,
105111
"MyChannel",
106112
"bob",
107113
encoder,
108114
decoder
109115
);
110116

111-
let syncingStatusDetail: StatusDetail;
117+
let syncingStatusDetail: StatusDetail | undefined;
112118
reliableChannelBob.syncStatus.addEventListener("syncing", (event) => {
113119
syncingStatusDetail = event.detail;
114120
});
115121

116-
let syncedStatusDetail: StatusDetail;
122+
let syncedStatusDetail: StatusDetail | undefined;
117123
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
118124
syncedStatusDetail = event.detail;
119125
});
120126

121-
messageSent = false;
122-
reliableChannelAlice.addEventListener("message-sent", (_event) => {
123-
messageSent = true;
124-
});
125-
reliableChannelAlice.send(
127+
await sendAndWaitForEvent(
128+
reliableChannelAlice,
126129
utf8ToBytes("second message with missing message as dep")
127130
);
128-
while (!messageSent) {
129-
await delay(50);
130-
}
131131

132-
while (!syncingStatusDetail!) {
133-
await delay(50);
134-
}
132+
await waitFor(() => syncingStatusDetail);
135133

136-
expect(syncingStatusDetail.missing).to.eq(1);
137-
expect(syncingStatusDetail.received).to.eq(1);
134+
expect(syncingStatusDetail!.missing).to.eq(1);
135+
expect(syncingStatusDetail!.received).to.eq(1);
138136

139-
while (!syncedStatusDetail!) {
140-
await delay(50);
141-
}
137+
await waitFor(() => syncedStatusDetail);
142138

143-
expect(syncedStatusDetail.missing).to.eq(0);
144-
expect(syncedStatusDetail.received).to.eq(2);
139+
expect(syncedStatusDetail!.missing).to.eq(0);
140+
expect(syncedStatusDetail!.received).to.eq(2);
145141
});
146142

147-
it("Synced status is emitted when a missing message is mark as lost", async () => {
148-
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
149-
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
150-
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
151-
152-
const reliableChannelAlice = await ReliableChannel.create(
143+
it("Synced status is emitted when a missing message is marked as lost", async () => {
144+
reliableChannelAlice = await ReliableChannel.create(
153145
mockWakuNodeAlice,
154146
"MyChannel",
155147
"alice",
@@ -162,16 +154,12 @@ describe("Status", () => {
162154
);
163155

164156
// Send a message before Bob goes online so it's marked as missing
165-
let messageSent = false;
166-
reliableChannelAlice.addEventListener("message-sent", (_event) => {
167-
messageSent = true;
168-
});
169-
reliableChannelAlice.send(utf8ToBytes("missing message"));
170-
while (!messageSent) {
171-
await delay(50);
172-
}
157+
await sendAndWaitForEvent(
158+
reliableChannelAlice,
159+
utf8ToBytes("missing message")
160+
);
173161

174-
const reliableChannelBob = await ReliableChannel.create(
162+
reliableChannelBob = await ReliableChannel.create(
175163
mockWakuNodeBob,
176164
"MyChannel",
177165
"bob",
@@ -185,31 +173,23 @@ describe("Status", () => {
185173
}
186174
);
187175

188-
let syncingStatusDetail: StatusDetail;
176+
let syncingStatusDetail: StatusDetail | undefined;
189177
reliableChannelBob.syncStatus.addEventListener("syncing", (event) => {
190178
syncingStatusDetail = event.detail;
191179
});
192180

193-
messageSent = false;
194-
reliableChannelAlice.addEventListener("message-sent", (_event) => {
195-
messageSent = true;
196-
});
197-
reliableChannelAlice.send(
181+
await sendAndWaitForEvent(
182+
reliableChannelAlice,
198183
utf8ToBytes("second message with missing message as dep")
199184
);
200-
while (!messageSent) {
201-
await delay(50);
202-
}
203185

204-
while (!syncingStatusDetail!) {
205-
await delay(50);
206-
}
186+
await waitFor(() => syncingStatusDetail);
207187

208-
expect(syncingStatusDetail.missing).to.eq(1, "at first, one missing");
209-
expect(syncingStatusDetail.received).to.eq(1, "at first, one received");
210-
expect(syncingStatusDetail.lost).to.eq(0, "at first, no loss");
188+
expect(syncingStatusDetail!.missing).to.eq(1, "at first, one missing");
189+
expect(syncingStatusDetail!.received).to.eq(1, "at first, one received");
190+
expect(syncingStatusDetail!.lost).to.eq(0, "at first, no loss");
211191

212-
let syncedStatusDetail: StatusDetail;
192+
let syncedStatusDetail: StatusDetail | undefined;
213193
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
214194
syncedStatusDetail = event.detail;
215195
});
@@ -218,12 +198,10 @@ describe("Status", () => {
218198
await delay(200);
219199
reliableChannelBob.messageChannel["sweepIncomingBuffer"]();
220200

221-
while (!syncedStatusDetail!) {
222-
await delay(50);
223-
}
201+
await waitFor(() => syncedStatusDetail);
224202

225-
expect(syncedStatusDetail.missing).to.eq(0, "no more missing message");
226-
expect(syncedStatusDetail.received).to.eq(1, "still one received message");
227-
expect(syncedStatusDetail.lost).to.eq(1, "missing message is marked lost");
203+
expect(syncedStatusDetail!.missing).to.eq(0, "no more missing message");
204+
expect(syncedStatusDetail!.received).to.eq(1, "still one received message");
205+
expect(syncedStatusDetail!.lost).to.eq(1, "missing message is marked lost");
228206
});
229207
});
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import { TypedEventEmitter } from "@libp2p/interface";
2+
import { delay, MockWakuEvents, MockWakuNode } from "@waku/utils";
3+
4+
import { ReliableChannel } from "./reliable_channel.js";
5+
6+
export const TEST_CONSTANTS = {
7+
POLL_INTERVAL_MS: 50,
8+
RETRY_INTERVAL_MS: 300
9+
} as const;
10+
11+
/**
12+
* Wait for a condition to become truthy, with timeout
13+
* @param condition Function that returns the value when ready, or undefined while waiting
14+
* @param timeoutMs Maximum time to wait before throwing
15+
* @returns The value returned by condition
16+
* @throws Error if timeout is reached
17+
*/
18+
export async function waitFor<T>(
19+
condition: () => T | undefined,
20+
timeoutMs = 5000
21+
): Promise<T> {
22+
const start = Date.now();
23+
while (!condition()) {
24+
if (Date.now() - start > timeoutMs) {
25+
throw new Error(
26+
`Timeout after ${timeoutMs}ms waiting for condition to be met`
27+
);
28+
}
29+
await delay(TEST_CONSTANTS.POLL_INTERVAL_MS);
30+
}
31+
return condition()!;
32+
}
33+
34+
/**
35+
* Send a message and wait for the "message-sent" event
36+
* @param channel The ReliableChannel to send from
37+
* @param message The message payload to send
38+
*/
39+
export async function sendAndWaitForEvent(
40+
channel: ReliableChannel<any>,
41+
message: Uint8Array
42+
): Promise<void> {
43+
return new Promise((resolve) => {
44+
const handler = (): void => {
45+
channel.removeEventListener("message-sent", handler);
46+
resolve();
47+
};
48+
channel.addEventListener("message-sent", handler);
49+
channel.send(message);
50+
});
51+
}
52+
53+
/**
54+
* Create a common event emitter and two mock Waku nodes
55+
* @returns Object containing the emitter and two mock nodes (alice and bob)
56+
*/
57+
export function createMockNodes(): {
58+
emitter: TypedEventEmitter<MockWakuEvents>;
59+
alice: MockWakuNode;
60+
bob: MockWakuNode;
61+
} {
62+
const emitter = new TypedEventEmitter<MockWakuEvents>();
63+
return {
64+
emitter,
65+
alice: new MockWakuNode(emitter),
66+
bob: new MockWakuNode(emitter)
67+
};
68+
}

0 commit comments

Comments
 (0)