Skip to content

Commit 48dbb07

Browse files
committed
perf(analytics): parallelize export with sliced scroll
Add helpers to scan one index in multiple slices and merge the results while preserving the existing CSV output format. This also clears scroll contexts at the end of each scan. For analytics, use 8 sliced scrolls on top of the larger batch size and source filtering from the previous commit. Local snapshot benchmarks reached about 132k docs/s with 8 slices, and GitHub Actions reduced Generate CSVs from 2h24m59s to 7m27s.
1 parent 1a4bf4d commit 48dbb07

1 file changed

Lines changed: 90 additions & 25 deletions

File tree

dumpOpenData.js

Lines changed: 90 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,17 @@ const client = new Client({
1616
node: ELASTICSEARCH_URL,
1717
});
1818

19+
const ANALYTICS_SOURCE_INCLUDES = [
20+
'type',
21+
'docId',
22+
'date',
23+
'stats.lineUser',
24+
'stats.lineVisit',
25+
'stats.webUser',
26+
'stats.webVisit',
27+
'stats.liff',
28+
];
29+
1930
/**
2031
* @param {string} input
2132
* @returns {string} - input's sha256 hash hex string. Empty string if input is falsy.
@@ -26,9 +37,46 @@ function sha256(input) {
2637
: '';
2738
}
2839

40+
async function* mergeAsyncIterables(iterables) {
41+
const entries = iterables.map((iterable) => {
42+
const iterator = iterable[Symbol.asyncIterator]();
43+
return {
44+
iterator,
45+
next: iterator.next().then((result) => ({ iterator, result })),
46+
};
47+
});
48+
49+
while (entries.length > 0) {
50+
const { iterator, result } = await Promise.race(
51+
entries.map((entry) => entry.next)
52+
);
53+
const entryIndex = entries.findIndex(
54+
(entry) => entry.iterator === iterator
55+
);
56+
57+
if (entryIndex === -1) continue;
58+
59+
if (result.done) {
60+
entries.splice(entryIndex, 1);
61+
continue;
62+
}
63+
64+
entries[entryIndex].next = iterator
65+
.next()
66+
.then((nextResult) => ({ iterator, result: nextResult }));
67+
yield result.value;
68+
}
69+
}
70+
2971
async function* scanIndex(
3072
index,
31-
{ size = 200, sourceIncludes = undefined, sort = undefined } = {}
73+
{
74+
size = 200,
75+
sourceIncludes = undefined,
76+
sort = undefined,
77+
slice = undefined,
78+
progressLabel = index,
79+
} = {}
3280
) {
3381
let processedCount = 0;
3482

@@ -38,28 +86,53 @@ async function* scanIndex(
3886
scroll: '5m',
3987
_source_includes: sourceIncludes,
4088
sort,
89+
slice,
4190
});
4291

4392
let scrollId = result._scroll_id;
4493

45-
while (result.hits.hits.length > 0) {
46-
for (const hit of result.hits.hits) {
47-
processedCount += 1;
48-
yield hit;
94+
try {
95+
while (result.hits.hits.length > 0) {
96+
for (const hit of result.hits.hits) {
97+
processedCount += 1;
98+
yield hit;
99+
}
100+
101+
if (processedCount % 10000 === 0) {
102+
console.info(`${progressLabel}:\t${processedCount} processed`);
103+
}
104+
105+
result = await client.scroll({
106+
scroll_id: scrollId,
107+
scroll: '5m',
108+
});
109+
scrollId = result._scroll_id;
49110
}
50-
51-
if (processedCount % 10000 === 0) {
52-
console.info(`${index}:\t${processedCount} processed`);
111+
} finally {
112+
if (scrollId) {
113+
try {
114+
await client.clearScroll({
115+
scroll_id: scrollId,
116+
});
117+
} catch (error) {
118+
console.warn(`Failed to clear scroll for ${progressLabel}: ${error}`);
119+
}
53120
}
54-
55-
result = await client.scroll({
56-
scroll_id: scrollId,
57-
scroll: '5m',
58-
});
59-
scrollId = result._scroll_id;
60121
}
61122
}
62123

124+
function scanIndexInSlices(index, { slices, ...options }) {
125+
return mergeAsyncIterables(
126+
Array.from({ length: slices }, (_, sliceId) =>
127+
scanIndex(index, {
128+
...options,
129+
slice: { id: sliceId, max: slices },
130+
progressLabel: `${index}[${sliceId + 1}/${slices}]`,
131+
})
132+
)
133+
);
134+
}
135+
63136
/**
64137
* @param {AsyncIterable} articles
65138
* @returns {Promise<string>} Generated CSV string
@@ -461,18 +534,10 @@ pipeline(
461534
);
462535

463536
pipeline(
464-
scanIndex('analytics', {
537+
scanIndexInSlices('analytics', {
538+
slices: 8,
465539
size: 5000,
466-
sourceIncludes: [
467-
'type',
468-
'docId',
469-
'date',
470-
'stats.lineUser',
471-
'stats.lineVisit',
472-
'stats.webUser',
473-
'stats.webVisit',
474-
'stats.liff',
475-
],
540+
sourceIncludes: ANALYTICS_SOURCE_INCLUDES,
476541
sort: ['_doc'],
477542
}),
478543
dumpAnalytics,

0 commit comments

Comments
 (0)