Skip to content

Commit 3cd407d

Browse files
committed
Support adaptive data feed slice replay
1 parent 8f2bcea commit 3cd407d

5 files changed

Lines changed: 157 additions & 40 deletions

File tree

ARCHITECTURE.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,18 @@ Main Thread Worker Thread
1313
│ │
1414
│── Start replay ──→ │
1515
│ Fetch data slice from API
16-
│ Cache to disk (.gz file)
17-
│ ←── message (sliceKey, path) ──
16+
│ Cache to disk (.gz/.zst file)
17+
│ ←── message (sliceKey, path, size) ──
1818
│ Fetch next slice...
1919
│ │
2020
Read cached file from disk │
21-
Decompress (gunzip)
21+
Decompress (gzip/zstd)
2222
Split by newlines │
2323
Parse JSON messages │
2424
Yield {localTimestamp, message} │
2525
```
2626

27-
Worker thread pre-fetches and caches slices while the main thread processes the current one. This keeps I/O and CPU pipelined.
27+
Worker thread pre-fetches and caches slices while the main thread processes the current one. This keeps I/O and CPU pipelined. Normal replay fetches the first and last minute as one-minute requests, uses the returned suggested slice size for the middle of the range, and caches multi-minute responses as start-minute files with a `.size-{minutes}` suffix. One-minute cache paths keep the legacy filename.
2828

2929
## Real-time Streaming
3030

src/handy.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,7 @@ async function _downloadFile(requestOptions: RequestOptions, url: string, downlo
628628

629629
try {
630630
// based on https://github.com/nodejs/node/issues/28172 - only reliable way to consume response stream and avoiding all the 'gotchas'
631+
let responseHeaders: Record<string, string> = {}
631632
await new Promise<void>((resolve, reject) => {
632633
const req = https
633634
.get(url, requestOptions, (res) => {
@@ -642,6 +643,7 @@ async function _downloadFile(requestOptions: RequestOptions, url: string, downlo
642643
reject(new HttpError(statusCode!, body, url))
643644
})
644645
} else {
646+
responseHeaders = parseNodeResponseHeaders(res.headers)
645647
if (appendContentEncodingExtension) {
646648
const contentEncoding = asSingleHeaderValue(res.headers['content-encoding'])
647649
if (contentEncoding === 'zstd') {
@@ -682,7 +684,8 @@ async function _downloadFile(requestOptions: RequestOptions, url: string, downlo
682684
await rename(tmpFilePath, finalDownloadPath)
683685

684686
return {
685-
downloadPath: finalDownloadPath
687+
downloadPath: finalDownloadPath,
688+
headers: responseHeaders
686689
}
687690
} finally {
688691
tmpFileCleanups.delete(tmpFilePath)

src/replay.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ export async function* replay<T extends Exchange, U extends boolean = false, Z e
4242

4343
const fromDate = parseAsUTCDate(from)
4444
const toDate = parseAsUTCDate(to)
45-
const cachedSlicePaths = new Map<string, string>()
45+
const cachedSlicePaths = new Map<string, { slicePath: string; sliceSize: number }>()
4646
let replayError
4747
debug('replay for exchange: %s started - from: %s, to: %s, filters: %o', exchange, fromDate.toISOString(), toDate.toISOString(), filters)
4848

@@ -65,7 +65,10 @@ export async function* replay<T extends Exchange, U extends boolean = false, Z e
6565
const worker = new ReliableWorker(payload)
6666

6767
worker.on('message', (message: WorkerMessage) => {
68-
cachedSlicePaths.set(message.sliceKey, message.slicePath)
68+
cachedSlicePaths.set(message.sliceKey, {
69+
slicePath: message.slicePath,
70+
sliceSize: message.sliceSize
71+
})
6972
})
7073

7174
worker.on('error', (err) => {
@@ -100,23 +103,24 @@ export async function* replay<T extends Exchange, U extends boolean = false, Z e
100103

101104
debug('getting slice: %s, exchange: %s', sliceKey, exchange)
102105

103-
let cachedSlicePath
104-
while (cachedSlicePath === undefined) {
105-
cachedSlicePath = cachedSlicePaths.get(sliceKey)
106+
let cachedSlice
107+
while (cachedSlice === undefined) {
108+
cachedSlice = cachedSlicePaths.get(sliceKey)
106109

107110
// if something went wrong(network issue, auth issue, gunzip issue etc)
108111
if (replayError !== undefined) {
109112
throw replayError
110113
}
111114

112-
if (cachedSlicePath === undefined) {
115+
if (cachedSlice === undefined) {
113116
// if response for requested date is not ready yet wait 100ms and try again
114117
debug('waiting for slice: %s, exchange: %s', sliceKey, exchange)
115118
await wait(100)
116119
}
117120
}
118121

119122
// response is a path to file on disk let' read it as stream
123+
const { slicePath: cachedSlicePath, sliceSize } = cachedSlice
120124
const isZstdSlice = cachedSlicePath.endsWith('.zst')
121125
const linesStream = createReadStream(cachedSlicePath, { highWaterMark: CHUNK_SIZE })
122126
// decompress it while preserving the on-disk cache in the negotiated wire format
@@ -185,8 +189,8 @@ export async function* replay<T extends Exchange, U extends boolean = false, Z e
185189
if (autoCleanup) {
186190
await cleanupSlice(cachedSlicePath)
187191
}
188-
// move one minute forward
189-
currentSliceDate.setUTCMinutes(currentSliceDate.getUTCMinutes() + 1)
192+
// move by the number of minutes covered by this cached response
193+
currentSliceDate.setUTCMinutes(currentSliceDate.getUTCMinutes() + sliceSize)
190194
}
191195

192196
debug(

src/worker.ts

Lines changed: 68 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import type { DataFeedCompression } from './options.ts'
77
import { Exchange, Filter } from './types.ts'
88
const debug = dbg('tardis-dev')
99

10+
const DEFAULT_DATA_FEED_SLICE_SIZE = 1
11+
1012
if (isMainThread) {
1113
debug('current worker is not meant to run in main thread')
1214
} else {
@@ -77,15 +79,32 @@ async function getDataFeedSlices(payload: WorkerJobPayload) {
7779
}
7880
} else {
7981
// fetch last slice - it will tell us if user has access to the end of requested date range and data is available
80-
await getDataFeedSlice(payload, minutesCountToFetch - 1, filters, cacheDir)
82+
// also fetch it from API to get current suggested slice size headers
83+
const lastSlice = await getDataFeedSlice(payload, minutesCountToFetch - 1, filters, cacheDir, DEFAULT_DATA_FEED_SLICE_SIZE, false)
8184

8285
// fetch first slice - it will tell us if user has access to the beginning of requested date range
83-
await getDataFeedSlice(payload, 0, filters, cacheDir)
86+
const firstSlice =
87+
minutesCountToFetch === 1 ? lastSlice : await getDataFeedSlice(payload, 0, filters, cacheDir, DEFAULT_DATA_FEED_SLICE_SIZE, false)
88+
89+
const replaySliceSize =
90+
filters.length === 0 ? DEFAULT_DATA_FEED_SLICE_SIZE : Math.max(firstSlice.suggestedSliceSize, lastSlice.suggestedSliceSize)
91+
const sliceOffsets: number[] = []
92+
for (let offset = 1; offset < minutesCountToFetch - 1; offset += replaySliceSize) {
93+
sliceOffsets.push(offset)
94+
}
8495

8596
// it both begining and end date of the range is accessible fetch all remaning slices concurently with CONCURRENCY_LIMIT
8697
await pMap(
87-
sequence(minutesCountToFetch, 1), // this will produce Iterable sequence from 1 to minutesCountToFetch
88-
(offset) => getDataFeedSlice(payload, offset, filters, cacheDir),
98+
sliceOffsets,
99+
async (offset) => {
100+
let currentOffset = offset
101+
let remainingSliceSize = Math.min(replaySliceSize, minutesCountToFetch - 1 - offset)
102+
while (remainingSliceSize > 0) {
103+
const result = await getDataFeedSlice(payload, currentOffset, filters, cacheDir, remainingSliceSize)
104+
currentOffset += result.sliceSize
105+
remainingSliceSize -= result.sliceSize
106+
}
107+
},
89108
{ concurrency: CONCURRENCY_LIMIT }
90109
)
91110
}
@@ -95,51 +114,73 @@ async function getDataFeedSlice(
95114
{ exchange, fromDate, endpoint, apiKey, dataFeedCompression, userAgent }: WorkerJobPayload,
96115
offset: number,
97116
filters: object[],
98-
cacheDir: string
117+
cacheDir: string,
118+
requestedSliceSize = DEFAULT_DATA_FEED_SLICE_SIZE,
119+
useCache = true
99120
) {
100121
const sliceTimestamp = addMinutes(fromDate, offset)
101122
const sliceKey = sliceTimestamp.toISOString()
102-
const sliceBasePath = `${cacheDir}/${formatDateToPath(sliceTimestamp)}.json`
123+
const sliceSizeSuffix = requestedSliceSize === DEFAULT_DATA_FEED_SLICE_SIZE ? '' : `.size-${requestedSliceSize}`
124+
const sliceBasePath = `${cacheDir}/${formatDateToPath(sliceTimestamp)}${sliceSizeSuffix}.json`
103125
const zstdSlicePath = `${sliceBasePath}.zst`
104126
const gzipSlicePath = `${sliceBasePath}.gz`
105-
const cachedSlicePath = existsSync(zstdSlicePath) ? zstdSlicePath : existsSync(gzipSlicePath) ? gzipSlicePath : undefined
127+
let cachedSlicePath
128+
if (useCache) {
129+
cachedSlicePath = existsSync(zstdSlicePath) ? zstdSlicePath : existsSync(gzipSlicePath) ? gzipSlicePath : undefined
130+
}
131+
132+
if (cachedSlicePath !== undefined) {
133+
debug('getDataFeedSlice already cached: %s, sliceSize: %d', sliceKey, requestedSliceSize)
134+
const message: WorkerMessage = {
135+
sliceKey,
136+
slicePath: cachedSlicePath,
137+
sliceSize: requestedSliceSize
138+
}
139+
parentPort!.postMessage(message)
140+
return {
141+
sliceSize: requestedSliceSize,
142+
suggestedSliceSize: DEFAULT_DATA_FEED_SLICE_SIZE
143+
}
144+
}
106145

107146
let url = `${endpoint}/data-feeds/${exchange}?from=${fromDate.toISOString()}&offset=${offset}&compression=${dataFeedCompression}`
147+
if (requestedSliceSize > DEFAULT_DATA_FEED_SLICE_SIZE) {
148+
url += `&sliceSize=${requestedSliceSize}`
149+
}
108150

109151
if (filters.length > 0) {
110152
url += `&filters=${encodeURIComponent(JSON.stringify(filters))}`
111153
}
112154

113-
const slicePath =
114-
cachedSlicePath ||
115-
(
116-
await download({
117-
apiKey,
118-
downloadPath: sliceBasePath,
119-
url,
120-
userAgent,
121-
appendContentEncodingExtension: true,
122-
acceptEncoding: dataFeedCompression === 'gzip' ? 'gzip' : 'zstd, gzip'
123-
})
124-
).downloadPath
125-
126-
if (cachedSlicePath === undefined) {
127-
debug('getDataFeedSlice fetched from API and cached, %s', sliceKey)
128-
} else {
129-
debug('getDataFeedSlice already cached: %s', sliceKey)
130-
}
155+
const downloadResult = await download({
156+
apiKey,
157+
downloadPath: sliceBasePath,
158+
url,
159+
userAgent,
160+
appendContentEncodingExtension: true,
161+
acceptEncoding: dataFeedCompression === 'gzip' ? 'gzip' : 'zstd, gzip'
162+
})
163+
const responseSliceSize = Number(downloadResult.headers['x-slice-size'])
164+
const suggestedSliceSize = Number(downloadResult.headers['x-suggested-slice-size'] ?? DEFAULT_DATA_FEED_SLICE_SIZE)
131165

132-
// everything went well (already cached or successfull cached) let's communicate it to parent thread
166+
debug('getDataFeedSlice fetched from API and cached, %s, sliceSize: %d', sliceKey, responseSliceSize)
133167
const message: WorkerMessage = {
134168
sliceKey,
135-
slicePath
169+
slicePath: downloadResult.downloadPath,
170+
sliceSize: responseSliceSize
136171
}
137172
parentPort!.postMessage(message)
173+
174+
return {
175+
sliceSize: responseSliceSize,
176+
suggestedSliceSize
177+
}
138178
}
139179

140180
export type WorkerMessage = {
141181
sliceKey: string
142182
slicePath: string
183+
sliceSize: number
143184
}
144185

145186
export type WorkerJobPayload = {

test/replay.test.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1+
import { existsSync, mkdtempSync, readdirSync, rmSync } from 'node:fs'
2+
import os from 'node:os'
3+
import path from 'node:path'
14
import {
25
clearCache,
36
Exchange,
47
EXCHANGES,
8+
init,
59
normalizeBookChanges,
610
normalizeDerivativeTickers,
711
normalizeTrades,
@@ -120,6 +124,60 @@ describe('replay', () => {
120124
1000 * 60 * 10
121125
)
122126

127+
test(
128+
'replays real data using a multi-minute data feed slice',
129+
async () => {
130+
const cacheDir = mkdtempSync(path.join(os.tmpdir(), 'tardis-node-replay-slices-'))
131+
132+
try {
133+
init({ cacheDir })
134+
135+
const messages = []
136+
for await (const { message } of replay({
137+
exchange: 'bitmex',
138+
from: '2019-05-01T00:00:00.000Z',
139+
to: '2019-05-01T00:12:00.000Z',
140+
filters: [
141+
{
142+
channel: 'trade',
143+
symbols: ['ETHUSD']
144+
}
145+
],
146+
skipDecoding: true
147+
})) {
148+
messages.push(message)
149+
}
150+
151+
const cacheFiles = listFiles(cacheDir)
152+
const multiMinuteSliceFiles = cacheFiles.filter((filePath) => /\.size-(?:[2-9]|10)\.json\.(?:gz|zst)$/.test(filePath))
153+
154+
expect(messages.length).toBeGreaterThan(0)
155+
expect(multiMinuteSliceFiles.length).toBeGreaterThan(0)
156+
157+
const cacheFilesAfterFirstReplay = cacheFiles.sort()
158+
for await (const _ of replay({
159+
exchange: 'bitmex',
160+
from: '2019-05-01T00:00:00.000Z',
161+
to: '2019-05-01T00:12:00.000Z',
162+
filters: [
163+
{
164+
channel: 'trade',
165+
symbols: ['ETHUSD']
166+
}
167+
],
168+
skipDecoding: true
169+
})) {
170+
}
171+
172+
expect(listFiles(cacheDir).sort()).toEqual(cacheFilesAfterFirstReplay)
173+
} finally {
174+
init()
175+
rmSync(cacheDir, { force: true, recursive: true })
176+
}
177+
},
178+
1000 * 60 * 10
179+
)
180+
123181
test(
124182
'replays raw Coinbase data feed for 1st of Jun 2019 (ZEC-USDC trades)',
125183
async () => {
@@ -409,3 +467,14 @@ describe('replay', () => {
409467
1000 * 60 * 10
410468
)
411469
})
470+
471+
function listFiles(directory: string): string[] {
472+
if (existsSync(directory) === false) {
473+
return []
474+
}
475+
476+
return readdirSync(directory, { withFileTypes: true }).flatMap((entry) => {
477+
const entryPath = path.join(directory, entry.name)
478+
return entry.isDirectory() ? listFiles(entryPath) : [entryPath]
479+
})
480+
}

0 commit comments

Comments
 (0)