Skip to content

Commit 6bae7c3

Browse files
define custom order of messages
1 parent 089414a commit 6bae7c3

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

src/consume.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -683,17 +683,13 @@ function messageViewerStartPollingCommand(
683683
case "PreviewJSON": {
684684
track({ action: "preview-snapshot" });
685685
const {
686-
timestamp,
686+
order,
687687
messages: { values },
688688
serialized,
689689
} = stream();
690690
const includes = bitset()?.predicate() ?? (() => true);
691691
const records: string[] = [];
692-
for (
693-
let i = 0, p = timestamp.head, payload;
694-
i < timestamp.size;
695-
i++, p = timestamp.next[p]
696-
) {
692+
for (let i = 0, p = order.head, payload; i < order.size; i++, p = order.next[p]) {
697693
if (includes(p)) {
698694
payload = prepare(values[p], serialized.key.includes(p), serialized.value.includes(p));
699695
records.push("\t" + JSON.stringify(payload));

src/stream/stream.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export class Stream {
66
serialized: { key: BitSet; value: BitSet };
77
timestamp: SkipList<number | undefined>;
88
partition: SkipList<number | undefined>;
9-
order: SkipList<PartitionConsumeRecord> | null;
9+
order: SkipList<PartitionConsumeRecord>;
1010

1111
constructor(capacity = 2 ** 24) {
1212
this.capacity = capacity;
@@ -35,7 +35,19 @@ export class Stream {
3535
let partitionOf = (point: number) => values[point].partition_id;
3636
this.partition = new SkipList(capacity, 1 / 2, partitionOf, ascending);
3737

38-
this.order = null;
38+
/* Messages are ordered by timestamp, partition, offset in descending order. */
39+
this.order = new SkipList(
40+
capacity,
41+
1 / 4,
42+
(point: number) => values[point],
43+
(a, b) => {
44+
return (
45+
descending(a.timestamp, b.timestamp) ||
46+
descending(a.partition_id, b.partition_id) ||
47+
descending(a.offset, b.offset)
48+
);
49+
},
50+
);
3951
}
4052

4153
insert(message: PartitionConsumeRecord) {
@@ -46,9 +58,11 @@ export class Stream {
4658
if (isCircular) {
4759
this.timestamp.remove(index);
4860
this.partition.remove(index);
61+
this.order.remove(index);
4962
}
5063
this.timestamp.insert(index);
5164
this.partition.insert(index);
65+
this.order.insert(index);
5266

5367
/* TEMP the API can provide key/value as objects which we don't really
5468
utilize as such right now. For faster search and table's rendering time
@@ -62,8 +76,6 @@ export class Stream {
6276
this.serialized.value.set(index);
6377
} else this.serialized.value.unset(index);
6478

65-
// TEMP (July 12th) disabling this since we don't have sorting feature yet
66-
// this.order?.insert(index);
6779
return index;
6880
}
6981

0 commit comments

Comments
 (0)