diff --git a/src/consume.ts b/src/consume.ts index 6c432f2534..ec1aa97e10 100644 --- a/src/consume.ts +++ b/src/consume.ts @@ -687,17 +687,13 @@ function messageViewerStartPollingCommand( case "PreviewJSON": { track({ action: "preview-snapshot" }); const { - timestamp, + order, messages: { values }, serialized, } = stream(); const includes = bitset()?.predicate() ?? (() => true); const records: string[] = []; - for ( - let i = 0, p = timestamp.head, payload; - i < timestamp.size; - i++, p = timestamp.next[p] - ) { + for (let i = 0, p = order.head, payload; i < order.size; i++, p = order.next[p]) { if (includes(p)) { payload = prepare(values[p], serialized.key.includes(p), serialized.value.includes(p)); records.push("\t" + JSON.stringify(payload)); diff --git a/src/stream/stream.ts b/src/stream/stream.ts index fdc4f9b206..ec73b883e5 100644 --- a/src/stream/stream.ts +++ b/src/stream/stream.ts @@ -6,7 +6,7 @@ export class Stream { serialized: { key: BitSet; value: BitSet }; timestamp: SkipList; partition: SkipList; - order: SkipList | null; + order: SkipList; constructor(capacity = 2 ** 24) { this.capacity = capacity; @@ -35,7 +35,17 @@ export class Stream { let partitionOf = (point: number) => values[point].partition_id; this.partition = new SkipList(capacity, 1 / 2, partitionOf, ascending); - this.order = null; + /* Messages are ordered by timestamp, partition, offset in descending order. */ + this.order = new SkipList( + capacity, + 1 / 4, + (point: number) => values[point], + (a, b) => { + return a.partition_id === b.partition_id + ? descending(a.offset, b.offset) + : descending(a.timestamp, b.timestamp); + }, + ); } insert(message: PartitionConsumeRecord) { @@ -46,9 +56,11 @@ export class Stream { if (isCircular) { this.timestamp.remove(index); this.partition.remove(index); + this.order.remove(index); } this.timestamp.insert(index); this.partition.insert(index); + this.order.insert(index); /* TEMP the API can provide key/value as objects which we don't really utilize as such right now. For faster search and table's rendering time @@ -62,8 +74,6 @@ export class Stream { this.serialized.value.set(index); } else this.serialized.value.unset(index); - // TEMP (July 12th) disabling this since we don't have sorting feature yet - // this.order?.insert(index); return index; }