@@ -22,7 +22,10 @@ import {
2222 ChunkSource ,
2323} from "#src/chunk_manager/backend.js" ;
2424import { ChunkPriorityTier , ChunkState } from "#src/chunk_manager/base.js" ;
25- import type { ChunkedGraphChunkSpecification } from "#src/datasource/graphene/base.js" ;
25+ import type {
26+ ChunkedGraphChunkSpecification ,
27+ HttpSource ,
28+ } from "#src/datasource/graphene/base.js" ;
2629import {
2730 getGrapheneFragmentKey ,
2831 GRAPHENE_MESH_NEW_SEGMENT_RPC_ID ,
@@ -65,7 +68,8 @@ import { computeChunkBounds } from "#src/sliceview/volume/backend.js";
6568import { Uint64Set } from "#src/uint64_set.js" ;
6669import { vec3 , vec3Key } from "#src/util/geom.js" ;
6770import { HttpError } from "#src/util/http_request.js" ;
68- import { parseUint64 } from "#src/util/json.js" ;
71+ import { parseUint64 , verifyStringArray } from "#src/util/json.js" ;
72+ import { Signal } from "#src/util/signal.js" ;
6973import {
7074 getBasePriority ,
7175 getPriorityTier ,
@@ -193,6 +197,73 @@ export class GrapheneMeshSource extends WithParameters(
193197 }
194198}
195199
200+ class LeavesManyProxy {
201+ pendingRequests = new Map <
202+ string ,
203+ [ Signal < ( response : any ) => void > , Uint64Set , AbortController ]
204+ > ( ) ;
205+
206+ constructor ( private httpSource : HttpSource ) { }
207+
208+ getQueueSizeForBounds ( bounds : string ) {
209+ const requestsForBounds = this . pendingRequests . get ( bounds ) ;
210+ return requestsForBounds ? requestsForBounds [ 1 ] . size : 0 ;
211+ }
212+
213+ async request (
214+ segment : bigint ,
215+ bounds : string ,
216+ signal : AbortSignal ,
217+ ) : Promise < any > {
218+ const { pendingRequests } = this ;
219+ let pendingRequest = pendingRequests . get ( bounds ) ;
220+ if ( ! pendingRequest ) {
221+ const requestSignal = new Signal < ( request : any ) => void > ( ) ;
222+ const abortController = new AbortController ( ) ;
223+ const segments = new Uint64Set ( ) ;
224+ pendingRequest = [ requestSignal , segments , abortController ] ;
225+ pendingRequests . set ( bounds , pendingRequest ) ;
226+ setTimeout ( async ( ) => {
227+ pendingRequests . delete ( bounds ) ;
228+ const { fetchOkImpl, baseUrl } = this . httpSource ;
229+ try {
230+ const response = await fetchOkImpl (
231+ `${ baseUrl } /leaves_many?int64_as_str=1&bounds=${ bounds } ` ,
232+ {
233+ method : "POST" ,
234+ body : JSON . stringify ( {
235+ node_ids : segments . toJSON ( ) ,
236+ } ) ,
237+ signal : abortController . signal ,
238+ } ,
239+ ) . then ( ( res ) => res . json ( ) ) ;
240+ requestSignal . dispatch ( response ) ;
241+ } catch ( e ) {
242+ requestSignal . dispatch ( e ) ;
243+ }
244+ } , 0 ) ;
245+ }
246+ const [ requestSignal , segments , abortController ] = pendingRequest ;
247+ segments . add ( segment ) ;
248+ signal . addEventListener ( "abort" , ( ) => {
249+ segments . delete ( segment ) ;
250+ if ( segments . size === 0 ) {
251+ abortController . abort ( ) ;
252+ }
253+ } ) ;
254+ return new Promise ( ( f , r ) => {
255+ const unregister = requestSignal . add ( ( response ) => {
256+ unregister ( ) ;
257+ if ( response instanceof Error ) {
258+ r ( response ) ;
259+ } else {
260+ f ( response [ segment . toString ( ) ] ) ;
261+ }
262+ } ) ;
263+ } ) ;
264+ }
265+ }
266+
196267export class ChunkedGraphChunk extends Chunk {
197268 chunkGridPosition : Float32Array ;
198269 source : GrapheneChunkedGraphChunkSource | null = null ;
@@ -205,6 +276,26 @@ export class ChunkedGraphChunk extends Chunk {
205276 this . chunkGridPosition = Float32Array . from ( chunkGridPosition ) ;
206277 }
207278
279+ get downloadSlots ( ) : number {
280+ const { source, bounds } = this ;
281+ if ( ! source || ! bounds ) return super . downloadSlots ;
282+ const queueSize = source . leavesManyProxy . getQueueSizeForBounds ( bounds ) ;
283+ // requests that can be bundled with a prior request are considered free
284+ return queueSize > 0 ? 0 : super . downloadSlots ;
285+ }
286+
287+ get bounds ( ) {
288+ const { source } = this ;
289+ if ( ! source ) return undefined ;
290+ const chunkPosition = computeChunkBounds ( source , this ) ;
291+ const chunkDataSize = this . chunkDataSize ! ;
292+ return (
293+ `${ chunkPosition [ 0 ] } -${ chunkPosition [ 0 ] + chunkDataSize [ 0 ] } _` +
294+ `${ chunkPosition [ 1 ] } -${ chunkPosition [ 1 ] + chunkDataSize [ 1 ] } _` +
295+ `${ chunkPosition [ 2 ] } -${ chunkPosition [ 2 ] + chunkDataSize [ 2 ] } `
296+ ) ;
297+ }
298+
208299 initializeChunkedGraphChunk (
209300 key : string ,
210301 chunkGridPosition : Float32Array ,
@@ -245,6 +336,7 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
245336 declare chunks : Map < string , ChunkedGraphChunk > ;
246337 tempChunkDataSize : Uint32Array ;
247338 tempChunkPosition : Float32Array ;
339+ leavesManyProxy : LeavesManyProxy ;
248340
249341 httpSource = getHttpSource (
250342 this . sharedKvStoreContext . kvStoreContext ,
@@ -257,28 +349,20 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
257349 const rank = this . spec . rank ;
258350 this . tempChunkDataSize = new Uint32Array ( rank ) ;
259351 this . tempChunkPosition = new Float32Array ( rank ) ;
352+ this . leavesManyProxy = new LeavesManyProxy ( this . httpSource ) ;
260353 }
261354
262355 async download ( chunk : ChunkedGraphChunk , signal : AbortSignal ) : Promise < void > {
263- const chunkPosition = this . computeChunkBounds ( chunk ) ;
264- const chunkDataSize = chunk . chunkDataSize ! ;
265- const bounds =
266- `${ chunkPosition [ 0 ] } -${ chunkPosition [ 0 ] + chunkDataSize [ 0 ] } _` +
267- `${ chunkPosition [ 1 ] } -${ chunkPosition [ 1 ] + chunkDataSize [ 1 ] } _` +
268- `${ chunkPosition [ 2 ] } -${ chunkPosition [ 2 ] + chunkDataSize [ 2 ] } ` ;
269-
270- const { fetchOkImpl, baseUrl } = this . httpSource ;
271- const request = fetchOkImpl (
272- `${ baseUrl } /${ chunk . segment } /leaves?int64_as_str=1&bounds=${ bounds } ` ,
273- { signal } ,
274- ) ;
356+ const { segment, bounds } = chunk ;
357+ if ( ! bounds ) return ;
358+ const request = this . leavesManyProxy . request ( segment , bounds , signal ) ;
275359 await this . withErrorMessage (
276360 request ,
277361 `Fetching leaves of segment ${ chunk . segment } in region ${ bounds } : ` ,
278362 )
279- . then ( ( res ) => res . json ( ) )
280363 . then ( ( res ) => {
281- chunk . leaves = decodeChunkedGraphChunk ( res . leaf_ids ) ;
364+ verifyStringArray ( res ) ;
365+ chunk . leaves = decodeChunkedGraphChunk ( res ) ;
282366 } )
283367 . catch ( ( err ) => {
284368 if ( err instanceof Error && err . name === "AbortError" ) return ;
@@ -298,10 +382,6 @@ export class GrapheneChunkedGraphChunkSource extends WithParameters(
298382 return chunk ;
299383 }
300384
301- computeChunkBounds ( chunk : ChunkedGraphChunk ) {
302- return computeChunkBounds ( this , chunk ) ;
303- }
304-
305385 async withErrorMessage < T > (
306386 promise : Promise < T > ,
307387 errorPrefix : string ,
0 commit comments