diff --git a/packages/sds/src/message_channel/mem_local_history.spec.ts b/packages/sds/src/message_channel/mem_local_history.spec.ts index 7fd1f567a6..68a8d2f4f3 100644 --- a/packages/sds/src/message_channel/mem_local_history.spec.ts +++ b/packages/sds/src/message_channel/mem_local_history.spec.ts @@ -7,44 +7,44 @@ describe("MemLocalHistory", () => { it("Cap max size when messages are pushed one at a time", () => { const maxSize = 2; - const hist = new MemLocalHistory(maxSize); + const hist = new MemLocalHistory({ maxSize: maxSize }); - hist.push( + hist.addMessages( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) ); - expect(hist.length).to.eq(1); - hist.push( + expect(hist.size).to.eq(1); + hist.addMessages( new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])) ); - expect(hist.length).to.eq(2); + expect(hist.size).to.eq(2); - hist.push( + hist.addMessages( new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3])) ); - expect(hist.length).to.eq(2); + expect(hist.size).to.eq(2); - expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1); - expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1); - expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1); + expect(hist.hasMessage("1")).to.eq(false); + expect(hist.hasMessage("2")).to.eq(true); + expect(hist.hasMessage("3")).to.eq(true); }); it("Cap max size when a pushed array is exceeding the cap", () => { const maxSize = 2; - const hist = new MemLocalHistory(maxSize); + const hist = new MemLocalHistory({ maxSize: maxSize }); - hist.push( + hist.addMessages( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) ); - expect(hist.length).to.eq(1); - hist.push( + expect(hist.size).to.eq(1); + hist.addMessages( new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])), new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3])) ); - expect(hist.length).to.eq(2); + expect(hist.size).to.eq(2); - expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1); - expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1); - expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1); + expect(hist.hasMessage("1")).to.eq(false); + expect(hist.hasMessage("2")).to.eq(true); + expect(hist.hasMessage("3")).to.eq(true); }); }); diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index fa62bfb9ae..b0c06b3f21 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -1,6 +1,13 @@ +import { Logger } from "@waku/utils"; import _ from "lodash"; -import { ContentMessage, isContentMessage } from "./message.js"; +import { + type ChannelId, + ContentMessage, + type HistoryEntry, + isContentMessage +} from "./message.js"; +import { PersistentStorage } from "./persistent_storage.js"; export const DEFAULT_MAX_LENGTH = 10_000; @@ -17,28 +24,65 @@ export const DEFAULT_MAX_LENGTH = 10_000; * If an array of items longer than `maxLength` is pushed, dropping will happen * at next push. */ -export class MemLocalHistory { +export interface ILocalHistory { + readonly size: number; + addMessages(...messages: ContentMessage[]): void; + hasMessage(messageId: string): boolean; + getMessage(messageId: string): ContentMessage | undefined; + getRecentMessages(count: number): ContentMessage[]; + getAllMessages(): ContentMessage[]; + findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[]; +} + +export type MemLocalHistoryOptions = { + storage?: ChannelId | PersistentStorage; + maxSize?: number; +}; + +const log = new Logger("sds:local-history"); + +export class MemLocalHistory implements ILocalHistory { private items: ContentMessage[] = []; + private messageIndex: Map = new Map(); + private readonly storage?: PersistentStorage; + private readonly maxSize: number; /** - * Construct a new in-memory local history + * Construct a new in-memory local history. * - * @param maxLength The maximum number of message to store. + * @param opts Configuration object. + * - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage. + * - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH. */ - public constructor(private maxLength: number = DEFAULT_MAX_LENGTH) {} + public constructor(opts: MemLocalHistoryOptions = {}) { + const { storage, maxSize } = opts; + this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH; + if (storage instanceof PersistentStorage) { + this.storage = storage; + log.info("Using explicit persistent storage"); + } else if (typeof storage === "string") { + this.storage = PersistentStorage.create(storage); + log.info("Creating persistent storage for channel", storage); + } else { + this.storage = undefined; + log.info("Using in-memory storage"); + } - public get length(): number { + this.load(); + } + + public get size(): number { return this.items.length; } - public push(...items: ContentMessage[]): number { - for (const item of items) { - this.validateMessage(item); + public addMessages(...messages: ContentMessage[]): void { + for (const message of messages) { + this.validateMessage(message); } // Add new items and sort by timestamp, ensuring uniqueness by messageId // The valueOf() method on ContentMessage enables native < operator sorting - const combinedItems = [...this.items, ...items]; + const combinedItems = [...this.items, ...messages]; // Sort by timestamp (using valueOf() which creates timestamp_messageId string) combinedItems.sort((a, b) => a.valueOf().localeCompare(b.valueOf())); @@ -46,50 +90,45 @@ export class MemLocalHistory { // Remove duplicates by messageId while maintaining order this.items = _.uniqBy(combinedItems, "messageId"); + this.rebuildIndex(); + // Let's drop older messages if max length is reached - if (this.length > this.maxLength) { - const numItemsToRemove = this.length - this.maxLength; - this.items.splice(0, numItemsToRemove); + if (this.size > this.maxSize) { + const numItemsToRemove = this.size - this.maxSize; + const removedItems = this.items.splice(0, numItemsToRemove); + for (const item of removedItems) { + this.messageIndex.delete(item.messageId); + } } - return this.items.length; + this.save(); } - public some( - predicate: ( - value: ContentMessage, - index: number, - array: ContentMessage[] - ) => unknown, - thisArg?: any - ): boolean { - return this.items.some(predicate, thisArg); + public hasMessage(messageId: string): boolean { + return this.messageIndex.has(messageId); } - public slice(start?: number, end?: number): ContentMessage[] { - return this.items.slice(start, end); + public getRecentMessages(count: number): ContentMessage[] { + return this.items.slice(-count); } - public find( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): ContentMessage | undefined { - return this.items.find(predicate, thisArg); + public getAllMessages(): ContentMessage[] { + return [...this.items]; } - public findIndex( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): number { - return this.items.findIndex(predicate, thisArg); + public getMessage(messageId: string): ContentMessage | undefined { + return this.messageIndex.get(messageId); + } + + public findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] { + return entries.filter((entry) => !this.messageIndex.has(entry.messageId)); + } + + private rebuildIndex(): void { + this.messageIndex.clear(); + for (const message of this.items) { + this.messageIndex.set(message.messageId, message); + } } private validateMessage(message: ContentMessage): void { @@ -99,4 +138,16 @@ export class MemLocalHistory { ); } } + + private save(): void { + this.storage?.save(this.items); + } + + private load(): void { + const messages = this.storage?.load() ?? []; + if (messages.length > 0) { + this.items = messages; + this.rebuildIndex(); + } + } } diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index ea1629250c..a81446e762 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -4,6 +4,8 @@ import { expect } from "chai"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { MessageChannelEvent } from "./events.js"; +import { MemLocalHistory } from "./mem_local_history.js"; +import { ILocalHistory } from "./mem_local_history.js"; import { ContentMessage, HistoryEntry, @@ -13,8 +15,8 @@ import { } from "./message.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, - ILocalHistory, - MessageChannel + MessageChannel, + MessageChannelOptions } from "./message_channel.js"; const channelId = "test-channel"; @@ -22,6 +24,23 @@ const callback = (_message: Message): Promise<{ success: boolean }> => { return Promise.resolve({ success: true }); }; +/** + * Test helper to create a MessageChannel with MemLocalHistory. + * This avoids localStorage pollution in tests and tests core functionality. + */ +const createTestChannel = ( + channelId: string, + senderId: string, + options: MessageChannelOptions = {} +): MessageChannel => { + return new MessageChannel( + channelId, + senderId, + options, + new MemLocalHistory() + ); +}; + const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { return channel["filter"] as DefaultBloomFilter; }; @@ -68,7 +87,7 @@ describe("MessageChannel", function () { describe("sending a message ", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice"); + channelA = createTestChannel(channelId, "alice"); }); it("should increase lamport timestamp", async () => { @@ -99,14 +118,10 @@ describe("MessageChannel", function () { const messageId = MessageChannel.getMessageId(payload); await sendMessage(channelA, payload, callback); const messageIdLog = channelA["localHistory"] as ILocalHistory; - expect(messageIdLog.length).to.equal(1); - expect( - messageIdLog.some( - (log) => - log.lamportTimestamp === expectedTimestamp && - log.messageId === messageId - ) - ).to.equal(true); + expect(messageIdLog.size).to.equal(1); + const msg = messageIdLog.getMessage(messageId); + expect(msg).to.exist; + expect(msg!.lamportTimestamp).to.equal(expectedTimestamp); }); it("should add sent message to localHistory with retrievalHint", async () => { @@ -120,12 +135,10 @@ describe("MessageChannel", function () { }); const localHistory = channelA["localHistory"] as ILocalHistory; - expect(localHistory.length).to.equal(1); + expect(localHistory.size).to.equal(1); // Find the message in local history - const historyEntry = localHistory.find( - (entry) => entry.messageId === messageId - ); + const historyEntry = localHistory.getMessage(messageId); expect(historyEntry).to.exist; expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); }); @@ -171,8 +184,8 @@ describe("MessageChannel", function () { describe("receiving a message", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice"); - channelB = new MessageChannel(channelId, "bob"); + channelA = createTestChannel(channelId, "alice"); + channelB = createTestChannel(channelId, "bob"); }); it("should increase lamport timestamp", async () => { @@ -187,8 +200,8 @@ describe("MessageChannel", function () { // TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648 it.skip("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => { - const testChannelA = new MessageChannel(channelId, "alice"); - const testChannelB = new MessageChannel(channelId, "bob"); + const testChannelA = createTestChannel(channelId, "alice"); + const testChannelB = createTestChannel(channelId, "bob"); const timestampBefore = testChannelA["lamportTimestamp"]; @@ -277,11 +290,9 @@ describe("MessageChannel", function () { // Message should not be in local history const localHistory = channelB["localHistory"]; - expect( - localHistory.some( - ({ messageId }) => messageId === receivedMessage!.messageId - ) - ).to.equal(false); + expect(localHistory.hasMessage(receivedMessage!.messageId)).to.equal( + false + ); }); it("should add received message to localHistory with retrievalHint", async () => { @@ -306,12 +317,10 @@ describe("MessageChannel", function () { ); const localHistory = channelA["localHistory"] as ILocalHistory; - expect(localHistory.length).to.equal(1); + expect(localHistory.size).to.equal(1); // Find the message in local history - const historyEntry = localHistory.find( - (entry) => entry.messageId === messageId - ); + const historyEntry = localHistory.getMessage(messageId); expect(historyEntry).to.exist; expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); }); @@ -360,35 +369,35 @@ describe("MessageChannel", function () { ); const localHistory = channelA["localHistory"]; - expect(localHistory.length).to.equal(3); + expect(localHistory.size).to.equal(3); // Verify chronological order: message1 (ts=1), message2 (ts=2), message3 (ts=3) - const first = localHistory.findIndex( - ({ messageId, lamportTimestamp }) => { + const first = localHistory + .getAllMessages() + .findIndex(({ messageId, lamportTimestamp }) => { return ( messageId === message1Id && lamportTimestamp === startTimestamp + 1n ); - } - ); + }); expect(first).to.eq(0); - const second = localHistory.findIndex( - ({ messageId, lamportTimestamp }) => { + const second = localHistory + .getAllMessages() + .findIndex(({ messageId, lamportTimestamp }) => { return ( messageId === message2Id && lamportTimestamp === startTimestamp + 2n ); - } - ); + }); expect(second).to.eq(1); - const third = localHistory.findIndex( - ({ messageId, lamportTimestamp }) => { + const third = localHistory + .getAllMessages() + .findIndex(({ messageId, lamportTimestamp }) => { return ( messageId === message3Id && lamportTimestamp === startTimestamp + 3n ); - } - ); + }); expect(third).to.eq(2); }); @@ -428,34 +437,34 @@ describe("MessageChannel", function () { ); const localHistory = channelA["localHistory"] as ILocalHistory; - expect(localHistory.length).to.equal(2); + expect(localHistory.size).to.equal(2); // When timestamps are equal, should be ordered by messageId lexicographically // The valueOf() method creates "000000000000005_messageId" for comparison const expectedOrder = [message1Id, message2Id].sort(); - const first = localHistory.findIndex( - ({ messageId, lamportTimestamp }) => { + const first = localHistory + .getAllMessages() + .findIndex(({ messageId, lamportTimestamp }) => { return messageId === expectedOrder[0] && lamportTimestamp == 5n; - } - ); + }); expect(first).to.eq(0); - const second = localHistory.findIndex( - ({ messageId, lamportTimestamp }) => { + const second = localHistory + .getAllMessages() + .findIndex(({ messageId, lamportTimestamp }) => { return messageId === expectedOrder[1] && lamportTimestamp == 5n; - } - ); + }); expect(second).to.eq(1); }); }); describe("reviewing ack status", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should mark all messages in causal history as acknowledged", async () => { @@ -661,10 +670,10 @@ describe("MessageChannel", function () { describe("Sweeping incoming buffer", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should detect messages with missing dependencies", async () => { @@ -746,7 +755,7 @@ describe("MessageChannel", function () { it("should mark a message as irretrievably lost if timeout is exceeded", async () => { // Create a channel with very very short timeout - const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { timeoutForLostMessagesMs: 10 }); @@ -789,7 +798,7 @@ describe("MessageChannel", function () { let lostMessages: HistoryEntry[] = []; // Create a channel with very short timeout - const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { timeoutForLostMessagesMs: 10 }); @@ -853,7 +862,7 @@ describe("MessageChannel", function () { it("should remove messages without delivering if timeout is exceeded", async () => { const causalHistorySize = channelA["causalHistorySize"]; // Create a channel with very very short timeout - const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { timeoutForLostMessagesMs: 10 }); @@ -1043,10 +1052,10 @@ describe("MessageChannel", function () { describe("Sweeping outgoing buffer", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should partition messages based on acknowledgement status", async () => { @@ -1088,12 +1097,12 @@ describe("MessageChannel", function () { describe("Sync messages", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); const message = utf8ToBytes("first message in channel"); - channelA["localHistory"].push( + channelA["localHistory"].addMessages( new ContentMessage( MessageChannel.getMessageId(message), "MyChannel", @@ -1115,7 +1124,7 @@ describe("MessageChannel", function () { }); it("should not be sent when there is no history", async () => { - const channelC = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { causalHistorySize: 2 }); const res = await channelC.pushOutgoingSyncMessage(async (_msg) => { @@ -1137,7 +1146,7 @@ describe("MessageChannel", function () { ).to.equal(false); const localLog = channelA["localHistory"]; - expect(localLog.length).to.equal(1); // beforeEach adds one message + expect(localLog.size).to.equal(1); // beforeEach adds one message }); it("should not be delivered", async () => { @@ -1151,7 +1160,7 @@ describe("MessageChannel", function () { expect(timestampAfter).to.equal(timestampBefore); const localLog = channelB["localHistory"]; - expect(localLog.length).to.equal(0); + expect(localLog.size).to.equal(0); const bloomFilter = getBloomFilter(channelB); expect( @@ -1160,7 +1169,7 @@ describe("MessageChannel", function () { }); it("should update ack status of messages in outgoing buffer", async () => { - const channelC = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { causalHistorySize: 2 }); for (const m of messagesA) { @@ -1185,7 +1194,7 @@ describe("MessageChannel", function () { describe("Ephemeral messages", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice"); + channelA = createTestChannel(channelId, "alice"); }); it("should be sent without a timestamp, causal history, or bloom filter", async () => { @@ -1208,10 +1217,10 @@ describe("MessageChannel", function () { }); it("should be delivered immediately if received", async () => { - const channelB = new MessageChannel(channelId, "bob"); + const channelB = createTestChannel(channelId, "bob"); // Track initial state - const localHistoryBefore = channelB["localHistory"].length; + const localHistoryBefore = channelB["localHistory"].size; const incomingBufferBefore = channelB["incomingBuffer"].length; const timestampBefore = channelB["lamportTimestamp"]; @@ -1229,11 +1238,74 @@ describe("MessageChannel", function () { // Verify ephemeral message behavior: // 1. Not added to local history - expect(channelB["localHistory"].length).to.equal(localHistoryBefore); + expect(channelB["localHistory"].size).to.equal(localHistoryBefore); // 2. Not added to incoming buffer expect(channelB["incomingBuffer"].length).to.equal(incomingBufferBefore); // 3. Doesn't update lamport timestamp expect(channelB["lamportTimestamp"]).to.equal(timestampBefore); }); }); + + describe("localStorage persistence", function () { + // LocalStorage specific tests (browser) + before(function () { + if (typeof localStorage === "undefined") { + this.skip(); + } + }); + + it("should restore messages from localStorage on channel recreation", async () => { + const persistentChannelId = "persistent-channel"; + + const channel1 = new MessageChannel(persistentChannelId, "alice"); + + await sendMessage(channel1, utf8ToBytes("msg-1"), callback); + await sendMessage(channel1, utf8ToBytes("msg-2"), callback); + + expect(channel1["localHistory"].size).to.equal(2); + + // Recreate channel with same storage - should load history + const channel2 = new MessageChannel(persistentChannelId, "alice"); + + expect(channel2["localHistory"].size).to.equal(2); + expect( + channel2["localHistory"].getAllMessages().map((m) => m.messageId) + ).to.deep.equal([ + MessageChannel.getMessageId(utf8ToBytes("msg-1")), + MessageChannel.getMessageId(utf8ToBytes("msg-2")) + ]); + }); + + it("should include persisted messages in causal history after restart", async () => { + const persistentChannelId = "persistent-causal"; + + const channel1 = new MessageChannel(persistentChannelId, "alice", { + causalHistorySize: 2 + }); + + await sendMessage(channel1, utf8ToBytes("msg-1"), callback); + await sendMessage(channel1, utf8ToBytes("msg-2"), callback); + await sendMessage(channel1, utf8ToBytes("msg-3"), callback); + + const channel2 = new MessageChannel(persistentChannelId, "alice", { + causalHistorySize: 2 + }); + + let capturedMessage: ContentMessage | null = null; + await sendMessage(channel2, utf8ToBytes("msg-4"), async (message) => { + capturedMessage = message; + return { success: true }; + }); + + expect(capturedMessage).to.not.be.null; + expect(capturedMessage!.causalHistory).to.have.lengthOf(2); + // Should reference the last 2 messages (msg-2 and msg-3) + expect(capturedMessage!.causalHistory[0].messageId).to.equal( + MessageChannel.getMessageId(utf8ToBytes("msg-2")) + ); + expect(capturedMessage!.causalHistory[1].messageId).to.equal( + MessageChannel.getMessageId(utf8ToBytes("msg-3")) + ); + }); + }); }); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index eccc1435b9..88ebe3c6d7 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -7,7 +7,7 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js"; import { MessageChannelEvent, MessageChannelEvents } from "./events.js"; -import { MemLocalHistory } from "./mem_local_history.js"; +import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; import { ChannelId, ContentMessage, @@ -23,6 +23,8 @@ import { } from "./message.js"; import { RepairConfig, RepairManager } from "./repair/repair.js"; +export type { ILocalHistory }; + export const DEFAULT_BLOOM_FILTER_OPTIONS = { capacity: 10000, errorRate: 0.001 @@ -63,11 +65,6 @@ export interface MessageChannelOptions { repairConfig?: RepairConfig; } -export type ILocalHistory = Pick< - Array, - "some" | "push" | "slice" | "find" | "length" | "findIndex" ->; - export class MessageChannel extends TypedEventEmitter { public readonly channelId: ChannelId; public readonly senderId: ParticipantId; @@ -106,7 +103,7 @@ export class MessageChannel extends TypedEventEmitter { channelId: ChannelId, senderId: ParticipantId, options: MessageChannelOptions = {}, - localHistory: ILocalHistory = new MemLocalHistory() + localHistory?: ILocalHistory ) { super(); this.channelId = channelId; @@ -117,7 +114,8 @@ export class MessageChannel extends TypedEventEmitter { this.outgoingBuffer = []; this.possibleAcks = new Map(); this.incomingBuffer = []; - this.localHistory = localHistory; + this.localHistory = + localHistory ?? new MemLocalHistory({ storage: channelId }); this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes @@ -299,9 +297,8 @@ export class MessageChannel extends TypedEventEmitter { message.messageId, message.causalHistory.map((ch) => ch.messageId) ); - const missingDependencies = message.causalHistory.filter( - (messageHistoryEntry) => - !this.isMessageAvailable(messageHistoryEntry.messageId) + const missingDependencies = this.findMissingDependencies( + message.causalHistory ); if (missingDependencies.length === 0) { if (isContentMessage(message) && this.deliverMessage(message)) { @@ -456,7 +453,7 @@ export class MessageChannel extends TypedEventEmitter { this.channelId, this.senderId, this.localHistory - .slice(-this.causalHistorySize) + .getRecentMessages(this.causalHistorySize) .map(({ messageId, retrievalHint, senderId }) => { return { messageId, retrievalHint, senderId }; }), @@ -585,9 +582,8 @@ export class MessageChannel extends TypedEventEmitter { this.filter.insert(message.messageId); } - const missingDependencies = message.causalHistory.filter( - (messageHistoryEntry) => - !this.isMessageAvailable(messageHistoryEntry.messageId) + const missingDependencies = this.findMissingDependencies( + message.causalHistory ); if (missingDependencies.length > 0) { @@ -678,7 +674,7 @@ export class MessageChannel extends TypedEventEmitter { this.channelId, this.senderId, this.localHistory - .slice(-this.causalHistorySize) + .getRecentMessages(this.causalHistorySize) .map(({ messageId, retrievalHint, senderId }) => { return { messageId, retrievalHint, senderId }; }), @@ -701,7 +697,7 @@ export class MessageChannel extends TypedEventEmitter { if (success && isContentMessage(message)) { message.retrievalHint = retrievalHint; this.filter.insert(messageId); - this.localHistory.push(message); + this.localHistory.addMessages(message); this.timeReceived.set(messageId, Date.now()); this.safeSendEvent(MessageChannelEvent.OutMessageSent, { detail: message @@ -741,24 +737,15 @@ export class MessageChannel extends TypedEventEmitter { } } - /** - * Check if a message is available (either in localHistory or incomingBuffer) - * This prevents treating messages as "missing" when they've already been received - * but are waiting in the incoming buffer for their dependencies. - * - * @param messageId - The ID of the message to check - * @private - */ - private isMessageAvailable(messageId: MessageId): boolean { - // Check if in local history - if (this.localHistory.some((m) => m.messageId === messageId)) { - return true; - } - // Check if in incoming buffer (already received, waiting for dependencies) - if (this.incomingBuffer.some((m) => m.messageId === messageId)) { - return true; - } - return false; + private findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] { + const missingFromHistory = + this.localHistory.findMissingDependencies(entries); + + const incomingIds = new Set(this.incomingBuffer.map((m) => m.messageId)); + + return missingFromHistory.filter( + (entry) => !incomingIds.has(entry.messageId) + ); } /** @@ -788,8 +775,8 @@ export class MessageChannel extends TypedEventEmitter { } // Check if the entry is already present - const existingHistoryEntry = this.localHistory.find( - ({ messageId }) => messageId === message.messageId + const existingHistoryEntry = this.localHistory.getMessage( + message.messageId ); // The history entry is already present, no need to re-add @@ -801,7 +788,7 @@ export class MessageChannel extends TypedEventEmitter { log.warn("message delivered without a retrieval hint", message.messageId); } - this.localHistory.push(message); + this.localHistory.addMessages(message); return true; } diff --git a/packages/sds/src/message_channel/persistent_storage.spec.ts b/packages/sds/src/message_channel/persistent_storage.spec.ts new file mode 100644 index 0000000000..702af3aed1 --- /dev/null +++ b/packages/sds/src/message_channel/persistent_storage.spec.ts @@ -0,0 +1,196 @@ +import { expect } from "chai"; + +import { MemLocalHistory } from "./mem_local_history.js"; +import { ContentMessage } from "./message.js"; +import { HistoryStorage, PersistentStorage } from "./persistent_storage.js"; + +const channelId = "channel-1"; + +describe("PersistentStorage", () => { + describe("Explicit storage", () => { + it("persists and restores messages", () => { + const storage = new MemoryStorage(); + const persistentStorage = PersistentStorage.create(channelId, storage); + + expect(persistentStorage).to.not.be.undefined; + + const history1 = new MemLocalHistory({ storage: persistentStorage }); + history1.addMessages(createMessage("msg-1", 1)); + history1.addMessages(createMessage("msg-2", 2)); + + const history2 = new MemLocalHistory({ storage: persistentStorage }); + + expect(history2.size).to.equal(2); + expect( + history2.getAllMessages().map((msg) => msg.messageId) + ).to.deep.equal(["msg-1", "msg-2"]); + }); + + it("uses in-memory only when no storage is provided", () => { + const history = new MemLocalHistory({ maxSize: 100 }); + history.addMessages(createMessage("msg-3", 3)); + + expect(history.size).to.equal(1); + expect(history.getAllMessages()[0].messageId).to.equal("msg-3"); + + const history2 = new MemLocalHistory({ maxSize: 100 }); + expect(history2.size).to.equal(0); + }); + + it("handles corrupt data in storage gracefully", () => { + const storage = new MemoryStorage(); + // Corrupt data + storage.setItem("waku:sds:history:channel-1", "{ invalid json }"); + + const persistentStorage = PersistentStorage.create(channelId, storage); + const history = new MemLocalHistory({ storage: persistentStorage }); + + expect(history.size).to.equal(0); + + // Corrupt data is not saved + expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null); + }); + + it("isolates history by channel ID", () => { + const storage = new MemoryStorage(); + + const storage1 = PersistentStorage.create("channel-1", storage); + const storage2 = PersistentStorage.create("channel-2", storage); + + const history1 = new MemLocalHistory({ storage: storage1 }); + const history2 = new MemLocalHistory({ storage: storage2 }); + + history1.addMessages(createMessage("msg-1", 1)); + history2.addMessages(createMessage("msg-2", 2)); + + expect(history1.size).to.equal(1); + expect(history1.getAllMessages()[0].messageId).to.equal("msg-1"); + + expect(history2.size).to.equal(1); + expect(history2.getAllMessages()[0].messageId).to.equal("msg-2"); + + expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; + expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null; + }); + + it("saves messages after each push", () => { + const storage = new MemoryStorage(); + const persistentStorage = PersistentStorage.create(channelId, storage); + const history = new MemLocalHistory({ storage: persistentStorage }); + + expect(storage.getItem("waku:sds:history:channel-1")).to.be.null; + + history.addMessages(createMessage("msg-1", 1)); + + expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; + + const saved = JSON.parse(storage.getItem("waku:sds:history:channel-1")!); + expect(saved).to.have.lengthOf(1); + expect(saved[0].messageId).to.equal("msg-1"); + }); + + it("loads messages on initialization", () => { + const storage = new MemoryStorage(); + const persistentStorage1 = PersistentStorage.create(channelId, storage); + const history1 = new MemLocalHistory({ storage: persistentStorage1 }); + + history1.addMessages(createMessage("msg-1", 1)); + history1.addMessages(createMessage("msg-2", 2)); + history1.addMessages(createMessage("msg-3", 3)); + + const persistentStorage2 = PersistentStorage.create(channelId, storage); + const history2 = new MemLocalHistory({ storage: persistentStorage2 }); + + expect(history2.size).to.equal(3); + expect(history2.getAllMessages().map((m) => m.messageId)).to.deep.equal([ + "msg-1", + "msg-2", + "msg-3" + ]); + }); + }); + + describe("Node.js only (no localStorage)", () => { + before(function () { + if (typeof localStorage !== "undefined") { + this.skip(); + } + }); + + it("returns undefined when no storage is available", () => { + const persistentStorage = PersistentStorage.create(channelId, undefined); + + expect(persistentStorage).to.equal(undefined); + }); + }); + + describe("Browser only (localStorage)", () => { + before(function () { + if (typeof localStorage === "undefined") { + this.skip(); + } + }); + + it("persists and restores messages with channelId", () => { + const testChannelId = `test-${Date.now()}`; + const history1 = new MemLocalHistory({ storage: testChannelId }); + history1.addMessages(createMessage("msg-1", 1)); + history1.addMessages(createMessage("msg-2", 2)); + + const history2 = new MemLocalHistory({ storage: testChannelId }); + + expect(history2.size).to.equal(2); + expect( + history2.getAllMessages().map((msg) => msg.messageId) + ).to.deep.equal(["msg-1", "msg-2"]); + + localStorage.removeItem(`waku:sds:history:${testChannelId}`); + }); + + it("auto-uses localStorage when channelId is provided", () => { + const testChannelId = `auto-storage-${Date.now()}`; + + const history = new MemLocalHistory({ storage: testChannelId }); + history.addMessages(createMessage("msg-auto-1", 1)); + history.addMessages(createMessage("msg-auto-2", 2)); + + const history2 = new MemLocalHistory({ storage: testChannelId }); + expect(history2.size).to.equal(2); + expect(history2.getAllMessages().map((m) => m.messageId)).to.deep.equal([ + "msg-auto-1", + "msg-auto-2" + ]); + + localStorage.removeItem(`waku:sds:history:${testChannelId}`); + }); + }); +}); + +const createMessage = (id: string, timestamp: number): ContentMessage => { + return new ContentMessage( + id, + channelId, + "sender", + [], + BigInt(timestamp), + undefined, + new Uint8Array([timestamp]), + undefined + ); +}; + +class MemoryStorage implements HistoryStorage { + private readonly store = new Map(); + + public getItem(key: string): string | null { + return this.store.get(key) ?? null; + } + + public setItem(key: string, value: string): void { + this.store.set(key, value); + } + + public removeItem(key: string): void { + this.store.delete(key); + } +} diff --git a/packages/sds/src/message_channel/persistent_storage.ts b/packages/sds/src/message_channel/persistent_storage.ts new file mode 100644 index 0000000000..ffc0f14674 --- /dev/null +++ b/packages/sds/src/message_channel/persistent_storage.ts @@ -0,0 +1,173 @@ +import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; +import { Logger } from "@waku/utils"; + +import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; + +const log = new Logger("sds:persistent-storage"); + +const HISTORY_STORAGE_PREFIX = "waku:sds:history:"; + +export interface HistoryStorage { + getItem(key: string): string | null; + setItem(key: string, value: string): void; + removeItem(key: string): void; +} + +type StoredHistoryEntry = { + messageId: string; + retrievalHint?: string; +}; + +type StoredContentMessage = { + messageId: string; + channelId: string; + senderId: string; + lamportTimestamp: string; + causalHistory: StoredHistoryEntry[]; + bloomFilter?: string; + content: string; + retrievalHint?: string; +}; + +/** + * Persistent storage for message history. + */ +export class PersistentStorage { + private readonly storageKey: string; + + /** + * Creates a PersistentStorage for a channel, or returns undefined if no storage is available. + * If no storage is provided, attempts to use global localStorage (if available). + * Returns undefined if no storage is available. + */ + public static create( + channelId: ChannelId, + storage?: HistoryStorage + ): PersistentStorage | undefined { + storage = + storage ?? + (typeof localStorage !== "undefined" ? localStorage : undefined); + if (!storage) { + log.info( + `No storage available. Messages will not persist across sessions. + If you're using NodeJS, you can provide a storage backend using the storage parameter.` + ); + return undefined; + } + return new PersistentStorage(channelId, storage); + } + + private constructor( + channelId: ChannelId, + private readonly storage: HistoryStorage + ) { + this.storageKey = `${HISTORY_STORAGE_PREFIX}${channelId}`; + } + + public save(messages: ContentMessage[]): void { + try { + const payload = JSON.stringify( + messages.map((msg) => MessageSerializer.serializeContentMessage(msg)) + ); + this.storage.setItem(this.storageKey, payload); + } catch (error) { + log.error("Failed to save messages to storage:", error); + } + } + + public load(): ContentMessage[] { + try { + const raw = this.storage.getItem(this.storageKey); + if (!raw) { + return []; + } + + const stored = JSON.parse(raw) as StoredContentMessage[]; + return stored + .map((record) => MessageSerializer.deserializeContentMessage(record)) + .filter((message): message is ContentMessage => Boolean(message)); + } catch (error) { + log.error("Failed to load messages from storage:", error); + this.storage.removeItem(this.storageKey); + return []; + } + } +} + +class MessageSerializer { + public static serializeContentMessage( + message: ContentMessage + ): StoredContentMessage { + return { + messageId: message.messageId, + channelId: message.channelId, + senderId: message.senderId, + lamportTimestamp: message.lamportTimestamp.toString(), + causalHistory: message.causalHistory.map((entry) => + MessageSerializer.serializeHistoryEntry(entry) + ), + bloomFilter: MessageSerializer.toHex(message.bloomFilter), + content: bytesToHex(new Uint8Array(message.content)), + retrievalHint: MessageSerializer.toHex(message.retrievalHint) + }; + } + + public static deserializeContentMessage( + record: StoredContentMessage + ): ContentMessage | undefined { + try { + const content = hexToBytes(record.content); + return new ContentMessage( + record.messageId, + record.channelId, + record.senderId, + record.causalHistory.map((entry) => + MessageSerializer.deserializeHistoryEntry(entry) + ), + BigInt(record.lamportTimestamp), + MessageSerializer.fromHex(record.bloomFilter), + content, + [], + MessageSerializer.fromHex(record.retrievalHint) + ); + } catch { + return undefined; + } + } + + public static serializeHistoryEntry(entry: HistoryEntry): StoredHistoryEntry { + return { + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? bytesToHex(entry.retrievalHint) + : undefined + }; + } + + public static deserializeHistoryEntry( + entry: StoredHistoryEntry + ): HistoryEntry { + return { + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? hexToBytes(entry.retrievalHint) + : undefined + }; + } + + private static toHex( + data?: Uint8Array | Uint8Array + ): string | undefined { + if (!data || data.length === 0) { + return undefined; + } + return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data)); + } + + private static fromHex(value?: string): Uint8Array | undefined { + if (!value) { + return undefined; + } + return hexToBytes(value); + } +} diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 1a8a85f43e..8eb13f8c64 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -1,8 +1,8 @@ import { Logger } from "@waku/utils"; +import type { ILocalHistory } from "../mem_local_history.js"; import type { HistoryEntry, MessageId } from "../message.js"; import { Message } from "../message.js"; -import type { ILocalHistory } from "../message_channel.js"; import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; import { @@ -191,9 +191,7 @@ export class RepairManager { this.outgoingBuffer.remove(request.messageId); // Check if we have this message - const message = localHistory.find( - (m) => m.messageId === request.messageId - ); + const message = localHistory.getMessage(request.messageId); if (!message) { log.info( `Cannot fulfill repair for ${request.messageId} - not in local history` @@ -255,7 +253,7 @@ export class RepairManager { const messages: Message[] = []; for (const entry of ready) { - const message = localHistory.find((m) => m.messageId === entry.messageId); + const message = localHistory.getMessage(entry.messageId); if (message) { messages.push(message); log.info(`Sending repair for ${entry.messageId}`);