diff --git a/.changeset/persistent-watch-state.md b/.changeset/persistent-watch-state.md new file mode 100644 index 000000000..391cda4aa --- /dev/null +++ b/.changeset/persistent-watch-state.md @@ -0,0 +1,7 @@ +--- +'@cloudflare/sandbox': patch +--- + +Add `checkChanges()` for apps that disconnect and reconnect later but still need to know whether files changed in the meantime. + +Use the returned `version` in a later call to learn whether a path is unchanged, changed, or needs a full resync. Retained change state lasts for the current container lifetime only. diff --git a/packages/sandbox-container/src/handlers/watch-handler.ts b/packages/sandbox-container/src/handlers/watch-handler.ts index db8355c19..40e2f7826 100644 --- a/packages/sandbox-container/src/handlers/watch-handler.ts +++ b/packages/sandbox-container/src/handlers/watch-handler.ts @@ -1,5 +1,5 @@ import { posix as pathPosix } from 'node:path'; -import type { Logger, WatchRequest } from '@repo/shared'; +import type { CheckChangesRequest, Logger, WatchRequest } from '@repo/shared'; import { ErrorCode } from '@repo/shared/errors'; import { CONFIG } from '../config'; import type { RequestContext } from '../core/types'; @@ -9,7 +9,7 @@ import { BaseHandler } from './base-handler'; const WORKSPACE_ROOT = CONFIG.DEFAULT_CWD; /** - * Handler for file watch operations + * Handler for file watch operations. */ export class WatchHandler extends BaseHandler { constructor( @@ -22,15 +22,19 @@ export class WatchHandler extends BaseHandler { async handle(request: Request, context: RequestContext): Promise { const pathname = new URL(request.url).pathname; - if (pathname === '/api/watch') { + if (pathname === '/api/watch' && request.method === 'POST') { return this.handleWatch(request, context); } + if (pathname === '/api/watch/check' && request.method === 'POST') { + return this.handleCheckChanges(request, context); + } + return this.createErrorResponse( { message: 'Invalid watch endpoint', code: ErrorCode.VALIDATION_FAILED, - details: { pathname } + details: { pathname, method: request.method } }, context ); @@ -44,6 +48,63 @@ export class WatchHandler extends BaseHandler { request: Request, context: RequestContext ): Promise { + const normalizedRequest = await this.parseAndNormalizeWatchRequest( + request, + context + ); + if (normalizedRequest instanceof Response) { + return normalizedRequest; + } + + const result = await this.watchService.watchDirectory( + normalizedRequest.path, + normalizedRequest + ); + + if (!result.success) { + return this.createErrorResponse(result.error, context); + } + + return new Response(result.data, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + ...context.corsHeaders + } + }); + } + + /** + * Check whether a path changed since a previously returned version. + */ + private async handleCheckChanges( + request: Request, + context: RequestContext + ): Promise { + const normalizedRequest = await this.parseAndNormalizeCheckRequest( + request, + context + ); + if (normalizedRequest instanceof Response) { + return normalizedRequest; + } + + const result = await this.watchService.checkChanges( + normalizedRequest.path, + normalizedRequest + ); + if (!result.success) { + return this.createErrorResponse(result.error, context); + } + + return this.createTypedResponse(result.data, context); + } + + private async parseAndNormalizeWatchRequest( + request: Request, + context: RequestContext + ): Promise { let body: WatchRequest; try { body = await this.parseRequestBody(request); @@ -68,23 +129,44 @@ export class WatchHandler extends BaseHandler { return this.createErrorResponse(pathResult.error, context); } - const result = await this.watchService.watchDirectory(pathResult.path, { + return { ...body, path: pathResult.path - }); + }; + } - if (result.success) { - return new Response(result.data, { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', - ...context.corsHeaders - } - }); + private async parseAndNormalizeCheckRequest( + request: Request, + context: RequestContext + ): Promise { + let body: CheckChangesRequest; + try { + body = await this.parseRequestBody(request); + } catch (error) { + return this.createErrorResponse( + { + message: + error instanceof Error ? error.message : 'Invalid request body', + code: ErrorCode.VALIDATION_FAILED + }, + context + ); + } + + const validationError = this.validateCheckChangesBody(body); + if (validationError) { + return this.createErrorResponse(validationError, context); } - return this.createErrorResponse(result.error, context); + const pathResult = this.normalizeWatchPath(body.path); + if (!pathResult.success) { + return this.createErrorResponse(pathResult.error, context); + } + + return { + ...body, + path: pathResult.path + }; } private validateWatchBody(body: WatchRequest): { @@ -163,6 +245,39 @@ export class WatchHandler extends BaseHandler { return null; } + private validateCheckChangesBody(body: CheckChangesRequest): { + message: string; + code: string; + details?: Record; + } | null { + const watchValidationError = this.validateWatchBody(body); + if (watchValidationError) { + return watchValidationError; + } + + if (body.since === undefined) { + return null; + } + + if (typeof body.since !== 'string' || body.since.trim() === '') { + return { + message: 'since must be a non-empty string when provided', + code: ErrorCode.VALIDATION_FAILED, + details: { since: body.since } + }; + } + + if (body.since.includes('\0')) { + return { + message: 'since contains invalid null bytes', + code: ErrorCode.VALIDATION_FAILED, + details: { since: body.since } + }; + } + + return null; + } + private isStringArrayOrUndefined( value: unknown ): value is string[] | undefined { @@ -177,8 +292,6 @@ export class WatchHandler extends BaseHandler { return null; } - // Supported syntax is intentionally narrow: *, **, ? and path separators. - // Character classes and brace expansion are rejected so behavior is explicit. const unsupportedTokens = /[[\]{}]/; for (const pattern of patterns) { diff --git a/packages/sandbox-container/src/routes/setup.ts b/packages/sandbox-container/src/routes/setup.ts index 391db093c..c8bfc5c6d 100644 --- a/packages/sandbox-container/src/routes/setup.ts +++ b/packages/sandbox-container/src/routes/setup.ts @@ -444,6 +444,13 @@ export function setupRoutes(router: Router, container: Container): void { middleware: [container.get('loggingMiddleware')] }); + router.register({ + method: 'POST', + path: '/api/watch/check', + handler: async (req, ctx) => container.get('watchHandler').handle(req, ctx), + middleware: [container.get('loggingMiddleware')] + }); + // Miscellaneous routes router.register({ method: 'GET', diff --git a/packages/sandbox-container/src/services/watch-service.ts b/packages/sandbox-container/src/services/watch-service.ts index f6d2bc277..0a7e425db 100644 --- a/packages/sandbox-container/src/services/watch-service.ts +++ b/packages/sandbox-container/src/services/watch-service.ts @@ -1,63 +1,172 @@ import type { + CheckChangesRequest, + CheckChangesResult, FileWatchEventType, FileWatchSSEEvent, Logger, WatchRequest } from '@repo/shared'; +import { logCanonicalEvent } from '@repo/shared'; import { ErrorCode } from '@repo/shared/errors'; import type { Subprocess } from 'bun'; import type { ServiceResult } from '../core/types'; import { serviceError, serviceSuccess } from '../core/types'; +interface Deferred { + promise: Promise; + resolve(value: T | PromiseLike): void; + reject(reason?: unknown): void; +} + +type LiveWatchEvent = Extract; +type TerminalWatchEvent = Extract< + FileWatchSSEEvent, + { type: 'error' | 'stopped' } +>; + +interface WatchSubscriber { + id: string; + controller: ReadableStreamDefaultController; + encoder: TextEncoder; + pendingEvents: Map; + droppedEvents: number; + flushInterval: ReturnType; + watchingSent: boolean; + closed: boolean; +} + interface ActiveWatch { id: string; + key: string; path: string; + recursive: boolean; + include?: string[]; + exclude?: string[]; process: Subprocess; startedAt: Date; + retained: boolean; + cursor: number; + lastEventAt: string | null; + expiresAt: string | null; + subscribers: Map; + ready: Deferred; + readyState: 'pending' | 'resolved' | 'rejected'; + expiryTimer: ReturnType | null; stopPromise?: Promise; } const WATCH_SETUP_TIMEOUT_MS = 10000; const EVENT_COALESCE_WINDOW_MS = 75; const MAX_PENDING_EVENTS = 1000; +const CHANGE_STATE_IDLE_TTL_MS = 10 * 60 * 1000; const STOP_TIMEOUT_MS = 5000; +const DEFAULT_EXCLUDE_PATTERNS = ['.git', 'node_modules', '.DS_Store']; +const DEFAULT_WATCH_EVENTS: FileWatchEventType[] = [ + 'create', + 'modify', + 'delete', + 'move_from', + 'move_to' +]; /** - * Service for watching filesystem changes using inotifywait + * Service for watching filesystem changes using inotifywait. */ export class WatchService { private activeWatches: Map = new Map(); + private watchIdsByKey: Map = new Map(); private watchCounter = 0; + private subscriberCounter = 0; constructor(private logger: Logger) {} /** - * Start watching a directory for changes - * Returns a ReadableStream of SSE events - * - * @param path - Absolute path to watch - * @param options - Watch options + * Start watching a directory and subscribe to live events. */ async watchDirectory( path: string, options: WatchRequest = { path } ): Promise>> { - const watchId = `watch-${++this.watchCounter}-${Date.now()}`; - const watchLogger = this.logger.child({ watchId, path }); - - // Clean up any existing watches on this path before starting a new one. - // Snapshot matching IDs first to avoid mutating the map during iteration. - const staleWatchIds = Array.from(this.activeWatches.entries()) - .filter(([, w]) => w.path === path) - .map(([id]) => id); - if (staleWatchIds.length > 0) { - watchLogger.debug('Cleaning up existing watches on path', { - staleWatchIds + const watchResult = this.getOrCreateWatch(path, options); + if (!watchResult.success) { + return serviceError(watchResult.error); + } + + return serviceSuccess(this.createSubscriberStream(watchResult.data.watch)); + } + + /** + * Check whether a path changed since a previously returned version. + */ + async checkChanges( + path: string, + options: CheckChangesRequest = { path } + ): Promise> { + const watchResult = this.getOrCreateWatch(path, options); + if (!watchResult.success) { + return serviceError(watchResult.error); + } + + const { watch, created } = watchResult.data; + watch.retained = true; + + try { + await watch.ready.promise; + this.refreshRetainedWatchExpiry(watch); + return this.buildCheckChangesResult(watch, options.since, created); + } catch (error) { + return serviceError({ + message: + error instanceof Error + ? error.message + : 'Failed to establish retained change state', + code: ErrorCode.WATCH_START_ERROR, + details: { path } }); - await Promise.all(staleWatchIds.map((id) => this.stopWatch(id))); + } + } + + /** + * Stop all active watches. + */ + async stopAllWatches(): Promise { + const watchIds = Array.from(this.activeWatches.keys()); + await Promise.all(watchIds.map((id) => this.stopWatchInternal(id))); + return watchIds.length; + } + + /** + * Get list of active watches. + */ + getActiveWatches(): Array<{ id: string; path: string; startedAt: Date }> { + return Array.from(this.activeWatches.values()).map((watch) => ({ + id: watch.id, + path: watch.path, + startedAt: watch.startedAt + })); + } + + private getOrCreateWatch( + path: string, + options: WatchRequest + ): ServiceResult<{ watch: ActiveWatch; created: boolean }> { + const normalized = this.normalizeWatchOptions(options); + const key = this.createWatchKey(path, normalized); + const existingWatchId = this.watchIdsByKey.get(key); + + if (existingWatchId) { + const existing = this.activeWatches.get(existingWatchId); + if ( + existing && + !existing.stopPromise && + existing.readyState !== 'rejected' + ) { + return serviceSuccess({ watch: existing, created: false }); + } + this.activeWatches.delete(existingWatchId); + this.watchIdsByKey.delete(key); } - // Verify path exists const pathCheck = Bun.spawnSync(['test', '-e', path]); if (pathCheck.exitCode !== 0) { return serviceError({ @@ -67,9 +176,9 @@ export class WatchService { }); } - // Build inotifywait command + const watchId = `watch-${++this.watchCounter}-${Date.now()}`; const args = this.buildInotifyArgs(path, options); - watchLogger.debug('Starting inotifywait', { args }); + const startTime = Date.now(); try { const proc = Bun.spawn(['inotifywait', ...args], { @@ -77,134 +186,633 @@ export class WatchService { stderr: 'pipe' }); - // Store active watch - this.activeWatches.set(watchId, { + const watch: ActiveWatch = { id: watchId, + key, path, + recursive: normalized.recursive, + include: normalized.include, + exclude: normalized.exclude, process: proc, - startedAt: new Date() - }); + startedAt: new Date(), + retained: false, + cursor: 0, + lastEventAt: null, + expiresAt: null, + subscribers: new Map(), + ready: createDeferred(), + readyState: 'pending', + expiryTimer: null + }; - // Create SSE stream from inotifywait output - const stream = this.createWatchStream(watchId, path, proc, watchLogger); + this.activeWatches.set(watchId, watch); + this.watchIdsByKey.set(key, watchId); - return serviceSuccess(stream); + const watchLogger = this.logger.child({ watchId, path }); + this.runWatchLoop(watch, watchLogger, startTime); + + return serviceSuccess({ watch, created: true }); } catch (error) { const err = error instanceof Error ? error : new Error(String(error)); - watchLogger.error('Failed to start inotifywait', err); + logCanonicalEvent(this.logger, { + event: 'watch.start', + outcome: 'error', + durationMs: Date.now() - startTime, + path, + watchId, + errorMessage: err.message, + error: err + }); return serviceError({ - message: `Failed to start file watcher: ${error instanceof Error ? error.message : 'Unknown error'}`, + message: `Failed to start file watcher: ${err.message}`, code: ErrorCode.WATCH_START_ERROR, details: { path } }); } } - /** - * Stop all active watches - */ - async stopAllWatches(): Promise { - const watchIds = Array.from(this.activeWatches.keys()); - await Promise.all(watchIds.map((id) => this.stopWatch(id))); - return watchIds.length; + private normalizeWatchOptions(options: WatchRequest): { + recursive: boolean; + include?: string[]; + exclude?: string[]; + events: FileWatchEventType[]; + } { + const include = this.normalizePatterns(options.include); + const exclude = include + ? undefined + : (this.normalizePatterns(options.exclude) ?? DEFAULT_EXCLUDE_PATTERNS); + + return { + recursive: options.recursive !== false, + include, + exclude, + events: this.normalizeEvents(options.events) + }; } - /** - * Get list of active watches - */ - getActiveWatches(): Array<{ id: string; path: string; startedAt: Date }> { - return Array.from(this.activeWatches.values()).map((w) => ({ - id: w.id, - path: w.path, - startedAt: w.startedAt - })); + private createWatchKey( + path: string, + options: { + recursive: boolean; + include?: string[]; + exclude?: string[]; + events: FileWatchEventType[]; + } + ): string { + return JSON.stringify({ + path, + recursive: options.recursive, + include: options.include ?? null, + exclude: options.exclude ?? null, + events: options.events + }); } - /** - * Stop a specific watch by ID. Idempotent via stored stopPromise. - * Sends SIGTERM, waits for exit with timeout, escalates to SIGKILL if needed. - */ - private stopWatch(watchId: string): Promise { + private buildCheckChangesResult( + watch: ActiveWatch, + since: string | undefined, + created: boolean + ): ServiceResult { + const version = this.buildVersionToken(watch); + const timestamp = new Date().toISOString(); + + if (since === undefined) { + return serviceSuccess({ + success: true, + status: 'unchanged', + version, + timestamp + }); + } + + const parsedVersion = this.parseVersionToken(since); + if (!parsedVersion) { + return serviceError({ + message: 'since must be a version returned by checkChanges()', + code: ErrorCode.VALIDATION_FAILED, + details: { since } + }); + } + + if (parsedVersion.watchId !== watch.id) { + return serviceSuccess({ + success: true, + status: 'resync', + reason: created ? 'expired' : 'restarted', + version, + timestamp + }); + } + + if (parsedVersion.cursor > watch.cursor) { + return serviceError({ + message: 'since refers to a newer version than the current watch state', + code: ErrorCode.VALIDATION_FAILED, + details: { + since, + currentVersion: version + } + }); + } + + return serviceSuccess({ + success: true, + status: parsedVersion.cursor === watch.cursor ? 'unchanged' : 'changed', + version, + timestamp + }); + } + + private buildVersionToken(watch: ActiveWatch): string { + return `${watch.id}:${watch.cursor}`; + } + + private parseVersionToken( + version: string + ): { watchId: string; cursor: number } | null { + const separatorIndex = version.lastIndexOf(':'); + if (separatorIndex <= 0 || separatorIndex === version.length - 1) { + return null; + } + + const watchId = version.slice(0, separatorIndex); + const cursorText = version.slice(separatorIndex + 1); + if (!/^\d+$/.test(cursorText)) { + return null; + } + + const cursor = Number(cursorText); + if (!Number.isSafeInteger(cursor) || cursor < 0) { + return null; + } + + return { watchId, cursor }; + } + + private refreshRetainedWatchExpiry(watch: ActiveWatch): void { + if (!watch.retained) { + watch.expiresAt = null; + this.clearRetainedWatchExpiry(watch); + return; + } + + this.clearRetainedWatchExpiry(watch); + + if (watch.subscribers.size > 0) { + watch.expiresAt = null; + return; + } + + const expiresAt = new Date(Date.now() + CHANGE_STATE_IDLE_TTL_MS); + watch.expiresAt = expiresAt.toISOString(); + watch.expiryTimer = setTimeout(() => { + void this.stopWatchInternal(watch.id, { + type: 'stopped', + reason: 'Retained change state expired after idle period' + }); + }, CHANGE_STATE_IDLE_TTL_MS); + } + + private clearRetainedWatchExpiry(watch: ActiveWatch): void { + if (watch.expiryTimer) { + clearTimeout(watch.expiryTimer); + watch.expiryTimer = null; + } + } + + private createSubscriberStream( + watch: ActiveWatch + ): ReadableStream { + const self = this; + const encoder = new TextEncoder(); + let subscriberId: string | undefined; + + return new ReadableStream({ + async start(controller) { + subscriberId = self.addSubscriber(watch, controller, encoder); + + try { + await watch.ready.promise; + } catch (error) { + self.closeSubscriber( + watch, + subscriberId, + errorEvent( + error instanceof Error + ? error.message + : 'Watch failed to establish' + ) + ); + return; + } + + const subscriber = subscriberId + ? watch.subscribers.get(subscriberId) + : undefined; + if (!subscriber || subscriber.closed) { + return; + } + + subscriber.watchingSent = true; + try { + controller.enqueue( + encoder.encode( + `data: ${JSON.stringify({ + type: 'watching', + path: watch.path, + watchId: watch.id + } satisfies FileWatchSSEEvent)}\n\n` + ) + ); + } catch { + await self.removeSubscriber(watch, subscriber.id); + return; + } + + self.flushSubscriberEvents(watch, subscriber); + }, + + cancel() { + if (subscriberId) { + return self.removeSubscriber(watch, subscriberId); + } + return Promise.resolve(); + } + }); + } + + private addSubscriber( + watch: ActiveWatch, + controller: ReadableStreamDefaultController, + encoder: TextEncoder + ): string { + const subscriberId = `subscriber-${++this.subscriberCounter}`; + const subscriber: WatchSubscriber = { + id: subscriberId, + controller, + encoder, + pendingEvents: new Map(), + droppedEvents: 0, + flushInterval: setInterval(() => { + this.flushSubscriberEvents(watch, subscriber); + }, EVENT_COALESCE_WINDOW_MS), + watchingSent: false, + closed: false + }; + + watch.subscribers.set(subscriberId, subscriber); + this.refreshRetainedWatchExpiry(watch); + return subscriberId; + } + + private async removeSubscriber( + watch: ActiveWatch, + subscriberId: string + ): Promise { + this.closeSubscriber(watch, subscriberId); + await this.maybeStopWatchWhenUnused(watch); + } + + private async maybeStopWatchWhenUnused(watch: ActiveWatch): Promise { + if (!watch.retained && watch.subscribers.size === 0) { + await this.stopWatchInternal(watch.id, { + type: 'stopped', + reason: 'Watch stopped after last subscriber disconnected' + }); + return; + } + + this.refreshRetainedWatchExpiry(watch); + } + + private closeSubscriber( + watch: ActiveWatch, + subscriberId: string, + terminalEvent?: TerminalWatchEvent + ): void { + const subscriber = watch.subscribers.get(subscriberId); + if (!subscriber || subscriber.closed) { + return; + } + + subscriber.closed = true; + clearInterval(subscriber.flushInterval); + watch.subscribers.delete(subscriberId); + + try { + const shouldSendTerminalEvent = + terminalEvent !== undefined && + (subscriber.watchingSent || terminalEvent.type === 'error'); + if (shouldSendTerminalEvent) { + subscriber.controller.enqueue( + subscriber.encoder.encode( + `data: ${JSON.stringify(terminalEvent)}\n\n` + ) + ); + } + } catch { + // Stream already closed. + } + + try { + subscriber.controller.close(); + } catch { + // Stream already closed. + } + } + + private enqueueSubscriberEvent( + watch: ActiveWatch, + subscriber: WatchSubscriber, + event: LiveWatchEvent + ): void { + if (subscriber.closed) { + return; + } + + const key = `${event.eventType}|${event.path}|${event.isDirectory}`; + + if ( + !subscriber.pendingEvents.has(key) && + subscriber.pendingEvents.size >= MAX_PENDING_EVENTS + ) { + subscriber.droppedEvents++; + + if ( + subscriber.droppedEvents === 1 || + subscriber.droppedEvents % 100 === 0 + ) { + this.logger.warn('Dropping watch events due to backpressure', { + watchId: watch.id, + subscriberId: subscriber.id, + droppedEvents: subscriber.droppedEvents, + pendingCount: subscriber.pendingEvents.size + }); + } + return; + } + + subscriber.pendingEvents.set(key, event); + } + + private flushSubscriberEvents( + watch: ActiveWatch, + subscriber: WatchSubscriber + ): void { + if (subscriber.closed || !subscriber.watchingSent) { + return; + } + + try { + for (const event of subscriber.pendingEvents.values()) { + subscriber.controller.enqueue( + subscriber.encoder.encode(`data: ${JSON.stringify(event)}\n\n`) + ); + } + subscriber.pendingEvents.clear(); + } catch { + subscriber.closed = true; + clearInterval(subscriber.flushInterval); + watch.subscribers.delete(subscriber.id); + void this.maybeStopWatchWhenUnused(watch); + } + } + + private broadcastEvent(watch: ActiveWatch, event: LiveWatchEvent): void { + for (const subscriber of watch.subscribers.values()) { + this.enqueueSubscriberEvent(watch, subscriber, event); + } + } + + private broadcastTerminalEvent( + watch: ActiveWatch, + terminalEvent: TerminalWatchEvent + ): void { + for (const subscriberId of Array.from(watch.subscribers.keys())) { + this.closeSubscriber(watch, subscriberId, terminalEvent); + } + } + + private async stopWatchInternal( + watchId: string, + terminalEvent?: TerminalWatchEvent + ): Promise { const watch = this.activeWatches.get(watchId); - if (!watch) return Promise.resolve(); + if (!watch) { + return; + } - // Return existing stop promise if already stopping - if (watch.stopPromise) return watch.stopPromise; + if (watch.stopPromise) { + return watch.stopPromise; + } const cleanup = async () => { + const resolvedTerminalEvent: TerminalWatchEvent = terminalEvent ?? { + type: 'stopped', + reason: 'Watch process ended' + }; + const isError = resolvedTerminalEvent.type === 'error'; + const reason = isError + ? resolvedTerminalEvent.error + : resolvedTerminalEvent.reason; + + this.activeWatches.delete(watchId); + this.watchIdsByKey.delete(watch.key); + this.clearRetainedWatchExpiry(watch); + + if (watch.readyState === 'pending') { + this.rejectWatchReady(watch, new Error(reason)); + } + + this.broadcastTerminalEvent(watch, resolvedTerminalEvent); + try { watch.process.kill(); } catch { - // Process may have already exited + // Process may have already exited. } - // Wait for graceful exit with timeout, escalate to SIGKILL if needed - let timeoutHandle: ReturnType; + let timeoutHandle: ReturnType | undefined; const exitedCleanly = await Promise.race([ watch.process.exited.then(() => true as const), new Promise((resolve) => { timeoutHandle = setTimeout(() => resolve(false), STOP_TIMEOUT_MS); }) ]); - clearTimeout(timeoutHandle!); + + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } if (!exitedCleanly) { try { - watch.process.kill(9); // SIGKILL + watch.process.kill(9); } catch { - // Already dead + // Process may have already exited. } } - this.activeWatches.delete(watchId); + logCanonicalEvent(this.logger, { + event: 'watch.stop', + outcome: isError ? 'error' : 'success', + durationMs: Date.now() - watch.startedAt.getTime(), + path: watch.path, + watchId, + errorMessage: isError ? reason : undefined + }); }; watch.stopPromise = cleanup(); return watch.stopPromise; } + private runWatchLoop( + watch: ActiveWatch, + logger: Logger, + startTime: number + ): void { + const stdout = watch.process.stdout; + const stderr = watch.process.stderr; + + if (!stdout || typeof stdout === 'number') { + const error = new Error('Failed to capture process output'); + this.rejectWatchReady(watch, error); + void this.stopWatchInternal(watch.id, errorEvent(error.message)); + return; + } + + void (async () => { + try { + if (stderr && typeof stderr !== 'number') { + const monitor = await this.waitForWatchesEstablished(stderr); + this.continueStderrMonitoring( + monitor.reader, + monitor.decoder, + monitor.buffer, + watch + ); + } + + this.resolveWatchReady(watch); + + logCanonicalEvent(this.logger, { + event: 'watch.start', + outcome: 'success', + durationMs: Date.now() - startTime, + path: watch.path, + watchId: watch.id, + recursive: watch.recursive + }); + + const reader = stdout.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + this.handleWatchLine(watch, line); + } + } + + if (buffer.trim()) { + this.handleWatchLine(watch, buffer); + } + + await this.stopWatchInternal(watch.id, { + type: 'stopped', + reason: 'Watch process ended' + }); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + logger.error('Error reading watch output', err, { + watchId: watch.id, + path: watch.path + }); + this.rejectWatchReady(watch, err); + await this.stopWatchInternal(watch.id, errorEvent(err.message)); + } + })(); + } + + private handleWatchLine(watch: ActiveWatch, line: string): void { + if (!line.trim()) { + return; + } + + const parsed = this.parseInotifyEvent(line); + if (!parsed) { + return; + } + + const timestamp = new Date().toISOString(); + watch.cursor += 1; + watch.lastEventAt = timestamp; + + this.broadcastEvent(watch, { + type: 'event', + eventType: parsed.eventType, + path: parsed.path, + isDirectory: parsed.isDirectory, + timestamp + }); + } + + private resolveWatchReady(watch: ActiveWatch): void { + if (watch.readyState !== 'pending') { + return; + } + + watch.readyState = 'resolved'; + watch.ready.resolve(); + } + + private rejectWatchReady(watch: ActiveWatch, error: Error): void { + if (watch.readyState !== 'pending') { + return; + } + + watch.readyState = 'rejected'; + watch.ready.reject(error); + } + private buildInotifyArgs(path: string, options: WatchRequest): string[] { - const args: string[] = [ - '-m', // Monitor mode (continuous) - '--format', - '%e|%w%f' // event|path (ISDIR is part of event flags) - ]; + const args: string[] = ['-m', '--format', '%e|%w%f']; - // Recursive watching if (options.recursive !== false) { args.push('-r'); } - // Event types - const events: FileWatchEventType[] = options.events || [ - 'create', - 'modify', - 'delete', - 'move_from', - 'move_to' - ]; + const events = this.normalizeEvents(options.events); const inotifyEvents = events - .map((e) => this.mapEventType(e)) - .filter((e): e is string => e !== undefined); + .map((eventType) => this.mapEventType(eventType)) + .filter((eventType): eventType is string => eventType !== undefined); if (inotifyEvents.length > 0) { args.push('-e', inotifyEvents.join(',')); } - // inotifywait does not allow --include and --exclude together. - // Include filters take precedence; when include is set, exclusion is handled - // implicitly by only matching included paths. - const includeRegex = this.buildCombinedPathRegex(options.include); + const includeRegex = this.buildCombinedPathRegex( + this.normalizePatterns(options.include) + ); if (includeRegex) { args.push('--include', includeRegex); } else { - const excludes = options.exclude || ['.git', 'node_modules', '.DS_Store']; + const excludes = + this.normalizePatterns(options.exclude) ?? DEFAULT_EXCLUDE_PATTERNS; const excludeRegex = this.buildCombinedPathRegex(excludes); if (excludeRegex) { args.push('--exclude', excludeRegex); } } - // Add path last args.push(path); return args; @@ -222,6 +830,29 @@ export class WatchService { return mapping[type]; } + private normalizePatterns(patterns?: string[]): string[] | undefined { + if (!patterns || patterns.length === 0) { + return undefined; + } + + return Array.from(new Set(patterns)).sort(); + } + + private normalizeEvents(events?: FileWatchEventType[]): FileWatchEventType[] { + if (!events || events.length === 0) { + return DEFAULT_WATCH_EVENTS; + } + + const orderedEvents = DEFAULT_WATCH_EVENTS.filter((eventType) => + events.includes(eventType) + ); + const additionalEvents = events.filter( + (eventType) => !orderedEvents.includes(eventType) + ); + + return [...orderedEvents, ...additionalEvents]; + } + private buildCombinedPathRegex(patterns?: string[]): string | undefined { if (!patterns || patterns.length === 0) { return undefined; @@ -233,8 +864,6 @@ export class WatchService { } private globToPathRegex(pattern: string): string { - // Supported glob syntax is intentionally limited to *, ** and ?. - // Handler validation rejects unsupported tokens such as [] and {}. return pattern .replace(/[.+^${}()|[\]\\]/g, '\\$&') .replace(/\*\*/g, '::double_star::') @@ -248,25 +877,24 @@ export class WatchService { path: string; isDirectory: boolean; } | null { - // Format: EVENT|/path/to/file|EVENT_FLAGS - // The third part (%:e) contains colon-separated flags like CREATE:ISDIR const parts = line.trim().split('|'); - if (parts.length < 2) return null; + if (parts.length < 2) { + return null; + } const [rawEvent, filePath, flagsPart] = parts; - // Check if ISDIR appears in either the event or the flags const isDirectory = rawEvent.includes('ISDIR') || (flagsPart?.includes('ISDIR') ?? false); - // Map inotify event back to our type const eventType = this.parseEventType(rawEvent); - if (!eventType) return null; + if (!eventType) { + return null; + } return { eventType, path: filePath, isDirectory }; } private parseEventType(rawEvent: string): FileWatchEventType | null { - // inotify can emit multiple events like "CREATE,ISDIR" const events = rawEvent.split(','); const primary = events[0].toLowerCase(); @@ -277,235 +905,23 @@ export class WatchService { moved_from: 'move_from', moved_to: 'move_to', attrib: 'attrib', - // Handle close_write as modify (common for editors) close_write: 'modify' }; return mapping[primary] || null; } - private createWatchStream( - watchId: string, - path: string, - proc: Subprocess, - logger: Logger - ): ReadableStream { - const encoder = new TextEncoder(); - const self = this; - const stdout = proc.stdout; - const stderr = proc.stderr; - - if (!stdout || typeof stdout === 'number') { - // Return a stream that immediately errors - return new ReadableStream({ - start(controller) { - const errorEvent: FileWatchSSEEvent = { - type: 'error', - error: 'Failed to capture process output' - }; - controller.enqueue( - encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`) - ); - controller.close(); - } - }); - } - - return new ReadableStream({ - async start(controller) { - // Wait for inotifywait to establish watches before sending watching event. - // If setup emits an error or times out, fail fast and stop this watch. - if (stderr && typeof stderr !== 'number') { - try { - await self.waitForWatchesEstablished( - stderr, - controller, - encoder, - logger - ); - } catch { - await self.stopWatch(watchId); - controller.close(); - return; - } - } - - // Send watching event only after watches are established - const watchingEvent: FileWatchSSEEvent = { - type: 'watching', - path, - watchId - }; - controller.enqueue( - encoder.encode(`data: ${JSON.stringify(watchingEvent)}\n\n`) - ); - - // Read stdout line by line - const reader = stdout.getReader(); - const decoder = new TextDecoder(); - let buffer = ''; - - const pendingEvents = new Map(); - let droppedEvents = 0; - const enqueueEvent = (event: FileWatchSSEEvent) => { - const key = - event.type === 'event' - ? `${event.eventType}|${event.path}|${event.isDirectory}` - : `${event.type}|${Date.now()}`; - - if ( - !pendingEvents.has(key) && - pendingEvents.size >= MAX_PENDING_EVENTS - ) { - droppedEvents++; - if (droppedEvents === 1 || droppedEvents % 100 === 0) { - logger.warn('Dropping watch events due to backpressure', { - watchId, - droppedEvents, - pendingCount: pendingEvents.size - }); - } - return; - } - - pendingEvents.set(key, event); - }; - - const flushPendingEvents = () => { - for (const event of pendingEvents.values()) { - controller.enqueue( - encoder.encode(`data: ${JSON.stringify(event)}\n\n`) - ); - } - pendingEvents.clear(); - }; - - const flushInterval = setInterval( - flushPendingEvents, - EVENT_COALESCE_WINDOW_MS - ); - - const processLine = (line: string) => { - const parsed = self.parseInotifyEvent(line); - if (!parsed) return; - - const event: FileWatchSSEEvent = { - type: 'event', - eventType: parsed.eventType, - path: parsed.path, - isDirectory: parsed.isDirectory, - timestamp: new Date().toISOString() - }; - enqueueEvent(event); - }; - - try { - logger.debug('Starting to read inotifywait stdout'); - while (true) { - const { done, value } = await reader.read(); - if (done) { - logger.debug('inotifywait stdout stream ended'); - break; - } - - const chunk = decoder.decode(value, { stream: true }); - logger.debug('Received chunk from inotifywait', { - chunkLength: chunk.length, - chunk: chunk.substring(0, 200) - }); - buffer += chunk; - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; - - for (const line of lines) { - if (line.trim()) { - logger.debug('Processing inotifywait line', { line }); - processLine(line); - } - } - } - - // Process any remaining buffer - if (buffer.trim()) { - processLine(buffer); - } - - clearInterval(flushInterval); - flushPendingEvents(); - - // Send stopped event - const reason = - droppedEvents > 0 - ? `Watch process ended. Dropped ${droppedEvents} events due to backpressure.` - : 'Watch process ended'; - - const stoppedEvent: FileWatchSSEEvent = { - type: 'stopped', - reason - }; - controller.enqueue( - encoder.encode(`data: ${JSON.stringify(stoppedEvent)}\n\n`) - ); - controller.close(); - } catch (error) { - clearInterval(flushInterval); - const err = error instanceof Error ? error : new Error(String(error)); - logger.error('Error reading watch output', err); - const errorEvent: FileWatchSSEEvent = { - type: 'error', - error: error instanceof Error ? error.message : 'Unknown error' - }; - controller.enqueue( - encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`) - ); - controller.close(); - } finally { - await self.stopWatch(watchId); - } - }, - - cancel() { - // Fire-and-forget: stopWatch handles kill + await exit + map cleanup. - // Returning the promise lets the stream machinery await it if needed. - return self.stopWatch(watchId); - } - }); - } - - /** - * Wait for inotifywait to output "Watches established" on stderr. - * This ensures the watch is ready to detect file changes before we signal readiness to clients. - * After watches are established, continues monitoring stderr for errors in background. - */ private async waitForWatchesEstablished( - stderr: ReadableStream, - controller: ReadableStreamDefaultController, - encoder: TextEncoder, - logger: Logger - ): Promise { + stderr: ReadableStream + ): Promise<{ + reader: { read(): Promise<{ done: boolean; value?: Uint8Array }> }; + decoder: TextDecoder; + buffer: string; + }> { const reader = stderr.getReader(); const decoder = new TextDecoder(); let buffer = ''; - const emitSetupError = (message: string) => { - const errorEvent: FileWatchSSEEvent = { - type: 'error', - error: message - }; - try { - controller.enqueue( - encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`) - ); - } catch (enqueueError) { - logger.debug('Could not enqueue setup error event', { - error: - enqueueError instanceof Error - ? enqueueError.message - : String(enqueueError) - }); - } - }; - const readLoop = async (): Promise<'established'> => { while (true) { const { done, value } = await reader.read(); @@ -524,19 +940,12 @@ export class WatchService { } if (trimmed.includes('Watches established')) { - logger.debug('inotifywait watches established'); return 'established'; } if (trimmed.includes('Setting up watches')) { - logger.debug('inotifywait setting up watches', { - message: trimmed - }); continue; } - logger.warn('inotifywait stderr during setup', { - message: trimmed - }); throw new Error(trimmed); } } @@ -556,24 +965,11 @@ export class WatchService { ]); if (result === 'timeout') { - const timeoutMessage = - 'Timed out waiting for file watcher setup to complete'; - logger.warn(timeoutMessage, { timeoutMs: WATCH_SETUP_TIMEOUT_MS }); - throw new Error(timeoutMessage); + throw new Error('Timed out waiting for file watcher setup to complete'); } - this.continueStderrMonitoring( - reader, - decoder, - buffer, - controller, - encoder, - logger - ); + return { reader, decoder, buffer }; } catch (error) { - const message = - error instanceof Error ? error.message : 'Failed to establish watch'; - emitSetupError(message); await reader.cancel().catch(() => {}); throw error; } finally { @@ -583,24 +979,20 @@ export class WatchService { } } - /** - * Continue monitoring stderr for errors after watches are established. - * Runs in background without blocking. - */ private continueStderrMonitoring( reader: { read(): Promise<{ done: boolean; value?: Uint8Array }> }, decoder: TextDecoder, initialBuffer: string, - controller: ReadableStreamDefaultController, - encoder: TextEncoder, - logger: Logger + watch: ActiveWatch ): void { - (async () => { + void (async () => { let buffer = initialBuffer; try { while (true) { const { done, value } = await reader.read(); - if (done) break; + if (done) { + break; + } buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); @@ -608,46 +1000,42 @@ export class WatchService { for (const line of lines) { const trimmed = line.trim(); - if (trimmed) { - // Skip info messages - if ( - trimmed.includes('Watches established') || - trimmed.includes('Setting up watches') - ) { - logger.debug('inotifywait info', { message: trimmed }); - continue; - } - - logger.warn('inotifywait stderr', { message: trimmed }); - const errorEvent: FileWatchSSEEvent = { - type: 'error', - error: trimmed - }; - try { - controller.enqueue( - encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`) - ); - } catch (enqueueError) { - logger.debug( - 'Could not enqueue stderr error event, stream likely closed', - { - error: - enqueueError instanceof Error - ? enqueueError.message - : String(enqueueError) - } - ); - break; - } + if (!trimmed) { + continue; + } + + if ( + trimmed.includes('Watches established') || + trimmed.includes('Setting up watches') + ) { + continue; } + + await this.stopWatchInternal(watch.id, errorEvent(trimmed)); + return; } } - } catch (error) { - // Stream closed or other error - expected when process terminates - logger.debug('stderr monitoring ended', { - error: error instanceof Error ? error.message : 'Unknown' - }); + } catch { + // Stream closed or process terminated — expected during cleanup. } })(); } } + +function createDeferred(): Deferred { + let resolve: Deferred['resolve'] = () => {}; + let reject: Deferred['reject'] = () => {}; + const promise = new Promise((resolvePromise, rejectPromise) => { + resolve = resolvePromise; + reject = rejectPromise; + }); + + return { promise, resolve, reject }; +} + +function errorEvent(message: string): TerminalWatchEvent { + return { + type: 'error', + error: message + }; +} diff --git a/packages/sandbox-container/tests/handlers/watch-handler.test.ts b/packages/sandbox-container/tests/handlers/watch-handler.test.ts index ccf0b0623..9f0890312 100644 --- a/packages/sandbox-container/tests/handlers/watch-handler.test.ts +++ b/packages/sandbox-container/tests/handlers/watch-handler.test.ts @@ -5,7 +5,8 @@ import type { WatchService } from '../../src/services/watch-service'; function createMockWatchService(): WatchService { return { - watchDirectory: vi.fn() + watchDirectory: vi.fn(), + checkChanges: vi.fn() } as unknown as WatchService; } @@ -115,4 +116,66 @@ describe('WatchHandler', () => { expect(response.status).toBe(200); }); }); + + describe('checkChanges', () => { + it('should call watchService.checkChanges for valid requests', async () => { + const watchService = createMockWatchService(); + (watchService.checkChanges as ReturnType).mockResolvedValue( + { + success: true, + data: { + success: true, + status: 'changed', + version: 'watch-1:1', + timestamp: '2026-03-17T00:00:00.000Z' + } + } + ); + + const handler = new WatchHandler(watchService, createNoOpLogger()); + const response = await handler.handle( + new Request('http://localhost:3000/api/watch/check', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + path: 'relative/path', + since: 'watch-1:0' + }) + }), + defaultContext + ); + + expect(response.status).toBe(200); + expect(watchService.checkChanges).toHaveBeenCalledWith( + '/workspace/relative/path', + { + path: '/workspace/relative/path', + since: 'watch-1:0' + } + ); + }); + + it('should reject empty since values', async () => { + const handler = new WatchHandler( + createMockWatchService(), + createNoOpLogger() + ); + + const response = await handler.handle( + new Request('http://localhost:3000/api/watch/check', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + path: '/workspace/test', + since: '' + }) + }), + defaultContext + ); + + expect(response.status).toBe(400); + const body = (await response.json()) as { message: string }; + expect(body.message).toContain('since must be a non-empty string'); + }); + }); }); diff --git a/packages/sandbox-container/tests/services/watch-service.test.ts b/packages/sandbox-container/tests/services/watch-service.test.ts index e82e8a783..c4fe84ced 100644 --- a/packages/sandbox-container/tests/services/watch-service.test.ts +++ b/packages/sandbox-container/tests/services/watch-service.test.ts @@ -54,6 +54,19 @@ describe('WatchService', () => { }); }); + describe('checkChanges', () => { + it('should return error for non-existent path', async () => { + const result = await watchService.checkChanges( + '/non/existent/path/12345' + ); + + expect(result.success).toBe(false); + if (!result.success) { + expect(result.error.code).toBe(ErrorCode.FILE_NOT_FOUND); + } + }); + }); + describe('parseInotifyEvent', () => { // Access private method for testing via type assertion const testParseEvent = (service: WatchService, line: string) => { diff --git a/packages/sandbox/src/clients/index.ts b/packages/sandbox/src/clients/index.ts index 727778a1f..68766537f 100644 --- a/packages/sandbox/src/clients/index.ts +++ b/packages/sandbox/src/clients/index.ts @@ -41,7 +41,12 @@ export { // Client types and interfaces // ============================================================================= -export type { WatchRequest } from '@repo/shared'; +export type { + CheckChangesOptions, + CheckChangesRequest, + CheckChangesResult, + WatchRequest +} from '@repo/shared'; // Command client types export type { ExecuteRequest, ExecuteResponse } from './command-client'; // Desktop client types diff --git a/packages/sandbox/src/clients/watch-client.ts b/packages/sandbox/src/clients/watch-client.ts index f61df72ca..6f84bb475 100644 --- a/packages/sandbox/src/clients/watch-client.ts +++ b/packages/sandbox/src/clients/watch-client.ts @@ -1,4 +1,6 @@ import { + type CheckChangesRequest, + type CheckChangesResult, type FileWatchSSEEvent, parseSSEFrames, type SSEPartialEvent, @@ -11,9 +13,18 @@ import { BaseHttpClient } from './base-client'; * Uses inotify under the hood for native filesystem event notifications * * @internal This client is used internally by the SDK. - * Users should use `sandbox.watch()` instead. + * Users should use `sandbox.watch()` or `sandbox.checkChanges()` instead. */ export class WatchClient extends BaseHttpClient { + /** + * Check whether a path changed since a previously returned version. + */ + async checkChanges( + request: CheckChangesRequest + ): Promise { + return this.post('/api/watch/check', request); + } + /** * Start watching a directory for changes. * The returned promise resolves only after the watcher is established diff --git a/packages/sandbox/src/index.ts b/packages/sandbox/src/index.ts index 259e9c7e7..0e818df66 100644 --- a/packages/sandbox/src/index.ts +++ b/packages/sandbox/src/index.ts @@ -24,6 +24,8 @@ export type { BaseExecOptions, BucketCredentials, BucketProvider, + CheckChangesOptions, + CheckChangesResult, CodeContext, CreateContextOptions, DirectoryBackup, diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 396f9e210..61d3e4c47 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -3,6 +3,8 @@ import type { BackupOptions, BucketCredentials, BucketProvider, + CheckChangesOptions, + CheckChangesResult, CodeContext, CreateContextOptions, DirectoryBackup, @@ -2958,6 +2960,31 @@ export class Sandbox extends Container implements ISandbox { }); } + /** + * Check whether a path changed while this caller was disconnected. + * + * Pass the `version` returned from a prior call in `options.since` to learn + * whether the path is unchanged, changed, or needs a full resync because the + * retained change state was reset. + * + * @param path - Path to check (absolute or relative to /workspace) + * @param options - Change-check options + */ + async checkChanges( + path: string, + options: CheckChangesOptions = {} + ): Promise { + const sessionId = options.sessionId ?? (await this.ensureDefaultSession()); + return this.client.watch.checkChanges({ + path, + recursive: options.recursive, + include: options.include, + exclude: options.exclude, + since: options.since, + sessionId + }); + } + /** * Expose a port and get a preview URL for accessing services running in the sandbox * @@ -3396,6 +3423,8 @@ export class Sandbox extends Container implements ISandbox { this.readFile(path, { ...options, sessionId }), readFileStream: (path) => this.readFileStream(path, { sessionId }), watch: (path, options) => this.watch(path, { ...options, sessionId }), + checkChanges: (path, options) => + this.checkChanges(path, { ...options, sessionId }), mkdir: (path, options) => this.mkdir(path, { ...options, sessionId }), deleteFile: (path) => this.deleteFile(path, sessionId), renameFile: (oldPath, newPath) => diff --git a/packages/sandbox/tests/sandbox.test.ts b/packages/sandbox/tests/sandbox.test.ts index a6c893f91..551b78895 100644 --- a/packages/sandbox/tests/sandbox.test.ts +++ b/packages/sandbox/tests/sandbox.test.ts @@ -143,6 +143,13 @@ describe('Sandbox - Automatic Session Management', () => { path: '/test.txt', timestamp: new Date().toISOString() } as any); + + vi.spyOn(sandbox.client.watch, 'checkChanges').mockResolvedValue({ + success: true, + status: 'unchanged', + version: 'watch-1:0', + timestamp: new Date().toISOString() + } as any); }); afterEach(() => { @@ -195,6 +202,22 @@ describe('Sandbox - Automatic Session Management', () => { ); }); + it('should forward checkChanges options to the watch client', async () => { + await sandbox.checkChanges('/workspace/test', { + since: 'watch-1:0', + recursive: false + }); + + expect(sandbox.client.watch.checkChanges).toHaveBeenCalledWith({ + path: '/workspace/test', + recursive: false, + include: undefined, + exclude: undefined, + since: 'watch-1:0', + sessionId: expect.stringMatching(/^sandbox-/) + }); + }); + it('should reuse default session across multiple operations', async () => { await sandbox.exec('echo test1'); await sandbox.writeFile('/test.txt', 'content'); diff --git a/packages/sandbox/tests/watch-client.test.ts b/packages/sandbox/tests/watch-client.test.ts new file mode 100644 index 000000000..1c52eddad --- /dev/null +++ b/packages/sandbox/tests/watch-client.test.ts @@ -0,0 +1,55 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { WatchClient } from '../src/clients/watch-client'; + +describe('WatchClient', () => { + let client: WatchClient; + let mockFetch: ReturnType; + + beforeEach(() => { + vi.clearAllMocks(); + + mockFetch = vi.fn(); + global.fetch = mockFetch as unknown as typeof fetch; + + client = new WatchClient({ + baseUrl: 'http://test.com', + port: 3000 + }); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('should post to the retained change check endpoint', async () => { + mockFetch.mockResolvedValue( + new Response( + JSON.stringify({ + success: true, + status: 'changed', + version: 'watch-1:2', + timestamp: '2026-03-17T00:00:00.000Z' + }), + { status: 200 } + ) + ); + + const result = await client.checkChanges({ + path: '/workspace/test', + since: 'watch-1:1' + }); + + expect(result).toEqual({ + success: true, + status: 'changed', + version: 'watch-1:2', + timestamp: '2026-03-17T00:00:00.000Z' + }); + expect(mockFetch).toHaveBeenCalledWith( + 'http://test.com/api/watch/check', + expect.objectContaining({ + method: 'POST' + }) + ); + }); +}); diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index dfda3a290..103f75bb8 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -113,6 +113,9 @@ export type { // Bucket mounting types BucketCredentials, BucketProvider, + CheckChangesOptions, + CheckChangesRequest, + CheckChangesResult, ContextCreateResult, ContextDeleteResult, ContextListResult, diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index a3bc97404..c18c8df5f 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -732,6 +732,21 @@ export interface WatchOptions { sessionId?: string; } +/** + * Options for checking whether a path changed while disconnected. + * + * Pass the `version` returned from a previous `checkChanges()` call to learn + * whether the path is unchanged, changed, or needs a full resync because the + * retained change state was reset. Change state lives only for the current + * container lifetime and may expire while idle. + */ +export interface CheckChangesOptions extends WatchOptions { + /** + * Version returned by a previous `checkChanges()` call. + */ + since?: string; +} + // Internal types for SSE protocol (not user-facing) /** @@ -757,6 +772,13 @@ export interface WatchRequest { sessionId?: string; } +/** + * @internal Request body for checking retained change state. + */ +export interface CheckChangesRequest extends WatchRequest { + since?: string; +} + /** * SSE events emitted by `sandbox.watch()`. */ @@ -782,6 +804,24 @@ export type FileWatchSSEEvent = reason: string; }; +/** + * Result returned by `checkChanges()`. + */ +export type CheckChangesResult = + | { + success: true; + status: 'unchanged' | 'changed'; + version: string; + timestamp: string; + } + | { + success: true; + status: 'resync'; + reason: 'expired' | 'restarted'; + version: string; + timestamp: string; + }; + // Process management result types export interface ProcessStartResult { success: boolean; @@ -977,6 +1017,10 @@ export interface ExecutionSession { path: string, options?: Omit ): Promise>; + checkChanges( + path: string, + options?: Omit + ): Promise; mkdir(path: string, options?: { recursive?: boolean }): Promise; deleteFile(path: string): Promise; renameFile(oldPath: string, newPath: string): Promise; @@ -1234,6 +1278,10 @@ export interface ISandbox { path: string, options?: WatchOptions ): Promise>; + checkChanges( + path: string, + options?: CheckChangesOptions + ): Promise; mkdir(path: string, options?: { recursive?: boolean }): Promise; deleteFile(path: string): Promise; renameFile(oldPath: string, newPath: string): Promise; diff --git a/tests/e2e/file-watch-workflow.test.ts b/tests/e2e/file-watch-workflow.test.ts index ba85c560e..696098bd5 100644 --- a/tests/e2e/file-watch-workflow.test.ts +++ b/tests/e2e/file-watch-workflow.test.ts @@ -9,7 +9,7 @@ * - Recursive vs non-recursive watching */ -import type { FileWatchSSEEvent } from '@repo/shared'; +import type { CheckChangesResult, FileWatchSSEEvent } from '@repo/shared'; import { afterAll, beforeAll, @@ -184,6 +184,31 @@ describe('File Watch Workflow', () => { await expectOk(response, `deleteFile(${path})`); } + async function checkChanges( + path: string, + options: { + recursive?: boolean; + include?: string[]; + exclude?: string[]; + since?: string; + } = {} + ): Promise { + const response = await fetch(`${workerUrl}/api/watch/check`, { + method: 'POST', + headers, + body: JSON.stringify({ + path, + recursive: options.recursive, + include: options.include, + exclude: options.exclude, + since: options.since + }) + }); + + await expectOk(response, `checkChanges(${path})`); + return (await response.json()) as CheckChangesResult; + } + test('should establish watch and receive watching event', async () => { testDir = sandbox!.uniquePath('watch-establish'); await createDir(testDir); @@ -202,6 +227,46 @@ describe('File Watch Workflow', () => { } }, 30000); + test('should report unchanged when starting retained change tracking', async () => { + testDir = sandbox!.uniquePath('watch-check-baseline'); + await createDir(testDir); + + const result = await checkChanges(testDir); + + expect(result.status).toBe('unchanged'); + expect(result.version).toMatch(/^watch-\d+-\d+:0$/); + }, 30000); + + test('should retain changed state across reconnect gaps', async () => { + testDir = sandbox!.uniquePath('watch-check-retain'); + await createDir(testDir); + + const first = await checkChanges(testDir); + expect(first.status).toBe('unchanged'); + + await createFile(`${testDir}/changed.txt`, 'hello'); + + const second = await checkChanges(testDir, { + since: first.version + }); + + expect(second.status).toBe('changed'); + expect(second.version).not.toBe(first.version); + }, 30000); + + test('should report unchanged when no files changed since the version', async () => { + testDir = sandbox!.uniquePath('watch-check-unchanged'); + await createDir(testDir); + + const first = await checkChanges(testDir); + const second = await checkChanges(testDir, { + since: first.version + }); + + expect(second.status).toBe('unchanged'); + expect(second.version).toBe(first.version); + }, 30000); + test('should detect file creation', async () => { testDir = sandbox!.uniquePath('watch-create'); await createDir(testDir); diff --git a/tests/e2e/test-worker/index.ts b/tests/e2e/test-worker/index.ts index f9c0262f5..2ba86d591 100644 --- a/tests/e2e/test-worker/index.ts +++ b/tests/e2e/test-worker/index.ts @@ -1035,6 +1035,21 @@ console.log('Terminal server on port ' + port); }); } + // File Watch - Check retained change state via public Sandbox API. + if (url.pathname === '/api/watch/check' && request.method === 'POST') { + const result = await sandbox.checkChanges(body.path, { + recursive: body.recursive, + include: body.include, + exclude: body.exclude, + since: body.since, + sessionId: sessionId ?? undefined + }); + + return new Response(JSON.stringify(result), { + headers: { 'Content-Type': 'application/json' } + }); + } + // Cleanup endpoint - destroys the sandbox container // This is used by E2E tests to explicitly clean up after each test if (url.pathname === '/cleanup' && request.method === 'POST') {