diff --git a/.changeset/fix-r2-cache-upload.md b/.changeset/fix-r2-cache-upload.md new file mode 100644 index 000000000..aa2051083 --- /dev/null +++ b/.changeset/fix-r2-cache-upload.md @@ -0,0 +1,7 @@ +--- +"@opennextjs/cloudflare": minor +--- + +Use remote R2 binding for cache population + +Using remote binding is not subject the Cloudflare API rate limit of 1,200 requests per 5 minutes that caused failures for large applications with thousands of prerendered pages. diff --git a/packages/cloudflare/scripts/debug-r2-memory.mjs b/packages/cloudflare/scripts/debug-r2-memory.mjs new file mode 100644 index 000000000..354ee3563 --- /dev/null +++ b/packages/cloudflare/scripts/debug-r2-memory.mjs @@ -0,0 +1,253 @@ +/** + * Standalone script to reproduce the OOM issue when populating R2 via unstable_startWorker. + * + * This mirrors the exact pattern used by populateR2IncrementalCache in populate-cache.ts: + * - Starts a local worker with a remote R2 binding via unstable_startWorker + * - Sends concurrent POST requests with FormData (key + value) to the worker + * - The worker writes each entry to R2 + * + * Memory usage is logged every 50 completed entries to detect leaks. + * + * Usage: + * pnpm build + * node --expose-gc packages/cloudflare/scripts/debug-r2-memory.mjs [bucket-name] + * + * bucket-name defaults to "cache". + */ + +import os from "node:os"; +import path from "node:path"; +import { fileURLToPath } from "node:url"; + +import { unstable_startWorker } from "wrangler"; + +// --- Configuration --- + +const TOTAL_ENTRIES = 10_000; +const CONCURRENCY = 1; +const VALUE = "0".repeat(50_000); +const MEMORY_LOG_INTERVAL = 50; + +const bucketName = process.argv[2] ?? "cache"; + +// --- Memory tracking --- + +/** @type {Array<{ entries: number; rss: number; heapUsed: number; heapTotal: number; external: number; arrayBuffers: number; sysFree: number }>} */ +const memorySnapshots = []; + +/** + * Forces a GC cycle (if --expose-gc was used) and records a memory snapshot. + * + * @param {number} entriesCompleted - Number of entries completed so far. + */ +function logMemory(entriesCompleted) { + if (globalThis.gc) { + globalThis.gc(); + } + + const mem = process.memoryUsage(); + const sysFree = os.freemem(); + const snapshot = { + entries: entriesCompleted, + rss: Math.round(mem.rss / 1024 / 1024), + heapUsed: Math.round(mem.heapUsed / 1024 / 1024), + heapTotal: Math.round(mem.heapTotal / 1024 / 1024), + external: Math.round(mem.external / 1024 / 1024), + arrayBuffers: Math.round(mem.arrayBuffers / 1024 / 1024), + sysFree: Math.round(sysFree / 1024 / 1024), + }; + memorySnapshots.push(snapshot); + + console.log( + `[${String(entriesCompleted).padStart(5)}/${TOTAL_ENTRIES}] ` + + `RSS=${String(snapshot.rss).padStart(5)}MB ` + + `heap=${String(snapshot.heapUsed).padStart(5)}MB ` + + `external=${String(snapshot.external).padStart(5)}MB ` + + `arrayBuffers=${String(snapshot.arrayBuffers).padStart(5)}MB ` + + `sysFree=${String(snapshot.sysFree).padStart(6)}MB` + ); +} + +/** + * Prints a summary table of all memory snapshots. + */ +function printSummary() { + console.log("\n=== Memory Summary ===\n"); + const header = + "Entries".padStart(7) + + " " + + "RSS(MB)".padStart(8) + + " " + + "Heap(MB)".padStart(9) + + " " + + "External(MB)".padStart(13) + + " " + + "ArrBuf(MB)".padStart(11) + + " " + + "SysFree(MB)".padStart(12); + console.log(header); + console.log("-".repeat(header.length)); + + for (const s of memorySnapshots) { + console.log( + String(s.entries).padStart(7) + + " " + + String(s.rss).padStart(8) + + " " + + String(s.heapUsed).padStart(9) + + " " + + String(s.external).padStart(13) + + " " + + String(s.arrayBuffers).padStart(11) + + " " + + String(s.sysFree).padStart(12) + ); + } + + if (memorySnapshots.length >= 2) { + const first = memorySnapshots[0]; + const last = memorySnapshots[memorySnapshots.length - 1]; + console.log("-".repeat(header.length)); + console.log( + "Delta".padStart(7) + + " " + + String(last.rss - first.rss).padStart(7) + + " " + + String(last.heapUsed - first.heapUsed).padStart(8) + + " " + + String(last.external - first.external).padStart(12) + + " " + + String(last.arrayBuffers - first.arrayBuffers).padStart(10) + + " " + + String(last.sysFree - first.sysFree).padStart(11) + ); + } +} + +// --- Send a single entry to the worker (mirrors sendEntryToR2Worker) --- + +/** + * Sends a single cache entry to the R2 worker via POST /populate with FormData. + * + * @param {string} workerUrl - The URL of the worker's /populate endpoint. + * @param {string} key - The R2 object key. + * @param {string} value - The value to store. + * @throws {Error} If the worker reports a failure. + */ +async function sendEntry(workerUrl, key, value) { + const formData = new FormData(); + formData.set("key", key); + formData.set("value", value); + + const start = performance.now(); + const response = await fetch(workerUrl, { + method: "POST", + body: formData, + }); + + const result = await response.json(); + const elapsed = Math.round(performance.now() - start); + console.log(`[request] ${key} ${elapsed}ms`); + + if (!result.success) { + throw new Error(`Failed to write "${key}": ${result.error}`); + } +} + +// --- Main --- + +async function main() { + console.log(`Bucket: ${bucketName}`); + console.log(`Entries: ${TOTAL_ENTRIES}`); + console.log(`Concurrency: ${CONCURRENCY}`); + console.log(`Value size: ${VALUE.length} chars`); + console.log( + `System RAM: ${Math.round(os.totalmem() / 1024 / 1024)}MB (${Math.round(os.freemem() / 1024 / 1024)}MB free)` + ); + console.log(`GC exposed: ${!!globalThis.gc}`); + console.log(); + + if (!globalThis.gc) { + console.warn("WARNING: --expose-gc not set. Memory snapshots will be less accurate.\n"); + } + + // Resolve the worker entrypoint (the compiled r2-cache.js) + const currentDir = path.dirname(fileURLToPath(import.meta.url)); + const handlerPath = path.join(currentDir, "../dist/cli/workers/r2-cache.js"); + + console.log(`Starting worker with remote R2 binding (bucket: ${bucketName})...`); + + const worker = await unstable_startWorker({ + name: "debug-r2-memory", + entrypoint: handlerPath, + compatibilityDate: "2026-01-01", + bindings: { + R2: { + type: "r2_bucket", + bucket_name: bucketName, + remote: true, + }, + }, + dev: { + server: { port: 0 }, + inspector: false, + watch: false, + liveReload: false, + logLevel: "debug", + }, + }); + + try { + await worker.ready; + const baseUrl = await worker.url; + const workerUrl = new URL("/populate", baseUrl).href; + + console.log(`Worker ready at ${baseUrl}\n`); + + // Log initial memory before sending anything. + logMemory(0); + + // Concurrency-limited send loop (mirrors sendEntriesToR2Worker). + let completed = 0; + const pending = new Set(); + + for (let i = 0; i < TOTAL_ENTRIES; i++) { + // If we've reached the concurrency limit, wait for one to finish. + if (pending.size >= CONCURRENCY) { + await Promise.race(pending); + } + + const key = `key-${i}`; + const task = sendEntry(workerUrl, key, VALUE) + .then(() => { + completed++; + + // Log memory every MEMORY_LOG_INTERVAL completed entries. + if (completed % MEMORY_LOG_INTERVAL === 0) { + logMemory(completed); + } + }) + .finally(() => pending.delete(task)); + pending.add(task); + } + + // Wait for all remaining in-flight requests. + await Promise.all(pending); + + // Final memory snapshot if not already logged at TOTAL_ENTRIES. + if (completed % MEMORY_LOG_INTERVAL !== 0) { + logMemory(completed); + } + + console.log(`\nAll ${TOTAL_ENTRIES} entries written successfully.`); + printSummary(); + } finally { + await worker.dispose(); + console.log("\nWorker disposed."); + } +} + +main().catch((err) => { + console.error("Fatal error:", err); + process.exit(1); +}); diff --git a/packages/cloudflare/src/cli/commands/populate-cache.spec.ts b/packages/cloudflare/src/cli/commands/populate-cache.spec.ts index 132c08de2..c32648b9b 100644 --- a/packages/cloudflare/src/cli/commands/populate-cache.spec.ts +++ b/packages/cloudflare/src/cli/commands/populate-cache.spec.ts @@ -1,3 +1,4 @@ +import EventEmitter from "node:events"; import { mkdirSync, writeFileSync } from "node:fs"; import path from "node:path"; @@ -78,6 +79,21 @@ vi.mock("./utils/helpers.js", () => ({ quoteShellMeta: vi.fn((s) => s), })); +const mockWorkerFetch = vi.fn(); +const mockWorkerDispose = vi.fn(); + +vi.mock("wrangler", () => ({ + unstable_startWorker: vi.fn(() => + Promise.resolve({ + ready: Promise.resolve(), + url: Promise.resolve(new URL("http://localhost:12345")), + fetch: mockWorkerFetch, + dispose: mockWorkerDispose, + raw: new EventEmitter(), + }) + ), +})); + describe("populateCache", () => { const setupMockFileSystem = () => { mockFs({ @@ -100,13 +116,19 @@ describe("populateCache", () => { ({ target }) => { afterEach(() => { mockFs.restore(); + vi.clearAllMocks(); }); - test(target, async () => { - const { runWrangler } = await import("./utils/run-wrangler.js"); - + test(`${target} - starts worker and sends individual cache entries via FormData`, async () => { setupMockFileSystem(); - vi.mocked(runWrangler).mockClear(); + + // Mock fetch to return a successful response for each individual entry. + global.fetch = vi.fn().mockResolvedValue( + new Response(JSON.stringify({ success: true }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) + ); await populateCache( { @@ -131,48 +153,37 @@ describe("populateCache", () => { {} as any // eslint-disable-line @typescript-eslint/no-explicit-any ); - expect(runWrangler).toHaveBeenCalledWith( - expect.anything(), - expect.arrayContaining(["r2 bulk put", "test-bucket"]), - expect.objectContaining({ target }) - ); - }); - - test(`${target} using jurisdiction`, async () => { - const { runWrangler } = await import("./utils/run-wrangler.js"); - - setupMockFileSystem(); - vi.mocked(runWrangler).mockClear(); - - await populateCache( - { - outputDir: "/test/output", - } as BuildOptions, - { - default: { - override: { - incrementalCache: "cf-r2-incremental-cache", - }, - }, - } as any, // eslint-disable-line @typescript-eslint/no-explicit-any - { - r2_buckets: [ - { - binding: "NEXT_INC_CACHE_R2_BUCKET", + // Verify the worker was started with the correct R2 binding configuration. + const { unstable_startWorker: startWorker } = await import("wrangler"); + expect(startWorker).toHaveBeenCalledWith( + expect.objectContaining({ + name: "open-next-cache-populate", + compatibilityDate: "2026-01-01", + bindings: expect.objectContaining({ + R2: expect.objectContaining({ + type: "r2_bucket", bucket_name: "test-bucket", - jurisdiction: "eu", - }, - ], - } as any, // eslint-disable-line @typescript-eslint/no-explicit-any - { target, shouldUsePreviewId: false }, - {} as any // eslint-disable-line @typescript-eslint/no-explicit-any + remote: target === "remote", + }), + }), + }) ); - expect(runWrangler).toHaveBeenCalledWith( - expect.anything(), - expect.arrayContaining(["r2 bulk put", "test-bucket", "--jurisdiction eu"]), - expect.objectContaining({ target }) + // Each cache entry should be sent as an individual POST request with FormData. + expect(global.fetch).toHaveBeenCalledWith( + "http://localhost:12345/populate", + expect.objectContaining({ method: "POST" }) ); + + // Verify the body is FormData containing key and value fields. + const fetchCall = (global.fetch as ReturnType).mock.calls[0]; + const body = fetchCall[1].body; + expect(body).toBeInstanceOf(FormData); + expect(body.get("key")).toBeTypeOf("string"); + expect(body.get("value")).toBeTypeOf("string"); + + // Verify worker was disposed after sending entries. + expect(mockWorkerDispose).toHaveBeenCalled(); }); } ); diff --git a/packages/cloudflare/src/cli/commands/populate-cache.ts b/packages/cloudflare/src/cli/commands/populate-cache.ts index e80f88e13..a83d97787 100644 --- a/packages/cloudflare/src/cli/commands/populate-cache.ts +++ b/packages/cloudflare/src/cli/commands/populate-cache.ts @@ -2,6 +2,7 @@ import fs from "node:fs"; import fsp from "node:fs/promises"; import os from "node:os"; import path from "node:path"; +import { fileURLToPath } from "node:url"; import type { BuildOptions } from "@opennextjs/aws/build/helper.js"; import logger from "@opennextjs/aws/logger.js"; @@ -15,6 +16,7 @@ import type { IncrementalCache, TagCache } from "@opennextjs/aws/types/overrides import { globSync } from "glob"; import { tqdm } from "ts-tqdm"; import type { Unstable_Config as WranglerConfig } from "wrangler"; +import { unstable_startWorker } from "wrangler"; import type yargs from "yargs"; import { @@ -37,6 +39,7 @@ import { NAME as D1_TAG_NAME, } from "../../api/overrides/tag-cache/d1-next-tag-cache.js"; import { normalizePath } from "../utils/normalize-path.js"; +import type { R2Response } from "../workers/r2-cache-types.js"; import { getEnvFromPlatformProxy, quoteShellMeta, type WorkerEnvVar } from "./utils/helpers.js"; import type { WranglerTarget } from "./utils/run-wrangler.js"; import { runWrangler } from "./utils/run-wrangler.js"; @@ -196,9 +199,11 @@ type PopulateCacheOptions = { */ wranglerConfigPath?: string; /** - * Chunk sizes to use when populating KV cache. Ignored for R2. + * Number of concurrent requests when populating the cache. + * For KV this is the chunk size passed to `wrangler kv bulk put`. + * For R2 this is the number of concurrent requests to the local worker. * - * @default 25 for KV, 50 for R2 + * @default 25 */ cacheChunkSize?: number; /** @@ -207,73 +212,195 @@ type PopulateCacheOptions = { shouldUsePreviewId: boolean; }; +/** + * Populates the R2 incremental cache by starting a local worker with an R2 binding. + * + * Flow: + * 1. Reads the R2 binding configuration from the wrangler config. + * 2. Collects cache assets from the build output. + * 3. Starts a local worker (via `unstable_startWorker`) with the R2 binding. + * 4. Sends individual POST requests to the worker. + * + * Using a binding bypasses the Cloudflare REST API rate limit that affects `wrangler r2 bulk put`. + */ async function populateR2IncrementalCache( buildOpts: BuildOptions, config: WranglerConfig, populateCacheOptions: PopulateCacheOptions, envVars: WorkerEnvVar ) { - logger.info("\nPopulating R2 incremental cache..."); + logger.info(`\nPopulating ${populateCacheOptions.target} R2 incremental cache...`); - const binding = config.r2_buckets.find( - ({ binding }: { binding: string }) => binding === R2_CACHE_BINDING_NAME - ); + const binding = config.r2_buckets.find(({ binding }) => binding === R2_CACHE_BINDING_NAME); if (!binding) { - throw new Error(`No R2 binding ${JSON.stringify(R2_CACHE_BINDING_NAME)} found!`); + throw new Error(`No R2 binding "${R2_CACHE_BINDING_NAME}" found!`); } - const bucket = binding.bucket_name; - if (!bucket) { - throw new Error(`R2 binding ${JSON.stringify(R2_CACHE_BINDING_NAME)} should have a 'bucket_name'`); + const prefix = envVars[R2_CACHE_PREFIX_ENV_NAME]; + const assets = getCacheAssets(buildOpts); + + if (assets.length === 0) { + logger.info("No cache assets to populate"); + return; } - const prefix = envVars[R2_CACHE_PREFIX_ENV_NAME]; + const currentDir = path.dirname(fileURLToPath(import.meta.url)); + const handlerPath = path.join(currentDir, "../workers/r2-cache.js"); + + const isRemote = populateCacheOptions.target === "remote"; + + // Start a local worker with the R2 binding configured for the target environment. + const worker = await unstable_startWorker({ + name: "open-next-cache-populate", + entrypoint: handlerPath, + // TODO: do we need a date for local ? + compatibilityDate: "2026-01-01", + bindings: { + R2: { + type: "r2_bucket", + bucket_name: binding.bucket_name, + ...(binding.jurisdiction && { jurisdiction: binding.jurisdiction }), + remote: isRemote, + }, + }, + dev: { + server: { port: 0 }, + inspector: false, + watch: false, + liveReload: false, + logLevel: "none", + }, + }); - const assets = getCacheAssets(buildOpts); + // When targeting remote, wrangler's DevEnv emits an "error" event if the R2 bucket + // doesn't exist (Cloudflare API code 10085). In-flight fetch calls hang forever + // because the remote proxy session fails (see https://github.com/cloudflare/workers-sdk/issues/11253). + // We listen for error events and race against sendEntriesToR2Worker to surface the error. + const errorPromise = new Promise((_, reject) => { + worker.raw.once("error", (event: { type: string; reason: string; cause: Error }) => { + const message = event.cause?.message ?? event.reason ?? "Unknown error"; + reject(new Error(message)); + }); + }); - const objectList = assets.map(({ fullPath, key, buildId, isFetch }) => ({ + try { + await worker.ready; + const baseUrl = await worker.url; + await Promise.race([ + sendEntriesToR2Worker({ + workerUrl: new URL("/populate", baseUrl).href, + assets, + prefix, + concurrency: Math.max(1, populateCacheOptions.cacheChunkSize ?? 25), + }), + errorPromise, + ]); + } catch (e) { + await worker.dispose(); + if (isRemote) { + logger.error(`Failed to populate the remote R2 cache. Does the bucket "${binding.bucket_name}" exist?`); + } else { + logger.error(`Failed to populate the local R2 cache: ${e instanceof Error ? e.message : String(e)}`); + } + process.exit(1); + } finally { + await worker.dispose(); + } + + logger.info(`Successfully populated cache with ${assets.length} entries`); +} + +/** + * Sends cache entries to the R2 worker, one entry per request. + * + * Up to `concurrency` requests are in-flight at any given time. + * Retry logic for transient R2 write failures is handled by the worker. + * + * @param options + * @param options.workerUrl - The URL of the local R2 worker's `/populate` endpoint. + * @param options.assets - The cache assets to write, as collected by {@link getCacheAssets}. + * @param options.prefix - Optional prefix prepended to each R2 key. + * @param options.concurrency - Maximum number of concurrent in-flight requests. + * @returns Resolves when all entries have been written successfully. + * @throws {Error} If any entry fails after all retries or encounters a non-retryable error. + */ +async function sendEntriesToR2Worker(options: { + workerUrl: string; + assets: CacheAsset[]; + prefix: string | undefined; + concurrency: number; +}): Promise { + const { workerUrl, assets, prefix, concurrency } = options; + + // Build the list of entries to send (key + filename). + // File contents are read lazily in sendEntryToR2Worker to avoid + // loading all cache values into memory at once. + const entries = assets.map(({ fullPath, key, buildId, isFetch }) => ({ key: computeCacheKey(key, { prefix, buildId, cacheType: isFetch ? "fetch" : "cache", }), - file: fullPath, + filename: fullPath, })); - const tempDir = await fsp.mkdtemp(path.join(os.tmpdir(), "open-next-")); - const listFile = path.join(tempDir, `r2-bulk-list.json`); - fs.writeFileSync(listFile, JSON.stringify(objectList)); - - const concurrency = Math.max(1, populateCacheOptions.cacheChunkSize ?? 50); - const jurisdiction = binding.jurisdiction ? `--jurisdiction ${binding.jurisdiction}` : ""; + // Use a concurrency-limited loop with a progress bar. + // `pending` tracks in-flight promises so we can cap concurrency. + const pending = new Set>(); - const result = runWrangler( - buildOpts, - [ - "r2 bulk put", - bucket, - `--filename ${quoteShellMeta(listFile)}`, - `--concurrency ${concurrency}`, - jurisdiction, - ], - { - target: populateCacheOptions.target, - configPath: populateCacheOptions.wranglerConfigPath, - // R2 does not support the environment flag and results in the following error: - // Incorrect type for the 'cacheExpiry' field on 'HttpMetadata': the provided value is not of type 'date'. - environment: undefined, - logging: "error", + for (const entry of tqdm(entries)) { + // If we've reached the concurrency limit, wait for one to finish. + if (pending.size >= concurrency) { + await Promise.race(pending); } - ); - fs.rmSync(listFile, { force: true }); + const task = sendEntryToR2Worker({ + workerUrl, + key: entry.key, + filename: entry.filename, + }).finally(() => pending.delete(task)); + pending.add(task); + } - if (!result.success) { - logger.error(`Wrangler r2 bulk put command failed${result.stderr ? `:\n${result.stderr}` : ""}`); - process.exit(1); + await Promise.all(pending); +} + +/** + * Sends a single cache entry to the R2 worker. + * + * The file is read from disk and sent as FormData. The worker handles + * retry logic internally. + * + * @param options + * @param options.workerUrl - The URL of the local R2 worker's `/populate` endpoint. + * @param options.key - The R2 object key. + * @param options.filename - Path to the cache file on disk. Read at send time to avoid holding all values in memory. + * @throws {Error} If the worker reports a failure. + */ +async function sendEntryToR2Worker(options: { + workerUrl: string; + key: string; + filename: string; +}): Promise { + const { workerUrl, key, filename } = options; + + const formData = new FormData(); + formData.set("key", key); + formData.set("value", fs.readFileSync(filename, "utf8")); + + const response = await fetch(workerUrl, { + method: "POST", + body: formData, + }); + + const result = (await response.json()) as R2Response; + + if (result.success) { + return; } - logger.info(`Successfully populated cache with ${assets.length} assets`); + logger.error(`Failed to write "${key}" to R2: ${result.error}`); + throw new Error(result.error); } async function populateKVIncrementalCache( @@ -282,23 +409,29 @@ async function populateKVIncrementalCache( populateCacheOptions: PopulateCacheOptions, envVars: WorkerEnvVar ) { - logger.info("\nPopulating KV incremental cache..."); + logger.info(`\nPopulating ${populateCacheOptions.target} KV incremental cache...`); const binding = config.kv_namespaces.find( ({ binding }: { binding: string }) => binding === KV_CACHE_BINDING_NAME ); if (!binding) { - throw new Error(`No KV binding ${JSON.stringify(KV_CACHE_BINDING_NAME)} found!`); + throw new Error(`No KV binding "${KV_CACHE_BINDING_NAME}" found!`); } const prefix = envVars[KV_CACHE_PREFIX_ENV_NAME]; - const assets = getCacheAssets(buildOpts); + if (assets.length === 0) { + logger.info("No cache assets to populate"); + return; + } + const chunkSize = Math.max(1, populateCacheOptions.cacheChunkSize ?? 25); const totalChunks = Math.ceil(assets.length / chunkSize); - logger.info(`Inserting ${assets.length} assets to KV in chunks of ${chunkSize}`); + logger.info( + `Inserting ${assets.length} assets to ${populateCacheOptions.target} KV in chunks of ${chunkSize}` + ); const tempDir = await fsp.mkdtemp(path.join(os.tmpdir(), "open-next-")); @@ -342,7 +475,7 @@ async function populateKVIncrementalCache( } } - logger.info(`Successfully populated cache with ${assets.length} assets`); + logger.info(`Successfully populated cache with ${assets.length} entries`); } function populateD1TagCache( @@ -356,7 +489,7 @@ function populateD1TagCache( ({ binding }: { binding: string }) => binding === D1_TAG_BINDING_NAME ); if (!binding) { - throw new Error(`No D1 binding ${JSON.stringify(D1_TAG_BINDING_NAME)} found!`); + throw new Error(`No D1 binding "${D1_TAG_BINDING_NAME}" found!`); } const result = runWrangler( diff --git a/packages/cloudflare/src/cli/workers/r2-cache-types.ts b/packages/cloudflare/src/cli/workers/r2-cache-types.ts new file mode 100644 index 000000000..82f8464e7 --- /dev/null +++ b/packages/cloudflare/src/cli/workers/r2-cache-types.ts @@ -0,0 +1,31 @@ +/** + * Shared types and error codes for the R2 cache worker and its caller. + */ + +/** The R2 bucket binding is not configured in the worker environment. */ +export const ERR_BINDING_NOT_FOUND = "ERR_BINDING_NOT_FOUND"; +/** The request body is not valid FormData or is missing required fields. */ +export const ERR_INVALID_REQUEST = "ERR_INVALID_REQUEST"; +/** The R2 put operation failed. */ +export const ERR_WRITE_FAILED = "ERR_WRITE_FAILED"; + +export type ErrorCode = typeof ERR_BINDING_NOT_FOUND | typeof ERR_INVALID_REQUEST | typeof ERR_WRITE_FAILED; + +/** Successful response from the worker. */ +export interface R2SuccessResponse { + success: true; +} + +/** Error response from the worker, includes an error message and a typed code. */ +export interface R2ErrorResponse { + success: false; + error: string; + code: ErrorCode; +} + +/** Union of all possible responses from the worker. */ +export type R2Response = R2SuccessResponse | R2ErrorResponse; + +export interface CachePopulateEnv { + R2?: R2Bucket; +} diff --git a/packages/cloudflare/src/cli/workers/r2-cache.spec.ts b/packages/cloudflare/src/cli/workers/r2-cache.spec.ts new file mode 100644 index 000000000..2ed77cf28 --- /dev/null +++ b/packages/cloudflare/src/cli/workers/r2-cache.spec.ts @@ -0,0 +1,259 @@ +import { beforeEach, describe, expect, test, vi } from "vitest"; + +import handler from "./r2-cache.ts"; +import { ERR_BINDING_NOT_FOUND, ERR_INVALID_REQUEST, ERR_WRITE_FAILED } from "./r2-cache-types.ts"; + +const mockPut = vi.fn(); +const mockR2Bucket = { put: mockPut } as unknown as R2Bucket; + +describe("r2-cache worker", () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + }); + + describe("routing", () => { + test("returns 404 for non-POST requests", async () => { + const request = new Request("https://example.com/populate", { method: "GET" }); + + const response = await handler.fetch(request, { R2: mockR2Bucket }); + expect(response.status).toBe(404); + }); + + test("returns 404 for wrong pathname", async () => { + const request = new Request("https://example.com/other", { + method: "POST", + body: new FormData(), + }); + + const response = await handler.fetch(request, { R2: mockR2Bucket }); + expect(response.status).toBe(404); + }); + }); + + describe("binding validation", () => { + test("returns ERR_BINDING_NOT_FOUND when R2 binding is missing", async () => { + const formData = new FormData(); + formData.set("key", "k"); + formData.set("value", "v"); + const request = new Request("https://example.com/populate", { + method: "POST", + body: formData, + }); + + const response = await handler.fetch(request, { R2: undefined }); + expect(response.status).toBe(500); + + const body = await response.json(); + expect(body).toEqual({ + success: false, + error: expect.stringContaining("not configured"), + code: ERR_BINDING_NOT_FOUND, + }); + }); + }); + + describe("FormData validation", () => { + test("returns ERR_INVALID_REQUEST for non-FormData body", async () => { + const request = new Request("https://example.com/populate", { + method: "POST", + body: "not form data", + }); + + const response = await handler.fetch(request, { R2: mockR2Bucket }); + expect(response.status).toBe(400); + + const body = await response.json(); + expect(body).toEqual({ + success: false, + error: "Invalid FormData body", + code: ERR_INVALID_REQUEST, + }); + }); + + test("returns ERR_INVALID_REQUEST when key is missing", async () => { + const formData = new FormData(); + formData.set("value", "v"); + const request = new Request("https://example.com/populate", { + method: "POST", + body: formData, + }); + + const response = await handler.fetch(request, { R2: mockR2Bucket }); + expect(response.status).toBe(400); + + const body = await response.json(); + expect(body.success).toBe(false); + expect(body.code).toBe(ERR_INVALID_REQUEST); + }); + + test("returns ERR_INVALID_REQUEST when value is missing", async () => { + const formData = new FormData(); + formData.set("key", "k"); + const request = new Request("https://example.com/populate", { + method: "POST", + body: formData, + }); + + const response = await handler.fetch(request, { R2: mockR2Bucket }); + expect(response.status).toBe(400); + + const body = await response.json(); + expect(body.success).toBe(false); + expect(body.code).toBe(ERR_INVALID_REQUEST); + }); + + test("returns ERR_INVALID_REQUEST when both key and value are missing", async () => { + const request = new Request("https://example.com/populate", { + method: "POST", + body: new FormData(), + }); + + const response = await handler.fetch(request, { R2: mockR2Bucket }); + expect(response.status).toBe(400); + + const body = await response.json(); + expect(body.success).toBe(false); + expect(body.code).toBe(ERR_INVALID_REQUEST); + }); + }); + + describe("R2 write", () => { + test("returns success for a valid key/value write", async () => { + mockPut.mockResolvedValue(undefined); + + const formData = new FormData(); + formData.set("key", "cache/key1"); + formData.set("value", '{"data":"value1"}'); + const request = new Request("https://example.com/populate", { + method: "POST", + body: formData, + }); + + const response = await handler.fetch(request, { R2: mockR2Bucket }); + expect(response.status).toBe(200); + + const body = await response.json(); + expect(body).toEqual({ success: true }); + + expect(mockPut).toHaveBeenCalledWith("cache/key1", '{"data":"value1"}'); + }); + + test("returns ERR_WRITE_FAILED when R2 put fails after all retries", async () => { + mockPut.mockRejectedValue(new Error("R2 storage error")); + + const formData = new FormData(); + formData.set("key", "cache/key1"); + formData.set("value", "v"); + const request = new Request("https://example.com/populate", { + method: "POST", + body: formData, + }); + + // Advance through all retry delays: 200, 400, 800, 1600 ms + const fetchPromise = handler.fetch(request, { R2: mockR2Bucket }); + await vi.advanceTimersByTimeAsync(200 + 400 + 800 + 1600); + + const response = await fetchPromise; + expect(response.status).toBe(500); + + const body = await response.json(); + expect(body).toEqual({ + success: false, + error: expect.stringContaining("cache/key1"), + code: ERR_WRITE_FAILED, + }); + expect(body.error).toContain("R2 storage error"); + expect(body.error).toContain("5 attempts"); + expect(mockPut).toHaveBeenCalledTimes(5); + }); + }); + + describe("retry logic", () => { + test("retries on transient R2 write failure and succeeds", async () => { + mockPut.mockRejectedValueOnce(new Error("transient error")).mockResolvedValueOnce(undefined); + + const formData = new FormData(); + formData.set("key", "cache/key1"); + formData.set("value", "v"); + const fetchPromise = handler.fetch( + new Request("https://example.com/populate", { method: "POST", body: formData }), + { R2: mockR2Bucket } + ); + + // First attempt fails immediately, then sleep(200) before attempt 2. + await vi.advanceTimersByTimeAsync(200); + + const response = await fetchPromise; + expect(response.status).toBe(200); + + const body = await response.json(); + expect(body).toEqual({ success: true }); + expect(mockPut).toHaveBeenCalledTimes(2); + }); + + test("exhausts all retries with exponential backoff", async () => { + mockPut.mockRejectedValue(new Error("persistent error")); + + const formData = new FormData(); + formData.set("key", "cache/key1"); + formData.set("value", "v"); + const fetchPromise = handler.fetch( + new Request("https://example.com/populate", { method: "POST", body: formData }), + { R2: mockR2Bucket } + ); + + // attempt 0: immediate, fails + // attempt 1: sleep(200), fails + await vi.advanceTimersByTimeAsync(200); + // attempt 2: sleep(400), fails + await vi.advanceTimersByTimeAsync(400); + // attempt 3: sleep(800), fails + await vi.advanceTimersByTimeAsync(800); + // attempt 4: sleep(1600), fails + await vi.advanceTimersByTimeAsync(1600); + + const response = await fetchPromise; + expect(response.status).toBe(500); + + const body = await response.json(); + expect(body.success).toBe(false); + expect(body.code).toBe(ERR_WRITE_FAILED); + expect(body.error).toContain("5 attempts"); + expect(mockPut).toHaveBeenCalledTimes(5); + }); + + test("succeeds on last retry attempt", async () => { + mockPut + .mockRejectedValueOnce(new Error("fail 1")) + .mockRejectedValueOnce(new Error("fail 2")) + .mockRejectedValueOnce(new Error("fail 3")) + .mockRejectedValueOnce(new Error("fail 4")) + .mockResolvedValueOnce(undefined); + + const formData = new FormData(); + formData.set("key", "cache/key1"); + formData.set("value", "v"); + const fetchPromise = handler.fetch( + new Request("https://example.com/populate", { method: "POST", body: formData }), + { R2: mockR2Bucket } + ); + + // attempt 1: sleep(200) + await vi.advanceTimersByTimeAsync(200); + // attempt 2: sleep(400) + await vi.advanceTimersByTimeAsync(400); + // attempt 3: sleep(800) + await vi.advanceTimersByTimeAsync(800); + // attempt 4: sleep(1600) + await vi.advanceTimersByTimeAsync(1600); + + const response = await fetchPromise; + expect(response.status).toBe(200); + + const body = await response.json(); + expect(body).toEqual({ success: true }); + expect(mockPut).toHaveBeenCalledTimes(5); + }); + }); +}); diff --git a/packages/cloudflare/src/cli/workers/r2-cache.ts b/packages/cloudflare/src/cli/workers/r2-cache.ts new file mode 100644 index 000000000..78414db4d --- /dev/null +++ b/packages/cloudflare/src/cli/workers/r2-cache.ts @@ -0,0 +1,122 @@ +/** + * This worker writes a cache entry to R2 with retry logic. + * + * It handles POST requests to /populate with a FormData body containing: + * - `key`: the R2 object key (string, required). + * - `value`: the cache value to store (string, required). + * + * The worker validates the R2 binding and request body, then writes the entry + * to R2, retrying transient write failures with exponential backoff. + * + * This is used by the `populate-cache` command to bypass REST API rate limits when populating large caches. + */ + +import { + type CachePopulateEnv, + ERR_BINDING_NOT_FOUND, + ERR_INVALID_REQUEST, + ERR_WRITE_FAILED, + type R2ErrorResponse, +} from "./r2-cache-types.js"; + +// Maximum number of write attempts before giving up. +const MAX_RETRIES = 5; +// Base backoff delay. +const RETRY_DELAY_MS = 100; + +/** + * Worker fetch handler. + * + * Routes `POST /populate` to the cache population logic. + * Validates the R2 binding and FormData body, then writes the entry to R2 + * with retry logic for transient write failures. + * + * Response format: + * - 200 with `{ success: true }` on success. + * - 4xx/5xx with `{ success: false, error, code }` on failure. + * - 404 for unmatched routes. + */ +export default { + async fetch(request: Request, env: CachePopulateEnv): Promise { + const url = new URL(request.url); + + if (request.method !== "POST" || url.pathname !== "/populate") { + return new Response("Not found", { status: 404 }); + } + + // Verify the R2 binding exists before processing the request. + const r2 = env.R2; + if (!r2) { + return Response.json( + { + success: false, + error: 'R2 binding "R2" is not configured', + code: ERR_BINDING_NOT_FOUND, + } satisfies R2ErrorResponse, + { status: 500 } + ); + } + + // Parse and validate the FormData body. + let formData: FormData; + try { + formData = await request.formData(); + } catch { + return Response.json( + { + success: false, + error: "Invalid FormData body", + code: ERR_INVALID_REQUEST, + } satisfies R2ErrorResponse, + { status: 400 } + ); + } + + const key = formData.get("key"); + const value = formData.get("value"); + + if (typeof key !== "string" || typeof value !== "string") { + return Response.json( + { + success: false, + error: "FormData must contain 'key' (string) and 'value' (string)", + code: ERR_INVALID_REQUEST, + } satisfies R2ErrorResponse, + { status: 400 } + ); + } + + // Write the entry to R2 with retry logic. + for (let remainingAttempts = MAX_RETRIES - 1; remainingAttempts >= 0; remainingAttempts--) { + try { + const putStart = Date.now(); + await r2.put(key, value); + console.log(`[r2.put] ${key} ${Date.now() - putStart}ms`); + return Response.json({ success: true }, { status: 200 }); + } catch (e) { + if (remainingAttempts > 0) { + console.error( + `Write to R2 failed for key "${key}", retrying... (${remainingAttempts} attempts left)`, + e + ); + await new Promise((resolve) => + setTimeout(resolve, RETRY_DELAY_MS * Math.pow(1.2, MAX_RETRIES - 1 - remainingAttempts)) + ); + continue; + } + console.error(`Failed to write key "${key}" to R2 after ${MAX_RETRIES} attempts:`, e); + const detail = e instanceof Error ? e.message : String(e); + return Response.json( + { + success: false, + error: `Failed to write key "${key}" after ${MAX_RETRIES} attempts: ${detail}`, + code: ERR_WRITE_FAILED, + } satisfies R2ErrorResponse, + { status: 500 } + ); + } + } + + throw new Error("Unreachable"); + }, +};