Skip to content
7 changes: 7 additions & 0 deletions .changeset/fix-r2-cache-upload.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@opennextjs/cloudflare": minor
---

Use remote R2 binding for cache population

Using remote binding is not subject the Cloudflare API rate limit of 1,200 requests per 5 minutes that caused failures for large applications with thousands of prerendered pages.
253 changes: 253 additions & 0 deletions packages/cloudflare/scripts/debug-r2-memory.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/**
* Standalone script to reproduce the OOM issue when populating R2 via unstable_startWorker.
*
* This mirrors the exact pattern used by populateR2IncrementalCache in populate-cache.ts:
* - Starts a local worker with a remote R2 binding via unstable_startWorker
* - Sends concurrent POST requests with FormData (key + value) to the worker
* - The worker writes each entry to R2
*
* Memory usage is logged every 50 completed entries to detect leaks.
*
* Usage:
* pnpm build
* node --expose-gc packages/cloudflare/scripts/debug-r2-memory.mjs [bucket-name]
*
* bucket-name defaults to "cache".
*/

import os from "node:os";
import path from "node:path";
import { fileURLToPath } from "node:url";

import { unstable_startWorker } from "wrangler";

// --- Configuration ---

const TOTAL_ENTRIES = 10_000;
const CONCURRENCY = 1;
const VALUE = "0".repeat(50_000);
const MEMORY_LOG_INTERVAL = 50;

const bucketName = process.argv[2] ?? "cache";

// --- Memory tracking ---

/** @type {Array<{ entries: number; rss: number; heapUsed: number; heapTotal: number; external: number; arrayBuffers: number; sysFree: number }>} */
const memorySnapshots = [];

/**
* Forces a GC cycle (if --expose-gc was used) and records a memory snapshot.
*
* @param {number} entriesCompleted - Number of entries completed so far.
*/
function logMemory(entriesCompleted) {
if (globalThis.gc) {
globalThis.gc();
}

const mem = process.memoryUsage();
const sysFree = os.freemem();
const snapshot = {
entries: entriesCompleted,
rss: Math.round(mem.rss / 1024 / 1024),
heapUsed: Math.round(mem.heapUsed / 1024 / 1024),
heapTotal: Math.round(mem.heapTotal / 1024 / 1024),
external: Math.round(mem.external / 1024 / 1024),
arrayBuffers: Math.round(mem.arrayBuffers / 1024 / 1024),
sysFree: Math.round(sysFree / 1024 / 1024),
};
memorySnapshots.push(snapshot);

console.log(
`[${String(entriesCompleted).padStart(5)}/${TOTAL_ENTRIES}] ` +
`RSS=${String(snapshot.rss).padStart(5)}MB ` +
`heap=${String(snapshot.heapUsed).padStart(5)}MB ` +
`external=${String(snapshot.external).padStart(5)}MB ` +
`arrayBuffers=${String(snapshot.arrayBuffers).padStart(5)}MB ` +
`sysFree=${String(snapshot.sysFree).padStart(6)}MB`
);
}

/**
* Prints a summary table of all memory snapshots.
*/
function printSummary() {
console.log("\n=== Memory Summary ===\n");
const header =
"Entries".padStart(7) +
" " +
"RSS(MB)".padStart(8) +
" " +
"Heap(MB)".padStart(9) +
" " +
"External(MB)".padStart(13) +
" " +
"ArrBuf(MB)".padStart(11) +
" " +
"SysFree(MB)".padStart(12);
console.log(header);
console.log("-".repeat(header.length));

for (const s of memorySnapshots) {
console.log(
String(s.entries).padStart(7) +
" " +
String(s.rss).padStart(8) +
" " +
String(s.heapUsed).padStart(9) +
" " +
String(s.external).padStart(13) +
" " +
String(s.arrayBuffers).padStart(11) +
" " +
String(s.sysFree).padStart(12)
);
}

if (memorySnapshots.length >= 2) {
const first = memorySnapshots[0];
const last = memorySnapshots[memorySnapshots.length - 1];
console.log("-".repeat(header.length));
console.log(
"Delta".padStart(7) +
" " +
String(last.rss - first.rss).padStart(7) +
" " +
String(last.heapUsed - first.heapUsed).padStart(8) +
" " +
String(last.external - first.external).padStart(12) +
" " +
String(last.arrayBuffers - first.arrayBuffers).padStart(10) +
" " +
String(last.sysFree - first.sysFree).padStart(11)
);
}
}

