diff --git a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts index ad69d35009..d586d4409b 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts @@ -25,6 +25,8 @@ import { expect } from "chai"; import { beforeEach, describe } from "mocha"; import sinon from "sinon"; +import { waitForEvent } from "./test_utils.js"; + import { ReliableChannel } from "./index.js"; const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; @@ -63,18 +65,12 @@ describe("Reliable Channel", () => { // Setting up message tracking const messageId = reliableChannel.send(message); - let messageSending = false; - reliableChannel.addEventListener("sending-message", (event) => { - if (event.detail === messageId) { - messageSending = true; - } - }); - while (!messageSending) { - await delay(50); - } - - expect(messageSending).to.be.true; + await waitForEvent( + reliableChannel, + "sending-message", + (id) => id === messageId + ); }); it("Outgoing message is emitted as sent", async () => { @@ -90,19 +86,11 @@ describe("Reliable Channel", () => { const messageId = reliableChannel.send(message); - // Setting up message tracking - let messageSent = false; - reliableChannel.addEventListener("message-sent", (event) => { - if (event.detail === messageId) { - messageSent = true; - } - }); - - while (!messageSent) { - await delay(50); - } - - expect(messageSent).to.be.true; + await waitForEvent( + reliableChannel, + "message-sent", + (id) => id === messageId + ); }); it("Encoder error raises irrecoverable error", async () => { @@ -130,22 +118,11 @@ describe("Reliable Channel", () => { encoder.contentTopic = "..."; const messageId = reliableChannel.send(message); - // Setting up message tracking - let irrecoverableError = false; - reliableChannel.addEventListener( + await waitForEvent<{ messageId: string; error: any }>( + reliableChannel, "sending-message-irrecoverable-error", - (event) => { - if (event.detail.messageId === messageId) { - irrecoverableError = true; - } - } + (detail) => detail.messageId === messageId ); - - while (!irrecoverableError) { - await delay(50); - } - - expect(irrecoverableError).to.be.true; }); it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => { @@ -205,39 +182,32 @@ describe("Reliable Channel", () => { // Alice sets up message tracking for first message const firstMessageId = ReliableChannel.getMessageId(messages[0]); - let firstMessagePossiblyAcknowledged = false; - reliableChannelAlice.addEventListener( - "message-possibly-acknowledged", - (event) => { - if (event.detail.messageId === firstMessageId) { - firstMessagePossiblyAcknowledged = true; - } - } + + const bobReceivedThirdPromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === "third" ); - let messageReceived = false; - reliableChannelBob.addEventListener("message-received", (event) => { - if (bytesToUtf8(event.detail.payload) === "third") { - messageReceived = true; - } - }); + const firstMessagePossiblyAckPromise = waitForEvent<{ + messageId: string; + possibleAckCount: number; + }>( + reliableChannelAlice, + "message-possibly-acknowledged", + (detail) => detail.messageId === firstMessageId + ); for (const m of messages) { reliableChannelAlice.send(m); } // Wait for Bob to receive last message to ensure it is all included in filter - while (!messageReceived) { - await delay(50); - } + await bobReceivedThirdPromise; - // Bobs sends a message now, it should include first one in bloom filter + // Bob sends a message now, it should include first one in bloom filter reliableChannelBob.send(utf8ToBytes("message back")); - while (!firstMessagePossiblyAcknowledged) { - await delay(50); - } - - expect(firstMessagePossiblyAcknowledged).to.be.true; + await firstMessagePossiblyAckPromise; }); it("Outgoing message is acknowledged", async () => { @@ -264,31 +234,23 @@ describe("Reliable Channel", () => { const messageId = reliableChannelAlice.send(message); - // Alice sets up message tracking - let messageAcknowledged = false; - reliableChannelAlice.addEventListener("message-acknowledged", (event) => { - if (event.detail === messageId) { - messageAcknowledged = true; - } - }); + const bobReceivedPromise = waitForEvent( + reliableChannelBob, + "message-received" + ); - let bobReceivedMessage = false; - reliableChannelBob.addEventListener("message-received", () => { - bobReceivedMessage = true; - }); + const messageAcknowledgedPromise = waitForEvent( + reliableChannelAlice, + "message-acknowledged", + (id) => id === messageId + ); // Wait for bob to receive the message to ensure it's included in causal history - while (!bobReceivedMessage) { - await delay(50); - } + await bobReceivedPromise; - // Bobs sends a message now, it should include first one in causal history + // Bob sends a message now, it should include first one in causal history reliableChannelBob.send(utf8ToBytes("second message in channel")); - while (!messageAcknowledged) { - await delay(50); - } - - expect(messageAcknowledged).to.be.true; + await messageAcknowledgedPromise; }); it("Incoming message is emitted as received", async () => { @@ -300,19 +262,17 @@ describe("Reliable Channel", () => { decoder ); - let receivedMessage: IDecodedMessage; - reliableChannel.addEventListener("message-received", (event) => { - receivedMessage = event.detail; - }); - const message = utf8ToBytes("message in channel"); + const receivedPromise = waitForEvent( + reliableChannel, + "message-received" + ); + reliableChannel.send(message); - while (!receivedMessage!) { - await delay(50); - } + const receivedMessage = await receivedPromise; - expect(bytesToUtf8(receivedMessage!.payload)).to.eq(bytesToUtf8(message)); + expect(bytesToUtf8(receivedMessage.payload)).to.eq(bytesToUtf8(message)); }); describe("Retries", () => { @@ -355,20 +315,29 @@ describe("Reliable Channel", () => { } }); - reliableChannelAlice.send(message); + // Wait for first message + const firstMessagePromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === msgTxt + ); - while (messageCount < 1) { - await delay(10); - } + reliableChannelAlice.send(message); + await firstMessagePromise; expect(messageCount).to.equal(1, "Bob received Alice's message once"); + // Wait for retry - Bob should receive the same message again + const retryMessagePromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === msgTxt + ); + // No response from Bob should trigger a retry from Alice - while (messageCount < 2) { - await delay(10); - } + await retryMessagePromise; expect(messageCount).to.equal(2, "retried once"); - // Bobs sends a message now, it should include first one in causal history + // Bob sends a message now, it should include first one in causal history reliableChannelBob.send(utf8ToBytes("second message in channel")); // Wait long enough to confirm no retry is executed diff --git a/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts index 9cce421c59..0c66f75c6c 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts @@ -6,16 +6,13 @@ import { IDecoder, IEncoder } from "@waku/interfaces"; -import { - createRoutingInfo, - delay, - MockWakuEvents, - MockWakuNode -} from "@waku/utils"; +import { createRoutingInfo, MockWakuEvents, MockWakuNode } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { beforeEach, describe } from "mocha"; +import { waitForEvent } from "./test_utils.js"; + import { ReliableChannel } from "./index.js"; const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; @@ -64,33 +61,29 @@ describe("Reliable Channel: Acks", () => { // Alice sets up message tracking const messageId = ReliableChannel.getMessageId(message); - let messageReceived = false; - reliableChannelBob.addEventListener("message-received", (event) => { - if (bytesToUtf8(event.detail.payload) === "first message in channel") { - messageReceived = true; - } - }); + const messageReceivedPromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === "first message in channel" + ); - let messageAcknowledged = false; - reliableChannelAlice.addEventListener("message-acknowledged", (event) => { - if (event.detail === messageId) { - messageAcknowledged = true; - } - }); + const messageAcknowledgedPromise = waitForEvent( + reliableChannelAlice, + "message-acknowledged", + (id) => id === messageId + ); + // Alice sends the message reliableChannelAlice.send(message); // Wait for Bob to receive the message to ensure it uses it in causal history - while (!messageReceived) { - await delay(50); - } - // Bobs sends a message now, it should include first one in causal history + await messageReceivedPromise; + + // Bob sends a message now, it should include first one in causal history reliableChannelBob.send(utf8ToBytes("second message in channel")); - while (!messageAcknowledged) { - await delay(50); - } - expect(messageAcknowledged).to.be.true; + // Wait for Alice to receive acknowledgment + await messageAcknowledgedPromise; }); it("Re-sent message is acknowledged once other parties join.", async () => { @@ -115,18 +108,17 @@ describe("Reliable Channel: Acks", () => { // acknowledged in this test. const message = utf8ToBytes("message to be acknowledged"); const messageId = ReliableChannel.getMessageId(message); + let messageAcknowledged = false; reliableChannelAlice.addEventListener("message-acknowledged", (event) => { if (event.detail === messageId) { messageAcknowledged = true; } }); - reliableChannelAlice.send(message); - // Wait a bit to ensure Bob does not receive the message - await delay(100); + reliableChannelAlice.send(message); - // Now Bob goes online + // Now Bob goes online (no need to wait since Bob wasn't online to receive it) const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); const reliableChannelBob = await ReliableChannel.create( mockWakuNodeBob, @@ -141,47 +133,51 @@ describe("Reliable Channel: Acks", () => { } ); - // Track when Bob receives the message - let bobReceivedMessage = false; - reliableChannelBob.addEventListener("message-received", (event) => { - if (bytesToUtf8(event.detail.payload!) === "message to be acknowledged") { - bobReceivedMessage = true; - } - }); - // Some sync messages are exchanged await reliableChannelAlice["sendSyncMessage"](); await reliableChannelBob["sendSyncMessage"](); - // wait a bit to ensure messages are processed - await delay(100); + // Wait for Bob to receive "some message" to ensure sync messages were processed + const bobReceivedSomeMessagePromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === "some message" + ); // Some content messages are exchanged too reliableChannelAlice.send(utf8ToBytes("some message")); reliableChannelBob.send(utf8ToBytes("some other message")); - // wait a bit to ensure messages are processed - await delay(100); + // Wait for the "some message" to be received to ensure messages are processed + await bobReceivedSomeMessagePromise; // At this point, the message shouldn't be acknowledged yet as Bob // does not have a complete log expect(messageAcknowledged).to.be.false; // Now Alice resends the message + const bobReceivedMessagePromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === "message to be acknowledged" + ); + reliableChannelAlice.send(message); // Wait for Bob to receive the message - while (!bobReceivedMessage) { - await delay(50); - } + await bobReceivedMessagePromise; + + // Set up promise waiter for acknowledgment before Bob sends sync + const messageAcknowledgedPromise = waitForEvent( + reliableChannelAlice, + "message-acknowledged", + (id) => id === messageId + ); // Bob receives it, and should include it in its sync await reliableChannelBob["sendSyncMessage"](); - while (!messageAcknowledged) { - await delay(50); - } - // The sync should acknowledge the message - expect(messageAcknowledged).to.be.true; + // Wait for acknowledgment + await messageAcknowledgedPromise; }); }); diff --git a/packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts index 628c99bfaa..24770f1fce 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts @@ -25,6 +25,8 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { beforeEach, describe } from "mocha"; +import { waitForEvent } from "./test_utils.js"; + import { ReliableChannel } from "./index.js"; const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; @@ -70,19 +72,15 @@ describe("Reliable Channel: Encryption", () => { // Setting up message tracking const messageId = ReliableChannel.getMessageId(message); - let messageSending = false; - reliableChannel.addEventListener("sending-message", (event) => { - if (event.detail === messageId) { - messageSending = true; - } - }); - reliableChannel.send(message); - while (!messageSending) { - await delay(50); - } + const sendingPromise = waitForEvent( + reliableChannel, + "sending-message", + (id) => id === messageId + ); - expect(messageSending).to.be.true; + reliableChannel.send(message); + await sendingPromise; }); it("Outgoing message is emitted as sent", async () => { @@ -98,19 +96,15 @@ describe("Reliable Channel: Encryption", () => { // Setting up message tracking const messageId = ReliableChannel.getMessageId(message); - let messageSent = false; - reliableChannel.addEventListener("message-sent", (event) => { - if (event.detail === messageId) { - messageSent = true; - } - }); - reliableChannel.send(message); - while (!messageSent) { - await delay(50); - } + const sentPromise = waitForEvent( + reliableChannel, + "message-sent", + (id) => id === messageId + ); - expect(messageSent).to.be.true; + reliableChannel.send(message); + await sentPromise; }); it("Encoder error raises irrecoverable error", async () => { @@ -137,23 +131,16 @@ describe("Reliable Channel: Encryption", () => { // Setting up message tracking const messageId = ReliableChannel.getMessageId(message); - let irrecoverableError = false; - reliableChannel.addEventListener( + + const errorPromise = waitForEvent<{ messageId: string; error: any }>( + reliableChannel, "sending-message-irrecoverable-error", - (event) => { - if (event.detail.messageId === messageId) { - irrecoverableError = true; - } - } + (detail) => detail.messageId === messageId ); encoder.contentTopic = "..."; reliableChannel.send(message); - while (!irrecoverableError) { - await delay(50); - } - - expect(irrecoverableError).to.be.true; + await errorPromise; }); it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => { @@ -216,21 +203,21 @@ describe("Reliable Channel: Encryption", () => { // Alice sets up message tracking for first message const firstMessageId = ReliableChannel.getMessageId(messages[0]); - let firstMessagePossiblyAcknowledged = false; - reliableChannelAlice.addEventListener( - "message-possibly-acknowledged", - (event) => { - if (event.detail.messageId === firstMessageId) { - firstMessagePossiblyAcknowledged = true; - } - } - ); let bobMessageReceived = 0; reliableChannelAlice.addEventListener("message-received", () => { bobMessageReceived++; }); + const firstMessagePossiblyAckPromise = waitForEvent<{ + messageId: string; + possibleAckCount: number; + }>( + reliableChannelAlice, + "message-possibly-acknowledged", + (detail) => detail.messageId === firstMessageId + ); + for (const m of messages) { reliableChannelAlice.send(m); } @@ -240,13 +227,9 @@ describe("Reliable Channel: Encryption", () => { await delay(50); } - // Bobs sends a message now, it should include first one in bloom filter + // Bob sends a message now, it should include first one in bloom filter reliableChannelBob.send(utf8ToBytes("message back")); - while (!firstMessagePossiblyAcknowledged) { - await delay(50); - } - - expect(firstMessagePossiblyAcknowledged).to.be.true; + await firstMessagePossiblyAckPromise; }); it("Outgoing message is acknowledged", async () => { @@ -273,32 +256,26 @@ describe("Reliable Channel: Encryption", () => { // Alice sets up message tracking const messageId = ReliableChannel.getMessageId(message); - let messageAcknowledged = false; - reliableChannelAlice.addEventListener("message-acknowledged", (event) => { - if (event.detail === messageId) { - messageAcknowledged = true; - } - }); - let bobReceivedMessage = false; - reliableChannelBob.addEventListener("message-received", () => { - bobReceivedMessage = true; - }); + const bobReceivedPromise = waitForEvent( + reliableChannelBob, + "message-received" + ); + + const messageAcknowledgedPromise = waitForEvent( + reliableChannelAlice, + "message-acknowledged", + (id) => id === messageId + ); reliableChannelAlice.send(message); // Wait for Bob to receive the message - while (!bobReceivedMessage) { - await delay(50); - } + await bobReceivedPromise; - // Bobs sends a message now, it should include first one in causal history + // Bob sends a message now, it should include first one in causal history reliableChannelBob.send(utf8ToBytes("second message in channel")); - while (!messageAcknowledged) { - await delay(50); - } - - expect(messageAcknowledged).to.be.true; + await messageAcknowledgedPromise; }); it("Incoming message is emitted as received", async () => { @@ -310,18 +287,16 @@ describe("Reliable Channel: Encryption", () => { decoder ); - let receivedMessage: IDecodedMessage; - reliableChannel.addEventListener("message-received", (event) => { - receivedMessage = event.detail; - }); - const message = utf8ToBytes("message in channel"); + const receivedPromise = waitForEvent( + reliableChannel, + "message-received" + ); + reliableChannel.send(message); - while (!receivedMessage!) { - await delay(50); - } + const receivedMessage = await receivedPromise; - expect(bytesToUtf8(receivedMessage!.payload)).to.eq(bytesToUtf8(message)); + expect(bytesToUtf8(receivedMessage.payload)).to.eq(bytesToUtf8(message)); }); }); diff --git a/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts index 226d5b8c6a..24a2ffd365 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts @@ -18,6 +18,8 @@ import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { beforeEach, describe } from "mocha"; +import { waitForEvent } from "./test_utils.js"; + import { ReliableChannel } from "./index.js"; const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; @@ -58,16 +60,12 @@ describe("Reliable Channel: Sync", () => { // Send a message to have a history const sentMsgId = reliableChannel.send(utf8ToBytes("some message")); - let messageSent = false; - reliableChannel.addEventListener("message-sent", (event) => { - if (event.detail === sentMsgId) { - messageSent = true; - } - }); - while (!messageSent) { - await delay(50); - } + await waitForEvent( + reliableChannel, + "message-sent", + (id) => id === sentMsgId + ); let syncMessageSent = false; reliableChannel.messageChannel.addEventListener( @@ -116,7 +114,7 @@ describe("Reliable Channel: Sync", () => { const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); - const syncMinIntervalMs = 1000; + const syncMinIntervalMs = 200; const reliableChannelAlice = await ReliableChannel.create( mockWakuNodeAlice, @@ -146,16 +144,12 @@ describe("Reliable Channel: Sync", () => { // Send a message to have a history const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message")); - let messageSent = false; - reliableChannelAlice.addEventListener("message-sent", (event) => { - if (event.detail === sentMsgId) { - messageSent = true; - } - }); - while (!messageSent) { - await delay(50); - } + await waitForEvent( + reliableChannelAlice, + "message-sent", + (id) => id === sentMsgId + ); let syncMessageSent = false; reliableChannelBob.messageChannel.addEventListener( @@ -165,20 +159,21 @@ describe("Reliable Channel: Sync", () => { } ); - while (!syncMessageSent) { - // Bob will send a sync message as soon as it started, we are waiting for this one - await delay(100); - } + // Bob will send a sync message as soon as it started, we are waiting for this one + await waitForEvent( + reliableChannelBob.messageChannel, + MessageChannelEvent.OutSyncSent + ); // Let's reset the tracker syncMessageSent = false; - // We should be faster than Bob as Bob will "randomly" wait a full second + // We should be faster than Bob as Bob will "randomly" wait the full interval await reliableChannelAlice["sendSyncMessage"](); - // Bob should be waiting a full second before sending a message after Alice - await delay(900); + // Bob should be waiting the full interval before sending a message after Alice + await delay(180); // Now, let's wait Bob to send the sync message - await delay(200); + await delay(40); expect(syncMessageSent).to.be.true; }); @@ -189,7 +184,7 @@ describe("Reliable Channel: Sync", () => { const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); - const syncMinIntervalMs = 1000; + const syncMinIntervalMs = 200; const reliableChannelAlice = await ReliableChannel.create( mockWakuNodeAlice, @@ -219,16 +214,12 @@ describe("Reliable Channel: Sync", () => { // Send a message to have a history const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message")); - let messageSent = false; - reliableChannelAlice.addEventListener("message-sent", (event) => { - if (event.detail === sentMsgId) { - messageSent = true; - } - }); - while (!messageSent) { - await delay(50); - } + await waitForEvent( + reliableChannelAlice, + "message-sent", + (id) => id === sentMsgId + ); let syncMessageSent = false; reliableChannelBob.messageChannel.addEventListener( @@ -238,26 +229,27 @@ describe("Reliable Channel: Sync", () => { } ); - while (!syncMessageSent) { - // Bob will send a sync message as soon as it started, we are waiting for this one - await delay(100); - } + // Bob will send a sync message as soon as it started, we are waiting for this one + await waitForEvent( + reliableChannelBob.messageChannel, + MessageChannelEvent.OutSyncSent + ); // Let's reset the tracker syncMessageSent = false; - // We should be faster than Bob as Bob will "randomly" wait a full second + // We should be faster than Bob as Bob will "randomly" wait the full interval reliableChannelAlice.send(utf8ToBytes("some message")); - // Bob should be waiting a full second before sending a message after Alice - await delay(900); + // Bob should be waiting the full interval before sending a message after Alice + await delay(180); // Now, let's wait Bob to send the sync message - await delay(200); + await delay(40); expect(syncMessageSent).to.be.true; }); it("Sync message is not sent if another sync message was just sent", async function () { this.timeout(5000); - const syncMinIntervalMs = 1000; + const syncMinIntervalMs = 200; const reliableChannel = await ReliableChannel.create( mockWakuNode, @@ -273,16 +265,12 @@ describe("Reliable Channel: Sync", () => { // Send a message to have a history const sentMsgId = reliableChannel.send(utf8ToBytes("some message")); - let messageSent = false; - reliableChannel.addEventListener("message-sent", (event) => { - if (event.detail === sentMsgId) { - messageSent = true; - } - }); - while (!messageSent) { - await delay(50); - } + await waitForEvent( + reliableChannel, + "message-sent", + (id) => id === sentMsgId + ); let syncMessageSent = false; reliableChannel.messageChannel.addEventListener( @@ -292,26 +280,27 @@ describe("Reliable Channel: Sync", () => { } ); - while (!syncMessageSent) { - // Will send a sync message as soon as it started, we are waiting for this one - await delay(100); - } + // Will send a sync message as soon as it started, we are waiting for this one + await waitForEvent( + reliableChannel.messageChannel, + MessageChannelEvent.OutSyncSent + ); // Let's reset the tracker syncMessageSent = false; - // We should be faster than automated sync as it will "randomly" wait a full second + // We should be faster than automated sync as it will "randomly" wait the full interval await reliableChannel["sendSyncMessage"](); - // should be waiting a full second before sending a message after Alice - await delay(900); + // should be waiting the full interval before sending a message + await delay(180); // Now, let's wait to send the automated sync message - await delay(200); + await delay(40); expect(syncMessageSent).to.be.true; }); it("Sync message is not sent if another non-ephemeral message was just sent", async function () { this.timeout(5000); - const syncMinIntervalMs = 1000; + const syncMinIntervalMs = 200; const reliableChannel = await ReliableChannel.create( mockWakuNode, @@ -327,16 +316,12 @@ describe("Reliable Channel: Sync", () => { // Send a message to have a history const sentMsgId = reliableChannel.send(utf8ToBytes("some message")); - let messageSent = false; - reliableChannel.addEventListener("message-sent", (event) => { - if (event.detail === sentMsgId) { - messageSent = true; - } - }); - while (!messageSent) { - await delay(50); - } + await waitForEvent( + reliableChannel, + "message-sent", + (id) => id === sentMsgId + ); let syncMessageSent = false; reliableChannel.messageChannel.addEventListener( @@ -346,20 +331,21 @@ describe("Reliable Channel: Sync", () => { } ); - while (!syncMessageSent) { - // Will send a sync message as soon as it started, we are waiting for this one - await delay(100); - } + // Will send a sync message as soon as it started, we are waiting for this one + await waitForEvent( + reliableChannel.messageChannel, + MessageChannelEvent.OutSyncSent + ); // Let's reset the tracker syncMessageSent = false; - // We should be faster than automated sync as it will "randomly" wait a full second + // We should be faster than automated sync as it will "randomly" wait the full interval reliableChannel.send(utf8ToBytes("non-ephemeral message")); - // should be waiting a full second before sending a message after Alice - await delay(900); + // should be waiting the full interval before sending a message + await delay(180); // Now, let's wait to send the automated sync message - await delay(200); + await delay(40); expect(syncMessageSent).to.be.true; }); diff --git a/packages/sdk/src/reliable_channel/test_utils.ts b/packages/sdk/src/reliable_channel/test_utils.ts new file mode 100644 index 0000000000..62f92bdbcd --- /dev/null +++ b/packages/sdk/src/reliable_channel/test_utils.ts @@ -0,0 +1,44 @@ +import { TypedEventEmitter } from "@libp2p/interface"; + +/** + * Helper function to wait for an event with an optional predicate. + * Useful for replacing delay-based polling in tests. + * + * @param emitter - The event emitter to listen to + * @param eventName - The name of the event to wait for + * @param predicate - Optional function to filter events by detail + * @param timeoutMs - Timeout in milliseconds (default: 5000) + * @returns Promise that resolves with the event detail + * + * @example + * ```typescript + * const messageId = await waitForEvent( + * channel, + * "message-acknowledged", + * (id) => id === expectedId + * ); + * ``` + */ +export function waitForEvent( + emitter: TypedEventEmitter, + eventName: string, + predicate?: (detail: T) => boolean, + timeoutMs: number = 5000 +): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + emitter.removeEventListener(eventName, handler); + reject(new Error(`Timeout waiting for event: ${eventName}`)); + }, timeoutMs); + + const handler = (event: CustomEvent): void => { + if (!predicate || predicate(event.detail)) { + clearTimeout(timeout); + emitter.removeEventListener(eventName, handler); + resolve(event.detail); + } + }; + + emitter.addEventListener(eventName, handler); + }); +} diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index 306c211a11..afa9c64f23 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -86,7 +86,7 @@ describe("Autosharding: Running Nodes", function () { expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(true); }); - const numTest = 10; + const numTest = 2; for (let i = 0; i < numTest; i++) { // Random ContentTopic const applicationName = `app${Math.floor(Math.random() * 100)}`; // Random application name app0 to app99 diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts index 9ca84ed25d..73381d24cc 100644 --- a/packages/tests/tests/sharding/static_sharding.spec.ts +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -82,7 +82,7 @@ describe("Static Sharding: Running Nodes", function () { expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(true); }); - const numTest = 10; + const numTest = 2; for (let i = 0; i < numTest; i++) { // Random clusterId between 2 and 1000 const clusterId = Math.floor(Math.random() * 999) + 2;