Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/persistent-watch-state.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@cloudflare/sandbox': patch
---

Add persistent file watch state for hibernating Durable Object workflows.
Use `ensureWatch()`, `getWatchState()`, `checkpointWatch()`, and `stopWatch()` to keep a watch alive without holding an SSE stream open.
282 changes: 257 additions & 25 deletions packages/sandbox-container/src/handlers/watch-handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { posix as pathPosix } from 'node:path';
import type { Logger, WatchRequest } from '@repo/shared';
import type {
Logger,
WatchCheckpointRequest,
WatchCheckpointResult,
WatchEnsureResult,
WatchRequest,
WatchStateResult,
WatchStopResult
} from '@repo/shared';
import { ErrorCode } from '@repo/shared/errors';
import { CONFIG } from '../config';
import type { RequestContext } from '../core/types';
Expand All @@ -9,7 +17,7 @@ import { BaseHandler } from './base-handler';
const WORKSPACE_ROOT = CONFIG.DEFAULT_CWD;

/**
* Handler for file watch operations
* Handler for file watch operations.
*/
export class WatchHandler extends BaseHandler<Request, Response> {
constructor(
Expand All @@ -22,28 +30,228 @@ export class WatchHandler extends BaseHandler<Request, Response> {
async handle(request: Request, context: RequestContext): Promise<Response> {
const pathname = new URL(request.url).pathname;

if (pathname === '/api/watch') {
if (pathname === '/api/watch' && request.method === 'POST') {
return this.handleWatch(request, context);
}

if (pathname === '/api/watch/ensure' && request.method === 'POST') {
return this.handleEnsureWatch(request, context);
}

if (pathname.startsWith('/api/watch/')) {
const segments = pathname.split('/');
const watchId = segments[3];
const action = segments[4];

if (!watchId) {
return this.createErrorResponse(
{
message: 'Watch ID is required',
code: ErrorCode.VALIDATION_FAILED
},
context
);
}

if (!action && request.method === 'GET') {
return this.handleGetWatchState(context, watchId);
}

if (!action && request.method === 'DELETE') {
return this.handleStopWatch(request, context, watchId);
}

if (action === 'checkpoint' && request.method === 'POST') {
return this.handleCheckpointWatch(request, context, watchId);
}
}

return this.createErrorResponse(
{
message: 'Invalid watch endpoint',
code: ErrorCode.VALIDATION_FAILED,
details: { pathname }
details: { pathname, method: request.method }
},
context
);
}

/**
* Start watching a directory.
* Returns an SSE stream of file change events.
*/
private async handleWatch(
request: Request,
context: RequestContext
): Promise<Response> {
const normalizedRequest = await this.parseAndNormalizeWatchRequest(
request,
context
);
if (normalizedRequest instanceof Response) {
return normalizedRequest;
}

const result = await this.watchService.watchDirectory(
normalizedRequest.path,
normalizedRequest
);

if (!result.success) {
return this.createErrorResponse(result.error, context);
}

return new Response(result.data, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
...context.corsHeaders
}
});
}

private async handleEnsureWatch(
request: Request,
context: RequestContext
): Promise<Response> {
const normalizedRequest = await this.parseAndNormalizeWatchRequest(
request,
context
);
if (normalizedRequest instanceof Response) {
return normalizedRequest;
}

const result = await this.watchService.ensureWatch(
normalizedRequest.path,
normalizedRequest
);

if (!result.success) {
return this.createErrorResponse(result.error, context);
}

const response: WatchEnsureResult = {
success: true,
watch: result.data.watch,
leaseToken: result.data.leaseToken,
timestamp: new Date().toISOString()
};

return this.createTypedResponse(response, context);
}

private async handleGetWatchState(
context: RequestContext,
watchId: string
): Promise<Response> {
const result = await this.watchService.getWatchState(watchId);
if (!result.success) {
return this.createErrorResponse(result.error, context);
}

const response: WatchStateResult = {
success: true,
watch: result.data,
timestamp: new Date().toISOString()
};

return this.createTypedResponse(response, context);
}

private async handleCheckpointWatch(
request: Request,
context: RequestContext,
watchId: string
): Promise<Response> {
let body: WatchCheckpointRequest;
try {
body = await this.parseRequestBody<WatchCheckpointRequest>(request);
} catch (error) {
return this.createErrorResponse(
{
message:
error instanceof Error ? error.message : 'Invalid request body',
code: ErrorCode.VALIDATION_FAILED
},
context
);
}

if (!Number.isInteger(body.cursor) || body.cursor < 0) {
return this.createErrorResponse(
{
message: 'cursor must be a non-negative integer',
code: ErrorCode.VALIDATION_FAILED,
details: { cursor: body.cursor }
},
context
);
}

if (body.leaseToken === undefined) {
return this.createErrorResponse(
{
message: 'leaseToken is required',
code: ErrorCode.VALIDATION_FAILED
},
context
);
}

const leaseTokenError = this.validateToken('leaseToken', body.leaseToken);
if (leaseTokenError) {
return this.createErrorResponse(leaseTokenError, context);
}

const result = await this.watchService.checkpointWatch(
watchId,
body.cursor,
body.leaseToken
);
if (!result.success) {
return this.createErrorResponse(result.error, context);
}

const response: WatchCheckpointResult = {
success: true,
checkpointed: result.data.checkpointed,
watch: result.data.watch,
timestamp: new Date().toISOString()
};

return this.createTypedResponse(response, context);
}

private async handleStopWatch(
request: Request,
context: RequestContext,
watchId: string
): Promise<Response> {
const leaseToken =
this.extractQueryParam(request, 'leaseToken') ?? undefined;
if (leaseToken !== undefined) {
const leaseTokenError = this.validateToken('leaseToken', leaseToken);
if (leaseTokenError) {
return this.createErrorResponse(leaseTokenError, context);
}
}

const result = await this.watchService.stopWatch(watchId, leaseToken);
if (!result.success) {
return this.createErrorResponse(result.error, context);
}

const response: WatchStopResult = {
success: true,
watchId,
timestamp: new Date().toISOString()
};

return this.createTypedResponse(response, context);
}

private async parseAndNormalizeWatchRequest(
request: Request,
context: RequestContext
): Promise<WatchRequest | Response> {
let body: WatchRequest;
try {
body = await this.parseRequestBody<WatchRequest>(request);
Expand All @@ -68,23 +276,10 @@ export class WatchHandler extends BaseHandler<Request, Response> {
return this.createErrorResponse(pathResult.error, context);
}

const result = await this.watchService.watchDirectory(pathResult.path, {
return {
...body,
path: pathResult.path
});

if (result.success) {
return new Response(result.data, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
...context.corsHeaders
}
});
}

return this.createErrorResponse(result.error, context);
};
}

private validateWatchBody(body: WatchRequest): {
Expand Down Expand Up @@ -160,6 +355,45 @@ export class WatchHandler extends BaseHandler<Request, Response> {
};
}

const resumeTokenError = this.validateToken(
'resumeToken',
body.resumeToken
);
if (resumeTokenError) {
return resumeTokenError;
}

return null;
}