// --- Send a single entry to the worker (mirrors sendEntryToR2Worker) ---

/**
* Sends a single cache entry to the R2 worker via POST /populate with FormData.
*
* @param {string} workerUrl - The URL of the worker's /populate endpoint.
* @param {string} key - The R2 object key.
* @param {string} value - The value to store.
* @throws {Error} If the worker reports a failure.
*/
async function sendEntry(workerUrl, key, value) {
const formData = new FormData();
formData.set("key", key);
formData.set("value", value);

const start = performance.now();
const response = await fetch(workerUrl, {
method: "POST",
body: formData,
});

const result = await response.json();
const elapsed = Math.round(performance.now() - start);
console.log(`[request] ${key} ${elapsed}ms`);

if (!result.success) {
throw new Error(`Failed to write "${key}": ${result.error}`);
}
}

// --- Main ---

async function main() {
console.log(`Bucket: ${bucketName}`);
console.log(`Entries: ${TOTAL_ENTRIES}`);
console.log(`Concurrency: ${CONCURRENCY}`);
console.log(`Value size: ${VALUE.length} chars`);
console.log(
`System RAM: ${Math.round(os.totalmem() / 1024 / 1024)}MB (${Math.round(os.freemem() / 1024 / 1024)}MB free)`
);
console.log(`GC exposed: ${!!globalThis.gc}`);
console.log();

if (!globalThis.gc) {
console.warn("WARNING: --expose-gc not set. Memory snapshots will be less accurate.\n");
}

// Resolve the worker entrypoint (the compiled r2-cache.js)
const currentDir = path.dirname(fileURLToPath(import.meta.url));
const handlerPath = path.join(currentDir, "../dist/cli/workers/r2-cache.js");

console.log(`Starting worker with remote R2 binding (bucket: ${bucketName})...`);

const worker = await unstable_startWorker({
name: "debug-r2-memory",
entrypoint: handlerPath,
compatibilityDate: "2026-01-01",
bindings: {
R2: {
type: "r2_bucket",
bucket_name: bucketName,
remote: true,
},
},
dev: {
server: { port: 0 },
inspector: false,
watch: false,
liveReload: false,
logLevel: "debug",
},
});

try {
await worker.ready;
const baseUrl = await worker.url;
const workerUrl = new URL("/populate", baseUrl).href;

console.log(`Worker ready at ${baseUrl}\n`);

// Log initial memory before sending anything.
logMemory(0);

// Concurrency-limited send loop (mirrors sendEntriesToR2Worker).
let completed = 0;
const pending = new Set();

for (let i = 0; i < TOTAL_ENTRIES; i++) {
// If we've reached the concurrency limit, wait for one to finish.
if (pending.size >= CONCURRENCY) {
await Promise.race(pending);
}

const key = `key-${i}`;
const task = sendEntry(workerUrl, key, VALUE)
.then(() => {
completed++;

// Log memory every MEMORY_LOG_INTERVAL completed entries.
if (completed % MEMORY_LOG_INTERVAL === 0) {
logMemory(completed);
}
})
.finally(() => pending.delete(task));
pending.add(task);
}

// Wait for all remaining in-flight requests.
await Promise.all(pending);

// Final memory snapshot if not already logged at TOTAL_ENTRIES.
if (completed % MEMORY_LOG_INTERVAL !== 0) {
logMemory(completed);
}

console.log(`\nAll ${TOTAL_ENTRIES} entries written successfully.`);
printSummary();
} finally {
await worker.dispose();
console.log("\nWorker disposed.");
}
}

