diff --git a/.gitignore b/.gitignore index 188172b1d3..d655a890db 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,5 @@ CLAUDE.md .env postgres-data/ packages/rln/waku-rlnv2-contract/ +/packages/**/allure-results +/packages/**/allure-results diff --git a/packages/sds/karma.conf.cjs b/packages/sds/karma.conf.cjs index 1acbc3dd2a..cd18c96f02 100644 --- a/packages/sds/karma.conf.cjs +++ b/packages/sds/karma.conf.cjs @@ -1,3 +1,16 @@ -const config = require("../../karma.conf.cjs"); +import path from "path"; -module.exports = config; +import baseConfig from "../../karma.conf.cjs"; + +export default function (config) { + baseConfig(config); + + const storageDir = path.resolve(__dirname, "src/message_channel/storage"); + + // Swap node storage for browser storage in webpack builds + config.webpack.resolve.alias = { + ...config.webpack.resolve.alias, + [path.join(storageDir, "node.ts")]: path.join(storageDir, "browser.ts"), + [path.join(storageDir, "node.js")]: path.join(storageDir, "browser.ts") + }; +} diff --git a/packages/sds/package.json b/packages/sds/package.json index 4c1155930e..e871849722 100644 --- a/packages/sds/package.json +++ b/packages/sds/package.json @@ -4,6 +4,9 @@ "description": "Scalable Data Sync implementation for the browser. Based on https://github.com/vacp2p/rfc-index/blob/main/vac/raw/sds.md", "types": "./dist/index.d.ts", "module": "./dist/index.js", + "browser": { + "./dist/message_channel/storage/index.js": "./dist/message_channel/storage/browser.js" + }, "exports": { ".": { "types": "./dist/index.d.ts", diff --git a/packages/sds/src/message_channel/mem_local_history.spec.ts b/packages/sds/src/message_channel/local_history.spec.ts similarity index 88% rename from packages/sds/src/message_channel/mem_local_history.spec.ts rename to packages/sds/src/message_channel/local_history.spec.ts index 7fd1f567a6..b22634b7ce 100644 --- a/packages/sds/src/message_channel/mem_local_history.spec.ts +++ b/packages/sds/src/message_channel/local_history.spec.ts @@ -1,13 +1,13 @@ import { expect } from "chai"; -import { MemLocalHistory } from "./mem_local_history.js"; +import { LocalHistory } from "./local_history.js"; import { ContentMessage } from "./message.js"; -describe("MemLocalHistory", () => { +describe("LocalHistory", () => { it("Cap max size when messages are pushed one at a time", () => { const maxSize = 2; - const hist = new MemLocalHistory(maxSize); + const hist = new LocalHistory({ maxSize }); hist.push( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) @@ -31,7 +31,7 @@ describe("MemLocalHistory", () => { it("Cap max size when a pushed array is exceeding the cap", () => { const maxSize = 2; - const hist = new MemLocalHistory(maxSize); + const hist = new LocalHistory({ maxSize }); hist.push( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/local_history.ts similarity index 61% rename from packages/sds/src/message_channel/mem_local_history.ts rename to packages/sds/src/message_channel/local_history.ts index fa62bfb9ae..971b52fe74 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/local_history.ts @@ -1,9 +1,28 @@ +import { Logger } from "@waku/utils"; import _ from "lodash"; import { ContentMessage, isContentMessage } from "./message.js"; +import { Storage } from "./storage/index.js"; export const DEFAULT_MAX_LENGTH = 10_000; +/** + * Options for the LocalHistory constructor. + * @param storage - The storage to use for the local history. + * - prefix - The prefix for the storage. + * - customInstance - The custom storage instance to use. + * @param maxSize - The maximum number of messages to store. + */ +export type LocalHistoryOptions = { + storage?: { + prefix?: string; + customInstance?: Storage; + }; + maxSize?: number; +}; + +const log = new Logger("sds:local-history"); + /** * In-Memory implementation of a local history of messages. * @@ -17,15 +36,28 @@ export const DEFAULT_MAX_LENGTH = 10_000; * If an array of items longer than `maxLength` is pushed, dropping will happen * at next push. */ -export class MemLocalHistory { +export class LocalHistory { private items: ContentMessage[] = []; + private readonly storage?: Storage; + private readonly maxSize: number; + + public constructor(opts: LocalHistoryOptions = {}) { + const { storage, maxSize } = opts; + const { prefix, customInstance } = storage ?? {}; + this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH; + if (customInstance) { + this.storage = customInstance; + log.info("Using custom storage instance", { customInstance }); + } else if (prefix) { + this.storage = new Storage(prefix); + log.info("Creating storage with prefix", { prefix }); + } else { + this.storage = undefined; + log.info("Using in-memory storage"); + } - /** - * Construct a new in-memory local history - * - * @param maxLength The maximum number of message to store. - */ - public constructor(private maxLength: number = DEFAULT_MAX_LENGTH) {} + this.load(); + } public get length(): number { return this.items.length; @@ -47,11 +79,13 @@ export class MemLocalHistory { this.items = _.uniqBy(combinedItems, "messageId"); // Let's drop older messages if max length is reached - if (this.length > this.maxLength) { - const numItemsToRemove = this.length - this.maxLength; + if (this.length > this.maxSize) { + const numItemsToRemove = this.length - this.maxSize; this.items.splice(0, numItemsToRemove); } + this.save(); + return this.items.length; } @@ -99,4 +133,15 @@ export class MemLocalHistory { ); } } + + private save(): void { + this.storage?.save(this.items); + } + + private load(): void { + const messages = this.storage?.load() ?? []; + if (messages.length > 0) { + this.items = messages; + } + } } diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index ea1629250c..e7e37582ad 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -4,6 +4,7 @@ import { expect } from "chai"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { MessageChannelEvent } from "./events.js"; +import { LocalHistory } from "./local_history.js"; import { ContentMessage, HistoryEntry, @@ -13,8 +14,8 @@ import { } from "./message.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, - ILocalHistory, - MessageChannel + MessageChannel, + MessageChannelOptions } from "./message_channel.js"; const channelId = "test-channel"; @@ -22,6 +23,18 @@ const callback = (_message: Message): Promise<{ success: boolean }> => { return Promise.resolve({ success: true }); }; +/** + * Test helper to create a MessageChannel with LocalHistory. + * This avoids localStorage pollution in tests and tests core functionality. + */ +const createTestChannel = ( + channelId: string, + senderId: string, + options: MessageChannelOptions = {} +): MessageChannel => { + return new MessageChannel(channelId, senderId, options, new LocalHistory()); +}; + const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { return channel["filter"] as DefaultBloomFilter; }; @@ -68,7 +81,7 @@ describe("MessageChannel", function () { describe("sending a message ", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice"); + channelA = createTestChannel(channelId, "alice"); }); it("should increase lamport timestamp", async () => { @@ -98,11 +111,11 @@ describe("MessageChannel", function () { const expectedTimestamp = channelA["lamportTimestamp"] + 1n; const messageId = MessageChannel.getMessageId(payload); await sendMessage(channelA, payload, callback); - const messageIdLog = channelA["localHistory"] as ILocalHistory; + const messageIdLog = channelA["localHistory"] as LocalHistory; expect(messageIdLog.length).to.equal(1); expect( messageIdLog.some( - (log) => + (log: ContentMessage) => log.lamportTimestamp === expectedTimestamp && log.messageId === messageId ) @@ -119,7 +132,7 @@ describe("MessageChannel", function () { return { success: true, retrievalHint: testRetrievalHint }; }); - const localHistory = channelA["localHistory"] as ILocalHistory; + const localHistory = channelA["localHistory"] as LocalHistory; expect(localHistory.length).to.equal(1); // Find the message in local history @@ -171,8 +184,8 @@ describe("MessageChannel", function () { describe("receiving a message", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice"); - channelB = new MessageChannel(channelId, "bob"); + channelA = createTestChannel(channelId, "alice"); + channelB = createTestChannel(channelId, "bob"); }); it("should increase lamport timestamp", async () => { @@ -187,8 +200,8 @@ describe("MessageChannel", function () { // TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648 it.skip("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => { - const testChannelA = new MessageChannel(channelId, "alice"); - const testChannelB = new MessageChannel(channelId, "bob"); + const testChannelA = createTestChannel(channelId, "alice"); + const testChannelB = createTestChannel(channelId, "bob"); const timestampBefore = testChannelA["lamportTimestamp"]; @@ -305,7 +318,7 @@ describe("MessageChannel", function () { testRetrievalHint ); - const localHistory = channelA["localHistory"] as ILocalHistory; + const localHistory = channelA["localHistory"] as LocalHistory; expect(localHistory.length).to.equal(1); // Find the message in local history @@ -427,7 +440,7 @@ describe("MessageChannel", function () { ) ); - const localHistory = channelA["localHistory"] as ILocalHistory; + const localHistory = channelA["localHistory"] as LocalHistory; expect(localHistory.length).to.equal(2); // When timestamps are equal, should be ordered by messageId lexicographically @@ -452,10 +465,10 @@ describe("MessageChannel", function () { describe("reviewing ack status", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should mark all messages in causal history as acknowledged", async () => { @@ -661,10 +674,10 @@ describe("MessageChannel", function () { describe("Sweeping incoming buffer", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should detect messages with missing dependencies", async () => { @@ -746,7 +759,7 @@ describe("MessageChannel", function () { it("should mark a message as irretrievably lost if timeout is exceeded", async () => { // Create a channel with very very short timeout - const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { timeoutForLostMessagesMs: 10 }); @@ -789,7 +802,7 @@ describe("MessageChannel", function () { let lostMessages: HistoryEntry[] = []; // Create a channel with very short timeout - const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { timeoutForLostMessagesMs: 10 }); @@ -853,7 +866,7 @@ describe("MessageChannel", function () { it("should remove messages without delivering if timeout is exceeded", async () => { const causalHistorySize = channelA["causalHistorySize"]; // Create a channel with very very short timeout - const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { timeoutForLostMessagesMs: 10 }); @@ -1043,10 +1056,10 @@ describe("MessageChannel", function () { describe("Sweeping outgoing buffer", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should partition messages based on acknowledgement status", async () => { @@ -1088,10 +1101,10 @@ describe("MessageChannel", function () { describe("Sync messages", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); const message = utf8ToBytes("first message in channel"); channelA["localHistory"].push( new ContentMessage( @@ -1115,7 +1128,7 @@ describe("MessageChannel", function () { }); it("should not be sent when there is no history", async () => { - const channelC = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { causalHistorySize: 2 }); const res = await channelC.pushOutgoingSyncMessage(async (_msg) => { @@ -1160,7 +1173,7 @@ describe("MessageChannel", function () { }); it("should update ack status of messages in outgoing buffer", async () => { - const channelC = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { causalHistorySize: 2 }); for (const m of messagesA) { @@ -1185,7 +1198,7 @@ describe("MessageChannel", function () { describe("Ephemeral messages", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice"); + channelA = createTestChannel(channelId, "alice"); }); it("should be sent without a timestamp, causal history, or bloom filter", async () => { @@ -1208,7 +1221,7 @@ describe("MessageChannel", function () { }); it("should be delivered immediately if received", async () => { - const channelB = new MessageChannel(channelId, "bob"); + const channelB = createTestChannel(channelId, "bob"); // Track initial state const localHistoryBefore = channelB["localHistory"].length; @@ -1236,4 +1249,67 @@ describe("MessageChannel", function () { expect(channelB["lamportTimestamp"]).to.equal(timestampBefore); }); }); + + describe("localStorage persistence", function () { + // LocalStorage specific tests (browser) + before(function () { + if (typeof localStorage === "undefined") { + this.skip(); + } + }); + + it("should restore messages from localStorage on channel recreation", async () => { + const persistentChannelId = "persistent-channel"; + + const channel1 = new MessageChannel(persistentChannelId, "alice"); + + await sendMessage(channel1, utf8ToBytes("msg-1"), callback); + await sendMessage(channel1, utf8ToBytes("msg-2"), callback); + + expect(channel1["localHistory"].length).to.equal(2); + + // Recreate channel with same storage - should load history + const channel2 = new MessageChannel(persistentChannelId, "alice"); + + expect(channel2["localHistory"].length).to.equal(2); + expect( + channel2["localHistory"].slice(0).map((m) => m.messageId) + ).to.deep.equal([ + MessageChannel.getMessageId(utf8ToBytes("msg-1")), + MessageChannel.getMessageId(utf8ToBytes("msg-2")) + ]); + }); + + it("should include persisted messages in causal history after restart", async () => { + const persistentChannelId = "persistent-causal"; + + const channel1 = new MessageChannel(persistentChannelId, "alice", { + causalHistorySize: 2 + }); + + await sendMessage(channel1, utf8ToBytes("msg-1"), callback); + await sendMessage(channel1, utf8ToBytes("msg-2"), callback); + await sendMessage(channel1, utf8ToBytes("msg-3"), callback); + + const channel2 = new MessageChannel(persistentChannelId, "alice", { + causalHistorySize: 2 + }); + + let capturedMessage: ContentMessage | null = null; + await sendMessage(channel2, utf8ToBytes("msg-4"), async (message) => { + capturedMessage = message; + return { success: true }; + }); + + expect(capturedMessage).to.not.be.null; + expect(capturedMessage!.causalHistory).to.have.lengthOf(2); + // Should reference the last 2 messages (msg-2 and msg-3) + expect(capturedMessage!.causalHistory[0].messageId).to.equal( + MessageChannel.getMessageId(utf8ToBytes("msg-2")) + ); + expect(capturedMessage!.causalHistory[1].messageId).to.equal( + MessageChannel.getMessageId(utf8ToBytes("msg-3")) + ); + }); + }); }); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index eccc1435b9..6091b010dc 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -7,7 +7,7 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js"; import { MessageChannelEvent, MessageChannelEvents } from "./events.js"; -import { MemLocalHistory } from "./mem_local_history.js"; +import { LocalHistory } from "./local_history.js"; import { ChannelId, ContentMessage, @@ -63,11 +63,6 @@ export interface MessageChannelOptions { repairConfig?: RepairConfig; } -export type ILocalHistory = Pick< - Array, - "some" | "push" | "slice" | "find" | "length" | "findIndex" ->; - export class MessageChannel extends TypedEventEmitter { public readonly channelId: ChannelId; public readonly senderId: ParticipantId; @@ -76,7 +71,7 @@ export class MessageChannel extends TypedEventEmitter { private outgoingBuffer: ContentMessage[]; private possibleAcks: Map; private incomingBuffer: Array; - private readonly localHistory: ILocalHistory; + private readonly localHistory: LocalHistory; private timeReceived: Map; private readonly causalHistorySize: number; private readonly possibleAcksThreshold: number; @@ -106,7 +101,7 @@ export class MessageChannel extends TypedEventEmitter { channelId: ChannelId, senderId: ParticipantId, options: MessageChannelOptions = {}, - localHistory: ILocalHistory = new MemLocalHistory() + localHistory?: LocalHistory ) { super(); this.channelId = channelId; @@ -117,7 +112,8 @@ export class MessageChannel extends TypedEventEmitter { this.outgoingBuffer = []; this.possibleAcks = new Map(); this.incomingBuffer = []; - this.localHistory = localHistory; + this.localHistory = + localHistory ?? new LocalHistory({ storage: { prefix: channelId } }); this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes diff --git a/packages/sds/src/message_channel/persistent_storage.spec.ts b/packages/sds/src/message_channel/persistent_storage.spec.ts new file mode 100644 index 0000000000..6109e188f7 --- /dev/null +++ b/packages/sds/src/message_channel/persistent_storage.spec.ts @@ -0,0 +1,119 @@ +import { expect } from "chai"; + +import { LocalHistory } from "./local_history.js"; +import { ContentMessage } from "./message.js"; + +const channelId = "channel-1"; + +describe("Storage", () => { + describe("Browser localStorage", () => { + before(function () { + if (typeof localStorage === "undefined") { + this.skip(); + } + }); + + afterEach(() => { + localStorage.removeItem(`waku:sds:storage:${channelId}`); + }); + + it("persists and restores messages", () => { + const history1 = new LocalHistory({ storage: { prefix: channelId } }); + history1.push(createMessage("msg-1", 1)); + history1.push(createMessage("msg-2", 2)); + + const history2 = new LocalHistory({ storage: { prefix: channelId } }); + + expect(history2.length).to.equal(2); + expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ + "msg-1", + "msg-2" + ]); + }); + + it("handles corrupt data gracefully", () => { + localStorage.setItem(`waku:sds:storage:${channelId}`, "{ invalid json }"); + + const history = new LocalHistory({ storage: { prefix: channelId } }); + expect(history.length).to.equal(0); + // Corrupt data is removed + expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null; + }); + + it("isolates history by channel ID", () => { + const history1 = new LocalHistory({ storage: { prefix: "channel-1" } }); + const history2 = new LocalHistory({ storage: { prefix: "channel-2" } }); + + history1.push(createMessage("msg-1", 1)); + history2.push(createMessage("msg-2", 2)); + + expect(history1.length).to.equal(1); + expect(history1.slice(0)[0].messageId).to.equal("msg-1"); + + expect(history2.length).to.equal(1); + expect(history2.slice(0)[0].messageId).to.equal("msg-2"); + + localStorage.removeItem("waku:sds:storage:channel-2"); + }); + + it("saves messages after each push", () => { + const history = new LocalHistory({ storage: { prefix: channelId } }); + + expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null; + + history.push(createMessage("msg-1", 1)); + + expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.not.be + .null; + + const saved = JSON.parse( + localStorage.getItem(`waku:sds:storage:${channelId}`)! + ); + expect(saved).to.have.lengthOf(1); + expect(saved[0].messageId).to.equal("msg-1"); + }); + + it("loads messages on initialization", () => { + const history1 = new LocalHistory({ storage: { prefix: channelId } }); + + history1.push(createMessage("msg-1", 1)); + history1.push(createMessage("msg-2", 2)); + history1.push(createMessage("msg-3", 3)); + + const history2 = new LocalHistory({ storage: { prefix: channelId } }); + + expect(history2.length).to.equal(3); + expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ + "msg-1", + "msg-2", + "msg-3" + ]); + }); + }); + + describe("In-memory fallback", () => { + it("uses in-memory only when no storage is provided", () => { + const history = new LocalHistory({ maxSize: 100 }); + history.push(createMessage("msg-3", 3)); + + expect(history.length).to.equal(1); + expect(history.slice(0)[0].messageId).to.equal("msg-3"); + + const history2 = new LocalHistory({ maxSize: 100 }); + expect(history2.length).to.equal(0); + }); + }); +}); + +const createMessage = (id: string, timestamp: number): ContentMessage => { + return new ContentMessage( + id, + channelId, + "sender", + [], + BigInt(timestamp), + undefined, + new Uint8Array([timestamp]), + undefined + ); +}; diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 1a8a85f43e..108dabfbdd 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -1,8 +1,8 @@ import { Logger } from "@waku/utils"; +import { LocalHistory } from "../local_history.js"; import type { HistoryEntry, MessageId } from "../message.js"; import { Message } from "../message.js"; -import type { ILocalHistory } from "../message_channel.js"; import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; import { @@ -183,7 +183,7 @@ export class RepairManager { */ public processIncomingRepairRequests( requests: HistoryEntry[], - localHistory: ILocalHistory, + localHistory: LocalHistory, currentTime = Date.now() ): void { for (const request of requests) { @@ -248,7 +248,7 @@ export class RepairManager { * Returns messages that should be rebroadcast */ public sweepIncomingBuffer( - localHistory: ILocalHistory, + localHistory: LocalHistory, currentTime = Date.now() ): Message[] { const ready = this.incomingBuffer.getReady(currentTime); diff --git a/packages/sds/src/message_channel/storage/browser.ts b/packages/sds/src/message_channel/storage/browser.ts new file mode 100644 index 0000000000..ae0d2a5531 --- /dev/null +++ b/packages/sds/src/message_channel/storage/browser.ts @@ -0,0 +1,52 @@ +import { Logger } from "@waku/utils"; + +import { ContentMessage } from "../message.js"; + +import { + MessageSerializer, + StoredContentMessage +} from "./message_serializer.js"; + +const log = new Logger("sds:storage"); + +const STORAGE_NAMESPACE = "waku:sds:storage:"; + +/** + * Browser localStorage wrapper for message persistence. + */ +export class Storage { + private readonly storageKey: string; + + public constructor(storagePrefix: string) { + this.storageKey = `${STORAGE_NAMESPACE}${storagePrefix}`; + } + + public save(messages: ContentMessage[]): void { + try { + const payload = JSON.stringify( + messages.map((msg) => MessageSerializer.serializeContentMessage(msg)) + ); + localStorage.setItem(this.storageKey, payload); + } catch (error) { + log.error("Failed to save messages to storage:", error); + } + } + + public load(): ContentMessage[] { + try { + const raw = localStorage.getItem(this.storageKey); + if (!raw) { + return []; + } + + const stored = JSON.parse(raw) as StoredContentMessage[]; + return stored + .map((record) => MessageSerializer.deserializeContentMessage(record)) + .filter((message): message is ContentMessage => Boolean(message)); + } catch (error) { + log.error("Failed to load messages from storage:", error); + localStorage.removeItem(this.storageKey); + return []; + } + } +} diff --git a/packages/sds/src/message_channel/storage/index.ts b/packages/sds/src/message_channel/storage/index.ts new file mode 100644 index 0000000000..365a399450 --- /dev/null +++ b/packages/sds/src/message_channel/storage/index.ts @@ -0,0 +1,2 @@ +// Node.js implementation - swapped to browser.js via package.json browser field +export { Storage } from "./node.js"; diff --git a/packages/sds/src/message_channel/storage/message_serializer.ts b/packages/sds/src/message_channel/storage/message_serializer.ts new file mode 100644 index 0000000000..49bf4c8f90 --- /dev/null +++ b/packages/sds/src/message_channel/storage/message_serializer.ts @@ -0,0 +1,97 @@ +import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; + +import { ContentMessage, HistoryEntry } from "../message.js"; + +export type StoredCausalEntry = { + messageId: string; + retrievalHint?: string; +}; + +export type StoredContentMessage = { + messageId: string; + channelId: string; + senderId: string; + lamportTimestamp: string; + causalHistory: StoredCausalEntry[]; + bloomFilter?: string; + content: string; + retrievalHint?: string; +}; + +export class MessageSerializer { + public static serializeContentMessage( + message: ContentMessage + ): StoredContentMessage { + return { + messageId: message.messageId, + channelId: message.channelId, + senderId: message.senderId, + lamportTimestamp: message.lamportTimestamp.toString(), + causalHistory: message.causalHistory.map((entry) => + MessageSerializer.serializeCausalEntry(entry) + ), + bloomFilter: MessageSerializer.toHex(message.bloomFilter), + content: bytesToHex(new Uint8Array(message.content)), + retrievalHint: MessageSerializer.toHex(message.retrievalHint) + }; + } + + public static deserializeContentMessage( + record: StoredContentMessage + ): ContentMessage | undefined { + try { + const content = hexToBytes(record.content); + return new ContentMessage( + record.messageId, + record.channelId, + record.senderId, + record.causalHistory.map((entry) => + MessageSerializer.deserializeCausalEntry(entry) + ), + BigInt(record.lamportTimestamp), + MessageSerializer.fromHex(record.bloomFilter), + content, + [], + MessageSerializer.fromHex(record.retrievalHint) + ); + } catch { + return undefined; + } + } + + private static serializeCausalEntry(entry: HistoryEntry): StoredCausalEntry { + return { + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? bytesToHex(entry.retrievalHint) + : undefined + }; + } + + private static deserializeCausalEntry( + entry: StoredCausalEntry + ): HistoryEntry { + return { + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? hexToBytes(entry.retrievalHint) + : undefined + }; + } + + private static toHex( + data?: Uint8Array | Uint8Array + ): string | undefined { + if (!data || data.length === 0) { + return undefined; + } + return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data)); + } + + private static fromHex(value?: string): Uint8Array | undefined { + if (!value) { + return undefined; + } + return hexToBytes(value); + } +} diff --git a/packages/sds/src/message_channel/storage/node.ts b/packages/sds/src/message_channel/storage/node.ts new file mode 100644 index 0000000000..dfd2f0bc03 --- /dev/null +++ b/packages/sds/src/message_channel/storage/node.ts @@ -0,0 +1,62 @@ +import { mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { dirname, join } from "node:path"; + +import { Logger } from "@waku/utils"; + +import { ContentMessage } from "../message.js"; + +import { + MessageSerializer, + StoredContentMessage +} from "./message_serializer.js"; + +const log = new Logger("sds:storage"); + +/** + * Node.js file-based storage for message persistence. + */ +export class Storage { + private readonly filePath: string; + + public constructor(storagePrefix: string, basePath: string = ".waku") { + this.filePath = join(basePath, `${storagePrefix}.json`); + } + + public save(messages: ContentMessage[]): void { + try { + const payload = JSON.stringify( + messages.map((msg) => MessageSerializer.serializeContentMessage(msg)), + null, + 2 + ); + mkdirSync(dirname(this.filePath), { recursive: true }); + writeFileSync(this.filePath, payload, "utf-8"); + } catch (error) { + log.error("Failed to save messages to storage:", error); + } + } + + public load(): ContentMessage[] { + try { + const raw = readFileSync(this.filePath, "utf-8"); + if (!raw) { + return []; + } + + const stored = JSON.parse(raw) as StoredContentMessage[]; + return stored + .map((record) => MessageSerializer.deserializeContentMessage(record)) + .filter((message): message is ContentMessage => Boolean(message)); + } catch (error: unknown) { + if ( + error && + typeof error === "object" && + "code" in error && + error.code !== "ENOENT" + ) { + log.error("Failed to load messages from storage:", error); + } + return []; + } + } +}