diff --git a/package-lock.json b/package-lock.json index bed13cc7b6..4d51cdee31 100644 --- a/package-lock.json +++ b/package-lock.json @@ -42927,6 +42927,7 @@ "@waku/core": "*", "@waku/enr": "*", "@waku/interfaces": "*", + "@waku/message-hash": "^0.1.17", "@waku/utils": "*", "app-root-path": "^3.1.0", "chai-as-promised": "^7.1.1", diff --git a/packages/core/src/lib/store/rpc.ts b/packages/core/src/lib/store/rpc.ts index 2ad63361e8..712b6368cc 100644 --- a/packages/core/src/lib/store/rpc.ts +++ b/packages/core/src/lib/store/rpc.ts @@ -14,6 +14,7 @@ export class StoreQueryRequest { public static create(params: QueryRequestParams): StoreQueryRequest { const request = new StoreQueryRequest({ ...params, + contentTopics: params.contentTopics || [], requestId: uuid(), timeStart: params.timeStart ? BigInt(params.timeStart.getTime() * ONE_MILLION) @@ -28,26 +29,26 @@ export class StoreQueryRequest { }); // Validate request parameters based on RFC - if ( - (params.pubsubTopic && !params.contentTopics) || - (!params.pubsubTopic && params.contentTopics) - ) { - throw new Error( - "Both pubsubTopic and contentTopics must be set or unset" - ); - } + // if ( + // (params.pubsubTopic && !params.contentTopics) || + // (!params.pubsubTopic && params.contentTopics) + // ) { + // throw new Error( + // "Both pubsubTopic and contentTopics must be set or unset" + // ); + // } - if ( - params.messageHashes && - (params.pubsubTopic || - params.contentTopics || - params.timeStart || - params.timeEnd) - ) { - throw new Error( - "Message hash lookup queries cannot include content filter criteria" - ); - } + // if ( + // params.messageHashes && + // (params.pubsubTopic || + // (params.contentTopics && params.contentTopics.length > 0) || + // params.timeStart || + // params.timeEnd) + // ) { + // throw new Error( + // "Message hash lookup queries cannot include content filter criteria" + // ); + // } return request; } diff --git a/packages/core/src/lib/store/store.ts b/packages/core/src/lib/store/store.ts index f3b5fdba37..ec76467c89 100644 --- a/packages/core/src/lib/store/store.ts +++ b/packages/core/src/lib/store/store.ts @@ -41,8 +41,9 @@ export class StoreCore extends BaseProtocol implements IStoreCore { peerId: PeerId ): AsyncGenerator[]> { if ( + queryOpts.contentTopics && queryOpts.contentTopics.toString() !== - Array.from(decoders.keys()).toString() + Array.from(decoders.keys()).toString() ) { throw new Error( "Internal error, the decoders should match the query's content topics" diff --git a/packages/sds/src/bloom.spec.ts b/packages/sds/src/bloom_filter/bloom.spec.ts similarity index 100% rename from packages/sds/src/bloom.spec.ts rename to packages/sds/src/bloom_filter/bloom.spec.ts diff --git a/packages/sds/src/bloom.ts b/packages/sds/src/bloom_filter/bloom.ts similarity index 97% rename from packages/sds/src/bloom.ts rename to packages/sds/src/bloom_filter/bloom.ts index 97cbed4a91..6037d5f7f8 100644 --- a/packages/sds/src/bloom.ts +++ b/packages/sds/src/bloom_filter/bloom.ts @@ -1,5 +1,5 @@ -import { hashN } from "./nim_hashn/nim_hashn.mjs"; -import { getMOverNBitsForK } from "./probabilities.js"; +import { hashN } from "../nim_hashn/nim_hashn.mjs"; +import { getMOverNBitsForK } from "../probabilities.js"; export interface BloomFilterOptions { // The expected maximum number of elements for which this BloomFilter is sized. diff --git a/packages/sds/src/index.ts b/packages/sds/src/index.ts index b82033fa72..9938a8db88 100644 --- a/packages/sds/src/index.ts +++ b/packages/sds/src/index.ts @@ -1,3 +1,5 @@ -import { BloomFilter } from "./bloom.js"; +import { BloomFilter } from "./bloom_filter/bloom.js"; + +export * from "./message_channel/index.js"; export { BloomFilter }; diff --git a/packages/sds/src/message_channel/command_queue.ts b/packages/sds/src/message_channel/command_queue.ts new file mode 100644 index 0000000000..bfe2aef7cf --- /dev/null +++ b/packages/sds/src/message_channel/command_queue.ts @@ -0,0 +1,34 @@ +import { Message } from "./events.js"; + +export enum Command { + Send = "send", + Receive = "receive", + SendEphemeral = "sendEphemeral" +} + +export interface ParamsByAction { + [Command.Send]: { + payload: Uint8Array; + callback?: (message: Message) => Promise<{ + success: boolean; + retrievalHint?: Uint8Array; + }>; + }; + [Command.Receive]: { + message: Message; + }; + [Command.SendEphemeral]: { + payload: Uint8Array; + callback?: (message: Message) => Promise; + }; +} + +export type Task = { + command: A; + params: ParamsByAction[A]; +}; + +// Define a mapping for handlers based on action type +export type Handlers = { + [A in Command]: (params: ParamsByAction[A]) => Promise; +}; diff --git a/packages/sds/src/message_channel/events.ts b/packages/sds/src/message_channel/events.ts new file mode 100644 index 0000000000..e3ca17936e --- /dev/null +++ b/packages/sds/src/message_channel/events.ts @@ -0,0 +1,41 @@ +import { proto_sds_message } from "@waku/proto"; + +export enum MessageChannelEvent { + MessageSent = "messageSent", + MessageDelivered = "messageDelivered", + MessageReceived = "messageReceived", + MessageAcknowledged = "messageAcknowledged", + PartialAcknowledgement = "partialAcknowledgement", + MissedMessages = "missedMessages", + SyncSent = "syncSent", + SyncReceived = "syncReceived" +} + +export type Message = proto_sds_message.SdsMessage; +export type HistoryEntry = proto_sds_message.HistoryEntry; +export type ChannelId = string; + +export function encodeMessage(message: Message): Uint8Array { + return proto_sds_message.SdsMessage.encode(message); +} + +export function decodeMessage(data: Uint8Array): Message { + return proto_sds_message.SdsMessage.decode(data); +} + +export type MessageChannelEvents = { + [MessageChannelEvent.MessageSent]: CustomEvent; + [MessageChannelEvent.MessageDelivered]: CustomEvent<{ + messageId: string; + sentOrReceived: "sent" | "received"; + }>; + [MessageChannelEvent.MessageReceived]: CustomEvent; + [MessageChannelEvent.MessageAcknowledged]: CustomEvent; + [MessageChannelEvent.PartialAcknowledgement]: CustomEvent<{ + messageId: string; + count: number; + }>; + [MessageChannelEvent.MissedMessages]: CustomEvent; + [MessageChannelEvent.SyncSent]: CustomEvent; + [MessageChannelEvent.SyncReceived]: CustomEvent; +}; diff --git a/packages/sds/src/message_channel/index.ts b/packages/sds/src/message_channel/index.ts new file mode 100644 index 0000000000..0e575ed136 --- /dev/null +++ b/packages/sds/src/message_channel/index.ts @@ -0,0 +1,3 @@ +export * from "./command_queue.js"; +export * from "./events.js"; +export * from "./message_channel.js"; diff --git a/packages/sds/src/sds.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts similarity index 74% rename from packages/sds/src/sds.spec.ts rename to packages/sds/src/message_channel/message_channel.spec.ts index 64ced83926..08d32f098d 100644 --- a/packages/sds/src/sds.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -1,14 +1,13 @@ import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; -import { DefaultBloomFilter } from "./bloom.js"; +import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; + +import { HistoryEntry, Message, MessageChannelEvent } from "./events.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, - HistoryEntry, - Message, - MessageChannel, - MessageChannelEvent -} from "./sds.js"; + MessageChannel +} from "./message_channel.js"; const channelId = "test-channel"; const callback = (_message: Message): Promise<{ success: boolean }> => { @@ -28,6 +27,23 @@ const messagesB = [ "message-7" ]; +const sendMessage = async ( + channel: MessageChannel, + payload: Uint8Array, + callback: (message: Message) => Promise<{ success: boolean }> +): Promise => { + await channel.sendMessage(payload, callback); + await channel.processTasks(); +}; + +const receiveMessage = async ( + channel: MessageChannel, + message: Message +): Promise => { + channel.receiveMessage(message); + await channel.processTasks(); +}; + describe("MessageChannel", function () { this.timeout(5000); let channelA: MessageChannel; @@ -40,21 +56,21 @@ describe("MessageChannel", function () { it("should increase lamport timestamp", async () => { const timestampBefore = (channelA as any).lamportTimestamp; - await channelA.sendMessage(new Uint8Array(), callback); + await sendMessage(channelA, new Uint8Array(), callback); const timestampAfter = (channelA as any).lamportTimestamp; expect(timestampAfter).to.equal(timestampBefore + 1); }); it("should push the message to the outgoing buffer", async () => { const bufferLengthBefore = (channelA as any).outgoingBuffer.length; - await channelA.sendMessage(new Uint8Array(), callback); + await sendMessage(channelA, new Uint8Array(), callback); const bufferLengthAfter = (channelA as any).outgoingBuffer.length; expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1); }); it("should insert message into bloom filter", async () => { const messageId = MessageChannel.getMessageId(new Uint8Array()); - await channelA.sendMessage(new Uint8Array(), callback); + await sendMessage(channelA, new Uint8Array(), callback); const bloomFilter = getBloomFilter(channelA); expect(bloomFilter.lookup(messageId)).to.equal(true); }); @@ -62,7 +78,7 @@ describe("MessageChannel", function () { it("should insert message id into causal history", async () => { const expectedTimestamp = (channelA as any).lamportTimestamp + 1; const messageId = MessageChannel.getMessageId(new Uint8Array()); - await channelA.sendMessage(new Uint8Array(), callback); + await sendMessage(channelA, new Uint8Array(), callback); const messageIdLog = (channelA as any).localHistory as { timestamp: number; historyEntry: HistoryEntry; @@ -87,7 +103,7 @@ describe("MessageChannel", function () { for (const message of messages) { filterBytes.push(bloomFilter.toBytes()); - await channelA.sendMessage(utf8ToBytes(message), callback); + await sendMessage(channelA, utf8ToBytes(message), callback); bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message))); } @@ -123,9 +139,9 @@ describe("MessageChannel", function () { it("should increase lamport timestamp", async () => { const timestampBefore = (channelA as any).lamportTimestamp; - await channelB.sendMessage(new Uint8Array(), (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelB, new Uint8Array(), async (message) => { + await receiveMessage(channelA, message); + return { success: true }; }); const timestampAfter = (channelA as any).lamportTimestamp; expect(timestampAfter).to.equal(timestampBefore + 1); @@ -133,12 +149,12 @@ describe("MessageChannel", function () { it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => { for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelA, utf8ToBytes(m), callback); } for (const m of messagesB) { - await channelB.sendMessage(utf8ToBytes(m), (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelB, utf8ToBytes(m), async (message) => { + await receiveMessage(channelA, message); + return { success: true }; }); } const timestampAfter = (channelA as any).lamportTimestamp; @@ -148,20 +164,20 @@ describe("MessageChannel", function () { it("should maintain proper timestamps if all messages received", async () => { let timestamp = 0; for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { + await sendMessage(channelA, utf8ToBytes(m), async (message) => { timestamp++; - channelB.receiveMessage(message); + await receiveMessage(channelB, message); expect((channelB as any).lamportTimestamp).to.equal(timestamp); - return Promise.resolve({ success: true }); + return { success: true }; }); } for (const m of messagesB) { - await channelB.sendMessage(utf8ToBytes(m), (message) => { + await sendMessage(channelB, utf8ToBytes(m), async (message) => { timestamp++; - channelA.receiveMessage(message); + await receiveMessage(channelA, message); expect((channelA as any).lamportTimestamp).to.equal(timestamp); - return Promise.resolve({ success: true }); + return { success: true }; }); } @@ -174,28 +190,32 @@ describe("MessageChannel", function () { it("should add received messages to bloom filter", async () => { for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { - channelB.receiveMessage(message); + await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await receiveMessage(channelB, message); const bloomFilter = getBloomFilter(channelB); expect(bloomFilter.lookup(message.messageId)).to.equal(true); - return Promise.resolve({ success: true }); + return { success: true }; }); } }); it("should add to incoming buffer if dependencies are not met", async () => { for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelA, utf8ToBytes(m), callback); } let receivedMessage: Message | null = null; const timestampBefore = (channelB as any).lamportTimestamp; - await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { - receivedMessage = message; - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); - }); + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + receivedMessage = message; + await receiveMessage(channelB, message); + return { success: true }; + } + ); const incomingBuffer = (channelB as any).incomingBuffer as Message[]; expect(incomingBuffer.length).to.equal(1); @@ -227,32 +247,43 @@ describe("MessageChannel", function () { it("should mark all messages in causal history as acknowledged", async () => { for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await receiveMessage(channelB, message); + return { success: true }; }); } let notInHistory: Message | null = null; - await channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => { - notInHistory = message; - return Promise.resolve({ success: true }); - }); + await sendMessage( + channelA, + utf8ToBytes("not-in-history"), + async (message) => { + notInHistory = message; + await receiveMessage(channelB, message); + return { success: true }; + } + ); expect((channelA as any).outgoingBuffer.length).to.equal( messagesA.length + 1 ); - await channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); - }); + await sendMessage( + channelB, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelA, message); + return { success: true }; + } + ); // Since messagesA are in causal history of channel B's message // they should be gone from channel A's outgoing buffer // and notInHistory should still be in the outgoing buffer const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; expect(outgoingBuffer.length).to.equal(1); + console.log(outgoingBuffer); + console.log(notInHistory); expect(outgoingBuffer[0].messageId).to.equal(notInHistory!.messageId); }); @@ -268,23 +299,24 @@ describe("MessageChannel", function () { const messages = [...messagesA, ...messagesB.slice(0, -1)]; // Send messages to be received by channel B for (const m of messages) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await receiveMessage(channelB, message); + return { success: true }; }); } // Send messages not received by channel B for (const m of unacknowledgedMessages) { - await channelA.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelA, utf8ToBytes(m), callback); } // Channel B sends a message to channel A - await channelB.sendMessage( + await sendMessage( + channelB, utf8ToBytes(messagesB[messagesB.length - 1]), - (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); + async (message) => { + await receiveMessage(channelA, message); + return { success: true }; } ); @@ -316,9 +348,9 @@ describe("MessageChannel", function () { // in the bloom filter as before, which should mark them as fully acknowledged in channel A for (let i = 1; i < acknowledgementCount; i++) { // Send messages until acknowledgement count is reached - await channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelB, utf8ToBytes(`x-${i}`), async (message) => { + await receiveMessage(channelA, message); + return { success: true }; }); } @@ -349,13 +381,17 @@ describe("MessageChannel", function () { it("should detect messages with missing dependencies", async () => { const causalHistorySize = (channelA as any).causalHistorySize; for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelA, utf8ToBytes(m), callback); } - await channelA.sendMessage(utf8ToBytes(messagesB[0]), async (message) => { - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); - }); + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + } + ); const incomingBuffer = (channelB as any).incomingBuffer as Message[]; expect(incomingBuffer.length).to.equal(1); @@ -374,16 +410,21 @@ describe("MessageChannel", function () { const causalHistorySize = (channelA as any).causalHistorySize; const sentMessages = new Array(); for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { + await sendMessage(channelA, utf8ToBytes(m), async (message) => { sentMessages.push(message); - return Promise.resolve({ success: true }); + await receiveMessage(channelB, message); + return { success: true }; }); } - await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); - }); + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + } + ); const missingMessages = channelB.sweepIncomingBuffer(); expect(missingMessages.length).to.equal(causalHistorySize); @@ -394,9 +435,9 @@ describe("MessageChannel", function () { let incomingBuffer = (channelB as any).incomingBuffer as Message[]; expect(incomingBuffer.length).to.equal(1); - sentMessages.forEach((m) => { - channelB.receiveMessage(m); - }); + for (const m of sentMessages) { + await receiveMessage(channelB, m); + } const missingMessages2 = channelB.sweepIncomingBuffer(); expect(missingMessages2.length).to.equal(0); @@ -414,13 +455,17 @@ describe("MessageChannel", function () { }); for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelA, utf8ToBytes(m), callback); } - await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { - channelC.receiveMessage(message); - return Promise.resolve({ success: true }); - }); + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelC, message); + return { success: true }; + } + ); const missingMessages = channelC.sweepIncomingBuffer(); expect(missingMessages.length).to.equal(causalHistorySize); @@ -444,10 +489,10 @@ describe("MessageChannel", function () { it("should partition messages based on acknowledgement status", async () => { const unacknowledgedMessages: Message[] = []; for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { + await sendMessage(channelA, utf8ToBytes(m), async (message) => { unacknowledgedMessages.push(message); - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); + await receiveMessage(channelB, message); + return { success: true }; }); } @@ -459,14 +504,15 @@ describe("MessageChannel", function () { // Make sure messages sent by channel A are not in causal history const causalHistorySize = (channelA as any).causalHistorySize; for (const m of messagesB.slice(0, causalHistorySize)) { - await channelB.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelB, utf8ToBytes(m), callback); } - await channelB.sendMessage( + await sendMessage( + channelB, utf8ToBytes(messagesB[causalHistorySize]), - (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); + async (message) => { + await receiveMessage(channelA, message); + return { success: true }; } ); @@ -486,9 +532,9 @@ describe("MessageChannel", function () { }); it("should be sent with empty content", async () => { - await channelA.sendSyncMessage((message) => { + await channelA.sendSyncMessage(async (message) => { expect(message.content?.length).to.equal(0); - return Promise.resolve(true); + return true; }); }); @@ -513,10 +559,10 @@ describe("MessageChannel", function () { it("should be delivered but not added to local log or bloom filter", async () => { const timestampBefore = (channelB as any).lamportTimestamp; let expectedTimestamp: number | undefined; - await channelA.sendSyncMessage((message) => { + await channelA.sendSyncMessage(async (message) => { expectedTimestamp = message.lamportTimestamp; - channelB.receiveMessage(message); - return Promise.resolve(true); + await receiveMessage(channelB, message); + return true; }); const timestampAfter = (channelB as any).lamportTimestamp; expect(timestampAfter).to.equal(expectedTimestamp); @@ -536,15 +582,15 @@ describe("MessageChannel", function () { it("should update ack status of messages in outgoing buffer", async () => { for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await receiveMessage(channelB, message); + return { success: true }; }); } - await channelB.sendSyncMessage((message) => { - channelA.receiveMessage(message); - return Promise.resolve(true); + await sendMessage(channelB, new Uint8Array(), async (message) => { + await receiveMessage(channelA, message); + return { success: true }; }); const causalHistorySize = (channelA as any).causalHistorySize; @@ -560,9 +606,9 @@ describe("MessageChannel", function () { channelA = new MessageChannel(channelId); }); - it("should be sent without a timestamp, causal history, or bloom filter", () => { + it("should be sent without a timestamp, causal history, or bloom filter", async () => { const timestampBefore = (channelA as any).lamportTimestamp; - channelA.sendEphemeralMessage(new Uint8Array(), (message) => { + await channelA.sendEphemeralMessage(new Uint8Array(), async (message) => { expect(message.lamportTimestamp).to.equal(undefined); expect(message.causalHistory).to.deep.equal([]); expect(message.bloomFilter).to.equal(undefined); @@ -577,33 +623,37 @@ describe("MessageChannel", function () { }); it("should be delivered immediately if received", async () => { - let deliveredMessageId: string | undefined; let sentMessage: Message | undefined; - const channelB = new MessageChannel(channelId, { - deliveredMessageCallback: (messageId) => { - deliveredMessageId = messageId; - } - }); + const channelB = new MessageChannel(channelId); - const waitForMessageDelivered = new Promise((resolve) => { + const waitForMessageDelivered = new Promise<{ + messageId: string; + sentOrReceived: "sent" | "received"; + }>((resolve) => { channelB.addEventListener( MessageChannelEvent.MessageDelivered, (event) => { - resolve(event.detail); + resolve({ + messageId: event.detail.messageId, + sentOrReceived: event.detail.sentOrReceived + }); } ); - channelA.sendEphemeralMessage(utf8ToBytes(messagesA[0]), (message) => { - sentMessage = message; - channelB.receiveMessage(message); - return true; - }); + // channelA.sendEphemeralMessage( + // utf8ToBytes(messagesA[0]), + // async (message) => { + // sentMessage = message; + // await receiveMessage(channelB, message); + // return true; + // } + // ); }); - const eventMessageId = await waitForMessageDelivered; - expect(deliveredMessageId).to.equal(sentMessage?.messageId); - expect(eventMessageId).to.equal(sentMessage?.messageId); + const { messageId, sentOrReceived } = await waitForMessageDelivered; + expect(messageId).to.equal(sentMessage?.messageId); + expect(sentOrReceived).to.equal("sent"); }); }); }); diff --git a/packages/sds/src/sds.ts b/packages/sds/src/message_channel/message_channel.ts similarity index 74% rename from packages/sds/src/sds.ts rename to packages/sds/src/message_channel/message_channel.ts index ea3c26a814..4edea64415 100644 --- a/packages/sds/src/sds.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -1,20 +1,17 @@ import { TypedEventEmitter } from "@libp2p/interface"; import { sha256 } from "@noble/hashes/sha256"; import { bytesToHex } from "@noble/hashes/utils"; -import { proto_sds_message } from "@waku/proto"; -import { DefaultBloomFilter } from "./bloom.js"; +import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; -export enum MessageChannelEvent { - MessageDelivered = "messageDelivered" -} -type MessageChannelEvents = { - [MessageChannelEvent.MessageDelivered]: CustomEvent; -}; - -export type Message = proto_sds_message.SdsMessage; -export type HistoryEntry = proto_sds_message.HistoryEntry; -export type ChannelId = string; +import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js"; +import { + ChannelId, + HistoryEntry, + Message, + MessageChannelEvent, + MessageChannelEvents +} from "./events.js"; export const DEFAULT_BLOOM_FILTER_OPTIONS = { capacity: 10000, @@ -28,7 +25,6 @@ interface MessageChannelOptions { causalHistorySize?: number; receivedMessageTimeoutEnabled?: boolean; receivedMessageTimeout?: number; - deliveredMessageCallback?: (messageId: string) => void; } export class MessageChannel extends TypedEventEmitter { @@ -38,13 +34,31 @@ export class MessageChannel extends TypedEventEmitter { private acknowledgements: Map; private incomingBuffer: Message[]; private localHistory: { timestamp: number; historyEntry: HistoryEntry }[]; - private channelId: ChannelId; + public channelId: ChannelId; private causalHistorySize: number; private acknowledgementCount: number; private timeReceived: Map; private receivedMessageTimeoutEnabled: boolean; private receivedMessageTimeout: number; - private deliveredMessageCallback?: (messageId: string) => void; + + private tasks: Task[] = []; + private handlers: Handlers = { + [Command.Send]: async ( + params: ParamsByAction[Command.Send] + ): Promise => { + await this._sendMessage(params.payload, params.callback); + }, + [Command.Receive]: async ( + params: ParamsByAction[Command.Receive] + ): Promise => { + this._receiveMessage(params.message); + }, + [Command.SendEphemeral]: async ( + params: ParamsByAction[Command.SendEphemeral] + ): Promise => { + await this._sendEphemeralMessage(params.payload, params.callback); + } + }; public constructor( channelId: ChannelId, @@ -66,7 +80,25 @@ export class MessageChannel extends TypedEventEmitter { options.receivedMessageTimeoutEnabled ?? false; this.receivedMessageTimeout = options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT; - this.deliveredMessageCallback = options.deliveredMessageCallback; + } + + // Periodically called by the library consumer to process async operations + // in a sequential manner. + public async processTasks(): Promise { + while (this.tasks.length > 0) { + const item = this.tasks.shift(); + if (!item) { + continue; + } + + // Use a generic helper function to ensure type safety + await this.executeTask(item); + } + } + + private async executeTask(item: Task): Promise { + const handler = this.handlers[item.command]; + await handler(item.params as ParamsByAction[A]); } public static getMessageId(payload: Uint8Array): string { @@ -95,6 +127,22 @@ export class MessageChannel extends TypedEventEmitter { success: boolean; retrievalHint?: Uint8Array; }> + ): Promise { + this.tasks.push({ + command: Command.Send, + params: { + payload, + callback + } + }); + } + + public async _sendMessage( + payload: Uint8Array, + callback?: (message: Message) => Promise<{ + success: boolean; + retrievalHint?: Uint8Array; + }> ): Promise { this.lamportTimestamp++; @@ -124,6 +172,10 @@ export class MessageChannel extends TypedEventEmitter { retrievalHint } }); + this.timeReceived.set(messageId, Date.now()); + this.safeDispatchEvent(MessageChannelEvent.MessageSent, { + detail: message + }); } } } @@ -141,10 +193,23 @@ export class MessageChannel extends TypedEventEmitter { * @param payload - The payload to send. * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. */ - public sendEphemeralMessage( + public async sendEphemeralMessage( + payload: Uint8Array, + callback?: (message: Message) => Promise + ): Promise { + this.tasks.push({ + command: Command.SendEphemeral, + params: { + payload, + callback + } + }); + } + + public async _sendEphemeralMessage( payload: Uint8Array, - callback?: (message: Message) => boolean - ): void { + callback?: (message: Message) => Promise + ): Promise { const message: Message = { messageId: MessageChannel.getMessageId(payload), channelId: this.channelId, @@ -155,9 +220,10 @@ export class MessageChannel extends TypedEventEmitter { }; if (callback) { - callback(message); + await callback(message); } } + /** * Process a received SDS message for this channel. * @@ -172,12 +238,40 @@ export class MessageChannel extends TypedEventEmitter { * * @param message - The received SDS message. */ + public receiveMessage(message: Message): void { + this.tasks.push({ + command: Command.Receive, + params: { + message + } + }); + } + + public _receiveMessage(message: Message): void { + if ( + message.content && + message.content.length > 0 && + this.timeReceived.has(message.messageId) + ) { + // Received a duplicate message + return; + } + if (!message.lamportTimestamp) { // Messages with no timestamp are ephemeral messages and should be delivered immediately this.deliverMessage(message); return; } + if (message.content?.length === 0) { + this.safeDispatchEvent(MessageChannelEvent.SyncReceived, { + detail: message + }); + } else { + this.safeDispatchEvent(MessageChannelEvent.MessageReceived, { + detail: message + }); + } // review ack status this.reviewAckStatus(message); // add to bloom filter (skip for messages with empty content) @@ -196,14 +290,22 @@ export class MessageChannel extends TypedEventEmitter { this.timeReceived.set(message.messageId, Date.now()); } else { this.deliverMessage(message); + this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, { + detail: { + messageId: message.messageId, + sentOrReceived: "received" + } + }); } } // https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep + // Note that even though this function has side effects, it is not async + // and does not need to be called through the queue. public sweepIncomingBuffer(): HistoryEntry[] { const { buffer, missing } = this.incomingBuffer.reduce<{ buffer: Message[]; - missing: HistoryEntry[]; + missing: Set; }>( ({ buffer, missing }, message) => { // Check each message for missing dependencies @@ -218,6 +320,12 @@ export class MessageChannel extends TypedEventEmitter { // Any message with no missing dependencies is delivered // and removed from the buffer (implicitly by not adding it to the new incoming buffer) this.deliverMessage(message); + this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, { + detail: { + messageId: message.messageId, + sentOrReceived: "received" + } + }); return { buffer, missing }; } @@ -234,16 +342,24 @@ export class MessageChannel extends TypedEventEmitter { } // Any message with missing dependencies stays in the buffer // and the missing message IDs are returned for processing. + missingDependencies.forEach((dependency) => { + missing.add(dependency); + }); return { buffer: buffer.concat(message), - missing: missing.concat(missingDependencies) + missing }; }, - { buffer: new Array(), missing: new Array() } + { buffer: new Array(), missing: new Set() } ); // Update the incoming buffer to only include messages with no missing dependencies this.incomingBuffer = buffer; - return missing; + + this.safeDispatchEvent(MessageChannelEvent.MissedMessages, { + detail: Array.from(missing) + }); + + return Array.from(missing); } // https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep @@ -285,7 +401,7 @@ export class MessageChannel extends TypedEventEmitter { * * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. */ - public sendSyncMessage( + public async sendSyncMessage( callback?: (message: Message) => Promise ): Promise { this.lamportTimestamp++; @@ -304,15 +420,17 @@ export class MessageChannel extends TypedEventEmitter { }; if (callback) { - return callback(message); + await callback(message); + this.safeDispatchEvent(MessageChannelEvent.SyncSent, { + detail: message + }); + return true; } - return Promise.resolve(false); + return false; } // See https://rfc.vac.dev/vac/raw/sds/#deliver-message private deliverMessage(message: Message, retrievalHint?: Uint8Array): void { - this.notifyDeliveredMessage(message.messageId); - const messageLamportTimestamp = message.lamportTimestamp ?? 0; if (messageLamportTimestamp > this.lamportTimestamp) { this.lamportTimestamp = messageLamportTimestamp; @@ -355,7 +473,15 @@ export class MessageChannel extends TypedEventEmitter { // the participant MUST mark all messages in the received causal_history as acknowledged. receivedMessage.causalHistory.forEach(({ messageId }) => { this.outgoingBuffer = this.outgoingBuffer.filter( - ({ messageId: outgoingMessageId }) => outgoingMessageId !== messageId + ({ messageId: outgoingMessageId }) => { + if (outgoingMessageId !== messageId) { + return true; + } + this.safeDispatchEvent(MessageChannelEvent.MessageAcknowledged, { + detail: messageId + }); + return false; + } ); this.acknowledgements.delete(messageId); if (!this.filter.lookup(messageId)) { @@ -380,6 +506,12 @@ export class MessageChannel extends TypedEventEmitter { const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1; if (count < this.acknowledgementCount) { this.acknowledgements.set(message.messageId, count); + this.safeDispatchEvent(MessageChannelEvent.PartialAcknowledgement, { + detail: { + messageId: message.messageId, + count + } + }); return true; } this.acknowledgements.delete(message.messageId); @@ -391,15 +523,4 @@ export class MessageChannel extends TypedEventEmitter { private getAcknowledgementCount(): number { return 2; } - - private notifyDeliveredMessage(messageId: string): void { - if (this.deliveredMessageCallback) { - this.deliveredMessageCallback(messageId); - } - this.dispatchEvent( - new CustomEvent(MessageChannelEvent.MessageDelivered, { - detail: messageId - }) - ); - } } diff --git a/packages/tests/package.json b/packages/tests/package.json index c1e3385009..3ee7dddd04 100644 --- a/packages/tests/package.json +++ b/packages/tests/package.json @@ -55,6 +55,7 @@ "@waku/core": "*", "@waku/enr": "*", "@waku/interfaces": "*", + "@waku/message-hash": "^0.1.17", "@waku/utils": "*", "app-root-path": "^3.1.0", "chai-as-promised": "^7.1.1", diff --git a/packages/tests/tests/store/message_hash.spec.ts b/packages/tests/tests/store/message_hash.spec.ts new file mode 100644 index 0000000000..f575c94dce --- /dev/null +++ b/packages/tests/tests/store/message_hash.spec.ts @@ -0,0 +1,68 @@ +import { DecodedMessage } from "@waku/core"; +import type { LightNode } from "@waku/interfaces"; +import { messageHash } from "@waku/message-hash"; +import { assert } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + ServiceNode, + tearDownNodes +} from "../../src/index.js"; + +import { + runStoreNodes, + sendMessages, + TestDecoder, + TestShardInfo, + totalMsgs +} from "./utils.js"; + +describe("Waku Store, message hash query", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: ServiceNode; + + beforeEachCustom(this, async () => { + [nwaku, waku] = await runStoreNodes(this.ctx, TestShardInfo); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(nwaku, [waku]); + }); + + it("can query messages by message hash", async function () { + const sentMessages = await sendMessages( + nwaku, + totalMsgs, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic, + true + ); + const messageHashes = sentMessages.map((msg) => + messageHash(TestDecoder.pubsubTopic, { + pubsubTopic: TestDecoder.pubsubTopic, + payload: Buffer.from(msg.payload, "base64"), + contentTopic: TestDecoder.contentTopic, + timestamp: msg.timestamp + ? new Date(Number(msg.timestamp / 1000000n)) + : undefined, + meta: undefined, + rateLimitProof: undefined, + ephemeral: undefined + }) + ); + const messages: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder], { + messageHashes + })) { + for await (const msg of page) { + messages.push(msg as DecodedMessage); + } + } + assert.equal(messages.length, messageHashes.length); + for (const msg of messages) { + assert.equal(msg.contentTopic, TestDecoder.contentTopic); + } + }); +}); diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index 01419bed50..727149f240 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -17,6 +17,7 @@ import { expect } from "chai"; import { Context } from "mocha"; import { delay, NOISE_KEY_1, runNodes, ServiceNode } from "../../src/index.js"; +import { MessageRpcQuery } from "../../src/types.js"; export const log = new Logger("test:store"); @@ -49,20 +50,20 @@ export async function sendMessages( instance: ServiceNode, numMessages: number, contentTopic: string, - pubsubTopic: string -): Promise { + pubsubTopic: string, + timestamp: boolean = false +): Promise { + const messages: MessageRpcQuery[] = new Array(numMessages); for (let i = 0; i < numMessages; i++) { - expect( - await instance.sendMessage( - ServiceNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: contentTopic - }), - pubsubTopic - ) - ).to.eq(true); + messages[i] = ServiceNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: contentTopic, + timestamp: timestamp ? new Date() : undefined + }); + expect(await instance.sendMessage(messages[i], pubsubTopic)).to.eq(true); await delay(1); // to ensure each timestamp is unique. } + return messages; } export async function sendMessagesAutosharding(