Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8428957
feat: persistent history for SDS
danisharora099 Nov 14, 2025
81e9070
refactor: simplify localHistory initialization by removing intermedia…
danisharora099 Nov 20, 2025
16587fa
feat(history): introduce `ILocalHistory` interface and refactor `Pers…
danisharora099 Nov 20, 2025
b24eb86
refactor: rename `storageKey` to `storageKeyPrefix` and update storag…
danisharora099 Nov 20, 2025
dd8032d
refactor: rename persistence methods from `persist`/`restore` to `sav…
danisharora099 Nov 20, 2025
1851154
refactor: Remove silent error suppression for persistent history stor…
danisharora099 Nov 20, 2025
a0bc652
feat: remove `storageKeyPrefix` and `getDefaultHistoryStorage`, simpl…
danisharora099 Nov 20, 2025
d320c75
feat: Conditionally set default history storage to localStorage for i…
danisharora099 Nov 21, 2025
0aff074
tests: use memLocalHistory for message_channel.spec.ts
danisharora099 Nov 27, 2025
b55967c
test: add unit tests for localStorage persistence and error handling …
danisharora099 Nov 27, 2025
2a0a08d
test: local storage specific test
danisharora099 Nov 27, 2025
c8bb505
feat: Introduce `PersistentStorage` for message history management an…
danisharora099 Nov 27, 2025
959cad9
chore: remove unused exports
danisharora099 Nov 27, 2025
b437e19
chore: interface abstractions
danisharora099 Nov 27, 2025
af0bbf8
chore: update interface to type as we're not implementing it
danisharora099 Nov 27, 2025
9075729
fix: MemLocalHistory uses optional chaining
danisharora099 Nov 27, 2025
3e3c511
chore: add logging to LocalHistory for storage init
danisharora099 Nov 27, 2025
055d7fe
wip
danisharora099 Nov 27, 2025
42bfd98
refactor: abstract `messageIndex` behind `ILocalHistory` as `findMiss…
danisharora099 Nov 28, 2025
bc59717
chore: remove unused function
danisharora099 Nov 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions packages/sds/src/message_channel/mem_local_history.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,44 @@ describe("MemLocalHistory", () => {
it("Cap max size when messages are pushed one at a time", () => {
const maxSize = 2;

const hist = new MemLocalHistory(maxSize);
const hist = new MemLocalHistory({ maxSize: maxSize });

hist.push(
hist.addMessages(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
);
expect(hist.length).to.eq(1);
hist.push(
expect(hist.size).to.eq(1);
hist.addMessages(
new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2]))
);
expect(hist.length).to.eq(2);
expect(hist.size).to.eq(2);

hist.push(
hist.addMessages(
new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3]))
);
expect(hist.length).to.eq(2);
expect(hist.size).to.eq(2);

expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1);
expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1);
expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1);
expect(hist.hasMessage("1")).to.eq(false);
expect(hist.hasMessage("2")).to.eq(true);
expect(hist.hasMessage("3")).to.eq(true);
});

it("Cap max size when a pushed array is exceeding the cap", () => {
const maxSize = 2;

const hist = new MemLocalHistory(maxSize);
const hist = new MemLocalHistory({ maxSize: maxSize });

hist.push(
hist.addMessages(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
);
expect(hist.length).to.eq(1);
hist.push(
expect(hist.size).to.eq(1);
hist.addMessages(
new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])),
new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3]))
);
expect(hist.length).to.eq(2);
expect(hist.size).to.eq(2);

expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1);
expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1);
expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1);
expect(hist.hasMessage("1")).to.eq(false);
expect(hist.hasMessage("2")).to.eq(true);
expect(hist.hasMessage("3")).to.eq(true);
});
});
137 changes: 94 additions & 43 deletions packages/sds/src/message_channel/mem_local_history.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { Logger } from "@waku/utils";
import _ from "lodash";

import { ContentMessage, isContentMessage } from "./message.js";
import {
type ChannelId,
ContentMessage,
type HistoryEntry,
isContentMessage
} from "./message.js";
import { PersistentStorage } from "./persistent_storage.js";

export const DEFAULT_MAX_LENGTH = 10_000;

Expand All @@ -17,79 +24,111 @@ export const DEFAULT_MAX_LENGTH = 10_000;
* If an array of items longer than `maxLength` is pushed, dropping will happen
* at next push.
*/
export class MemLocalHistory {
export interface ILocalHistory {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageChannel is the one to define the interface, hence this should be in message_channel.ts.

readonly size: number;
addMessages(...messages: ContentMessage[]): void;
hasMessage(messageId: string): boolean;
getMessage(messageId: string): ContentMessage | undefined;
getRecentMessages(count: number): ContentMessage[];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the expectation is for this to be ordered? if so, I'd add it to the name.

getAllMessages(): ContentMessage[];
findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[];
}

export type MemLocalHistoryOptions = {
storage?: ChannelId | PersistentStorage;
maxSize?: number;
};

const log = new Logger("sds:local-history");

export class MemLocalHistory implements ILocalHistory {
private items: ContentMessage[] = [];
private messageIndex: Map<string, ContentMessage> = new Map();
Comment on lines 45 to +46
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private items: ContentMessage[] = [];
private messageIndex: Map<string, ContentMessage> = new Map();
private orderedMessageArray: ContentMessage[] = [];
private messageMap: Map<string, ContentMessage> = new Map();

I don't think a Map qualify as an index because it cotnains both keys (index) and values (not a ref)

private readonly storage?: PersistentStorage;
private readonly maxSize: number;

/**
* Construct a new in-memory local history
* Construct a new in-memory local history.
*
* @param maxLength The maximum number of message to store.
* @param opts Configuration object.
* - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage.
* - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH.
*/
public constructor(private maxLength: number = DEFAULT_MAX_LENGTH) {}
public constructor(opts: MemLocalHistoryOptions = {}) {
const { storage, maxSize } = opts;
this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH;
if (storage instanceof PersistentStorage) {
this.storage = storage;
log.info("Using explicit persistent storage");
} else if (typeof storage === "string") {
this.storage = PersistentStorage.create(storage);
log.info("Creating persistent storage for channel", storage);
} else {
this.storage = undefined;
log.info("Using in-memory storage");
}

public get length(): number {
this.load();
}

public get size(): number {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

return this.items.length;
}

public push(...items: ContentMessage[]): number {
for (const item of items) {
this.validateMessage(item);
public addMessages(...messages: ContentMessage[]): void {
for (const message of messages) {
this.validateMessage(message);
}

// Add new items and sort by timestamp, ensuring uniqueness by messageId
// The valueOf() method on ContentMessage enables native < operator sorting
const combinedItems = [...this.items, ...items];
const combinedItems = [...this.items, ...messages];

// Sort by timestamp (using valueOf() which creates timestamp_messageId string)
combinedItems.sort((a, b) => a.valueOf().localeCompare(b.valueOf()));

// Remove duplicates by messageId while maintaining order
this.items = _.uniqBy(combinedItems, "messageId");

this.rebuildIndex();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are rebuilding the index, and then you are removing items from the array, and then you are removing the removed items from the index.

I think there is a better way to do this.


// Let's drop older messages if max length is reached
if (this.length > this.maxLength) {
const numItemsToRemove = this.length - this.maxLength;
this.items.splice(0, numItemsToRemove);
if (this.size > this.maxSize) {
const numItemsToRemove = this.size - this.maxSize;
const removedItems = this.items.splice(0, numItemsToRemove);
for (const item of removedItems) {
this.messageIndex.delete(item.messageId);
}
}

return this.items.length;
this.save();
}

public some(
predicate: (
value: ContentMessage,
index: number,
array: ContentMessage[]
) => unknown,
thisArg?: any
): boolean {
return this.items.some(predicate, thisArg);
public hasMessage(messageId: string): boolean {
return this.messageIndex.has(messageId);
}

public slice(start?: number, end?: number): ContentMessage[] {
return this.items.slice(start, end);
public getRecentMessages(count: number): ContentMessage[] {
return this.items.slice(-count);
}

public find(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): ContentMessage | undefined {
return this.items.find(predicate, thisArg);
public getAllMessages(): ContentMessage[] {
return [...this.items];
}

public findIndex(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): number {
return this.items.findIndex(predicate, thisArg);
public getMessage(messageId: string): ContentMessage | undefined {
return this.messageIndex.get(messageId);
}

public findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] {
return entries.filter((entry) => !this.messageIndex.has(entry.messageId));
}

private rebuildIndex(): void {
this.messageIndex.clear();
for (const message of this.items) {
this.messageIndex.set(message.messageId, message);
}
}

private validateMessage(message: ContentMessage): void {
Expand All @@ -99,4 +138,16 @@ export class MemLocalHistory {
);
}
}

private save(): void {
this.storage?.save(this.items);
}

private load(): void {
const messages = this.storage?.load() ?? [];
if (messages.length > 0) {
this.items = messages;
this.rebuildIndex();
}
}
}
Loading