diff --git a/.changeset/middleware-store-wrappers.md b/.changeset/middleware-store-wrappers.md new file mode 100644 index 00000000..8be0e1b2 --- /dev/null +++ b/.changeset/middleware-store-wrappers.md @@ -0,0 +1,58 @@ +--- +"zarrita": minor +--- + +Introduce composable store middleware via `defineStoreMiddleware` + +**New APIs:** + +- `zarr.defineStoreMiddleware(factory)` — define a store middleware with automatic Proxy delegation. The factory receives an `AsyncReadable` store and options, returning overrides and extensions. Supports sync and async factories. +- `zarr.defineStoreMiddleware.generic()(factory)` — for middleware whose options depend on the store's request options type (e.g., `mergeOptions`). Uses `GenericOptions` interface for higher-kinded type encoding. +- `zarr.extendStore(store, ...middleware)` — compose middleware in a pipeline. Each middleware is `(store) => newStore`. Returns `Promise` to support async middleware like `withConsolidation`. + +**Renamed:** + +- `withConsolidated` → `withConsolidation` +- `tryWithConsolidated` → `withMaybeConsolidation` +- `WithConsolidatedOptions` → `ConsolidationOptions` +- `BatchedRangeStoreOptions` → `RangeBatchingOptions` +- `Stats` → `RangeBatchingStats` + +**Migration:** + +The previous exports are still available but deprecated. Update your imports: + +```ts +import * as zarr from "zarrita"; + +// Pipeline composition (use arrow functions for full type inference) +let store = await zarr.extendStore( + new zarr.FetchStore("https://..."), + zarr.withConsolidation, + (s) => zarr.withRangeBatching(s, { mergeOptions: (batch) => batch[0] }), +); +``` + +**Defining custom middleware:** + +```ts +import * as zarr from "zarrita"; + +const withCaching = zarr.defineStoreMiddleware( + (store, opts: { maxSize?: number }) => { + let cache = new Map(); + return { + async get(key, options) { + let hit = cache.get(key); + if (hit) return hit; + let result = await store.get(key, options); + if (result) cache.set(key, result); + return result; + }, + clear() { cache.clear(); }, + }; + }, +); +``` + +`BatchedRangeStore` class has been removed in favor of `withRangeBatching` built on `zarr.defineStoreMiddleware`. diff --git a/docs/.vitepress/config.mjs b/docs/.vitepress/config.mjs index 38b883c5..301b8a05 100644 --- a/docs/.vitepress/config.mjs +++ b/docs/.vitepress/config.mjs @@ -34,6 +34,7 @@ export default defineConfig({ { text: "Slicing and Indexing", link: "/slicing" }, { text: "Supported Types", link: "/supported-types" }, { text: "Cookbook", link: "/cookbook" }, + { text: "Store Middleware", link: "/store-middleware" }, ], }, { diff --git a/docs/store-middleware.md b/docs/store-middleware.md new file mode 100644 index 00000000..8fc454bb --- /dev/null +++ b/docs/store-middleware.md @@ -0,0 +1,177 @@ +# Store Middleware + +Many of the stores in [`@zarrita/storage`](/packages/storage) (like +[`FetchStore`](/packages/storage#fetchstore) and +[`FileSystemStore`](/packages/storage#filesystemstore)) handle the raw +byte-level connection with a data source. + +A common pattern is to _wrap_ a store in another store that adds behavior +(e.g., caching responses, batching range requests, or serving pre-loaded +metadata) while delegating everything else to the inner store. + +`zarr.defineStoreMiddleware` lets you define this kind of "middleware" that you can +_compose_ with base stores using `zarr.extendStore`. + +## Built-in middleware + +**zarrita** ships with middleware for consolidated metadata and range batching: + +```ts +import * as zarr from "zarrita"; + +let store = await zarr.extendStore( + new zarr.FetchStore("https://example.com/data.zarr"), + zarr.withConsolidation({ format: "v3" }), + zarr.withRangeBatching(), +); + +store.contents(); // from zarr.withConsolidation +store.stats; // from zarr.withRangeBatching +``` + +Each middleware in the pipeline wraps the previous result. `zarr.extendStore` +handles async middleware (like `zarr.withConsolidation`, which loads metadata) +automatically. It always returns a `Promise`. + +You can also use middleware directly without `zarr.extendStore`: + +```ts +let consolidated = await zarr.withConsolidation( + new zarr.FetchStore("https://example.com/data.zarr"), + { format: "v3" }, +); +``` + +## Defining your own middleware + +Use `zarr.defineStoreMiddleware` to define custom middleware. The factory function receives +the inner store and custom options, and returns an object with method overrides +and/or new methods. Anything not returned is automatically delegated to the +inner store via `Proxy`. + +```ts +import * as zarr from "zarrita"; + +const withCaching = zarr.defineStoreMiddleware( + (store: zarr.AsyncReadable, opts: { maxSize?: number } = {}) => { + let cache = new Map(); + return { + async get(key: zarr.AbsolutePath, options?: unknown) { + let hit = cache.get(key); + if (hit) return hit; + let result = await store.get(key, options); + if (result) cache.set(key, result); + return result; + }, + clear() { + cache.clear(); + }, + }; + }, +); + +let store = withCaching(new zarr.FetchStore("https://..."), { maxSize: 256 }); +store.clear(); // new method from the middleware +``` + +The returned middleware supports a **dual API**: call it directly with `(store, +opts)` or curry it with `(opts)` for use in `zarr.extendStore` pipelines: + +```ts +// Direct +let store = withCaching(new zarr.FetchStore("https://..."), { maxSize: 256 }); + +// Curried (for zarr.extendStore) +let store = await zarr.extendStore( + new zarr.FetchStore("https://..."), + withCaching({ maxSize: 256 }), +); +``` + +Middleware can be **sync or async**. If the factory returns a `Promise`, the +wrapper returns a `Promise` too: + +```ts +const withMetadata = zarr.defineStoreMiddleware( + async (store: zarr.AsyncReadable, opts: { key: string }) => { + let meta = JSON.parse(new TextDecoder().decode(await store.get(opts.key))); + return { + metadata() { return meta; }, + }; + }, +); + +let store = await withMetadata(rawStore, { key: "/meta.json" }); +store.metadata(); // loaded during initialization +``` + +## Store options and generics + +Stores are generic over their request options type. For example, +[`zarr.FetchStore`](/packages/storage#fetchstore) uses `RequestInit` so you can +pass headers or an `AbortSignal` to individual requests. Most middleware +doesn't need to know about this type, and `zarr.defineStoreMiddleware` preserves it +automatically through the chain. + +Sometimes, though, middleware options _depend_ on the store's request options. +For example, `zarr.withRangeBatching` has a `mergeOptions` callback that +combines request options from concurrent callers, and its parameter type +should match the store's options. + +This is an advanced typing feature purely for caller convenience. It ensures +users get proper type inference and autocomplete on options that reference +the store's request type. Use `zarr.defineStoreMiddleware.generic` with a +`zarr.GenericOptions` interface that maps the store's options type into your +middleware options: + +```ts +import * as zarr from "zarrita"; + +// 1. Define your options as a normal generic interface +interface LoggingOptions { + label?: string; + formatOptions?: (opts: O) => string; +} + +// 2. Create the type lambda (one line that wires up the generic) +interface LoggingOptsFor extends zarr.GenericOptions { + readonly options: LoggingOptions; +} + +// 3. Define the middleware +const withLogging = zarr.defineStoreMiddleware.generic()( + (store, opts: LoggingOptions = {}) => { + let label = opts.label ?? "store"; + let format = opts.formatOptions ?? String; + return { + async get(key: zarr.AbsolutePath, options?: unknown) { + console.log(`[${label}] get ${key} ${format(options)}`); + return store.get(key, options); + }, + }; + }, +); +``` + +At the call site, `formatOptions` receives the store's options type: + +```ts +let store = withLogging(new zarr.FetchStore("https://..."), { + label: "my-store", + formatOptions: (opts) => { + // ^^^^ typed as RequestInit + return opts.method ?? "GET"; + }, +}); +``` + +This is an advanced pattern. Most middleware won't need it. It exists so +that _users_ of your middleware get proper type inference and autocomplete +when their options reference the store's request type. If your options don't +depend on the store type, use the simpler `zarr.defineStoreMiddleware` and skip the +`zarr.GenericOptions` boilerplate entirely. + +Under the hood, `zarr.GenericOptions` uses TypeScript's `this` types to encode +a [higher-kinded type](https://www.typescriptlang.org/docs/handbook/2/classes.html#this-types): +`this["_O"]` refers to the store's request options, which gets substituted with +the concrete type (e.g., `RequestInit`) at the call site. diff --git a/packages/zarrita/__tests__/batched-fetch.test.ts b/packages/zarrita/__tests__/batched-fetch.test.ts index 7bb74d24..3a399d9c 100644 --- a/packages/zarrita/__tests__/batched-fetch.test.ts +++ b/packages/zarrita/__tests__/batched-fetch.test.ts @@ -1,6 +1,6 @@ import type { AbsolutePath, RangeQuery } from "@zarrita/storage"; import { describe, expect, it, vi } from "vitest"; -import { withRangeBatching } from "../src/batched-fetch.js"; +import { withRangeBatching } from "../src/middleware/range-batching.js"; /** * Create a fake store with controllable getRange. diff --git a/packages/zarrita/__tests__/consolidated.test.ts b/packages/zarrita/__tests__/consolidated.test.ts index 467a5b87..48e7a006 100644 --- a/packages/zarrita/__tests__/consolidated.test.ts +++ b/packages/zarrita/__tests__/consolidated.test.ts @@ -2,17 +2,20 @@ import * as path from "node:path"; import * as url from "node:url"; import { FileSystemStore } from "@zarrita/storage"; import { assert, describe, expect, it } from "vitest"; -import { tryWithConsolidated, withConsolidated } from "../src/consolidated.js"; import { NodeNotFoundError } from "../src/errors.js"; import { Array as ZarrArray } from "../src/hierarchy.js"; +import { + withConsolidation, + withMaybeConsolidation, +} from "../src/middleware/consolidation.js"; import { open } from "../src/open.js"; let __dirname = path.dirname(url.fileURLToPath(import.meta.url)); -describe("withConsolidated", () => { +describe("withConsolidation", () => { it("loads consolidated metadata", async () => { let root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); - let store = await withConsolidated(new FileSystemStore(root)); + let store = await withConsolidation(new FileSystemStore(root)); let map = new Map(store.contents().map((x) => [x.path, x.kind])); expect(map).toMatchInlineSnapshot(` Map { @@ -53,7 +56,7 @@ describe("withConsolidated", () => { it("loads chunk data from underlying store", async () => { let root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); - let store = await withConsolidated(new FileSystemStore(root)); + let store = await withConsolidation(new FileSystemStore(root)); // biome-ignore lint/style/noNonNullAssertion: Fine for a test let entry = store .contents() @@ -91,7 +94,7 @@ describe("withConsolidated", () => { it("loads and navigates from root", async () => { let path_root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); - let store = await withConsolidated(new FileSystemStore(path_root)); + let store = await withConsolidation(new FileSystemStore(path_root)); let grp = await open(store, { kind: "group" }); expect(grp.kind).toBe("group"); let arr = await open(grp.resolve("1d.chunked.i2"), { kind: "array" }); @@ -104,7 +107,7 @@ describe("withConsolidated", () => { "../../../fixtures/v2/data.zarr/3d.contiguous.i2", ); let try_open = () => - withConsolidated(new FileSystemStore(root), { format: "v2" }); + withConsolidation(new FileSystemStore(root), { format: "v2" }); await expect(try_open).rejects.toThrowError(NodeNotFoundError); await expect(try_open).rejects.toThrowErrorMatchingInlineSnapshot( "[NodeNotFoundError: Node not found: v2 consolidated metadata]", @@ -112,14 +115,14 @@ describe("withConsolidated", () => { }); }); -describe("withConsolidated (v3)", () => { +describe("withConsolidation (v3)", () => { let v3root = path.join( __dirname, "../../../fixtures/v3/data.zarr/consolidated", ); it("loads v3 consolidated metadata", async () => { - let store = await withConsolidated(new FileSystemStore(v3root), { + let store = await withConsolidation(new FileSystemStore(v3root), { format: "v3", }); let map = new Map(store.contents().map((x) => [x.path, x.kind])); @@ -135,7 +138,7 @@ describe("withConsolidated (v3)", () => { }); it("loads chunk data from underlying store", async () => { - let store = await withConsolidated(new FileSystemStore(v3root), { + let store = await withConsolidation(new FileSystemStore(v3root), { format: "v3", }); let grp = await open(store, { kind: "group" }); @@ -158,7 +161,7 @@ describe("withConsolidated (v3)", () => { }); it("loads and navigates from root", async () => { - let store = await withConsolidated(new FileSystemStore(v3root), { + let store = await withConsolidation(new FileSystemStore(v3root), { format: "v3", }); let grp = await open(store, { kind: "group" }); @@ -171,18 +174,18 @@ describe("withConsolidated (v3)", () => { it("throws if v3 consolidated metadata is missing", async () => { let root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); let try_open = () => - withConsolidated(new FileSystemStore(root), { format: "v3" }); + withConsolidation(new FileSystemStore(root), { format: "v3" }); await expect(try_open).rejects.toThrowError(NodeNotFoundError); }); }); -describe("withConsolidated (format array)", () => { +describe("withConsolidation (format array)", () => { it("tries formats in order", async () => { let root = path.join( __dirname, "../../../fixtures/v3/data.zarr/consolidated", ); - let store = await withConsolidated(new FileSystemStore(root), { + let store = await withConsolidation(new FileSystemStore(root), { format: ["v3", "v2"], }); let contents = store.contents(); @@ -193,7 +196,7 @@ describe("withConsolidated (format array)", () => { it("falls back to second format", async () => { let root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); - let store = await withConsolidated(new FileSystemStore(root), { + let store = await withConsolidation(new FileSystemStore(root), { format: ["v3", "v2"], }); let contents = store.contents(); @@ -201,10 +204,10 @@ describe("withConsolidated (format array)", () => { }); }); -describe("tryWithConsolidated", () => { +describe("withMaybeConsolidation", () => { it("creates Listable from consolidated store", async () => { let root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); - let store = await tryWithConsolidated(new FileSystemStore(root)); + let store = await withMaybeConsolidation(new FileSystemStore(root)); expect(store).toHaveProperty("contents"); }); @@ -213,13 +216,13 @@ describe("tryWithConsolidated", () => { __dirname, "../../../fixtures/v2/data.zarr/3d.contiguous.i2", ); - let store = await tryWithConsolidated(new FileSystemStore(root)); + let store = await withMaybeConsolidation(new FileSystemStore(root)); expect(store).toBeInstanceOf(FileSystemStore); }); it("supports a zmetadataKey option", async () => { let root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); - let store = await tryWithConsolidated(new FileSystemStore(root), { + let store = await withMaybeConsolidation(new FileSystemStore(root), { metadataKey: ".zmetadata", }); expect(store).toHaveProperty("contents"); @@ -227,7 +230,7 @@ describe("tryWithConsolidated", () => { it("falls back to original store if metadataKey is incorrect", async () => { let root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); - let store = await tryWithConsolidated(new FileSystemStore(root), { + let store = await withMaybeConsolidation(new FileSystemStore(root), { metadataKey: ".nonexistent", }); expect(store).toBeInstanceOf(FileSystemStore); @@ -236,12 +239,12 @@ describe("tryWithConsolidated", () => { describe("Listable.getRange", () => { it("does not expose getRange if the underlying store does not support it", async () => { - let store = await tryWithConsolidated(new Map()); + let store = await withMaybeConsolidation(new Map()); expect("getRange" in store).toBeFalsy(); }); it("retrieves a byte range from an underlying store", async () => { let root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); - let store = await tryWithConsolidated(new FileSystemStore(root)); + let store = await withMaybeConsolidation(new FileSystemStore(root)); assert(typeof store.getRange === "function"); }); }); diff --git a/packages/zarrita/__tests__/middleware-types.test.ts b/packages/zarrita/__tests__/middleware-types.test.ts new file mode 100644 index 00000000..464f06bf --- /dev/null +++ b/packages/zarrita/__tests__/middleware-types.test.ts @@ -0,0 +1,144 @@ +import type { AbsolutePath, AsyncReadable } from "@zarrita/storage"; +import { expectType } from "tintype"; +import { describe, test } from "vitest"; +import * as zarr from "../src/index.js"; +import { defineStoreMiddleware } from "../src/middleware/define.js"; + +describe("extendStore", () => { + test("no middleware returns store as-is", () => { + let store = zarr.extendStore(new zarr.FetchStore("")); + expectType(store).toMatchInlineSnapshot(`Promise`); + }); + + test("direct form in pipeline infers store options", () => { + let store = zarr.extendStore(new zarr.FetchStore(""), (s) => + zarr.withRangeBatching(s, { + mergeOptions: (batch) => { + expectType(batch).toMatchInlineSnapshot( + `ReadonlyArray`, + ); + return batch[0]; + }, + }), + ); + expectType(store).toMatchInlineSnapshot(` + Promise< + Required> & { + stats: Readonly; + url: string | URL; + } + > + `); + }); + + test("no-config middleware can be passed uncalled", () => { + function check() { + return zarr.extendStore( + new zarr.FetchStore(""), + zarr.withConsolidation, + (s) => zarr.withRangeBatching(s, { mergeOptions: (batch) => batch[0] }), + ); + } + expectType(check).toMatchInlineSnapshot(` + () => Promise< + Required> & { + stats: Readonly; + url: string | URL; + contents: () => { path: AbsolutePath; kind: "array" | "group" }[]; + } + > + `); + }); +}); + +describe("defineStoreMiddleware", () => { + test("simple: extensions appear, store methods preserved", () => { + let withCustom = defineStoreMiddleware( + (store: AsyncReadable, _opts: { flag: boolean }) => { + return { + async get(key: AbsolutePath) { + return store.get(key); + }, + hello(): string { + return "world"; + }, + }; + }, + ); + let store = withCustom(new zarr.FetchStore(""), { flag: true }); + expectType(store).toMatchInlineSnapshot( + ` + Required> & { + url: string | URL; + hello: () => string; + } + `, + ); + }); + + test("generic: store Options flows into opts parameter", () => { + interface ThingOptions { + storeOptions?: O; + retries?: number; + } + + let withThing = defineStoreMiddleware( + (store: AsyncReadable, opts: ThingOptions) => { + return { + async get(key: AbsolutePath, options?: O) { + return store.get(key, options ?? opts.storeOptions); + }, + retries: opts.retries ?? 3, + }; + }, + ); + + let store = withThing(new zarr.FetchStore(""), { + storeOptions: { signal: AbortSignal.timeout(1000) }, + retries: 5, + }); + expectType(store).toMatchInlineSnapshot( + ` + Required> & { + url: string | URL; + retries: number; + } + `, + ); + }); + + test("chaining preserves Options through wrappers", () => { + let withA = defineStoreMiddleware( + (store: AsyncReadable, _opts: { a: number }) => { + return { + async get(key: AbsolutePath) { + return store.get(key); + }, + methodA(): number { + return 1; + }, + }; + }, + ); + let withB = defineStoreMiddleware( + (store: AsyncReadable, _opts: { b: string }) => { + return { + async get(key: AbsolutePath) { + return store.get(key); + }, + methodB(): string { + return "hello"; + }, + }; + }, + ); + let store = withB(withA(new zarr.FetchStore(""), { a: 1 }), { b: "x" }); + expectType(store).toMatchInlineSnapshot(` + Required> & { + url: string | URL; + methodB: () => string; + methodA: () => number; + } + `); + }); +}); diff --git a/packages/zarrita/src/batched-fetch.ts b/packages/zarrita/src/batched-fetch.ts deleted file mode 100644 index 35448816..00000000 --- a/packages/zarrita/src/batched-fetch.ts +++ /dev/null @@ -1,328 +0,0 @@ -import type { AbsolutePath, AsyncReadable, RangeQuery } from "@zarrita/storage"; - -/** - * Simple LRU cache using Map insertion order. - * Oldest entry (first key) is evicted when capacity is exceeded. - */ -class LRUCache { - #map = new Map(); - #max: number; - - constructor(max: number) { - this.#max = max; - } - - has(key: string): boolean { - return this.#map.has(key); - } - - get(key: string): V | undefined { - if (!this.#map.has(key)) { - return undefined; - } - let value = this.#map.get(key) as V; - // Move to end (most recently used) - this.#map.delete(key); - this.#map.set(key, value); - return value; - } - - set(key: string, value: V): void { - this.#map.delete(key); - this.#map.set(key, value); - if (this.#map.size > this.#max) { - // Evict oldest (first inserted) - let first = this.#map.keys().next().value; - if (first !== undefined) { - this.#map.delete(first); - } - } - } - - clear(): void { - this.#map.clear(); - } -} - -interface PendingRequest { - offset: number; - length: number; - resolve: (value: Uint8Array | undefined) => void; - reject: (reason: unknown) => void; -} - -interface RangeGroup { - offset: number; - length: number; - requests: PendingRequest[]; -} - -export interface Stats { - hits: number; - misses: number; - mergedRequests: number; - batchedRequests: number; -} - -export interface BatchedRangeStoreOptions { - /** Maximum number of entries in the LRU cache (default: 256). */ - cacheSize?: number; - /** Byte gap threshold for merging adjacent ranges (default: 32768). */ - coalesceSize?: number; - /** - * Merge options from all callers in a batch into a single value passed to - * the inner store. By default, the first caller's options win. Use this to - * combine `AbortSignal`s, merge headers, etc. - */ - mergeOptions?: ( - batch: ReadonlyArray, - ) => Options | undefined; -} - -/** - * Default coalesce size (in bytes): two requests separated by less than this - * are merged into a single fetch. Fetching across a small gap is cheaper than - * an extra round trip. 32 KB matches geotiff.js's BlockedSource heuristic and - * Rust object_store's `OBJECT_STORE_COALESCE_DEFAULT`. - */ -const DEFAULT_COALESCE_SIZE = 32768; - -/** - * Groups sorted requests into contiguous ranges, coalescing across small gaps. - * Modelled after geotiff.js BlockedSource.groupBlocks(). - */ -function groupRequests( - sorted: PendingRequest[], - coalesceSize: number, -): RangeGroup[] { - if (sorted.length === 0) { - return []; - } - let groups: RangeGroup[] = []; - let current = [sorted[0]]; - let groupStart = sorted[0].offset; - let groupEnd = sorted[0].offset + sorted[0].length; - - for (let i = 1; i < sorted.length; i++) { - let req = sorted[i]; - let reqEnd = req.offset + req.length; - if (req.offset <= groupEnd + coalesceSize) { - current.push(req); - groupEnd = Math.max(groupEnd, reqEnd); - } else { - groups.push({ - offset: groupStart, - length: groupEnd - groupStart, - requests: current, - }); - current = [req]; - groupStart = req.offset; - groupEnd = reqEnd; - } - } - groups.push({ - offset: groupStart, - length: groupEnd - groupStart, - requests: current, - }); - return groups; -} - -/** - * A store wrapper that batches concurrent `getRange()` calls within a single - * microtask tick, merges adjacent byte ranges, and caches results in an LRU - * cache. The cache assumes immutable data — there is no invalidation or TTL. - * - * Inspired by geotiff.js - * {@link https://github.com/geotiffjs/geotiff.js/blob/master/src/source/blockedsource.js BlockedSource}. - * - * @remarks By default, `options` (e.g. `AbortSignal`, custom headers) are - * taken from the first `getRange()` call that schedules a flush tick. Pass a - * `mergeOptions` reducer to combine options from all callers in a batch. - * - * @see {@link withRangeBatching} - */ -export class BatchedRangeStore - implements AsyncReadable -{ - #inner: AsyncReadable; - #innerGetRange: NonNullable["getRange"]>; - #pending: Map = new Map(); - #scheduled = false; - #batchOptions: (Options | undefined)[] = []; - #mergeOptions: - | ((batch: ReadonlyArray) => Options | undefined) - | undefined; - #coalesceSize: number; - #cache: LRUCache; - #inflight: Map> = new Map(); - - #stats: Stats = { hits: 0, misses: 0, mergedRequests: 0, batchedRequests: 0 }; - - get stats(): Readonly { - return { ...this.#stats }; - } - - constructor( - inner: AsyncReadable, - options?: BatchedRangeStoreOptions, - ) { - if (!inner.getRange) { - throw new Error("BatchedRangeStore requires a store with getRange"); - } - this.#inner = inner; - this.#innerGetRange = inner.getRange.bind(inner); - this.#coalesceSize = options?.coalesceSize ?? DEFAULT_COALESCE_SIZE; - this.#cache = new LRUCache(options?.cacheSize ?? 256); - this.#mergeOptions = options?.mergeOptions; - } - - get(key: AbsolutePath, options?: Options): Promise { - return this.#inner.get(key, options); - } - - getRange( - key: AbsolutePath, - range: RangeQuery, - options?: Options, - ): Promise { - // Suffix requests (shard index reads) bypass batching - file size - // is unknown until the response arrives. - if ("suffixLength" in range) { - let cacheKey = `${key}\0suffix\0${range.suffixLength}`; - if (this.#cache.has(cacheKey)) { - this.#stats.hits++; - return Promise.resolve(this.#cache.get(cacheKey)); - } - // Deduplicate concurrent suffix requests (same pattern as #346) - let inflight = this.#inflight.get(cacheKey); - if (inflight) { - this.#stats.hits++; - return inflight; - } - this.#stats.misses++; - let promise = this.#innerGetRange(key, range, options) - .then((data) => { - this.#cache.set(cacheKey, data); - this.#inflight.delete(cacheKey); - return data; - }) - .catch((err) => { - this.#inflight.delete(cacheKey); - throw err; - }); - this.#inflight.set(cacheKey, promise); - return promise; - } - - let { offset, length } = range; - let cacheKey = `${key}\0${offset}\0${length}`; - - if (this.#cache.has(cacheKey)) { - this.#stats.hits++; - return Promise.resolve(this.#cache.get(cacheKey)); - } - - this.#stats.misses++; - this.#stats.batchedRequests++; - - return new Promise((resolve, reject) => { - let pending = this.#pending.get(key); - if (!pending) { - pending = []; - this.#pending.set(key, pending); - } - pending.push({ offset, length, resolve, reject }); - - this.#batchOptions.push(options); - if (!this.#scheduled) { - this.#scheduled = true; - queueMicrotask(() => this.#flush()); - } - }); - } - - async #flush(): Promise { - let batch = this.#batchOptions; - this.#batchOptions = []; - let work = new Map(this.#pending); - this.#pending.clear(); - this.#scheduled = false; - - let options: Options | undefined; - try { - options = this.#mergeOptions ? this.#mergeOptions(batch) : batch[0]; - } catch (err) { - for (let requests of work.values()) { - for (let req of requests) req.reject(err); - } - return; - } - - let pathPromises: Promise[] = []; - for (let [path, requests] of work) { - requests.sort((a, b) => a.offset - b.offset); - let groups = groupRequests(requests, this.#coalesceSize); - this.#stats.mergedRequests += groups.length; - pathPromises.push(this.#fetchGroups(path, groups, options)); - } - await Promise.all(pathPromises); - } - - async #fetchGroups( - path: AbsolutePath, - groups: RangeGroup[], - options?: Options, - ): Promise { - await Promise.all( - groups.map(async (group) => { - try { - let data = await this.#innerGetRange( - path, - { offset: group.offset, length: group.length }, - options, - ); - if (data && data.length < group.length) { - throw new Error( - `Short read: expected ${group.length} bytes but received ${data.length}`, - ); - } - for (let req of group.requests) { - let cacheKey = `${path}\0${req.offset}\0${req.length}`; - if (!data) { - this.#cache.set(cacheKey, undefined); - req.resolve(undefined); - continue; - } - let start = req.offset - group.offset; - let slice = data.slice(start, start + req.length); - this.#cache.set(cacheKey, slice); - req.resolve(slice); - } - } catch (err) { - for (let req of group.requests) { - req.reject(err); - } - } - }), - ); - } -} - -/** - * Wraps a store with range-batching: concurrent `getRange()` calls within a - * single microtask tick are merged into fewer HTTP requests and cached in an - * LRU cache. - * - * ```typescript - * import * as zarr from "zarrita"; - * - * let store = zarr.withRangeBatching(new zarr.FetchStore("https://example.com/data.zarr")); - * ``` - */ -export function withRangeBatching( - store: AsyncReadable, - options?: BatchedRangeStoreOptions, -): BatchedRangeStore { - return new BatchedRangeStore(store, options); -} diff --git a/packages/zarrita/src/index.ts b/packages/zarrita/src/index.ts index 743d4977..775b9b23 100644 --- a/packages/zarrita/src/index.ts +++ b/packages/zarrita/src/index.ts @@ -1,19 +1,9 @@ // re-export all the storage interface types export type * from "@zarrita/storage"; // re-export fetch store from storage -export { default as FetchStore } from "@zarrita/storage/fetch"; -export type { - BatchedRangeStoreOptions, - Stats as RangeBatchingStats, -} from "./batched-fetch.js"; -export { BatchedRangeStore, withRangeBatching } from "./batched-fetch.js"; +export { FetchStore, FileSystemStore } from "@zarrita/storage"; +// core export { registry } from "./codecs.js"; -export type { - ConsolidatedFormat, - Listable, - WithConsolidatedOptions, -} from "./consolidated.js"; -export { tryWithConsolidated, withConsolidated } from "./consolidated.js"; export { create } from "./create.js"; export { KeyError, NodeNotFoundError } from "./errors.js"; export { Array, Group, Location, root } from "./hierarchy.js"; @@ -35,6 +25,31 @@ export { sliceIndices as _zarrita_internal_sliceIndices, } from "./indexing/util.js"; export type * from "./metadata.js"; +/** @deprecated Use {@linkcode ConsolidationOptions} instead. */ +export type { + ConsolidatedFormat, + ConsolidationOptions, + ConsolidationOptions as WithConsolidatedOptions, + Listable, +} from "./middleware/consolidation.js"; +// deprecated re-exports +/** @deprecated Use {@linkcode withConsolidation} instead. */ +/** @deprecated Use {@linkcode withMaybeConsolidation} instead. */ +export { + withConsolidation, + withConsolidation as withConsolidated, + withMaybeConsolidation, + withMaybeConsolidation as tryWithConsolidated, +} from "./middleware/consolidation.js"; +export type { GenericOptions } from "./middleware/define.js"; +export { defineStoreMiddleware } from "./middleware/define.js"; +export { extendStore } from "./middleware/extend-store.js"; +export type { + RangeBatchingOptions, + RangeBatchingStats, +} from "./middleware/range-batching.js"; +// middleware +export { withRangeBatching } from "./middleware/range-batching.js"; export { open } from "./open.js"; export { BoolArray, diff --git a/packages/zarrita/src/consolidated.ts b/packages/zarrita/src/middleware/consolidation.ts similarity index 54% rename from packages/zarrita/src/consolidated.ts rename to packages/zarrita/src/middleware/consolidation.ts index 9e7f6861..bd717aca 100644 --- a/packages/zarrita/src/consolidated.ts +++ b/packages/zarrita/src/middleware/consolidation.ts @@ -1,5 +1,5 @@ -import type { AbsolutePath, Readable } from "@zarrita/storage"; -import { KeyError, NodeNotFoundError } from "./errors.js"; +import type { AbsolutePath, AsyncReadable, Readable } from "@zarrita/storage"; +import { JsonDecodeError, KeyError, NodeNotFoundError } from "../errors.js"; import type { ArrayMetadata, ArrayMetadataV2, @@ -7,14 +7,15 @@ import type { DataType, GroupMetadata, GroupMetadataV2, -} from "./metadata.js"; -import { VERSION_COUNTER } from "./open.js"; +} from "../metadata.js"; +import { VERSION_COUNTER } from "../open.js"; import { ensureCorrectScalar, jsonDecodeObject, jsonEncodeObject, rethrowUnless, -} from "./util.js"; +} from "../util.js"; +import { defineStoreMiddleware } from "./define.js"; type ConsolidatedMetadataV2 = { metadata: Record; @@ -58,23 +59,11 @@ function isConsolidatedV3(meta: unknown): meta is GroupMetadata & { ); } -/** - * Represents a read-only store that can list its contents. - */ -export interface Listable { - /** Get the bytes at a given path. */ - get: (...args: Parameters) => Promise; - /** Get a byte range at a given path. */ - getRange: Store["getRange"]; - /** List the contents of the store. */ - contents(): { path: AbsolutePath; kind: "array" | "group" }[]; -} - /** The format of consolidated metadata to use. */ export type ConsolidatedFormat = "v2" | "v3"; -/** Options for {@linkcode withConsolidated} and {@linkcode tryWithConsolidated}. */ -export interface WithConsolidatedOptions { +/** Options for {@linkcode withConsolidation} and {@linkcode withMaybeConsolidation}. */ +export interface ConsolidationOptions { /** * The format(s) of consolidated metadata to try. * @@ -157,7 +146,6 @@ async function loadConsolidatedV3( }); } let knownMeta: Record = {}; - // Add root group metadata knownMeta["/zarr.json"] = { zarr_format: 3, node_type: "group", @@ -166,7 +154,6 @@ async function loadConsolidatedV3( for (let [path, meta] of Object.entries( rootMeta.consolidated_metadata.metadata, )) { - // Normalize path: ensure it starts with / let normalized = path.startsWith("/") ? path : `/${path}`; let key = `${normalized}/zarr.json` as AbsolutePath; if (meta.node_type === "array") { @@ -186,117 +173,104 @@ function resolveFormats( if (format !== undefined) { return globalThis.Array.isArray(format) ? format : [format]; } - // Auto-detect: use version counter to decide priority let versionMax = VERSION_COUNTER.versionMax(store); return versionMax === "v3" ? ["v3", "v2"] : ["v2", "v3"]; } -function createListable( - store: Store, - knownMeta: Record, -): Listable { - return { - async get( - ...args: Parameters - ): Promise { - let [key, opts] = args; - if (knownMeta[key]) { - return jsonEncodeObject(knownMeta[key]); - } - let maybeBytes = await store.get(key, opts); - if (isMetaKey(key) && maybeBytes) { - let meta = jsonDecodeObject(maybeBytes); - knownMeta[key] = meta; - } - return maybeBytes; - }, - getRange: store.getRange?.bind(store), - contents(): { path: AbsolutePath; kind: "array" | "group" }[] { - let contents: { path: AbsolutePath; kind: "array" | "group" }[] = []; - for (let [key, value] of Object.entries(knownMeta)) { - let parts = key.split("/"); - let filename = parts.pop(); - let path = (parts.join("/") || "/") as AbsolutePath; - if (filename === ".zarray") contents.push({ path, kind: "array" }); - if (filename === ".zgroup") contents.push({ path, kind: "group" }); - if (isV3(value)) { - contents.push({ path, kind: value.node_type }); - } - } - return contents; - }, - }; -} +/** A store augmented with a `contents()` method from consolidated metadata. */ +export type Listable = Store & { + contents(): { path: AbsolutePath; kind: "array" | "group" }[]; +}; /** - * Open a consolidated store. - * - * Supports both Zarr v2 consolidated metadata (`.zmetadata`) and - * v3 consolidated metadata (`zarr.json` with `consolidated_metadata`). + * Wraps a store with consolidated metadata, enabling efficient listing and + * metadata access without extra network requests. * - * @param store The store to open. - * @param opts Options object. - * @returns A listable store. + * Supports Zarr v2 (`.zmetadata`) and v3 (`zarr.json` with + * `consolidated_metadata`). Throws if no consolidated metadata is found. * * @example - * ```js - * // Auto-detect format (default) - * let store = await withConsolidated( - * new zarr.FetchStore("https://my-bucket.s3.amazonaws.com") - * ); - * - * // Explicit v2 - * let store = await withConsolidated(rawStore, { format: "v2" }); + * ```ts + * // Direct + * let store = await zarr.withConsolidation(new zarr.FetchStore("https://...")); * - * // Explicit v3 - * let store = await withConsolidated(rawStore, { format: "v3" }); + * // With options + * let store = await zarr.withConsolidation(rawStore, { format: "v3" }); * - * // Try v3 first, then v2 - * let store = await withConsolidated(rawStore, { format: ["v3", "v2"] }); + * // In a pipeline + * let store = await zarr.extendStore( + * new zarr.FetchStore("https://..."), + * (s) => zarr.withConsolidation(s, { format: "v3" }), + * ); * - * store.contents(); // [{ path: "/", kind: "group" }, { path: "/foo", kind: "array" }, ...] + * store.contents(); // [{ path: "/", kind: "group" }, ...] * ``` */ -export async function withConsolidated( - store: Store, - opts: WithConsolidatedOptions = {}, -): Promise> { - let formats = resolveFormats(store, opts.format); - let lastError: unknown; - for (let format of formats) { - try { - let knownMeta = - format === "v2" - ? await loadConsolidatedV2(store, opts.metadataKey) - : await loadConsolidatedV3(store); - return createListable(store, knownMeta); - } catch (err) { - rethrowUnless(err, NodeNotFoundError); - lastError = err; +export const withConsolidation = defineStoreMiddleware( + async (store, opts: ConsolidationOptions = {}) => { + let formats = resolveFormats(store, opts.format); + let lastError: unknown; + for (let format of formats) { + try { + let knownMeta = + format === "v2" + ? await loadConsolidatedV2(store, opts.metadataKey) + : await loadConsolidatedV3(store); + return { + async get( + key: AbsolutePath, + options?: unknown, + ): Promise { + if (knownMeta[key]) { + return jsonEncodeObject(knownMeta[key]); + } + let maybeBytes = await store.get(key, options); + if (isMetaKey(key) && maybeBytes) { + knownMeta[key] = jsonDecodeObject(maybeBytes); + } + return maybeBytes; + }, + contents(): { path: AbsolutePath; kind: "array" | "group" }[] { + let contents: { + path: AbsolutePath; + kind: "array" | "group"; + }[] = []; + for (let [key, value] of Object.entries(knownMeta)) { + let parts = key.split("/"); + let filename = parts.pop(); + let path = (parts.join("/") || "/") as AbsolutePath; + if (filename === ".zarray") + contents.push({ path, kind: "array" }); + if (filename === ".zgroup") + contents.push({ path, kind: "group" }); + if (isV3(value)) { + contents.push({ path, kind: value.node_type }); + } + } + return contents; + }, + }; + } catch (err) { + rethrowUnless(err, NodeNotFoundError, JsonDecodeError); + lastError = err; + } } - } - throw lastError; -} + throw lastError; + }, +); /** - * Try to open a consolidated store, but fall back to the original store if the - * consolidated metadata is missing. - * - * Provides a convenient way to open a store that may or may not have consolidated, - * returning a consistent interface for both cases. Ideal for usage senarios with - * known access paths, since store with consolidated metadata do not incur - * additional network requests when accessing underlying groups and arrays. - * - * @param store The store to open. - * @param opts Options to pass to withConsolidated. - * @returns A listable store. + * Like {@linkcode withConsolidation}, but falls back to the original store if + * no consolidated metadata is found (instead of throwing). */ -export async function tryWithConsolidated( +export async function withMaybeConsolidation( store: Store, - opts: WithConsolidatedOptions = {}, + opts: ConsolidationOptions = {}, ): Promise | Store> { - return withConsolidated(store, opts).catch((error: unknown) => { - rethrowUnless(error, NodeNotFoundError); - return store; - }); + return (withConsolidation(store, opts) as Promise>).catch( + (error: unknown) => { + rethrowUnless(error, NodeNotFoundError, JsonDecodeError); + return store; + }, + ); } diff --git a/packages/zarrita/src/middleware/define.ts b/packages/zarrita/src/middleware/define.ts new file mode 100644 index 00000000..a97c4aa4 --- /dev/null +++ b/packages/zarrita/src/middleware/define.ts @@ -0,0 +1,190 @@ +import type { AsyncReadable } from "@zarrita/storage"; + +/** Store keys stripped from Extensions to avoid poisoning Options. */ +type StoreKeys = "get" | "getRange"; + +/** Strip store keys from the extension type so Options aren't poisoned. */ +type Extensions = { + [K in Extract, string>]: T[K]; +} & {}; + +/** + * Base interface for defining middleware options that depend on the store's + * request options type. Extend this and override `options` using `this["_O"]`: + * + * ```ts + * interface MyOpts extends GenericOptions { + * readonly options: { storeOptions?: this["_O"]; retries?: number }; + * } + * ``` + */ +export interface GenericOptions { + readonly _O: unknown; + readonly options: unknown; +} + +/** Apply a type lambda — substitutes `_O` with a concrete type. */ +type Apply = (F & { readonly _O: O })["options"]; + +/** Extract the Options type from a store. */ +// biome-ignore lint/suspicious/noExplicitAny: required for conditional type matching +type InferStoreOpts = S extends { get(key: any, opts?: infer O): any } + ? O + : unknown; + +/** If factory returns a Promise, wrap the result in Promise; otherwise keep it sync. */ +type Prettify = { [K in keyof T]: T[K] } & {}; + +/** Collapse get/getRange into AsyncReadable, show extras separately. */ +type CollapseStore = + T extends AsyncReadable + ? // biome-ignore lint/suspicious/noExplicitAny: needed for getRange optionality check + (T extends { getRange: (...args: any[]) => any } + ? Required> + : AsyncReadable) & + Prettify> + : T; + +type WrapperResult = + Ext extends Promise + ? Promise>> + : CollapseStore>; + +function createProxy( + store: object, + overrides: Record, +) { + return new Proxy(store, { + get(target, prop, receiver) { + if (prop in overrides) { + return overrides[prop]; + } + return Reflect.get(target, prop, receiver); + }, + has(target, prop) { + return prop in overrides || Reflect.has(target, prop); + }, + ownKeys(target) { + let keys = new Set([ + ...Reflect.ownKeys(target), + ...Object.keys(overrides), + ]); + return [...keys]; + }, + getOwnPropertyDescriptor(target, prop) { + if (prop in overrides) { + return { + configurable: true, + enumerable: true, + value: overrides[prop], + }; + } + return Reflect.getOwnPropertyDescriptor(target, prop); + }, + }); +} + +function _apply( + factory: (store: AsyncReadable, opts: unknown) => unknown, + store: AsyncReadable, + opts: unknown, +) { + let result = factory(store, opts); + if (result instanceof Promise) { + return result.then((overrides) => + createProxy(store, overrides as Record), + ); + } + return createProxy(store, result as Record); +} + +/** + * Define a composable store middleware. + * + * The factory function receives the inner store and options, and returns an + * object of overrides and extensions. Methods not returned are automatically + * delegated to the inner store via Proxy. + * + * Overrides of `get`/`getRange` work at runtime via the Proxy, but their types + * are stripped from the return type so that the original store's Options generic + * is preserved through chains of middleware. + * + * Supports both sync and async factories — if the factory returns a Promise, + * the middleware returns a Promise too. + * + * ## Simple case + * + * When your options don't depend on the store's request options type: + * + * ```ts + * import * as zarr from "zarrita"; + * + * const withCaching = zarr.defineStoreMiddleware( + * (store, opts: { maxSize: number }) => { + * return { + * async get(key, options) { ... }, + * clear() { ... }, + * }; + * }, + * ); + * ``` + * + * ## Generic case + * + * When your options need the store's request options type (e.g. for + * `storeOptions` or `mergeOptions`), use {@linkcode GenericOptions}: + * + * ```ts + * import * as zarr from "zarrita"; + * + * interface MyOptsFor extends zarr.GenericOptions { + * readonly options: MyOptions; + * } + * + * const withMyThing = zarr.defineStoreMiddleware.generic()( + * (store, opts) => { ... }, + * ); + * + * // At the call site, storeOptions is inferred from the store: + * withMyThing(fetchStore, { storeOptions: { signal } }); + * // ^? RequestInit + * ``` + */ +export function defineStoreMiddleware< + // to correctly infer F's parameter types from the factory function. + F extends ( + store: AsyncReadable, + // biome-ignore lint/suspicious/noExplicitAny: `any` for opts is required for TypeScript + opts?: any, + // biome-ignore lint/suspicious/noExplicitAny: `any` for opts is required for TypeScript + ) => Partial & Record, +>( + factory: F, +): ( + store: S, + opts?: Parameters[1], +) => WrapperResult, S> { + // @ts-expect-error - TypeScript can't infer this + return (store: AsyncReadable, opts: unknown) => _apply(factory, store, opts); +} + +/** + * Define a store middleware whose options depend on the store's request options type. + * + * @see {@linkcode defineStoreMiddleware} for full documentation and examples. + */ +defineStoreMiddleware.generic = function generic< + OptsLambda extends GenericOptions, +>() { + // biome-ignore lint/suspicious/noExplicitAny: `any` allows extra extension properties + return & Record>( + factory: (store: AsyncReadable, opts: Apply) => Ext, + ): (( + store: S, + opts?: NoInfer>>, + ) => WrapperResult) => { + return ((store: AsyncReadable, opts: unknown) => + // biome-ignore lint/suspicious/noExplicitAny: TS can't figure it out + _apply(factory, store, opts)) as any; + }; +}; diff --git a/packages/zarrita/src/middleware/extend-store.ts b/packages/zarrita/src/middleware/extend-store.ts new file mode 100644 index 00000000..c4364fc5 --- /dev/null +++ b/packages/zarrita/src/middleware/extend-store.ts @@ -0,0 +1,43 @@ +import type { AsyncReadable } from "@zarrita/storage"; + +export function extendStore(store: S): Promise; +export function extendStore( + store: S, + m1: (store: S) => A, +): Promise>; +export function extendStore( + store: S, + m1: (store: S) => A, + m2: (store: Awaited) => B, +): Promise>; +export function extendStore( + store: S, + m1: (store: S) => A, + m2: (store: Awaited) => B, + m3: (store: Awaited) => C, +): Promise>; +export function extendStore( + store: S, + m1: (store: S) => A, + m2: (store: Awaited) => B, + m3: (store: Awaited) => C, + m4: (store: Awaited) => D, +): Promise>; +export function extendStore( + store: S, + m1: (store: S) => A, + m2: (store: Awaited) => B, + m3: (store: Awaited) => C, + m4: (store: Awaited) => D, + m5: (store: Awaited) => E, +): Promise>; +export async function extendStore( + store: AsyncReadable, + ...middlewares: ((store: unknown) => unknown)[] +): Promise { + let result: unknown = store; + for (let m of middlewares) { + result = await m(result); + } + return result; +} diff --git a/packages/zarrita/src/middleware/range-batching.ts b/packages/zarrita/src/middleware/range-batching.ts new file mode 100644 index 00000000..7a423439 --- /dev/null +++ b/packages/zarrita/src/middleware/range-batching.ts @@ -0,0 +1,325 @@ +import type { + AbsolutePath, + AsyncReadable, + RangeQuery, + Readable, +} from "@zarrita/storage"; +import { defineStoreMiddleware, type GenericOptions } from "./define.js"; + +/** Narrow a Readable to AsyncReadable, throwing if getRange is missing. */ +function asAsync(store: Readable): AsyncReadable & { + getRange: NonNullable; +} { + if (!store.getRange) { + throw new Error("`zarr.withRangeBatching` requires a store with getRange"); + } + return store as AsyncReadable & { + getRange: NonNullable; + }; +} + +/** + * Simple LRU cache using Map insertion order. + * Oldest entry (first key) is evicted when capacity is exceeded. + */ +class LRUCache { + #map = new Map(); + #max: number; + + constructor(max: number) { + this.#max = max; + } + + has(key: string): boolean { + return this.#map.has(key); + } + + get(key: string): V | undefined { + if (!this.#map.has(key)) { + return undefined; + } + let value = this.#map.get(key) as V; + // Move to end (most recently used) + this.#map.delete(key); + this.#map.set(key, value); + return value; + } + + set(key: string, value: V): void { + this.#map.delete(key); + this.#map.set(key, value); + if (this.#map.size > this.#max) { + // Evict oldest (first inserted) + let first = this.#map.keys().next().value; + if (first !== undefined) { + this.#map.delete(first); + } + } + } + + clear(): void { + this.#map.clear(); + } +} + +interface PendingRequest { + offset: number; + length: number; + resolve: (value: Uint8Array | undefined) => void; + reject: (reason: unknown) => void; +} + +interface RangeGroup { + offset: number; + length: number; + requests: PendingRequest[]; +} + +export interface RangeBatchingStats { + hits: number; + misses: number; + mergedRequests: number; + batchedRequests: number; +} + +export interface RangeBatchingOptions { + /** Maximum number of entries in the LRU cache (default: 256). */ + cacheSize?: number; + /** Byte gap threshold for merging adjacent ranges (default: 32768). */ + coalesceSize?: number; + /** + * Merge options from all callers in a batch into a single value passed to + * the inner store. By default, the first caller's options win. Use this to + * combine `AbortSignal`s, merge headers, etc. + */ + mergeOptions?: (batch: ReadonlyArray) => O | undefined; +} + +/** + * Default coalesce size (in bytes): two requests separated by less than this + * are merged into a single fetch. Fetching across a small gap is cheaper than + * an extra round trip. 32 KB matches geotiff.js's BlockedSource heuristic and + * Rust object_store's `OBJECT_STORE_COALESCE_DEFAULT`. + */ +const DEFAULT_COALESCE_SIZE = 32768; + +/** + * Groups sorted requests into contiguous ranges, coalescing across small gaps. + * Modelled after geotiff.js BlockedSource.groupBlocks(). + */ +function groupRequests( + sorted: PendingRequest[], + coalesceSize: number, +): RangeGroup[] { + if (sorted.length === 0) { + return []; + } + let groups: RangeGroup[] = []; + let current = [sorted[0]]; + let groupStart = sorted[0].offset; + let groupEnd = sorted[0].offset + sorted[0].length; + + for (let i = 1; i < sorted.length; i++) { + let req = sorted[i]; + let reqEnd = req.offset + req.length; + if (req.offset <= groupEnd + coalesceSize) { + current.push(req); + groupEnd = Math.max(groupEnd, reqEnd); + } else { + groups.push({ + offset: groupStart, + length: groupEnd - groupStart, + requests: current, + }); + current = [req]; + groupStart = req.offset; + groupEnd = reqEnd; + } + } + groups.push({ + offset: groupStart, + length: groupEnd - groupStart, + requests: current, + }); + return groups; +} + +/** + * Wraps a store with range-batching: concurrent `getRange()` calls within a + * single microtask tick are merged into fewer HTTP requests and cached in an + * LRU cache. + * + * ```typescript + * import * as zarr from "zarrita"; + * + * let store = zarr.withRangeBatching(new zarr.FetchStore("https://example.com/data.zarr")); + * ``` + */ +/** @internal Type lambda for {@linkcode RangeBatchingOptions}. */ +export interface RangeBatchingOptsFor extends GenericOptions { + readonly options: RangeBatchingOptions; +} + +export const withRangeBatching = + defineStoreMiddleware.generic()( + (_store, opts: RangeBatchingOptions = {}) => { + let store = asAsync(_store); + let boundGetRange = store.getRange.bind(store); + + let coalesceSize = opts.coalesceSize ?? DEFAULT_COALESCE_SIZE; + let cache = new LRUCache(opts.cacheSize ?? 256); + let inflight = new Map>(); + let mergeOptionsFn = opts.mergeOptions; + + let pending = new Map(); + let scheduled = false; + let batchOptions: unknown[] = []; + + let _stats: RangeBatchingStats = { + hits: 0, + misses: 0, + mergedRequests: 0, + batchedRequests: 0, + }; + + async function flush(): Promise { + let batch = batchOptions; + batchOptions = []; + let work = new Map(pending); + pending.clear(); + scheduled = false; + + let options: unknown; + try { + options = mergeOptionsFn ? mergeOptionsFn(batch) : batch[0]; + } catch (err) { + for (let requests of work.values()) { + for (let req of requests) req.reject(err); + } + return; + } + + let pathPromises: Promise[] = []; + for (let [path, requests] of work) { + requests.sort((a, b) => a.offset - b.offset); + let groups = groupRequests(requests, coalesceSize); + _stats.mergedRequests += groups.length; + pathPromises.push(fetchGroups(path, groups, options)); + } + await Promise.all(pathPromises); + } + + async function fetchGroups( + path: AbsolutePath, + groups: RangeGroup[], + options?: unknown, + ): Promise { + await Promise.all( + groups.map(async (group) => { + try { + let data = await boundGetRange( + path, + { offset: group.offset, length: group.length }, + options, + ); + if (data && data.length < group.length) { + throw new Error( + `Short read: expected ${group.length} bytes but received ${data.length}`, + ); + } + for (let req of group.requests) { + let cacheKey = `${path}\0${req.offset}\0${req.length}`; + if (!data) { + cache.set(cacheKey, undefined); + req.resolve(undefined); + continue; + } + let start = req.offset - group.offset; + let slice = data.slice(start, start + req.length); + cache.set(cacheKey, slice); + req.resolve(slice); + } + } catch (err) { + for (let req of group.requests) { + req.reject(err); + } + } + }), + ); + } + + return { + get( + key: AbsolutePath, + options?: unknown, + ): Promise { + return store.get(key, options); + }, + + getRange( + key: AbsolutePath, + range: RangeQuery, + options?: unknown, + ): Promise { + // Suffix requests (shard index reads) bypass batching - file size + // is unknown until the response arrives. + if ("suffixLength" in range) { + let cacheKey = `${key}\0suffix\0${range.suffixLength}`; + if (cache.has(cacheKey)) { + _stats.hits++; + return Promise.resolve(cache.get(cacheKey)); + } + // Deduplicate concurrent suffix requests + let existing = inflight.get(cacheKey); + if (existing) { + _stats.hits++; + return existing; + } + _stats.misses++; + let promise = boundGetRange(key, range, options) + .then((data) => { + cache.set(cacheKey, data); + inflight.delete(cacheKey); + return data; + }) + .catch((err) => { + inflight.delete(cacheKey); + throw err; + }); + inflight.set(cacheKey, promise); + return promise; + } + + let { offset, length } = range; + let cacheKey = `${key}\0${offset}\0${length}`; + + if (cache.has(cacheKey)) { + _stats.hits++; + return Promise.resolve(cache.get(cacheKey)); + } + + _stats.misses++; + _stats.batchedRequests++; + + return new Promise((resolve, reject) => { + let reqs = pending.get(key); + if (!reqs) { + reqs = []; + pending.set(key, reqs); + } + reqs.push({ offset, length, resolve, reject }); + + batchOptions.push(options); + if (!scheduled) { + scheduled = true; + queueMicrotask(() => flush()); + } + }); + }, + + get stats(): Readonly { + return { ..._stats }; + }, + }; + }, + );