Skip to content

Commit 78c5ed5

Browse files
authored
SY-3411: Fix CSV exporting with large amounts of data (#1758)
Performance improvement to the client-side CSV exporting: Move main CSV exporting logic to the client, with an API that we can hopefully keep stable / mostly stable even when we move the heavy data processing work to the backend Make CSV export return a ReadableStream to allow for processing off of the main thread for desktop + certain browsers performance improvements, much faster export speeds now
1 parent fb69bc2 commit 78c5ed5

19 files changed

Lines changed: 1184 additions & 456 deletions

File tree

client/py/examples/multi-rate-calculated/simulated_daq.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
demonstrate how to calculate derived values from these channels with different rates.
1515
"""
1616

17+
import random
1718
import time
1819

1920
import numpy as np
@@ -71,21 +72,25 @@
7172
i = 0
7273
while rough_rate.wait():
7374
time = sy.TimeStamp.now()
74-
time_2 = time + sy.TimeSpan.MILLISECOND * 3
75+
time_2 = time + sy.TimeSpan.MICROSECOND * 3
7576
# Generate data to write to the first channel.
7677
data_to_write = {
7778
time_ch_1.key: [time, time_2],
7879
data_ch_1.key: [np.sin(i / 10), np.sin((i + 1) / 10)],
7980
}
8081

81-
# Only write to the second channel every third iteration, so its rate is 10Hz
82-
# instead of 30Hz.
82+
# Only write to the second channel every third iteration, so its rate is 10 Hz
83+
# instead of 30 Hz.
8384
if i % 3 == 0:
84-
# Generate timestamps at a different time to introduce intentional
85-
# misalignment.
86-
time = sy.TimeStamp.now()
87-
time_2 = time + sy.TimeSpan.MILLISECOND
88-
data_to_write[time_ch_2.key] = [time, time_2]
85+
# Generate timestamps at a random time that is off by between -5 and +5
86+
# nanoseconds using random.randint
87+
second_time = time + sy.TimeSpan.NANOSECOND * random.randint(-5, 5)
88+
second_time_2 = (
89+
second_time
90+
+ sy.TimeSpan.MICROSECOND * 3
91+
+ sy.TimeSpan.NANOSECOND * random.randint(-5, 5)
92+
)
93+
data_to_write[time_ch_2.key] = [second_time, second_time_2]
8994
data_to_write[data_ch_2.key] = [np.sin(i / 100), np.sin((i + 1) / 100)]
9095

9196
writer.write(data_to_write)

client/ts/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@synnaxlabs/client",
3-
"version": "0.49.0",
3+
"version": "0.49.2",
44
"description": "The Synnax Client Library",
55
"keywords": [
66
"synnax",

client/ts/src/framer/client.ts

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { channel } from "@/channel";
2121
import { Deleter } from "@/framer/deleter";
2222
import { Frame } from "@/framer/frame";
2323
import { AUTO_SPAN, Iterator, type IteratorConfig } from "@/framer/iterator";
24+
import { Reader, type ReadRequest } from "@/framer/reader";
2425
import { openStreamer, type Streamer, type StreamerConfig } from "@/framer/streamer";
2526
import { Writer, type WriterConfig, WriterMode } from "@/framer/writer";
2627
import { ontology } from "@/ontology";
@@ -32,6 +33,7 @@ export class Client {
3233
private readonly streamClient: WebSocketClient;
3334
private readonly retriever: channel.Retriever;
3435
private readonly deleter: Deleter;
36+
private readonly reader: Reader;
3537

3638
constructor(
3739
stream: WebSocketClient,
@@ -41,6 +43,7 @@ export class Client {
4143
this.streamClient = stream;
4244
this.retriever = retriever;
4345
this.deleter = new Deleter(unary);
46+
this.reader = new Reader(retriever, stream);
4447
}
4548

4649
/**
@@ -140,15 +143,15 @@ export class Client {
140143
}
141144

142145
async read(tr: CrudeTimeRange, channel: channel.KeyOrName): Promise<MultiSeries>;
143-
144146
async read(tr: CrudeTimeRange, channels: channel.Params): Promise<Frame>;
145-
147+
async read(request: ReadRequest): Promise<ReadableStream<Uint8Array>>;
146148
async read(
147-
tr: CrudeTimeRange,
148-
channels: channel.Params,
149-
): Promise<MultiSeries | Frame> {
150-
const { single } = channel.analyzeParams(channels);
151-
const fr = await this.readFrame(tr, channels);
149+
tr: CrudeTimeRange | ReadRequest,
150+
channels?: channel.Params,
151+
): Promise<MultiSeries | Frame | ReadableStream<Uint8Array>> {
152+
if (!("start" in tr)) return this.reader.read(tr);
153+
const { single } = channel.analyzeParams(channels!);
154+
const fr = await this.readFrame(tr, channels!);
152155
if (single) return fr.get(channels as channel.KeyOrName);
153156
return fr;
154157
}
@@ -199,13 +202,7 @@ export class Client {
199202
const { normalized, variant } = channel.analyzeParams(channels);
200203
const bounds = new TimeRange(timeRange);
201204
if (variant === "keys")
202-
return await this.deleter.delete({
203-
keys: normalized as channel.Key[],
204-
bounds,
205-
});
206-
return await this.deleter.delete({
207-
names: normalized as string[],
208-
bounds,
209-
});
205+
return await this.deleter.delete({ keys: normalized as channel.Key[], bounds });
206+
return await this.deleter.delete({ names: normalized as string[], bounds });
210207
}
211208
}

0 commit comments

Comments
 (0)