Skip to content

Commit 16ac7d3

Browse files
committed
perf(datasource/graphene): group together segment id requests within the same bounding box (leaves_many)
1 parent 11b30e3 commit 16ac7d3

File tree

1 file changed

+100
-20
lines changed

1 file changed

+100
-20
lines changed

src/datasource/graphene/backend.ts

Lines changed: 100 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ import {
2222
ChunkSource,
2323
} from "#src/chunk_manager/backend.js";
2424
import { ChunkPriorityTier, ChunkState } from "#src/chunk_manager/base.js";
25-
import type { ChunkedGraphChunkSpecification } from "#src/datasource/graphene/base.js";
25+
import type {
26+
ChunkedGraphChunkSpecification,
27+
HttpSource,
28+
} from "#src/datasource/graphene/base.js";
2629
import {
2730
getGrapheneFragmentKey,
2831
GRAPHENE_MESH_NEW_SEGMENT_RPC_ID,
@@ -65,7 +68,8 @@ import { computeChunkBounds } from "#src/sliceview/volume/backend.js";
6568
import { Uint64Set } from "#src/uint64_set.js";
6669
import { vec3, vec3Key } from "#src/util/geom.js";
6770
import { HttpError } from "#src/util/http_request.js";
68-
import { parseUint64 } from "#src/util/json.js";
71+
import { parseUint64, verifyStringArray } from "#src/util/json.js";
72+
import { Signal } from "#src/util/signal.js";
6973
import {
7074
getBasePriority,
7175
getPriorityTier,
@@ -193,6 +197,73 @@ export class GrapheneMeshSource extends WithParameters(
193197
}
194198
}
195199

200+
class LeavesManyProxy {
201+
pendingRequests = new Map<
202+
string,
203+
[Signal<(response: any) => void>, Uint64Set, AbortController]
204+
>();
205+
206+
constructor(private httpSource: HttpSource) {}
207+
208+
getQueueSizeForBounds(bounds: string) {
209+
const requestsForBounds = this.pendingRequests.get(bounds);
210+
return requestsForBounds ? requestsForBounds[1].size : 0;
211+
}
212+
213+
async request(
214+
segment: bigint,
215+
bounds: string,
216+
signal: AbortSignal,
217+
): Promise<any> {
218+
const { pendingRequests } = this;
219+
let pendingRequest = pendingRequests.get(bounds);
220+
if (!pendingRequest) {
221+
const requestSignal = new Signal<(request: any) => void>();
222+
const abortController = new AbortController();
223+
const segments = new Uint64Set();
224+
pendingRequest = [requestSignal, segments, abortController];
225+
pendingRequests.set(bounds, pendingRequest);
226+
setTimeout(async () => {
227+
pendingRequests.delete(bounds);
228+
const { fetchOkImpl, baseUrl } = this.httpSource;
229+
try {
230+
const response = await fetchOkImpl(
231+
`${baseUrl}/leaves_many?int64_as_str=1&bounds=${bounds}`,
232+
{
233+
method: "POST",
234+
body: JSON.stringify({
235+
node_ids: segments.toJSON(),
236+
}),
237+
signal: abortController.signal,
238+
},
239+
).then((res) => res.json());
240+
requestSignal.dispatch(response);
241+
} catch (e) {
242+
requestSignal.dispatch(e);
243+
}
244+
}, 0);
245+
}
246+
const [requestSignal, segments, abortController] = pendingRequest;
247+
segments.add(segment);
248+
signal.addEventListener("abort", () => {
249+
segments.delete(segment);
250+
if (segments.size === 0) {
251+
abortController.abort();
252+
}
253+
});
254+
return new Promise((f, r) => {
255+
const unregister = requestSignal.add((response) => {
256+
unregister();
257+
if (response instanceof Error) {
258+
r(response);
259+
} else {
260+
f(response[segment.toString()]);
261+
}
262+
});
263+
});
264+
}
265+
}
266+
196267
export class ChunkedGraphChunk extends Chunk {
197268
chunkGridPosition: Float32Array;
198269
source: GrapheneChunkedGraphChunkSource | null = null;
@@ -205,6 +276,26 @@ export class ChunkedGraphChunk extends Chunk {
205276
this.chunkGridPosition = Float32Array.from(chunkGridPosition);
206277
}
207278

279+
get downloadSlots(): number {
280+
const { source, bounds } = this;
281+
if (!source || !bounds) return super.downloadSlots;
282+
const queueSize = source.leavesManyProxy.getQueueSizeForBounds(bounds);
283+
// requests that can be bundled with a prior request are considered free
284+
return queueSize > 0 ? 0 : super.downloadSlots;
285+
}
286+
287+
get bounds() {
288+
const { source } = this;
289+
if (!source) return undefined;
290+
const chunkPosition = computeChunkBounds(source, this);
291+
const chunkDataSize = this.chunkDataSize!;
292+
return (
293+
`${chunkPosition[0]}-${chunkPosition[0] + chunkDataSize[0]}_` +
294+
`${chunkPosition[1]}-${chunkPosition[1] + chunkDataSize[1]}_` +
295+
`${chunkPosition[2]}-${chunkPosition[2] + chunkDataSize[2]}`
296+
);
297+
}
298+
208299
initializeChunkedGraphChunk(
209300
key: string,
210301
chunkGridPosition: Float32Array,
@@ -245,6 +336,7 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
245336
declare chunks: Map<string, ChunkedGraphChunk>;
246337
tempChunkDataSize: Uint32Array;
247338
tempChunkPosition: Float32Array;
339+
leavesManyProxy: LeavesManyProxy;
248340

249341
httpSource = getHttpSource(
250342
this.sharedKvStoreContext.kvStoreContext,
@@ -257,28 +349,20 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
257349
const rank = this.spec.rank;
258350
this.tempChunkDataSize = new Uint32Array(rank);
259351
this.tempChunkPosition = new Float32Array(rank);
352+
this.leavesManyProxy = new LeavesManyProxy(this.httpSource);
260353
}
261354

262355
async download(chunk: ChunkedGraphChunk, signal: AbortSignal): Promise<void> {
263-
const chunkPosition = this.computeChunkBounds(chunk);
264-
const chunkDataSize = chunk.chunkDataSize!;
265-
const bounds =
266-
`${chunkPosition[0]}-${chunkPosition[0] + chunkDataSize[0]}_` +
267-
`${chunkPosition[1]}-${chunkPosition[1] + chunkDataSize[1]}_` +
268-
`${chunkPosition[2]}-${chunkPosition[2] + chunkDataSize[2]}`;
269-
270-
const { fetchOkImpl, baseUrl } = this.httpSource;
271-
const request = fetchOkImpl(
272-
`${baseUrl}/${chunk.segment}/leaves?int64_as_str=1&bounds=${bounds}`,
273-
{ signal },
274-
);
356+
const { segment, bounds } = chunk;
357+
if (!bounds) return;
358+
const request = this.leavesManyProxy.request(segment, bounds, signal);
275359
await this.withErrorMessage(
276360
request,
277361
`Fetching leaves of segment ${chunk.segment} in region ${bounds}: `,
278362
)
279-
.then((res) => res.json())
280363
.then((res) => {
281-
chunk.leaves = decodeChunkedGraphChunk(res.leaf_ids);
364+
verifyStringArray(res);
365+
chunk.leaves = decodeChunkedGraphChunk(res);
282366
})
283367
.catch((err) => {
284368
if (err instanceof Error && err.name === "AbortError") return;
@@ -298,10 +382,6 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
298382
return chunk;
299383
}
300384

301-
computeChunkBounds(chunk: ChunkedGraphChunk) {
302-
return computeChunkBounds(this, chunk);
303-
}
304-
305385
async withErrorMessage<T>(
306386
promise: Promise<T>,
307387
errorPrefix: string,

0 commit comments

Comments
 (0)