From 1eb7e8678031b1128d06b7229f322259e71adc4b Mon Sep 17 00:00:00 2001 From: Oleksii Raspopov Date: Fri, 7 Mar 2025 13:22:03 -0500 Subject: [PATCH 1/2] define custom order of messages --- src/consume.ts | 8 ++------ src/stream/stream.ts | 20 ++++++++++++++++---- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/consume.ts b/src/consume.ts index 6c432f2534..ec1aa97e10 100644 --- a/src/consume.ts +++ b/src/consume.ts @@ -687,17 +687,13 @@ function messageViewerStartPollingCommand( case "PreviewJSON": { track({ action: "preview-snapshot" }); const { - timestamp, + order, messages: { values }, serialized, } = stream(); const includes = bitset()?.predicate() ?? (() => true); const records: string[] = []; - for ( - let i = 0, p = timestamp.head, payload; - i < timestamp.size; - i++, p = timestamp.next[p] - ) { + for (let i = 0, p = order.head, payload; i < order.size; i++, p = order.next[p]) { if (includes(p)) { payload = prepare(values[p], serialized.key.includes(p), serialized.value.includes(p)); records.push("\t" + JSON.stringify(payload)); diff --git a/src/stream/stream.ts b/src/stream/stream.ts index fdc4f9b206..a153de37bb 100644 --- a/src/stream/stream.ts +++ b/src/stream/stream.ts @@ -6,7 +6,7 @@ export class Stream { serialized: { key: BitSet; value: BitSet }; timestamp: SkipList; partition: SkipList; - order: SkipList | null; + order: SkipList; constructor(capacity = 2 ** 24) { this.capacity = capacity; @@ -35,7 +35,19 @@ export class Stream { let partitionOf = (point: number) => values[point].partition_id; this.partition = new SkipList(capacity, 1 / 2, partitionOf, ascending); - this.order = null; + /* Messages are ordered by timestamp, partition, offset in descending order. */ + this.order = new SkipList( + capacity, + 1 / 4, + (point: number) => values[point], + (a, b) => { + return ( + descending(a.timestamp, b.timestamp) || + descending(a.partition_id, b.partition_id) || + descending(a.offset, b.offset) + ); + }, + ); } insert(message: PartitionConsumeRecord) { @@ -46,9 +58,11 @@ export class Stream { if (isCircular) { this.timestamp.remove(index); this.partition.remove(index); + this.order.remove(index); } this.timestamp.insert(index); this.partition.insert(index); + this.order.insert(index); /* TEMP the API can provide key/value as objects which we don't really utilize as such right now. For faster search and table's rendering time @@ -62,8 +76,6 @@ export class Stream { this.serialized.value.set(index); } else this.serialized.value.unset(index); - // TEMP (July 12th) disabling this since we don't have sorting feature yet - // this.order?.insert(index); return index; } From e0cc541fb975717dbcaf2340dcb962ebc1e373bb Mon Sep 17 00:00:00 2001 From: Oleksii Raspopov Date: Thu, 13 Mar 2025 13:37:36 -0400 Subject: [PATCH 2/2] desc by offset within the same partition, otherwise rely on timestamp --- src/stream/stream.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/stream/stream.ts b/src/stream/stream.ts index a153de37bb..ec73b883e5 100644 --- a/src/stream/stream.ts +++ b/src/stream/stream.ts @@ -41,11 +41,9 @@ export class Stream { 1 / 4, (point: number) => values[point], (a, b) => { - return ( - descending(a.timestamp, b.timestamp) || - descending(a.partition_id, b.partition_id) || - descending(a.offset, b.offset) - ); + return a.partition_id === b.partition_id + ? descending(a.offset, b.offset) + : descending(a.timestamp, b.timestamp); }, ); }