Skip to content

perf(datasource/graphene): group together segment id requests within the same bounding box (leaves_many) #583

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 100 additions & 20 deletions src/datasource/graphene/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import {
ChunkSource,
} from "#src/chunk_manager/backend.js";
import { ChunkPriorityTier, ChunkState } from "#src/chunk_manager/base.js";
import type { ChunkedGraphChunkSpecification } from "#src/datasource/graphene/base.js";
import type {
ChunkedGraphChunkSpecification,
HttpSource,
} from "#src/datasource/graphene/base.js";
import {
getGrapheneFragmentKey,
GRAPHENE_MESH_NEW_SEGMENT_RPC_ID,
Expand Down Expand Up @@ -65,7 +68,8 @@ import { computeChunkBounds } from "#src/sliceview/volume/backend.js";
import { Uint64Set } from "#src/uint64_set.js";
import { vec3, vec3Key } from "#src/util/geom.js";
import { HttpError } from "#src/util/http_request.js";
import { parseUint64 } from "#src/util/json.js";
import { parseUint64, verifyStringArray } from "#src/util/json.js";
import { Signal } from "#src/util/signal.js";
import {
getBasePriority,
getPriorityTier,
Expand Down Expand Up @@ -193,6 +197,73 @@ export class GrapheneMeshSource extends WithParameters(
}
}

class LeavesManyProxy {
pendingRequests = new Map<
string,
[Signal<(response: any) => void>, Uint64Set, AbortController]
>();

constructor(private httpSource: HttpSource) {}

getQueueSizeForBounds(bounds: string) {
const requestsForBounds = this.pendingRequests.get(bounds);
return requestsForBounds ? requestsForBounds[1].size : 0;
}

async request(
segment: bigint,
bounds: string,
signal: AbortSignal,
): Promise<any> {
const { pendingRequests } = this;
let pendingRequest = pendingRequests.get(bounds);
if (!pendingRequest) {
const requestSignal = new Signal<(request: any) => void>();
const abortController = new AbortController();
const segments = new Uint64Set();
pendingRequest = [requestSignal, segments, abortController];
pendingRequests.set(bounds, pendingRequest);
setTimeout(async () => {
pendingRequests.delete(bounds);
const { fetchOkImpl, baseUrl } = this.httpSource;
try {
const response = await fetchOkImpl(
`${baseUrl}/leaves_many?int64_as_str=1&bounds=${bounds}`,
{
method: "POST",
body: JSON.stringify({
node_ids: segments.toJSON(),
}),
signal: abortController.signal,
},
).then((res) => res.json());
requestSignal.dispatch(response);
} catch (e) {
requestSignal.dispatch(e);
}
}, 0);
}
const [requestSignal, segments, abortController] = pendingRequest;
segments.add(segment);
signal.addEventListener("abort", () => {
segments.delete(segment);
if (segments.size === 0) {
abortController.abort();
}
});
return new Promise((f, r) => {
const unregister = requestSignal.add((response) => {
unregister();
if (response instanceof Error) {
r(response);
} else {
f(response[segment.toString()]);
}
});
});
}
}

export class ChunkedGraphChunk extends Chunk {
chunkGridPosition: Float32Array;
source: GrapheneChunkedGraphChunkSource | null = null;
Expand All @@ -205,6 +276,26 @@ export class ChunkedGraphChunk extends Chunk {
this.chunkGridPosition = Float32Array.from(chunkGridPosition);
}

get downloadSlots(): number {
const { source, bounds } = this;
if (!source || !bounds) return super.downloadSlots;
const queueSize = source.leavesManyProxy.getQueueSizeForBounds(bounds);
// requests that can be bundled with a prior request are considered free
return queueSize > 0 ? 0 : super.downloadSlots;
}

get bounds() {
const { source } = this;
if (!source) return undefined;
const chunkPosition = computeChunkBounds(source, this);
const chunkDataSize = this.chunkDataSize!;
return (
`${chunkPosition[0]}-${chunkPosition[0] + chunkDataSize[0]}_` +
`${chunkPosition[1]}-${chunkPosition[1] + chunkDataSize[1]}_` +
`${chunkPosition[2]}-${chunkPosition[2] + chunkDataSize[2]}`
);
}

initializeChunkedGraphChunk(
key: string,
chunkGridPosition: Float32Array,
Expand Down Expand Up @@ -245,6 +336,7 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
declare chunks: Map<string, ChunkedGraphChunk>;
tempChunkDataSize: Uint32Array;
tempChunkPosition: Float32Array;
leavesManyProxy: LeavesManyProxy;

httpSource = getHttpSource(
this.sharedKvStoreContext.kvStoreContext,
Expand All @@ -257,28 +349,20 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
const rank = this.spec.rank;
this.tempChunkDataSize = new Uint32Array(rank);
this.tempChunkPosition = new Float32Array(rank);
this.leavesManyProxy = new LeavesManyProxy(this.httpSource);
}

async download(chunk: ChunkedGraphChunk, signal: AbortSignal): Promise<void> {
const chunkPosition = this.computeChunkBounds(chunk);
const chunkDataSize = chunk.chunkDataSize!;
const bounds =
`${chunkPosition[0]}-${chunkPosition[0] + chunkDataSize[0]}_` +
`${chunkPosition[1]}-${chunkPosition[1] + chunkDataSize[1]}_` +
`${chunkPosition[2]}-${chunkPosition[2] + chunkDataSize[2]}`;

const { fetchOkImpl, baseUrl } = this.httpSource;
const request = fetchOkImpl(
`${baseUrl}/${chunk.segment}/leaves?int64_as_str=1&bounds=${bounds}`,
{ signal },
);
const { segment, bounds } = chunk;
if (!bounds) return;
const request = this.leavesManyProxy.request(segment, bounds, signal);
await this.withErrorMessage(
request,
`Fetching leaves of segment ${chunk.segment} in region ${bounds}: `,
)
.then((res) => res.json())
.then((res) => {
chunk.leaves = decodeChunkedGraphChunk(res.leaf_ids);
verifyStringArray(res);
chunk.leaves = decodeChunkedGraphChunk(res);
})
.catch((err) => {
if (err instanceof Error && err.name === "AbortError") return;
Expand All @@ -298,10 +382,6 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
return chunk;
}

computeChunkBounds(chunk: ChunkedGraphChunk) {
return computeChunkBounds(this, chunk);
}

async withErrorMessage<T>(
promise: Promise<T>,
errorPrefix: string,
Expand Down
Loading