private validateToken(
tokenName: 'leaseToken' | 'resumeToken',
token: unknown
): {
message: string;
code: string;
details?: Record<string, unknown>;
} | null {
if (token === undefined) {
return null;
}

if (typeof token !== 'string' || token.trim() === '') {
return {
message: `${tokenName} must be a non-empty string when provided`,
code: ErrorCode.VALIDATION_FAILED,
details: { [tokenName]: token }
};
}

if (token.includes('\0')) {
return {
message: `${tokenName} contains invalid null bytes`,
code: ErrorCode.VALIDATION_FAILED,
details: { [tokenName]: token }
};
}

return null;
}

Expand All @@ -177,8 +411,6 @@ export class WatchHandler extends BaseHandler<Request, Response> {
return null;
}

// Supported syntax is intentionally narrow: *, **, ? and path separators.
// Character classes and brace expansion are rejected so behavior is explicit.
const unsupportedTokens = /[[\]{}]/;

for (const pattern of patterns) {
Expand Down
28 changes: 28 additions & 0 deletions packages/sandbox-container/src/routes/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,34 @@ export function setupRoutes(router: Router, container: Container): void {
middleware: [container.get('loggingMiddleware')]
});

router.register({
method: 'POST',
path: '/api/watch/ensure',
handler: async (req, ctx) => container.get('watchHandler').handle(req, ctx),
middleware: [container.get('loggingMiddleware')]
});

router.register({
method: 'GET',
path: '/api/watch/{id}',
handler: async (req, ctx) => container.get('watchHandler').handle(req, ctx),
middleware: [container.get('loggingMiddleware')]
});

router.register({
method: 'POST',
path: '/api/watch/{id}/checkpoint',
handler: async (req, ctx) => container.get('watchHandler').handle(req, ctx),
middleware: [container.get('loggingMiddleware')]
});

router.register({
method: 'DELETE',
path: '/api/watch/{id}',
handler: async (req, ctx) => container.get('watchHandler').handle(req, ctx),
middleware: [container.get('loggingMiddleware')]
});

// Miscellaneous routes
router.register({
method: 'GET',
Expand Down
Loading
Loading