Skip to content

Commit 7d41b3b

Browse files
committed
fix(runtime): support external storage for large session blobs
1 parent 20fcf5e commit 7d41b3b

9 files changed

Lines changed: 2109 additions & 2120 deletions

packages/cli/src/lib/build-plugin-cloudflare.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ import {
156156
CLOUDFLARE_AGENT_INTERNAL_DISPATCH_PATH,
157157
CLOUDFLARE_WORKFLOW_INTERNAL_METADATA_PATH,
158158
createCloudflareAgentRuntime,
159+
createR2SessionAttachmentStore,
159160
createSqlSessionStore,
160161
SqliteEventStreamStore,
161162
bashFactoryToSessionEnv,
@@ -206,6 +207,7 @@ if (!hasRegisteredProvider('cloudflare')) {
206207
const skills = {};
207208
const packagedSkills = ${packagedSkillsValue};
208209
const systemPrompt = '';
210+
const sessionAttachmentStore = createR2SessionAttachmentStore(env.FLUE_SESSION_ATTACHMENTS);
209211
210212
${builtModuleNormalizationSource}
211213
const agentModules = {
@@ -348,7 +350,7 @@ function createAgentContextForRequest(executionStore, id, payload, doInstance, r
348350
349351
function createWorkflowContextForRequest(id, runId, payload, doInstance, req, initialEventIndex, dispatchId) {
350352
const storage = doInstance?.ctx?.storage;
351-
const defaultStore = storage?.sql ? createSqlSessionStore(storage.sql, storage.transactionSync?.bind(storage)) : memoryWorkflowSessionStore;
353+
const defaultStore = storage?.sql ? createSqlSessionStore(storage.sql, storage.transactionSync?.bind(storage), { attachmentStore: sessionAttachmentStore }) : memoryWorkflowSessionStore;
352354
return createContextForRequest(id, runId, payload, doInstance, req, defaultStore, initialEventIndex, dispatchId);
353355
}
354356
@@ -403,6 +405,7 @@ function createEventStreamStoreForInstance(doInstance) {
403405
404406
const cloudflareAgents = createCloudflareAgentRuntime({
405407
createdAgents,
408+
sessionStoreOptions: { attachmentStore: sessionAttachmentStore },
406409
createContext: ({ executionStore, instance, payload, request, initialEventIndex, dispatchId }) =>
407410
createAgentContextForRequest(executionStore, instance.name, payload, instance, request, initialEventIndex, dispatchId),
408411
runWithInstanceContext: (instance, agentName, fn) => runWithInstanceContext(instance, agentRuntimeIdentity(agentName), fn),

packages/runtime/src/cloudflare/agent-coordinator.ts

Lines changed: 76 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
import type { AgentExecutionStore, AgentSubmission, AgentSubmissionStore } from '../agent-execution-store.ts';
1+
import type {
2+
AgentExecutionStore,
3+
AgentSubmission,
4+
AgentSubmissionStore,
5+
} from '../agent-execution-store.ts';
26
import type { FlueContextInternal } from '../client.ts';
37
import {
48
createAgentSubmissionObserverRegistry,
@@ -22,6 +26,7 @@ const FLUE_AGENT_SUBMISSION_ATTEMPT_STALE_MS = 15 * 60 * 1000;
2226
const FLUE_AGENT_SUBMISSION_ATTEMPT_FIBER = 'flue:submission-attempt';
2327

2428
import type { SqlStorage } from '../sql-storage.ts';
29+
import type { SqlSessionStoreOptions } from '../sql-agent-execution-store.ts';
2530

2631
interface CloudflareAgentStorage {
2732
sql?: SqlStorage;
@@ -73,7 +78,10 @@ interface CloudflareAgentRuntimeOptions {
7378
agentName: string,
7479
callback: () => T,
7580
) => T;
76-
readonly createEventStreamStore: (instance: CloudflareAgentInstance) => import('../runtime/event-stream-store.ts').EventStreamStore;
81+
readonly createEventStreamStore: (
82+
instance: CloudflareAgentInstance,
83+
) => import('../runtime/event-stream-store.ts').EventStreamStore;
84+
readonly sessionStoreOptions?: SqlSessionStoreOptions;
7785
}
7886

7987
export interface CloudflareAgentRuntime {
@@ -96,7 +104,9 @@ export interface CloudflareAgentRuntime {
96104
): Promise<unknown>;
97105
}
98106

99-
export function createCloudflareAgentRuntime(options: CloudflareAgentRuntimeOptions): CloudflareAgentRuntime {
107+
export function createCloudflareAgentRuntime(
108+
options: CloudflareAgentRuntimeOptions,
109+
): CloudflareAgentRuntime {
100110
const coordinators = new WeakMap<CloudflareAgentInstance, CloudflareAgentCoordinator>();
101111
const observers = createAgentSubmissionObserverRegistry();
102112
const activeAttempts = new Set<string>();
@@ -113,7 +123,11 @@ export function createCloudflareAgentRuntime(options: CloudflareAgentRuntimeOpti
113123
prepare({ storage, className, agentName }) {
114124
return {
115125
agentName,
116-
executionStore: createSqlAgentExecutionStore(storage, className),
126+
executionStore: createSqlAgentExecutionStore(
127+
storage,
128+
className,
129+
options.sessionStoreOptions,
130+
),
117131
};
118132
},
119133
attach(instance, prepared) {
@@ -184,7 +198,8 @@ class CloudflareAgentCoordinator {
184198
id: this.instance.name,
185199
agentName: this.agentName,
186200
eventStreamStore: this.eventStreamStore,
187-
admitAttachedSubmission: (payload, onEvent, waitForResult) => this.admitAttachedSubmission(payload, onEvent, waitForResult),
201+
admitAttachedSubmission: (payload, onEvent, waitForResult) =>
202+
this.admitAttachedSubmission(payload, onEvent, waitForResult),
188203
}),
189204
);
190205
}
@@ -242,7 +257,9 @@ class CloudflareAgentCoordinator {
242257
}
243258
}
244259

245-
private armSubmissionWake(options: { delaySeconds?: number; idempotent?: boolean } = {}): Promise<unknown> {
260+
private armSubmissionWake(
261+
options: { delaySeconds?: number; idempotent?: boolean } = {},
262+
): Promise<unknown> {
246263
this.assertAgentsDurabilityApi('schedule');
247264
return this.instance.schedule(
248265
options.delaySeconds ?? FLUE_AGENT_SUBMISSION_WAKE_SECONDS,
@@ -258,7 +275,9 @@ class CloudflareAgentCoordinator {
258275
return true;
259276
}
260277

261-
private async reconcileSubmissions(options: { driverAlreadyArmed?: boolean } = {}): Promise<boolean> {
278+
private async reconcileSubmissions(
279+
options: { driverAlreadyArmed?: boolean } = {},
280+
): Promise<boolean> {
262281
if (!(await this.submissions.hasUnsettledSubmissions())) return false;
263282
if (!options.driverAlreadyArmed) await this.restoreSubmissionWake();
264283
try {
@@ -277,16 +296,16 @@ class CloudflareAgentCoordinator {
277296
}
278297
}
279298
for (const submission of await this.submissions.listRunnableSubmissions()) {
280-
// Cloudflare DOs are single-threaded per instance — leases are
281-
// advisory-only. Set to 0 so reconciliation never misidentifies
282-
// an active submission as expired. The Node coordinator uses real
283-
// lease expiry with heartbeat renewal for multi-process safety.
284-
const claimed = await this.submissions.claimSubmission({
285-
submissionId: submission.submissionId,
286-
attemptId: crypto.randomUUID(),
287-
ownerId: this.instance.ctx.id.toString(),
288-
leaseExpiresAt: 0,
289-
});
299+
// Cloudflare DOs are single-threaded per instance — leases are
300+
// advisory-only. Set to 0 so reconciliation never misidentifies
301+
// an active submission as expired. The Node coordinator uses real
302+
// lease expiry with heartbeat renewal for multi-process safety.
303+
const claimed = await this.submissions.claimSubmission({
304+
submissionId: submission.submissionId,
305+
attemptId: crypto.randomUUID(),
306+
ownerId: this.instance.ctx.id.toString(),
307+
leaseExpiresAt: 0,
308+
});
290309
if (!claimed) continue;
291310
try {
292311
this.startSubmissionAttempt(claimed);
@@ -339,7 +358,12 @@ class CloudflareAgentCoordinator {
339358
submission,
340359
agent,
341360
(payload, dispatchId) =>
342-
this.createContext(payload, submissionSyntheticRequest(submission.input), undefined, dispatchId),
361+
this.createContext(
362+
payload,
363+
submissionSyntheticRequest(submission.input),
364+
undefined,
365+
dispatchId,
366+
),
343367
{ ownerId: this.instance.ctx.id.toString(), leaseExpiresAt: 0 },
344368
),
345369
);
@@ -400,24 +424,32 @@ class CloudflareAgentCoordinator {
400424
if (!rows) throw new Error('[flue] Cloudflare durable agent SQL storage is unavailable.');
401425
for (const row of rows) {
402426
if (typeof row.created_at !== 'number') {
403-
console.warn('[flue:submission-reconciliation] Skipping attempt marker with non-numeric created_at.');
427+
console.warn(
428+
'[flue:submission-reconciliation] Skipping attempt marker with non-numeric created_at.',
429+
);
404430
continue;
405431
}
406432
if (Date.now() - row.created_at > FLUE_AGENT_SUBMISSION_ATTEMPT_STALE_MS) continue;
407433
if (row.snapshot === null) continue;
408434
if (typeof row.snapshot !== 'string') {
409-
console.warn('[flue:submission-reconciliation] Skipping attempt marker with non-string snapshot.');
435+
console.warn(
436+
'[flue:submission-reconciliation] Skipping attempt marker with non-string snapshot.',
437+
);
410438
continue;
411439
}
412440
let snapshot: unknown;
413441
try {
414442
snapshot = JSON.parse(row.snapshot);
415443
} catch {
416-
console.warn('[flue:submission-reconciliation] Skipping attempt marker with unparseable snapshot.');
444+
console.warn(
445+
'[flue:submission-reconciliation] Skipping attempt marker with unparseable snapshot.',
446+
);
417447
continue;
418448
}
419449
if (!isAttemptMarkerSnapshot(snapshot)) {
420-
console.warn('[flue:submission-reconciliation] Skipping attempt marker with invalid snapshot shape.');
450+
console.warn(
451+
'[flue:submission-reconciliation] Skipping attempt marker with invalid snapshot shape.',
452+
);
421453
continue;
422454
}
423455
keys.add(`${snapshot.submissionId}:${snapshot.attemptId}`);
@@ -439,7 +471,12 @@ class CloudflareAgentCoordinator {
439471
return agent;
440472
},
441473
createContext: (payload, dispatchId) => {
442-
const ctx = this.createContext(payload, submissionSyntheticRequest(submission.input), undefined, dispatchId);
474+
const ctx = this.createContext(
475+
payload,
476+
submissionSyntheticRequest(submission.input),
477+
undefined,
478+
dispatchId,
479+
);
443480
const streamPath = agentStreamPath(this.agentName, this.instance.name);
444481
ctx.subscribeEvent((event) => {
445482
eventStreamStore.appendEvent(streamPath, event).catch((error) => {
@@ -472,7 +509,11 @@ class CloudflareAgentCoordinator {
472509
onEvent?: (event: AttachedAgentEvent) => Promise<void> | void,
473510
waitForResult = true,
474511
): Promise<unknown> {
475-
const input = createDirectAgentSubmissionInput({ agent: this.agentName, id: this.instance.name, payload });
512+
const input = createDirectAgentSubmissionInput({
513+
agent: this.agentName,
514+
id: this.instance.name,
515+
payload,
516+
});
476517
const attachment = this.observers.attach(input.submissionId, { onEvent });
477518
try {
478519
await this.armSubmissionWake();
@@ -512,12 +553,16 @@ class CloudflareAgentCoordinator {
512553
return new Response('Conflicting internal dispatch replay.', { status: 409 });
513554
}
514555
await this.reconcileSubmissions({ driverAlreadyArmed: true });
515-
return Response.json({ dispatchId: admission.submission.submissionId, acceptedAt: input.acceptedAt });
556+
return Response.json({
557+
dispatchId: admission.submission.submissionId,
558+
acceptedAt: input.acceptedAt,
559+
});
516560
}
517-
518561
}
519562

520-
function isAttemptMarkerSnapshot(value: unknown): value is { submissionId: string; attemptId: string } {
563+
function isAttemptMarkerSnapshot(
564+
value: unknown,
565+
): value is { submissionId: string; attemptId: string } {
521566
if (!value || typeof value !== 'object') return false;
522567
const snapshot = value as Record<string, unknown>;
523568
return typeof snapshot.submissionId === 'string' && typeof snapshot.attemptId === 'string';
@@ -528,7 +573,8 @@ function submissionAttemptMarkerKey(submission: AgentSubmission): string {
528573
}
529574

530575
function isInternalDispatchRequest(request: Request): boolean {
531-
return request.method === 'POST' && new URL(request.url).pathname === CLOUDFLARE_AGENT_INTERNAL_DISPATCH_PATH;
576+
return (
577+
request.method === 'POST' &&
578+
new URL(request.url).pathname === CLOUDFLARE_AGENT_INTERNAL_DISPATCH_PATH
579+
);
532580
}
533-
534-

packages/runtime/src/cloudflare/agent-execution-store.ts

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import {
44
createSqlAgentExecutionStoreFromSql,
55
ensureSqlAgentExecutionTables,
66
ensureSessionTable,
7+
type SessionAttachmentStore,
8+
type SqlSessionStoreOptions,
79
SqlSessionStore,
810
} from '../sql-agent-execution-store.ts';
911
import type { SessionStore } from '../types.ts';
@@ -13,17 +15,47 @@ interface DurableObjectStorage {
1315
transactionSync?<T>(closure: () => T): T;
1416
}
1517

18+
interface R2BucketLike {
19+
put(key: string, value: string, options?: unknown): Promise<unknown>;
20+
get(key: string): Promise<{ text(): Promise<string> } | null>;
21+
delete(key: string): Promise<unknown>;
22+
}
23+
1624
export function createSqlSessionStore(
1725
sql: SqlStorage,
1826
transactionSync?: <T>(closure: () => T) => T,
27+
options: SqlSessionStoreOptions = {},
1928
): SessionStore {
2029
ensureSessionTable(sql);
21-
return new SqlSessionStore(sql, transactionSync);
30+
return new SqlSessionStore(sql, transactionSync, options);
31+
}
32+
33+
export function createR2SessionAttachmentStore(
34+
bucket: R2BucketLike | null | undefined,
35+
): SessionAttachmentStore | undefined {
36+
if (!bucket) return undefined;
37+
return {
38+
async put(key: string, data: string): Promise<void> {
39+
await bucket.put(key, data, {
40+
httpMetadata: { contentType: 'text/plain; charset=utf-8' },
41+
customMetadata: { flue: 'session-attachment' },
42+
});
43+
},
44+
async get(key: string): Promise<string> {
45+
const object = await bucket.get(key);
46+
if (!object) throw new Error('[flue] Persisted session attachment object is missing.');
47+
return object.text();
48+
},
49+
async delete(key: string): Promise<void> {
50+
await bucket.delete(key);
51+
},
52+
};
2253
}
2354

2455
export function createSqlAgentExecutionStore(
2556
storage: DurableObjectStorage | undefined,
2657
className: string,
58+
options: SqlSessionStoreOptions = {},
2759
): AgentExecutionStore {
2860
const sql = storage?.sql;
2961
const transactionSync = storage?.transactionSync;
@@ -38,7 +70,7 @@ export function createSqlAgentExecutionStore(
3870
try {
3971
ensureSqlAgentExecutionTables(sql);
4072
const runTransaction = <T>(closure: () => T): T => transactionSync.call(storage, closure) as T;
41-
return createSqlAgentExecutionStoreFromSql(sql, runTransaction);
73+
return createSqlAgentExecutionStoreFromSql(sql, runTransaction, options);
4274
} catch (cause) {
4375
const detail = cause instanceof Error ? cause.message : String(cause);
4476
throw new Error(

packages/runtime/src/internal.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ export {
2626
createCloudflareAgentRuntime,
2727
} from './cloudflare/agent-coordinator.ts';
2828
export { CLOUDFLARE_WORKFLOW_INTERNAL_METADATA_PATH } from './runtime/flue-app.ts';
29-
export { createSqlSessionStore } from './cloudflare/agent-execution-store.ts';
29+
export {
30+
createR2SessionAttachmentStore,
31+
createSqlSessionStore,
32+
} from './cloudflare/agent-execution-store.ts';
3033
export { createDurableRunStore } from './cloudflare/run-store.ts';
3134
export { createNodeAgentCoordinator, createNodeDispatchQueue } from './node/agent-coordinator.ts';
3235
export { InMemoryRunStore } from './node/run-store.ts';
@@ -95,7 +98,10 @@ export { SqliteEventStreamStore } from './runtime/event-stream-store.ts';
9598
export type { RunRecord, RunStatus, RunStore } from './runtime/run-store.ts';
9699

97100
export { bashFactoryToSessionEnv } from './sandbox.ts';
98-
export type { DirectAgentSubmissionInput, DispatchAgentSubmissionInput } from './runtime/agent-submissions.ts';
101+
export type {
102+
DirectAgentSubmissionInput,
103+
DispatchAgentSubmissionInput,
104+
} from './runtime/agent-submissions.ts';
99105
export { InMemorySessionStore } from './session.ts';
100106
export { parseSkillMarkdown } from './skill-frontmatter.ts';
101107

0 commit comments

Comments
 (0)