-
Notifications
You must be signed in to change notification settings - Fork 48
chore(sds): optimise lookups #2745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feat/persistent_history
Are you sure you want to change the base?
Changes from all commits
8428957
81e9070
16587fa
b24eb86
dd8032d
1851154
a0bc652
d320c75
0aff074
b55967c
2a0a08d
c8bb505
959cad9
b437e19
af0bbf8
9075729
3e3c511
055d7fe
42bfd98
bc59717
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,13 @@ | ||||||||||
| import { Logger } from "@waku/utils"; | ||||||||||
| import _ from "lodash"; | ||||||||||
|
|
||||||||||
| import { ContentMessage, isContentMessage } from "./message.js"; | ||||||||||
| import { | ||||||||||
| type ChannelId, | ||||||||||
| ContentMessage, | ||||||||||
| type HistoryEntry, | ||||||||||
| isContentMessage | ||||||||||
| } from "./message.js"; | ||||||||||
| import { PersistentStorage } from "./persistent_storage.js"; | ||||||||||
|
|
||||||||||
| export const DEFAULT_MAX_LENGTH = 10_000; | ||||||||||
|
|
||||||||||
|
|
@@ -17,79 +24,111 @@ export const DEFAULT_MAX_LENGTH = 10_000; | |||||||||
| * If an array of items longer than `maxLength` is pushed, dropping will happen | ||||||||||
| * at next push. | ||||||||||
| */ | ||||||||||
| export class MemLocalHistory { | ||||||||||
| export interface ILocalHistory { | ||||||||||
| readonly size: number; | ||||||||||
| addMessages(...messages: ContentMessage[]): void; | ||||||||||
| hasMessage(messageId: string): boolean; | ||||||||||
| getMessage(messageId: string): ContentMessage | undefined; | ||||||||||
| getRecentMessages(count: number): ContentMessage[]; | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume the expectation is for this to be ordered? if so, I'd add it to the name. |
||||||||||
| getAllMessages(): ContentMessage[]; | ||||||||||
| findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[]; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| export type MemLocalHistoryOptions = { | ||||||||||
| storage?: ChannelId | PersistentStorage; | ||||||||||
| maxSize?: number; | ||||||||||
| }; | ||||||||||
|
|
||||||||||
| const log = new Logger("sds:local-history"); | ||||||||||
|
|
||||||||||
| export class MemLocalHistory implements ILocalHistory { | ||||||||||
| private items: ContentMessage[] = []; | ||||||||||
| private messageIndex: Map<string, ContentMessage> = new Map(); | ||||||||||
|
Comment on lines
45
to
+46
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I don't think a |
||||||||||
| private readonly storage?: PersistentStorage; | ||||||||||
| private readonly maxSize: number; | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Construct a new in-memory local history | ||||||||||
| * Construct a new in-memory local history. | ||||||||||
| * | ||||||||||
| * @param maxLength The maximum number of message to store. | ||||||||||
| * @param opts Configuration object. | ||||||||||
| * - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage. | ||||||||||
| * - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH. | ||||||||||
| */ | ||||||||||
| public constructor(private maxLength: number = DEFAULT_MAX_LENGTH) {} | ||||||||||
| public constructor(opts: MemLocalHistoryOptions = {}) { | ||||||||||
| const { storage, maxSize } = opts; | ||||||||||
| this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH; | ||||||||||
| if (storage instanceof PersistentStorage) { | ||||||||||
| this.storage = storage; | ||||||||||
| log.info("Using explicit persistent storage"); | ||||||||||
| } else if (typeof storage === "string") { | ||||||||||
| this.storage = PersistentStorage.create(storage); | ||||||||||
| log.info("Creating persistent storage for channel", storage); | ||||||||||
| } else { | ||||||||||
| this.storage = undefined; | ||||||||||
| log.info("Using in-memory storage"); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| public get length(): number { | ||||||||||
| this.load(); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| public get size(): number { | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? |
||||||||||
| return this.items.length; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| public push(...items: ContentMessage[]): number { | ||||||||||
| for (const item of items) { | ||||||||||
| this.validateMessage(item); | ||||||||||
| public addMessages(...messages: ContentMessage[]): void { | ||||||||||
| for (const message of messages) { | ||||||||||
| this.validateMessage(message); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Add new items and sort by timestamp, ensuring uniqueness by messageId | ||||||||||
| // The valueOf() method on ContentMessage enables native < operator sorting | ||||||||||
| const combinedItems = [...this.items, ...items]; | ||||||||||
| const combinedItems = [...this.items, ...messages]; | ||||||||||
|
|
||||||||||
| // Sort by timestamp (using valueOf() which creates timestamp_messageId string) | ||||||||||
| combinedItems.sort((a, b) => a.valueOf().localeCompare(b.valueOf())); | ||||||||||
|
|
||||||||||
| // Remove duplicates by messageId while maintaining order | ||||||||||
| this.items = _.uniqBy(combinedItems, "messageId"); | ||||||||||
|
|
||||||||||
| this.rebuildIndex(); | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are rebuilding the index, and then you are removing items from the array, and then you are removing the removed items from the index. I think there is a better way to do this. |
||||||||||
|
|
||||||||||
| // Let's drop older messages if max length is reached | ||||||||||
| if (this.length > this.maxLength) { | ||||||||||
| const numItemsToRemove = this.length - this.maxLength; | ||||||||||
| this.items.splice(0, numItemsToRemove); | ||||||||||
| if (this.size > this.maxSize) { | ||||||||||
| const numItemsToRemove = this.size - this.maxSize; | ||||||||||
| const removedItems = this.items.splice(0, numItemsToRemove); | ||||||||||
| for (const item of removedItems) { | ||||||||||
| this.messageIndex.delete(item.messageId); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| return this.items.length; | ||||||||||
| this.save(); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| public some( | ||||||||||
| predicate: ( | ||||||||||
| value: ContentMessage, | ||||||||||
| index: number, | ||||||||||
| array: ContentMessage[] | ||||||||||
| ) => unknown, | ||||||||||
| thisArg?: any | ||||||||||
| ): boolean { | ||||||||||
| return this.items.some(predicate, thisArg); | ||||||||||
| public hasMessage(messageId: string): boolean { | ||||||||||
| return this.messageIndex.has(messageId); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| public slice(start?: number, end?: number): ContentMessage[] { | ||||||||||
| return this.items.slice(start, end); | ||||||||||
| public getRecentMessages(count: number): ContentMessage[] { | ||||||||||
| return this.items.slice(-count); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| public find( | ||||||||||
| predicate: ( | ||||||||||
| value: ContentMessage, | ||||||||||
| index: number, | ||||||||||
| obj: ContentMessage[] | ||||||||||
| ) => unknown, | ||||||||||
| thisArg?: any | ||||||||||
| ): ContentMessage | undefined { | ||||||||||
| return this.items.find(predicate, thisArg); | ||||||||||
| public getAllMessages(): ContentMessage[] { | ||||||||||
| return [...this.items]; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| public findIndex( | ||||||||||
| predicate: ( | ||||||||||
| value: ContentMessage, | ||||||||||
| index: number, | ||||||||||
| obj: ContentMessage[] | ||||||||||
| ) => unknown, | ||||||||||
| thisArg?: any | ||||||||||
| ): number { | ||||||||||
| return this.items.findIndex(predicate, thisArg); | ||||||||||
| public getMessage(messageId: string): ContentMessage | undefined { | ||||||||||
| return this.messageIndex.get(messageId); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| public findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] { | ||||||||||
| return entries.filter((entry) => !this.messageIndex.has(entry.messageId)); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private rebuildIndex(): void { | ||||||||||
| this.messageIndex.clear(); | ||||||||||
| for (const message of this.items) { | ||||||||||
| this.messageIndex.set(message.messageId, message); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private validateMessage(message: ContentMessage): void { | ||||||||||
|
|
@@ -99,4 +138,16 @@ export class MemLocalHistory { | |||||||||
| ); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private save(): void { | ||||||||||
| this.storage?.save(this.items); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private load(): void { | ||||||||||
| const messages = this.storage?.load() ?? []; | ||||||||||
| if (messages.length > 0) { | ||||||||||
| this.items = messages; | ||||||||||
| this.rebuildIndex(); | ||||||||||
| } | ||||||||||
| } | ||||||||||
| } | ||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageChannelis the one to define the interface, hence this should be inmessage_channel.ts.