main().catch((err) => {
console.error("Fatal error:", err);
process.exit(1);
});
95 changes: 53 additions & 42 deletions packages/cloudflare/src/cli/commands/populate-cache.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import EventEmitter from "node:events";
import { mkdirSync, writeFileSync } from "node:fs";
import path from "node:path";

Expand Down Expand Up @@ -78,6 +79,21 @@ vi.mock("./utils/helpers.js", () => ({
quoteShellMeta: vi.fn((s) => s),
}));

const mockWorkerFetch = vi.fn();
const mockWorkerDispose = vi.fn();

vi.mock("wrangler", () => ({
unstable_startWorker: vi.fn(() =>
Promise.resolve({
ready: Promise.resolve(),
url: Promise.resolve(new URL("http://localhost:12345")),
fetch: mockWorkerFetch,
dispose: mockWorkerDispose,
raw: new EventEmitter(),
})
),
}));

describe("populateCache", () => {
const setupMockFileSystem = () => {
mockFs({
Expand All @@ -100,13 +116,19 @@ describe("populateCache", () => {
({ target }) => {
afterEach(() => {
mockFs.restore();
vi.clearAllMocks();
});

test(target, async () => {
const { runWrangler } = await import("./utils/run-wrangler.js");

test(`${target} - starts worker and sends individual cache entries via FormData`, async () => {
setupMockFileSystem();
vi.mocked(runWrangler).mockClear();

// Mock fetch to return a successful response for each individual entry.
global.fetch = vi.fn().mockResolvedValue(
new Response(JSON.stringify({ success: true }), {
status: 200,
headers: { "Content-Type": "application/json" },
})
);

await populateCache(
{
Expand All @@ -131,48 +153,37 @@ describe("populateCache", () => {
{} as any // eslint-disable-line @typescript-eslint/no-explicit-any
);

expect(runWrangler).toHaveBeenCalledWith(
expect.anything(),
expect.arrayContaining(["r2 bulk put", "test-bucket"]),
expect.objectContaining({ target })
);
});

test(`${target} using jurisdiction`, async () => {
const { runWrangler } = await import("./utils/run-wrangler.js");

setupMockFileSystem();
vi.mocked(runWrangler).mockClear();

await populateCache(
{
outputDir: "/test/output",
} as BuildOptions,
{
default: {
override: {
incrementalCache: "cf-r2-incremental-cache",
},
},
} as any, // eslint-disable-line @typescript-eslint/no-explicit-any
{
r2_buckets: [
{
binding: "NEXT_INC_CACHE_R2_BUCKET",
// Verify the worker was started with the correct R2 binding configuration.
const { unstable_startWorker: startWorker } = await import("wrangler");
expect(startWorker).toHaveBeenCalledWith(
expect.objectContaining({
name: "open-next-cache-populate",
compatibilityDate: "2026-01-01",
bindings: expect.objectContaining({
R2: expect.objectContaining({
type: "r2_bucket",
bucket_name: "test-bucket",
jurisdiction: "eu",
},
],
} as any, // eslint-disable-line @typescript-eslint/no-explicit-any
{ target, shouldUsePreviewId: false },
{} as any // eslint-disable-line @typescript-eslint/no-explicit-any
remote: target === "remote",
}),
}),
})
);

expect(runWrangler).toHaveBeenCalledWith(
expect.anything(),
expect.arrayContaining(["r2 bulk put", "test-bucket", "--jurisdiction eu"]),
expect.objectContaining({ target })
// Each cache entry should be sent as an individual POST request with FormData.
expect(global.fetch).toHaveBeenCalledWith(
"http://localhost:12345/populate",
expect.objectContaining({ method: "POST" })
);

// Verify the body is FormData containing key and value fields.
const fetchCall = (global.fetch as ReturnType<typeof vi.fn>).mock.calls[0];
const body = fetchCall[1].body;
expect(body).toBeInstanceOf(FormData);
expect(body.get("key")).toBeTypeOf("string");
expect(body.get("value")).toBeTypeOf("string");

// Verify worker was disposed after sending entries.
expect(mockWorkerDispose).toHaveBeenCalled();
});
}
);
Expand Down
Loading
Loading