diff --git a/app/e2e/pitch-contour-alignment.spec.ts b/app/e2e/pitch-contour-alignment.spec.ts index 4665520..99d8260 100644 --- a/app/e2e/pitch-contour-alignment.spec.ts +++ b/app/e2e/pitch-contour-alignment.spec.ts @@ -1,4 +1,5 @@ import { test, expect } from './global-setup'; +import { API_BASE } from './test-utils'; /** * PitchContour SVG Alignment Tests @@ -129,10 +130,8 @@ test.describe('PitchContour alignment', () => { version: 1, }; - // Post session to API (use same port as playwright webServer) - const apiBase = 'http://localhost:5175'; - - const createRes = await request.post(`${apiBase}/api/sessions`, { + // Post session to API (respects BASE_URL env for full-stack testing) + const createRes = await request.post(`${API_BASE}/api/sessions`, { data: sessionData, }); diff --git a/app/package.json b/app/package.json index 254bc53..303677f 100644 --- a/app/package.json +++ b/app/package.json @@ -20,6 +20,8 @@ "test:e2e:full": "npx playwright test --project=chromium --project=webkit", "test:e2e:mobile": "npx playwright test e2e/mobile-iphone.spec.ts --project=mobile-safari", "test:e2e:all-local": "npx playwright test --project=chromium && npm run test:e2e:mobile", + "test:e2e:full-stack": "tsx scripts/test-e2e-full-stack.ts", + "test:e2e:full-stack:smoke": "tsx scripts/test-e2e-full-stack.ts --smoke", "validate:sync": "tsx scripts/validate-sync-checklist.ts", "validate:samples": "bash scripts/validate-sample-volume.sh", "validate:manifests": "tsx scripts/validate-manifests.ts", diff --git a/app/playwright.config.ts b/app/playwright.config.ts index 98d96d9..e75f73b 100644 --- a/app/playwright.config.ts +++ b/app/playwright.config.ts @@ -37,7 +37,8 @@ export default defineConfig({ ], use: { - baseURL: 'http://localhost:5175', + // Support PLAYWRIGHT_BASE_URL for full-stack testing against wrangler dev + baseURL: process.env.PLAYWRIGHT_BASE_URL || 'http://localhost:5175', headless: true, // Tracing for debugging failures @@ -79,12 +80,16 @@ export default defineConfig({ }, ], - webServer: { - command: process.env.USE_MOCK_API - ? 'USE_MOCK_API=1 npm run dev -- --port 5175' - : 'npm run dev -- --port 5175', - port: 5175, - reuseExistingServer: !process.env.CI, - timeout: 120000, - }, + // Only start webServer when not using external server (PLAYWRIGHT_BASE_URL) + // Full-stack testing via test:e2e:full-stack manages wrangler dev externally + webServer: process.env.PLAYWRIGHT_BASE_URL + ? undefined + : { + command: process.env.USE_MOCK_API + ? 'USE_MOCK_API=1 npm run dev -- --port 5175' + : 'npm run dev -- --port 5175', + port: 5175, + reuseExistingServer: !process.env.CI, + timeout: 120000, + }, }); diff --git a/app/scripts/test-e2e-full-stack.ts b/app/scripts/test-e2e-full-stack.ts new file mode 100644 index 0000000..5da9b63 --- /dev/null +++ b/app/scripts/test-e2e-full-stack.ts @@ -0,0 +1,184 @@ +#!/usr/bin/env npx tsx +/** + * Full-Stack E2E Test Runner + * + * Runs E2E tests against the real Cloudflare Worker (wrangler dev) instead of + * just the Vite dev server. This tests the complete stack including: + * - Cloudflare Worker API endpoints + * - Durable Objects (WebSocket, state persistence) + * - KV storage + * - Observability 2.0 wide events + * + * Usage: + * npm run test:e2e:full-stack # Run all E2E tests against wrangler dev + * npm run test:e2e:full-stack -- --smoke # Run only smoke tests + * + * Prerequisites: + * - Project must be built first (script handles this) + * - Port 8787 must be available for wrangler dev + */ + +import { spawn, execSync, ChildProcess } from 'child_process'; + +const WRANGLER_PORT = 8787; +const WRANGLER_URL = `http://localhost:${WRANGLER_PORT}`; +const MAX_STARTUP_WAIT_MS = 120_000; // 2 minutes +const HEALTH_CHECK_INTERVAL_MS = 1000; + +let wranglerProcess: ChildProcess | null = null; + +/** + * Check if wrangler dev is ready by hitting the health endpoint + */ +async function isWranglerReady(): Promise { + try { + const response = await fetch(`${WRANGLER_URL}/api/health`, { + signal: AbortSignal.timeout(2000), + }); + return response.ok; + } catch { + return false; + } +} + +/** + * Wait for wrangler dev to be ready + */ +async function waitForWrangler(): Promise { + const startTime = Date.now(); + console.log(`⏳ Waiting for wrangler dev to be ready on port ${WRANGLER_PORT}...`); + + while (Date.now() - startTime < MAX_STARTUP_WAIT_MS) { + if (await isWranglerReady()) { + console.log(`βœ… Wrangler dev is ready (took ${Math.round((Date.now() - startTime) / 1000)}s)`); + return; + } + await new Promise(resolve => setTimeout(resolve, HEALTH_CHECK_INTERVAL_MS)); + } + + throw new Error(`Wrangler dev failed to start within ${MAX_STARTUP_WAIT_MS / 1000}s`); +} + +/** + * Start wrangler dev in the background + */ +function startWrangler(): ChildProcess { + console.log('πŸš€ Starting wrangler dev...'); + + const proc = spawn('npx', ['wrangler', 'dev', '--port', String(WRANGLER_PORT)], { + stdio: ['ignore', 'pipe', 'pipe'], + detached: false, + shell: true, + }); + + // Log wrangler output with prefix + proc.stdout?.on('data', (data) => { + const lines = data.toString().split('\n').filter((l: string) => l.trim()); + lines.forEach((line: string) => console.log(` [wrangler] ${line}`)); + }); + + proc.stderr?.on('data', (data) => { + const lines = data.toString().split('\n').filter((l: string) => l.trim()); + lines.forEach((line: string) => console.log(` [wrangler] ${line}`)); + }); + + proc.on('error', (err) => { + console.error('❌ Failed to start wrangler:', err.message); + }); + + return proc; +} + +/** + * Stop wrangler dev + */ +function stopWrangler(): void { + if (wranglerProcess) { + console.log('πŸ›‘ Stopping wrangler dev...'); + wranglerProcess.kill('SIGTERM'); + wranglerProcess = null; + } +} + +/** + * Run playwright E2E tests + */ +function runE2ETests(smokeOnly: boolean): number { + console.log(`\nπŸ§ͺ Running E2E tests against ${WRANGLER_URL}...\n`); + + const args = smokeOnly + ? ['playwright', 'test', '--project=chromium', 'e2e/track-reorder.spec.ts', 'e2e/plock-editor.spec.ts', 'e2e/pitch-contour-alignment.spec.ts'] + : ['playwright', 'test']; + + try { + execSync(`npx ${args.join(' ')}`, { + stdio: 'inherit', + env: { + ...process.env, + // Override the base URL to point to wrangler dev + // PLAYWRIGHT_BASE_URL: Used by playwright.config.ts for browser navigation + // BASE_URL: Used by test-utils.ts for direct API requests + PLAYWRIGHT_BASE_URL: WRANGLER_URL, + BASE_URL: WRANGLER_URL, + }, + }); + return 0; + } catch { + // execSync throws on non-zero exit code + return 1; + } +} + +/** + * Build the project + */ +function buildProject(): void { + console.log('πŸ“¦ Building project...'); + execSync('npm run build', { stdio: 'inherit' }); + console.log('βœ… Build complete\n'); +} + +/** + * Main entry point + */ +async function main(): Promise { + const args = process.argv.slice(2); + const smokeOnly = args.includes('--smoke'); + let exitCode = 0; + + // Cleanup handler + const cleanup = () => { + stopWrangler(); + process.exit(exitCode); + }; + + process.on('SIGINT', cleanup); + process.on('SIGTERM', cleanup); + + try { + // Step 1: Build + buildProject(); + + // Step 2: Start wrangler dev + wranglerProcess = startWrangler(); + + // Step 3: Wait for wrangler to be ready + await waitForWrangler(); + + // Step 4: Run E2E tests + exitCode = runE2ETests(smokeOnly); + + if (exitCode === 0) { + console.log('\nβœ… All E2E tests passed!'); + } else { + console.log('\n❌ Some E2E tests failed'); + } + } catch (error) { + console.error('\n❌ Error:', error instanceof Error ? error.message : error); + exitCode = 1; + } finally { + cleanup(); + } +} + +main(); diff --git a/app/src/worker/index.ts b/app/src/worker/index.ts index c9b3f55..6f22698 100644 --- a/app/src/worker/index.ts +++ b/app/src/worker/index.ts @@ -55,27 +55,26 @@ import { isBodySizeValid, validationErrorResponse, } from './validation'; +// Observability 2.0: Wide events +import { + emitHttpRequestEvent, + getDeployInfo, + getInfraInfo, + getServiceInfo, + getDeviceType, + WarningCollector, + classifyError, + classifyCustomError, + createRequestMetrics, + type HttpRequestEvent, + type RequestMetrics, +} from './observability'; +import { matchRoute, extractSessionId } from './route-patterns'; + +// State hashing utilities (still needed for debug endpoints) import { - RequestLog, - generateRequestId, - createRequestLog, - storeLog, - getSessionLogs, - getRecentLogs, - getMetrics, - trackSessionCreated, - trackSessionAccessed, - incrementMetric, - // Phase 7: Multiplayer Observability - getSessionWsLogs, - getWsMetrics, hashState, canonicalizeForHash, - type ConnectionsDebugInfo, - type ClockDebugInfo, - type StateSyncDebugInfo, - type DurableObjectDebugInfo, - // WebSocketLog imported but used as type in getSessionWsLogs return } from './logging'; // Social Media Preview @@ -241,48 +240,102 @@ async function handleApiRequest( ctx: ExecutionContext ): Promise { const method = request.method; - const requestId = generateRequestId(); + const requestId = crypto.randomUUID().slice(0, 8); const startTime = Date.now(); - // Extract session ID from path for logging - const sessionIdMatch = path.match(/\/([a-f0-9-]{36})/); - const sessionId = sessionIdMatch ? sessionIdMatch[1] : undefined; - - // Create log entry - const log = createRequestLog(requestId, method, path, sessionId); - - // Helper to complete and store log - const completeLog = async (status: number, sessionState?: RequestLog['sessionState'], error?: string) => { - log.status = status; - log.responseTime = Date.now() - startTime; - log.sessionState = sessionState; - log.error = error; - // Store log async (don't block response) - storeLog(env, log).catch(console.error); - }; + // Observability 2.0: Wide event setup + const warnings = new WarningCollector(); + const metrics: RequestMetrics = createRequestMetrics(); + const routeMatch = matchRoute(path, method); + const sessionId = extractSessionId(path); + + // Helper to emit http_request wide event + const emitEvent = ( + status: number, + options?: { + sessionId?: string; + playerId?: string; + isPublished?: boolean; + sourceSessionId?: string; + responseSize?: number; + error?: Error | string | null; + errorSlug?: string; + errorExpected?: boolean; + } + ) => { + const event: HttpRequestEvent = { + event: 'http_request', + requestId, + method, + path, + deviceType: getDeviceType(request.headers.get('User-Agent')), + timestamp: new Date().toISOString(), + duration_ms: Date.now() - startTime, + status, + responseSize: options?.responseSize, + routePattern: routeMatch.pattern, + action: routeMatch.action, + outcome: status >= 400 ? 'error' : 'ok', + sessionId: options?.sessionId ?? sessionId, + playerId: options?.playerId, + isPublished: options?.isPublished, + sourceSessionId: options?.sourceSessionId, + kvReads: metrics.kvReads > 0 ? metrics.kvReads : undefined, + kvWrites: metrics.kvWrites > 0 ? metrics.kvWrites : undefined, + doRequests: metrics.doRequests > 0 ? metrics.doRequests : undefined, + warnings: warnings.hasWarnings() ? warnings.get() : undefined, + deploy: getDeployInfo(env), + infra: getInfraInfo(request), + service: getServiceInfo(env), + }; - // GET /api/debug/logs - Get recent logs - if (path === '/api/debug/logs' && method === 'GET') { - const url = new URL(request.url); - const filterSessionId = url.searchParams.get('sessionId'); - const limit = parseInt(url.searchParams.get('last') ?? '50', 10); + // Add error info if status indicates error + if (status >= 400 && options?.error !== undefined) { + if (options.errorSlug) { + event.error = classifyCustomError( + 'Error', + typeof options.error === 'string' ? options.error : options.error?.message ?? 'Unknown error', + options.errorSlug, + options.errorExpected ?? (status < 500), + routeMatch.action + ); + } else { + event.error = classifyError(status, options.error, routeMatch.action); + } + } - const logs = filterSessionId - ? await getSessionLogs(env, filterSessionId, limit) - : await getRecentLogs(env, limit); + ctx.waitUntil(Promise.resolve().then(() => emitHttpRequestEvent(event))); + }; + + // GET /api/health - Health check endpoint for monitoring and testing + if (path === '/api/health' && method === 'GET') { + emitEvent(200); + return new Response(JSON.stringify({ status: 'ok' }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }); + } - await completeLog(200); - return new Response(JSON.stringify({ logs }, null, 2), { + // GET /api/debug/logs - DEPRECATED: Legacy logs endpoint + // Observability 2.0 uses Workers Logs instead of KV-based logging + if (path === '/api/debug/logs' && method === 'GET') { + emitEvent(200); + return new Response(JSON.stringify({ + message: 'Legacy logs endpoint deprecated. Use wrangler tail or Workers Logs dashboard.', + logs: [], + }, null, 2), { status: 200, headers: { 'Content-Type': 'application/json' }, }); } - // GET /api/metrics - Get system metrics + // GET /api/metrics - DEPRECATED: Legacy metrics endpoint + // Observability 2.0 derives metrics from wide events in Workers Logs if (path === '/api/metrics' && method === 'GET') { - const metrics = await getMetrics(env); - await completeLog(200); - return new Response(JSON.stringify(metrics, null, 2), { + emitEvent(200); + return new Response(JSON.stringify({ + message: 'Legacy metrics endpoint deprecated. Metrics are now derived from Workers Logs wide events.', + }, null, 2), { status: 200, headers: { 'Content-Type': 'application/json' }, }); @@ -297,7 +350,7 @@ async function handleApiRequest( if (clientIP) { const rateLimit = checkRateLimit(clientIP); if (!rateLimit.allowed) { - await completeLog(429, undefined, `Rate limit exceeded for IP: ${clientIP}`); + emitEvent(429, { error: 'Rate limit exceeded', errorSlug: 'rate-limited', errorExpected: true }); return new Response(JSON.stringify({ error: 'Too many requests. Please wait before creating more sessions.', retryAfter: Math.ceil(rateLimit.resetIn / 1000), @@ -314,7 +367,7 @@ async function handleApiRequest( // Phase 13A: Validate body size before parsing if (!isBodySizeValid(request.headers.get('content-length'))) { - await completeLog(413, undefined, 'Request body too large'); + emitEvent(413, { error: 'Request body too large', errorSlug: 'payload-too-large', errorExpected: true }); return jsonError('Request body too large', 413); } @@ -331,7 +384,7 @@ async function handleApiRequest( if (body.name !== undefined) { const nameValidation = validateSessionName(body.name); if (!nameValidation.valid) { - await completeLog(400, undefined, `Validation failed: ${nameValidation.errors.join(', ')}`); + emitEvent(400, { error: nameValidation.errors.join(', '), errorSlug: 'validation-error', errorExpected: true }); return validationErrorResponse(nameValidation.errors); } sessionName = body.name as string; @@ -354,51 +407,43 @@ async function handleApiRequest( if (initialState) { const validation = validateSessionState(initialState); if (!validation.valid) { - await completeLog(400, undefined, `Validation failed: ${validation.errors.join(', ')}`); + emitEvent(400, { error: validation.errors.join(', '), errorSlug: 'validation-error', errorExpected: true }); return validationErrorResponse(validation.errors); } } - - // Log request body details - log.requestBody = { - trackCount: (initialState?.tracks as unknown[])?.length, - tempo: initialState?.tempo as number, - swing: initialState?.swing as number, - }; } + metrics.kvWrites++; const result = await createSession(env, { initialState, name: sessionName }); if (!result.success) { if (result.quotaExceeded) { - await completeLog(503, undefined, 'KV quota exceeded'); + emitEvent(503, { error: 'KV quota exceeded', errorSlug: 'kv-quota-exceeded', errorExpected: false }); return quotaExceededResponse(); } - await completeLog(500, undefined, result.error); + emitEvent(500, { error: result.error, errorSlug: 'session-create-failed', errorExpected: false }); return jsonError('Failed to create session', 500); } const session = result.data; - // Track metrics - await trackSessionCreated(env); - - await completeLog(201, { - trackCount: session.state.tracks.length, - hasData: session.state.tracks.length > 0, - }); - const response: CreateSessionResponse = { id: session.id, url: `/s/${session.id}`, }; + const responseBody = JSON.stringify(response); + + emitEvent(201, { + sessionId: session.id, + responseSize: new TextEncoder().encode(responseBody).length, + }); - return new Response(JSON.stringify(response), { + return new Response(responseBody, { status: 201, headers: { 'Content-Type': 'application/json' }, }); } catch (error) { - await completeLog(500, undefined, String(error)); + emitEvent(500, { error: error as Error, errorSlug: 'session-create-failed', errorExpected: false }); return jsonError('Failed to create session', 500); } } @@ -415,31 +460,33 @@ async function handleApiRequest( // GET /api/sessions/:id/ws - WebSocket upgrade to Durable Object const wsMatch = path.match(/^\/api\/sessions\/([a-f0-9-]{36})\/ws$/); if (wsMatch && request.headers.get('Upgrade') === 'websocket') { - const sessionId = wsMatch[1]; + const wsSessionId = wsMatch[1]; // Phase 13A: Validate session ID format BEFORE routing to DO // This saves DO billing for malformed requests - if (!isValidUUID(sessionId)) { - await completeLog(400, undefined, 'Invalid session ID format'); + if (!isValidUUID(wsSessionId)) { + emitEvent(400, { sessionId: wsSessionId, error: 'Invalid session ID format', errorSlug: 'invalid-session-id', errorExpected: true }); return jsonError('Invalid session ID format', 400); } // Verify session exists - const session = await getSession(env, sessionId, false); + metrics.kvReads++; + const session = await getSession(env, wsSessionId, false); if (!session) { - await completeLog(404, undefined, 'Session not found'); + emitEvent(404, { sessionId: wsSessionId, error: 'Session not found', errorSlug: 'session-not-found', errorExpected: true }); return jsonError('Session not found', 404); } // Get the Durable Object instance for this session - const doId = env.LIVE_SESSIONS.idFromName(sessionId); + const doId = env.LIVE_SESSIONS.idFromName(wsSessionId); let stub = env.LIVE_SESSIONS.get(doId); - // Forward the WebSocket upgrade request to the DO - // Don't log for WebSocket as it interferes with the upgrade - console.log(`[GET] ${path} -> 101 (WebSocket upgrade) session=${sessionId}`); + // WebSocket upgrade - emit event before returning (101 response cannot be modified) + // The ws_session event will be emitted by the DO on disconnect + emitEvent(101, { sessionId: wsSessionId, isPublished: session.immutable }); try { + metrics.doRequests++; // Return the DO response directly - WebSocket upgrade responses cannot be modified return await stub.fetch(request); } catch (error) { @@ -451,23 +498,25 @@ async function handleApiRequest( const e = error as { retryable?: boolean; overloaded?: boolean }; if (e.overloaded) { // Never retry overloaded errors - it makes things worse - await completeLog(503, undefined, 'Service overloaded'); + emitEvent(503, { sessionId: wsSessionId, error: 'Service overloaded', errorSlug: 'service-overloaded', errorExpected: false }); return jsonError('Service temporarily unavailable', 503); } if (e.retryable) { // Create fresh stub and retry once stub = env.LIVE_SESSIONS.get(doId); + warnings.add({ type: 'DORequestRetry', message: 'DO stub recreated after error', recoveryAction: 'retry_succeeded', attemptNumber: 2, totalAttempts: 2 }); try { + metrics.doRequests++; return await stub.fetch(request); } catch (retryError) { console.error(`[WS] DO retry failed: ${retryError}`); - await completeLog(500, undefined, 'WebSocket connection failed'); + emitEvent(500, { sessionId: wsSessionId, error: retryError as Error, errorSlug: 'ws-connection-failed', errorExpected: false }); return jsonError('Failed to establish WebSocket connection', 500); } } - await completeLog(500, undefined, String(error)); + emitEvent(500, { sessionId: wsSessionId, error: error as Error, errorSlug: 'ws-connection-failed', errorExpected: false }); return jsonError('WebSocket connection failed', 500); } } @@ -475,24 +524,25 @@ async function handleApiRequest( // GET /api/sessions/:id/live-debug - Forward to Durable Object debug endpoint const liveDebugMatch = path.match(/^\/api\/sessions\/([a-f0-9-]{36})\/live-debug$/); if (liveDebugMatch && method === 'GET') { - const sessionId = liveDebugMatch[1]; + const debugSessionId = liveDebugMatch[1]; try { // Get the Durable Object instance for this session - const doId = env.LIVE_SESSIONS.idFromName(sessionId); + const doId = env.LIVE_SESSIONS.idFromName(debugSessionId); const stub = env.LIVE_SESSIONS.get(doId); // Create debug request URL const debugUrl = new URL(request.url); - debugUrl.pathname = `/api/sessions/${sessionId}/debug`; + debugUrl.pathname = `/api/sessions/${debugSessionId}/debug`; // Forward to DO + metrics.doRequests++; const response = await stub.fetch(new Request(debugUrl.toString(), { method: 'GET' })); - await completeLog(response.status); + emitEvent(response.status, { sessionId: debugSessionId }); return response; } catch (e) { - console.error(`[live-debug] Error for session ${sessionId}:`, e); - await completeLog(500, undefined, String(e)); + console.error(`[live-debug] Error for session ${debugSessionId}:`, e); + emitEvent(500, { sessionId: debugSessionId, error: e as Error, errorSlug: 'debug-request-failed', errorExpected: false }); return jsonError(`Debug request failed: ${e}`, 500); } } @@ -504,7 +554,7 @@ async function handleApiRequest( // Phase 13A: Validate session ID format if (!isValidUUID(sourceId)) { - await completeLog(400, undefined, 'Invalid session ID format'); + emitEvent(400, { sessionId: sourceId, error: 'Invalid session ID format', errorSlug: 'invalid-session-id', errorExpected: true }); return jsonError('Invalid session ID format', 400); } @@ -514,6 +564,7 @@ async function handleApiRequest( let sourceSession: Session | null = null; try { + metrics.doRequests++; const doResponse = await stub.fetch(new Request( new URL(`/api/sessions/${sourceId}`, request.url).toString(), { method: 'GET' } @@ -524,41 +575,46 @@ async function handleApiRequest( } catch (error) { console.error(`[remix] DO error for source ${sourceId}:`, error); // Fall back to KV if DO fails + warnings.add({ type: 'DORequestRetry', message: 'Fell back to KV after DO error', recoveryAction: 'fallback_used' }); + metrics.kvReads++; sourceSession = await getSession(env, sourceId, false); } if (!sourceSession) { - await completeLog(404, undefined, 'Session not found'); + emitEvent(404, { sessionId: sourceId, error: 'Session not found', errorSlug: 'session-not-found', errorExpected: true }); return jsonError('Session not found', 404); } // Create remix using the DO-provided state + metrics.kvWrites++; const result = await remixSessionFromState(env, sourceId, sourceSession); if (!result.success) { if (result.quotaExceeded) { - await completeLog(503, undefined, 'KV quota exceeded'); + emitEvent(503, { sourceSessionId: sourceId, error: 'KV quota exceeded', errorSlug: 'kv-quota-exceeded', errorExpected: false }); return quotaExceededResponse(); } - await completeLog(500, undefined, result.error); + emitEvent(500, { sourceSessionId: sourceId, error: result.error, errorSlug: 'remix-failed', errorExpected: false }); return jsonError('Failed to remix session', 500); } const remixed = result.data; - await incrementMetric(env, 'remixes'); - await completeLog(201, { - trackCount: remixed.state.tracks.length, - hasData: remixed.state.tracks.length > 0, - }); - const response: RemixSessionResponse = { id: remixed.id, remixedFrom: sourceId, url: `/s/${remixed.id}`, }; + const responseBody = JSON.stringify(response); + + emitEvent(201, { + sessionId: remixed.id, + sourceSessionId: sourceId, + isPublished: false, + responseSize: new TextEncoder().encode(responseBody).length, + }); - return new Response(JSON.stringify(response), { + return new Response(responseBody, { status: 201, headers: { 'Content-Type': 'application/json' }, }); @@ -571,52 +627,56 @@ async function handleApiRequest( // POST /api/sessions/:id/publish - Publish a session (make it immutable) // Phase 34: Route through DO to get latest source state (may have pending changes) if (publishMatch && method === 'POST') { - const id = publishMatch[1]; + const publishSourceId = publishMatch[1]; // Phase 13A: Validate session ID format - if (!isValidUUID(id)) { - await completeLog(400, undefined, 'Invalid session ID format'); + if (!isValidUUID(publishSourceId)) { + emitEvent(400, { sessionId: publishSourceId, error: 'Invalid session ID format', errorSlug: 'invalid-session-id', errorExpected: true }); return jsonError('Invalid session ID format', 400); } // Get source session from DO (includes pending changes not yet in KV) - const doId = env.LIVE_SESSIONS.idFromName(id); + const doId = env.LIVE_SESSIONS.idFromName(publishSourceId); const stub = env.LIVE_SESSIONS.get(doId); let sourceSession: Session | null = null; try { + metrics.doRequests++; const doResponse = await stub.fetch(new Request( - new URL(`/api/sessions/${id}`, request.url).toString(), + new URL(`/api/sessions/${publishSourceId}`, request.url).toString(), { method: 'GET' } )); if (doResponse.ok) { sourceSession = await doResponse.json() as Session; } } catch (error) { - console.error(`[publish] DO error for source ${id}:`, error); + console.error(`[publish] DO error for source ${publishSourceId}:`, error); // Fall back to KV if DO fails - sourceSession = await getSession(env, id, false); + warnings.add({ type: 'DORequestRetry', message: 'Fell back to KV after DO error', recoveryAction: 'fallback_used' }); + metrics.kvReads++; + sourceSession = await getSession(env, publishSourceId, false); } if (!sourceSession) { - await completeLog(404, undefined, 'Session not found'); + emitEvent(404, { sessionId: publishSourceId, error: 'Session not found', errorSlug: 'session-not-found', errorExpected: true }); return jsonError('Session not found', 404); } // Publish using the DO-provided state - const result = await publishSessionFromState(env, id, sourceSession); + metrics.kvWrites++; + const result = await publishSessionFromState(env, publishSourceId, sourceSession); if (!result.success) { if (result.quotaExceeded) { - await completeLog(503, undefined, 'KV quota exceeded'); + emitEvent(503, { sourceSessionId: publishSourceId, error: 'KV quota exceeded', errorSlug: 'kv-quota-exceeded', errorExpected: false }); return quotaExceededResponse(); } // Handle trying to publish from an already-published session if (result.error.includes('already-published')) { - await completeLog(400, undefined, result.error); + emitEvent(400, { sourceSessionId: publishSourceId, error: result.error, errorSlug: 'already-published', errorExpected: true }); return jsonError(result.error, 400); } - await completeLog(500, undefined, result.error); + emitEvent(500, { sourceSessionId: publishSourceId, error: result.error, errorSlug: 'publish-failed', errorExpected: false }); return jsonError('Failed to publish session', 500); } @@ -628,25 +688,28 @@ async function handleApiRequest( const baseUrl = new URL(request.url).origin; ctx.waitUntil( Promise.all([ - purgeOGCache(id, baseUrl), // Source session - purgeOGCache(published.id, baseUrl), // New published session + purgeOGCache(publishSourceId, baseUrl), // Source session + purgeOGCache(published.id, baseUrl), // New published session ]).catch(error => console.error('[OG] Cache purge failed:', error)) ); - await incrementMetric(env, 'publishes'); - await completeLog(201, { - trackCount: published.state.tracks.length, - hasData: published.state.tracks.length > 0, - }); - // Return 201 Created - we're creating a NEW immutable session // The source session remains editable at its original URL - return new Response(JSON.stringify({ + const responseBody = JSON.stringify({ id: published.id, immutable: published.immutable, url: `/s/${published.id}`, - sourceId: id, // Include source session ID for reference - }), { + sourceId: publishSourceId, // Include source session ID for reference + }); + + emitEvent(201, { + sessionId: published.id, + sourceSessionId: publishSourceId, + isPublished: true, + responseSize: new TextEncoder().encode(responseBody).length, + }); + + return new Response(responseBody, { status: 201, headers: { 'Content-Type': 'application/json' }, }); @@ -657,52 +720,54 @@ async function handleApiRequest( // This fixes the architectural violation where we read stale data from KV // while DO had pending changes not yet persisted. if (sessionMatch && method === 'GET') { - const id = sessionMatch[1]; + const getSessionId = sessionMatch[1]; // Phase 13A: Validate session ID format - if (!isValidUUID(id)) { - await completeLog(400, undefined, 'Invalid session ID format'); + if (!isValidUUID(getSessionId)) { + emitEvent(400, { sessionId: getSessionId, error: 'Invalid session ID format', errorSlug: 'invalid-session-id', errorExpected: true }); return jsonError('Invalid session ID format', 400); } // Route through DO - it will return latest state (including pending changes) // and merge with KV metadata (name, timestamps, etc.) - const doId = env.LIVE_SESSIONS.idFromName(id); + const doId = env.LIVE_SESSIONS.idFromName(getSessionId); const stub = env.LIVE_SESSIONS.get(doId); try { + metrics.doRequests++; const doResponse = await stub.fetch(new Request(request.url, { method: 'GET' })); if (doResponse.status === 404) { - await completeLog(404, undefined, 'Session not found'); + emitEvent(404, { sessionId: getSessionId, error: 'Session not found', errorSlug: 'session-not-found', errorExpected: true }); return jsonError('Session not found', 404); } if (!doResponse.ok) { const errorBody = await doResponse.text(); - await completeLog(doResponse.status, undefined, errorBody); + emitEvent(doResponse.status, { sessionId: getSessionId, error: errorBody, errorSlug: 'do-error', errorExpected: false }); return new Response(errorBody, { status: doResponse.status, headers: { 'Content-Type': 'application/json' }, }); } - const session = await doResponse.json() as { state: { tracks: unknown[] } }; + const session = await doResponse.json() as { state: { tracks: unknown[] }; immutable?: boolean }; + const responseBody = JSON.stringify(session); - await trackSessionAccessed(env); - await completeLog(200, { - trackCount: session.state.tracks.length, - hasData: session.state.tracks.length > 0, + emitEvent(200, { + sessionId: getSessionId, + isPublished: session.immutable, + responseSize: new TextEncoder().encode(responseBody).length, }); - return new Response(JSON.stringify(session), { + return new Response(responseBody, { status: 200, headers: { 'Content-Type': 'application/json' }, }); } catch (error) { // If DO fails, log and return error (don't silently fall back to KV) - console.error(`[GET] DO error for session ${id}:`, error); - await completeLog(500, undefined, `DO error: ${error}`); + console.error(`[GET] DO error for session ${getSessionId}:`, error); + emitEvent(500, { sessionId: getSessionId, error: error as Error, errorSlug: 'do-error', errorExpected: false }); return jsonError('Failed to retrieve session', 500); } } @@ -711,17 +776,17 @@ async function handleApiRequest( // Phase 31E: Route through Durable Object to maintain architectural correctness // Previously this wrote directly to KV, causing state desync with active DO if (sessionMatch && method === 'PUT') { - const id = sessionMatch[1]; + const putSessionId = sessionMatch[1]; // Phase 13A: Validate session ID format - if (!isValidUUID(id)) { - await completeLog(400, undefined, 'Invalid session ID format'); + if (!isValidUUID(putSessionId)) { + emitEvent(400, { sessionId: putSessionId, error: 'Invalid session ID format', errorSlug: 'invalid-session-id', errorExpected: true }); return jsonError('Invalid session ID format', 400); } // Phase 13A: Validate body size before parsing if (!isBodySizeValid(request.headers.get('content-length'))) { - await completeLog(413, undefined, 'Request body too large'); + emitEvent(413, { sessionId: putSessionId, error: 'Request body too large', errorSlug: 'payload-too-large', errorExpected: true }); return jsonError('Request body too large', 413); } @@ -732,23 +797,16 @@ async function handleApiRequest( // Phase 13A: Validate session state const validation = validateSessionState(body.state); if (!validation.valid) { - await completeLog(400, undefined, `Validation failed: ${validation.errors.join(', ')}`); + emitEvent(400, { sessionId: putSessionId, error: validation.errors.join(', '), errorSlug: 'validation-error', errorExpected: true }); return validationErrorResponse(validation.errors); } - // Log request body details - log.requestBody = { - trackCount: body.state.tracks?.length, - tempo: body.state.tempo, - swing: body.state.swing, - }; - // Route to Durable Object - this is the architectural fix // The DO will: // 1. Update its internal state // 2. Persist to KV // 3. Broadcast to all connected WebSocket clients - const doId = env.LIVE_SESSIONS.idFromName(id); + const doId = env.LIVE_SESSIONS.idFromName(putSessionId); const stub = env.LIVE_SESSIONS.get(doId); // Create a new request with the validated body @@ -758,14 +816,15 @@ async function handleApiRequest( body: JSON.stringify(body), }); + metrics.doRequests++; const doResponse = await stub.fetch(doRequest); // Handle DO response - clone to get mutable headers for CORS if (doResponse.status === 403) { // Session is immutable - await completeLog(403, undefined, 'Session is published and cannot be modified'); - const body = await doResponse.text(); - return new Response(body, { + emitEvent(403, { sessionId: putSessionId, error: 'Session is published and cannot be modified', errorSlug: 'session-immutable', errorExpected: true }); + const responseBody = await doResponse.text(); + return new Response(responseBody, { status: 403, headers: { 'Content-Type': 'application/json' }, }); @@ -773,7 +832,7 @@ async function handleApiRequest( if (!doResponse.ok) { const errorBody = await doResponse.text(); - await completeLog(doResponse.status, undefined, errorBody); + emitEvent(doResponse.status, { sessionId: putSessionId, error: errorBody, errorSlug: 'update-failed', errorExpected: false }); return new Response(errorBody, { status: doResponse.status, headers: { 'Content-Type': 'application/json' }, @@ -782,13 +841,7 @@ async function handleApiRequest( const result = await doResponse.json() as { id: string; updatedAt: number; trackCount: number }; - await incrementMetric(env, 'updates'); - await completeLog(200, { - trackCount: result.trackCount, - hasData: result.trackCount > 0, - }); - - console.log(`[PUT] Session ${id} updated via DO, ${result.trackCount} tracks`); + emitEvent(200, { sessionId: putSessionId }); return new Response(JSON.stringify(result), { status: 200, @@ -797,10 +850,10 @@ async function handleApiRequest( } catch (error) { // Provide specific error messages for better debugging const errorMessage = error instanceof Error ? error.message : String(error); - await completeLog(400, undefined, errorMessage); // Distinguish between JSON parse errors and other errors if (error instanceof SyntaxError) { + emitEvent(400, { sessionId: putSessionId, error: 'Invalid JSON', errorSlug: 'invalid-json', errorExpected: true }); return new Response( JSON.stringify({ error: 'Invalid JSON', @@ -811,6 +864,7 @@ async function handleApiRequest( } // For other errors, include the actual error message + emitEvent(400, { sessionId: putSessionId, error: errorMessage, errorSlug: 'invalid-request', errorExpected: true }); return new Response( JSON.stringify({ error: 'Invalid request body', @@ -830,11 +884,11 @@ async function handleApiRequest( // { state: {...} } - Update just the session state // { name: "New Name", state: {...} } - Update both if (sessionMatch && method === 'PATCH') { - const id = sessionMatch[1]; + const patchSessionId = sessionMatch[1]; // Phase 13A: Validate session ID format - if (!isValidUUID(id)) { - await completeLog(400, undefined, 'Invalid session ID format'); + if (!isValidUUID(patchSessionId)) { + emitEvent(400, { sessionId: patchSessionId, error: 'Invalid session ID format', errorSlug: 'invalid-session-id', errorExpected: true }); return jsonError('Invalid session ID format', 400); } @@ -847,7 +901,7 @@ async function handleApiRequest( // Require at least one of name or state if (!hasName && !hasState) { - await completeLog(400, undefined, 'Missing name or state field'); + emitEvent(400, { sessionId: patchSessionId, error: 'Missing name or state field', errorSlug: 'missing-field', errorExpected: true }); return jsonError('Missing name or state field', 400); } @@ -855,7 +909,7 @@ async function handleApiRequest( if (hasName) { const nameValidation = validateSessionName(body.name); if (!nameValidation.valid) { - await completeLog(400, undefined, `Name validation failed: ${nameValidation.errors.join(', ')}`); + emitEvent(400, { sessionId: patchSessionId, error: nameValidation.errors.join(', '), errorSlug: 'validation-error', errorExpected: true }); return validationErrorResponse(nameValidation.errors); } } @@ -864,7 +918,7 @@ async function handleApiRequest( if (hasState) { const stateValidation = validateSessionState(body.state); if (!stateValidation.valid) { - await completeLog(400, undefined, `State validation failed: ${stateValidation.errors.join(', ')}`); + emitEvent(400, { sessionId: patchSessionId, error: stateValidation.errors.join(', '), errorSlug: 'validation-error', errorExpected: true }); return validationErrorResponse(stateValidation.errors); } } @@ -873,7 +927,7 @@ async function handleApiRequest( // The DO will: // 1. Update KV // 2. Broadcast to all connected WebSocket clients - const doId = env.LIVE_SESSIONS.idFromName(id); + const doId = env.LIVE_SESSIONS.idFromName(patchSessionId); const stub = env.LIVE_SESSIONS.get(doId); // Create a new request with the validated body @@ -883,12 +937,13 @@ async function handleApiRequest( body: JSON.stringify(body), }); + metrics.doRequests++; const doResponse = await stub.fetch(doRequest); // Handle DO response - clone to get mutable headers for CORS if (doResponse.status === 403) { // Session is immutable - await completeLog(403, undefined, 'Session is published and cannot be modified'); + emitEvent(403, { sessionId: patchSessionId, error: 'Session is published and cannot be modified', errorSlug: 'session-immutable', errorExpected: true }); const responseBody = await doResponse.text(); return new Response(responseBody, { status: 403, @@ -898,7 +953,7 @@ async function handleApiRequest( if (!doResponse.ok) { const errorBody = await doResponse.text(); - await completeLog(doResponse.status, undefined, errorBody); + emitEvent(doResponse.status, { sessionId: patchSessionId, error: errorBody, errorSlug: 'update-failed', errorExpected: false }); return new Response(errorBody, { status: doResponse.status, headers: { 'Content-Type': 'application/json' }, @@ -907,9 +962,7 @@ async function handleApiRequest( const result = await doResponse.json() as { id: string; name: string | null; updatedAt: number }; - await completeLog(200); - - console.log(`[PATCH] Session ${id} name updated via DO to: ${result.name}`); + emitEvent(200, { sessionId: patchSessionId }); return new Response(JSON.stringify(result), { status: 200, @@ -918,10 +971,10 @@ async function handleApiRequest( } catch (error) { // Provide specific error messages for better debugging const errorMessage = error instanceof Error ? error.message : String(error); - await completeLog(400, undefined, errorMessage); // Distinguish between JSON parse errors and other errors if (error instanceof SyntaxError) { + emitEvent(400, { sessionId: patchSessionId, error: 'Invalid JSON', errorSlug: 'invalid-json', errorExpected: true }); return new Response( JSON.stringify({ error: 'Invalid JSON', @@ -932,6 +985,7 @@ async function handleApiRequest( } // For other errors, include the actual error message + emitEvent(400, { sessionId: patchSessionId, error: errorMessage, errorSlug: 'invalid-request', errorExpected: true }); return new Response( JSON.stringify({ error: 'Invalid request body', @@ -945,13 +999,14 @@ async function handleApiRequest( // GET /api/debug/session/:id - Debug endpoint for session inspection const debugMatch = path.match(/^\/api\/debug\/session\/([a-f0-9-]{36})$/); if (debugMatch && method === 'GET') { - const id = debugMatch[1]; - const session = await getSession(env, id, false); // Don't update access time + const debugSessionId = debugMatch[1]; + metrics.kvReads++; + const session = await getSession(env, debugSessionId, false); // Don't update access time if (!session) { - await completeLog(404, undefined, 'Session not found'); + emitEvent(404, { sessionId: debugSessionId, error: 'Session not found', errorSlug: 'session-not-found', errorExpected: true }); return new Response(JSON.stringify({ - id, + id: debugSessionId, exists: false, error: 'Session not found' }), { @@ -987,10 +1042,7 @@ async function handleApiRequest( sizeBytes: JSON.stringify(session).length, }; - await completeLog(200, { - trackCount: session.state.tracks.length, - hasData: session.state.tracks.length > 0, - }); + emitEvent(200, { sessionId: debugSessionId }); return new Response(JSON.stringify(debugInfo, null, 2), { status: 200, @@ -1000,105 +1052,65 @@ async function handleApiRequest( // ========================================================================== // Phase 7: Multiplayer Debug Endpoints + // DEPRECATED: These endpoints used KV-based logging which is replaced by + // Observability 2.0 wide events. They now return static messages pointing + // users to Workers Logs. // ========================================================================== - // GET /api/debug/session/:id/connections - WebSocket connection info + // GET /api/debug/session/:id/connections - DEPRECATED const connectionsMatch = path.match(/^\/api\/debug\/session\/([a-f0-9-]{36})\/connections$/); if (connectionsMatch && method === 'GET') { - const id = connectionsMatch[1]; - const session = await getSession(env, id, false); - - if (!session) { - await completeLog(404, undefined, 'Session not found'); - return jsonError('Session not found', 404); - } - - const wsMetrics = await getWsMetrics(env, id); - const wsLogs = await getSessionWsLogs(env, id, 50); - - // Build player connection info from logs - const playerMap = new Map(); - - const _now = Date.now(); - - for (const log of wsLogs) { - if (log.type === 'ws_connect') { - playerMap.set(log.playerId, { - connectedAt: log.timestamp, - lastMessage: log.timestamp, - messageCount: 0, - }); - } else if (log.type === 'ws_message') { - const player = playerMap.get(log.playerId); - if (player) { - player.lastMessage = log.timestamp; - player.messageCount++; - } - } else if (log.type === 'ws_disconnect') { - playerMap.delete(log.playerId); - } - } - - // Calculate message rate (messages per second over last 5 minutes) - const totalMessages = Object.values(wsMetrics.messages.byType).reduce((a, b) => a + b, 0); - const messageRate = `${(totalMessages / 300).toFixed(1)}/sec`; - - const connectionsInfo: ConnectionsDebugInfo = { - activeConnections: wsMetrics.connections.active, - players: Array.from(playerMap.entries()).map(([id, info]) => ({ - id, - connectedAt: info.connectedAt, - lastMessage: info.lastMessage, - messageCount: info.messageCount, - })), - messageRate, - }; - - await completeLog(200); - return new Response(JSON.stringify(connectionsInfo, null, 2), { + emitEvent(200, { sessionId: connectionsMatch[1] }); + return new Response(JSON.stringify({ + message: 'Legacy connections endpoint deprecated. Use wrangler tail to see ws_session events.', + activeConnections: 0, + players: [], + }, null, 2), { status: 200, headers: { 'Content-Type': 'application/json' }, }); } - // GET /api/debug/session/:id/clock - Clock sync debug info + // GET /api/debug/session/:id/clock - Clock sync debug info (still functional) const clockMatch = path.match(/^\/api\/debug\/session\/([a-f0-9-]{36})\/clock$/); if (clockMatch && method === 'GET') { - const id = clockMatch[1]; - const session = await getSession(env, id, false); + const clockSessionId = clockMatch[1]; + metrics.kvReads++; + const session = await getSession(env, clockSessionId, false); if (!session) { - await completeLog(404, undefined, 'Session not found'); + emitEvent(404, { sessionId: clockSessionId, error: 'Session not found', errorSlug: 'session-not-found', errorExpected: true }); return jsonError('Session not found', 404); } - // Clock sync data is stored per-session in Phase 10 - // For now, return placeholder structure that will be populated later - const clockKey = `clock-sync:${id}`; + // Clock sync data is stored per-session + metrics.kvReads++; + const clockKey = `clock-sync:${clockSessionId}`; const clockData = await env.SESSIONS.get(clockKey, 'json') as { clients: Array<{ id: string; offset: number; lastPing: number }>; } | null; - const clockInfo: ClockDebugInfo = { + const clockInfo = { serverTime: Date.now(), connectedClients: clockData?.clients ?? [], }; - await completeLog(200); + emitEvent(200, { sessionId: clockSessionId }); return new Response(JSON.stringify(clockInfo, null, 2), { status: 200, headers: { 'Content-Type': 'application/json' }, }); } - // GET /api/debug/session/:id/state-sync - State sync verification + // GET /api/debug/session/:id/state-sync - State sync verification (still functional) const stateSyncMatch = path.match(/^\/api\/debug\/session\/([a-f0-9-]{36})\/state-sync$/); if (stateSyncMatch && method === 'GET') { - const id = stateSyncMatch[1]; - const session = await getSession(env, id, false); + const syncSessionId = stateSyncMatch[1]; + metrics.kvReads++; + const session = await getSession(env, syncSessionId, false); if (!session) { - await completeLog(404, undefined, 'Session not found'); + emitEvent(404, { sessionId: syncSessionId, error: 'Session not found', errorSlug: 'session-not-found', errorExpected: true }); return jsonError('Session not found', 404); } @@ -1110,15 +1122,15 @@ async function handleApiRequest( }); const serverStateHash = hashState(canonicalState); - // Client hashes are reported via WebSocket in Phase 9 - // For now, return placeholder structure - const clientHashesKey = `state-hashes:${id}`; + // Client hashes are reported via WebSocket + metrics.kvReads++; + const clientHashesKey = `state-hashes:${syncSessionId}`; const clientHashes = await env.SESSIONS.get(clientHashesKey, 'json') as Array<{ playerId: string; hash: string; }> | null; - const stateSyncInfo: StateSyncDebugInfo = { + const stateSyncInfo = { serverStateHash, clientHashes: (clientHashes ?? []).map(c => ({ ...c, @@ -1126,52 +1138,51 @@ async function handleApiRequest( })), }; - await completeLog(200); + emitEvent(200, { sessionId: syncSessionId }); return new Response(JSON.stringify(stateSyncInfo, null, 2), { status: 200, headers: { 'Content-Type': 'application/json' }, }); } - // GET /api/debug/durable-object/:id - Durable Object debug info - const doMatch = path.match(/^\/api\/debug\/durable-object\/([a-f0-9-]{36})$/); - if (doMatch && method === 'GET') { - const id = doMatch[1]; - const session = await getSession(env, id, false); + // GET /api/debug/durable-object/:id - Durable Object debug info (still functional) + const doDebugMatch = path.match(/^\/api\/debug\/durable-object\/([a-f0-9-]{36})$/); + if (doDebugMatch && method === 'GET') { + const doDebugSessionId = doDebugMatch[1]; + metrics.kvReads++; + const session = await getSession(env, doDebugSessionId, false); if (!session) { - await completeLog(404, undefined, 'Session not found'); + emitEvent(404, { sessionId: doDebugSessionId, error: 'Session not found', errorSlug: 'session-not-found', errorExpected: true }); return jsonError('Session not found', 404); } - // Phase 8: Fetch debug info directly from the Durable Object + // Fetch debug info directly from the Durable Object try { - const doId = env.LIVE_SESSIONS.idFromName(id); + const doId = env.LIVE_SESSIONS.idFromName(doDebugSessionId); const stub = env.LIVE_SESSIONS.get(doId); const debugUrl = new URL(request.url); - debugUrl.pathname = `/api/sessions/${id}/debug`; + debugUrl.pathname = `/api/sessions/${doDebugSessionId}/debug`; + metrics.doRequests++; const doResponse = await stub.fetch(new Request(debugUrl.toString())); if (doResponse.ok) { const doDebug = await doResponse.json(); - await completeLog(200); + emitEvent(200, { sessionId: doDebugSessionId }); return new Response(JSON.stringify(doDebug, null, 2), { status: 200, headers: { 'Content-Type': 'application/json' }, }); } } catch { - // DO may not be active, fall back to KV-based info - console.log('[DEBUG] DO not active, using KV fallback'); + // DO may not be active + console.log('[DEBUG] DO not active'); } - // Fallback: return what we can infer from KV/metrics - const wsMetrics = await getWsMetrics(env, id); - - const doInfo: DurableObjectDebugInfo = { - id, - connectedPlayers: wsMetrics.connections.active, - // Phase 22: Per-player playback tracking + // Fallback when DO is not active + const doInfo = { + id: doDebugSessionId, + connectedPlayers: 0, playingPlayerIds: [], playingCount: 0, currentStep: 0, @@ -1179,31 +1190,28 @@ async function handleApiRequest( lastActivity: 'unknown (DO not active)', }; - await completeLog(200); + emitEvent(200, { sessionId: doDebugSessionId }); return new Response(JSON.stringify(doInfo, null, 2), { status: 200, headers: { 'Content-Type': 'application/json' }, }); } - // GET /api/debug/session/:id/ws-logs - WebSocket logs for session + // GET /api/debug/session/:id/ws-logs - DEPRECATED const wsLogsMatch = path.match(/^\/api\/debug\/session\/([a-f0-9-]{36})\/ws-logs$/); if (wsLogsMatch && method === 'GET') { - const id = wsLogsMatch[1]; - const url = new URL(request.url); - const limit = parseInt(url.searchParams.get('last') ?? '100', 10); - - const wsLogs = await getSessionWsLogs(env, id, limit); - - await completeLog(200); - return new Response(JSON.stringify({ logs: wsLogs }, null, 2), { + emitEvent(200, { sessionId: wsLogsMatch[1] }); + return new Response(JSON.stringify({ + message: 'Legacy ws-logs endpoint deprecated. Use wrangler tail to see ws_session events.', + logs: [], + }, null, 2), { status: 200, headers: { 'Content-Type': 'application/json' }, }); } // Unknown API route - await completeLog(404, undefined, 'Unknown route'); + emitEvent(404, { error: 'Unknown route', errorSlug: 'not-found', errorExpected: true }); return jsonError('Not found', 404); } diff --git a/app/src/worker/live-session.ts b/app/src/worker/live-session.ts index b547bd4..c2a1c72 100644 --- a/app/src/worker/live-session.ts +++ b/app/src/worker/live-session.ts @@ -29,6 +29,20 @@ import type { import { isStateMutatingMessage, isStateMutatingBroadcast, assertNever, VALID_STEP_COUNTS_SET } from './types'; import { getSession, updateSession, updateSessionName } from './sessions'; import { hashState, canonicalizeForHash } from './logging'; +// Observability 2.0: Wide events +import { + emitWsSessionEvent, + getDeployInfo, + getInfraInfo, + getServiceInfo, + mapCloseCode, + createCreatorIdentity, + identitiesMatch, + type WsSessionEvent, + type InfraInfo, + type Warning, + type CreatorIdentity, +} from './observability'; import { validateStateInvariants, logInvariantStatus, @@ -110,12 +124,32 @@ function generateIdentity(playerId: string) { // Schema version for migrations const SCHEMA_VERSION = 1; +// Observability 2.0: Player context for wide event tracking +// This is NOT serialized with the WebSocket attachment - it's kept in memory +// and used to emit the ws_session event at disconnect +interface PlayerObservability { + connectionId: string; + isCreator: boolean; // Detected via IP + User-Agent hash comparison + messagesByType: Record; + playCount: number; + totalPlayTime_ms: number; + lastPlayStartTime: number | null; + syncRequestCount: number; + syncErrorCount: number; + peakConcurrentPlayers: number; + playersSeenIds: Set; + warnings: Warning[]; + infra: InfraInfo; +} + // Phase 26 BUG-02: Threshold for detecting clients falling behind // If client's ack is more than this many sequences behind serverSeq, push a snapshot const ACK_GAP_THRESHOLD = 50; export class LiveSessionDurableObject extends DurableObject { private players: Map = new Map(); + // Observability 2.0: Track observability data for each player (not serialized) + private playerObservability: Map = new Map(); private state: SessionState | null = null; private sessionId: string | null = null; // Phase 22: Track playback state per-player (not session-wide) @@ -128,6 +162,10 @@ export class LiveSessionDurableObject extends DurableObject { // Phase 21: Published sessions are immutable - reject all edits private immutable: boolean = false; + // Observability 2.0: Creator identity for isCreator detection + // Stored when first WebSocket connects, compared on subsequent connections + private creatorIdentity: CreatorIdentity | null = null; + // Phase 13B: Server sequence number for message ordering // Now persisted to DO storage to survive hibernation/eviction private serverSeq: number = 0; @@ -153,14 +191,20 @@ export class LiveSessionDurableObject extends DurableObject { new WebSocketRequestResponsePair('ping', 'pong') ); - // Initialize serverSeq from storage using blockConcurrencyWhile - // This prevents race conditions where multiple requests arrive before serverSeq is loaded + // Initialize serverSeq and creatorIdentity from storage using blockConcurrencyWhile + // This prevents race conditions where multiple requests arrive before data is loaded this.ctx.blockConcurrencyWhile(async () => { const storedSeq = await this.ctx.storage.get('serverSeq'); if (storedSeq !== undefined) { this.serverSeq = storedSeq; } + // Load creator identity (may be null if session is new) + const storedCreatorIdentity = await this.ctx.storage.get('creatorIdentity'); + if (storedCreatorIdentity) { + this.creatorIdentity = storedCreatorIdentity; + } + // Schema migration support - future-proofing const storedVersion = await this.ctx.storage.get('schemaVersion'); if (storedVersion !== undefined && storedVersion < SCHEMA_VERSION) { @@ -606,12 +650,52 @@ export class LiveSessionDurableObject extends DurableObject { name: identity.name, }; + // Observability 2.0: Create observability context for this connection + // Capture all players seen at connect time + const playersSeenIds = new Set(); + for (const p of this.players.values()) { + playersSeenIds.add(p.id); + } + playersSeenIds.add(playerId); // Include self + + // Observability 2.0: Determine if this connection is the session creator + // Uses IP + User-Agent hash comparison (more stable than playerId across refreshes) + const connectingIdentity = await createCreatorIdentity(request); + let isCreator = false; + + if (!this.creatorIdentity) { + // First connection to this session - this is the creator + this.creatorIdentity = connectingIdentity; + await this.ctx.storage.put('creatorIdentity', connectingIdentity); + isCreator = true; + console.log(`[WS] Creator identity stored for session=${this.sessionId}`); + } else { + // Compare with stored creator identity + isCreator = identitiesMatch(this.creatorIdentity, connectingIdentity); + } + + const observability: PlayerObservability = { + connectionId: crypto.randomUUID(), + isCreator, + messagesByType: {}, + playCount: 0, + totalPlayTime_ms: 0, + lastPlayStartTime: null, + syncRequestCount: 0, + syncErrorCount: 0, + peakConcurrentPlayers: this.players.size + 1, // +1 for this player + playersSeenIds, + warnings: [], + infra: getInfraInfo(request), + }; + // Accept the WebSocket with hibernation support this.ctx.acceptWebSocket(server); // Store player info as attachment for hibernation server.serializeAttachment(playerInfo); this.players.set(server, playerInfo); + this.playerObservability.set(server, observability); console.log(`[WS] connect session=${this.sessionId} player=${playerId} total=${this.players.size}`); @@ -683,6 +767,18 @@ export class LiveSessionDurableObject extends DurableObject { return; } + // Observability 2.0: Track message types + const obs = this.playerObservability.get(ws); + if (obs) { + obs.messagesByType[msg.type] = (obs.messagesByType[msg.type] || 0) + 1; + // Update peak concurrent players + obs.peakConcurrentPlayers = Math.max(obs.peakConcurrentPlayers, this.players.size); + // Track players seen (new players who joined during this connection) + for (const p of this.players.values()) { + obs.playersSeenIds.add(p.id); + } + } + console.log(`[WS] message session=${this.sessionId} player=${player.id} type=${msg.type}`); // Phase 26 BUG-02: Detect clients falling behind via ack field @@ -691,6 +787,11 @@ export class LiveSessionDurableObject extends DurableObject { const ackGap = this.serverSeq - msg.ack; if (ackGap > ACK_GAP_THRESHOLD) { console.log(`[WS] Client falling behind: player=${player.id} ack=${msg.ack} serverSeq=${this.serverSeq} gap=${ackGap}`); + // Observability 2.0: Track proactive sync (server-initiated recovery) + const obs = this.playerObservability.get(ws); + if (obs) { + obs.syncRequestCount++; + } this.sendSnapshotToClient(ws, player); } } @@ -836,6 +937,49 @@ export class LiveSessionDurableObject extends DurableObject { const player = this.players.get(ws); if (!player) return; + // Observability 2.0: Emit ws_session wide event before cleanup + const obs = this.playerObservability.get(ws); + if (obs) { + // Finalize playback time if player was still playing + if (obs.lastPlayStartTime !== null) { + obs.totalPlayTime_ms += Date.now() - obs.lastPlayStartTime; + obs.lastPlayStartTime = null; + } + + const disconnectedAt = Date.now(); + const disconnectReason = mapCloseCode(code); + + const event: WsSessionEvent = { + event: 'ws_session', + connectionId: obs.connectionId, + sessionId: this.sessionId!, + playerId: player.id, + isCreator: obs.isCreator, + isPublished: this.immutable, + connectedAt: new Date(player.connectedAt).toISOString(), + disconnectedAt: new Date(disconnectedAt).toISOString(), + duration_ms: disconnectedAt - player.connectedAt, + messageCount: player.messageCount, + messagesByType: obs.messagesByType, + peakConcurrentPlayers: obs.peakConcurrentPlayers, + playersSeenCount: obs.playersSeenIds.size, + playCount: obs.playCount, + totalPlayTime_ms: obs.totalPlayTime_ms, + syncRequestCount: obs.syncRequestCount, + syncErrorCount: obs.syncErrorCount, + outcome: disconnectReason === 'error' ? 'error' : 'ok', + disconnectReason, + warnings: obs.warnings.length > 0 ? obs.warnings : undefined, + deploy: getDeployInfo(this.env), + infra: obs.infra, + service: getServiceInfo(this.env), + }; + emitWsSessionEvent(event); + + // Clean up observability data + this.playerObservability.delete(ws); + } + this.players.delete(ws); console.log(`[WS] disconnect session=${this.sessionId} player=${player.id} reason=${reason} code=${code}`); @@ -1930,6 +2074,13 @@ export class LiveSessionDurableObject extends DurableObject { // Phase 22: Track per-player playback state this.playingPlayers.add(player.id); + // Observability 2.0: Track play count and start time + const obs = this.playerObservability.get(ws); + if (obs) { + obs.playCount++; + obs.lastPlayStartTime = Date.now(); + } + this.broadcast({ type: 'playback_started', playerId: player.id, @@ -1942,6 +2093,13 @@ export class LiveSessionDurableObject extends DurableObject { // Phase 22: Track per-player playback state this.playingPlayers.delete(player.id); + // Observability 2.0: Track total play time + const obs = this.playerObservability.get(ws); + if (obs && obs.lastPlayStartTime !== null) { + obs.totalPlayTime_ms += Date.now() - obs.lastPlayStartTime; + obs.lastPlayStartTime = null; + } + this.broadcast({ type: 'playback_stopped', playerId: player.id, @@ -1988,6 +2146,15 @@ export class LiveSessionDurableObject extends DurableObject { * Handle request_snapshot - client requests full state (e.g., after mismatch) */ private handleRequestSnapshot(ws: WebSocket, player: PlayerInfo): void { + // Observability 2.0: Track sync request + const obs = this.playerObservability.get(ws); + if (obs) { + obs.syncRequestCount++; + // Track error if state is not loaded (shouldn't happen but possible) + if (!this.state) { + obs.syncErrorCount++; + } + } this.sendSnapshotToClient(ws, player, 'recovery'); } diff --git a/app/src/worker/logging.test.ts b/app/src/worker/logging.test.ts index 1d96ed1..c8fd352 100644 --- a/app/src/worker/logging.test.ts +++ b/app/src/worker/logging.test.ts @@ -1,100 +1,14 @@ /** - * Phase 7: Unit tests for WebSocket logging and state hashing + * Unit tests for state hashing utilities + * + * Note: WebSocket logging tests have been removed as that functionality + * has been replaced by Observability 2.0 wide events (see observability.ts). */ import { describe, it, expect } from 'vitest'; -import { - createWsConnectLog, - createWsMessageLog, - createWsDisconnectLog, - hashState, - generatePlayerId, - type WebSocketLog, -} from './logging'; +import { hashState } from './logging'; -describe('WebSocket Logging (Phase 7)', () => { - describe('generatePlayerId', () => { - it('should generate an 8-character ID', () => { - const id = generatePlayerId(); - expect(id).toHaveLength(8); - }); - - it('should generate unique IDs', () => { - const ids = new Set(); - for (let i = 0; i < 100; i++) { - ids.add(generatePlayerId()); - } - expect(ids.size).toBe(100); - }); - }); - - describe('createWsConnectLog', () => { - it('should create a connect log with correct type', () => { - const log = createWsConnectLog('session-123', 'player-456'); - - expect(log.type).toBe('ws_connect'); - expect(log.sessionId).toBe('session-123'); - expect(log.playerId).toBe('player-456'); - expect(log.timestamp).toBeDefined(); - }); - - it('should not include disconnect-specific fields', () => { - const log = createWsConnectLog('session-123', 'player-456'); - - expect(log.reason).toBeUndefined(); - expect(log.duration).toBeUndefined(); - }); - }); - - describe('createWsMessageLog', () => { - it('should create a message log with correct type', () => { - const log = createWsMessageLog('session-123', 'player-456', 'toggle_step'); - - expect(log.type).toBe('ws_message'); - expect(log.sessionId).toBe('session-123'); - expect(log.playerId).toBe('player-456'); - expect(log.messageType).toBe('toggle_step'); - expect(log.timestamp).toBeDefined(); - }); - - it('should include optional payload', () => { - const payload = { trackId: 0, step: 4 }; - const log = createWsMessageLog('session-123', 'player-456', 'toggle_step', payload); - - expect(log.payload).toEqual(payload); - }); - - it('should work without payload', () => { - const log = createWsMessageLog('session-123', 'player-456', 'ping'); - - expect(log.payload).toBeUndefined(); - }); - }); - - describe('createWsDisconnectLog', () => { - it('should create a disconnect log with correct type', () => { - const log = createWsDisconnectLog('session-123', 'player-456', 'closed', 342); - - expect(log.type).toBe('ws_disconnect'); - expect(log.sessionId).toBe('session-123'); - expect(log.playerId).toBe('player-456'); - expect(log.reason).toBe('closed'); - expect(log.duration).toBe(342); - expect(log.timestamp).toBeDefined(); - }); - - it('should handle different disconnect reasons', () => { - const reasons = ['closed', 'timeout', 'error', 'kicked']; - - for (const reason of reasons) { - const log = createWsDisconnectLog('session', 'player', reason, 100); - expect(log.reason).toBe(reason); - } - }); - }); -}); - -describe('State Hashing (Phase 7)', () => { +describe('State Hashing', () => { describe('hashState', () => { it('should return an 8-character hex string', () => { const hash = hashState({ foo: 'bar' }); @@ -172,35 +86,3 @@ describe('State Hashing (Phase 7)', () => { }); }); }); - -describe('WebSocketLog type (Phase 7)', () => { - it('should have correct type discriminator', () => { - const connectLog: WebSocketLog = { - type: 'ws_connect', - timestamp: new Date().toISOString(), - sessionId: 'session', - playerId: 'player', - }; - - const messageLog: WebSocketLog = { - type: 'ws_message', - timestamp: new Date().toISOString(), - sessionId: 'session', - playerId: 'player', - messageType: 'toggle_step', - }; - - const disconnectLog: WebSocketLog = { - type: 'ws_disconnect', - timestamp: new Date().toISOString(), - sessionId: 'session', - playerId: 'player', - reason: 'closed', - duration: 100, - }; - - expect(connectLog.type).toBe('ws_connect'); - expect(messageLog.type).toBe('ws_message'); - expect(disconnectLog.type).toBe('ws_disconnect'); - }); -}); diff --git a/app/src/worker/logging.ts b/app/src/worker/logging.ts index 05ff748..d36a20e 100644 --- a/app/src/worker/logging.ts +++ b/app/src/worker/logging.ts @@ -1,533 +1,18 @@ /** - * Structured request logging for observability + * State hashing utilities for consistency verification * - * Supports both HTTP request logging and WebSocket lifecycle logging - * for multiplayer session debugging. - */ - -import type { Env } from './types'; - -// ============================================================================= -// HTTP Request Logging -// ============================================================================= - -export interface RequestLog { - timestamp: string; - requestId: string; - method: string; - path: string; - sessionId?: string; - - // Request details - requestBody?: { - trackCount?: number; - tempo?: number; - swing?: number; - }; - - // Response details - status: number; - responseTime: number; - - // Session state (for debugging) - sessionState?: { - trackCount: number; - hasData: boolean; - }; - - error?: string; -} - -// ============================================================================= -// WebSocket Lifecycle Logging (Phase 7: Multiplayer Observability) -// ============================================================================= - -export type WebSocketLogType = 'ws_connect' | 'ws_message' | 'ws_disconnect'; - -export interface WebSocketLog { - type: WebSocketLogType; - timestamp: string; - sessionId: string; - playerId: string; - - // For messages - messageType?: string; - payload?: unknown; - - // For disconnect - reason?: string; - duration?: number; // Connection duration in seconds -} - -/** - * Player connection info for debug endpoints - */ -export interface PlayerConnectionInfo { - id: string; - connectedAt: string; - lastMessage: string; - messageCount: number; -} - -/** - * Connection summary for debug endpoints - */ -export interface ConnectionsDebugInfo { - activeConnections: number; - players: PlayerConnectionInfo[]; - messageRate: string; // e.g., "12/sec" -} - -/** - * Clock sync info for debug endpoints - */ -export interface ClockDebugInfo { - serverTime: number; - connectedClients: Array<{ - id: string; - reportedOffset: number; - lastPing: number; - }>; -} - -/** - * State sync info for debug endpoints - */ -export interface StateSyncDebugInfo { - serverStateHash: string; - clientHashes: Array<{ - playerId: string; - hash: string; - match: boolean; - }>; -} - -/** - * Durable Object debug info - */ -export interface DurableObjectDebugInfo { - id: string; - connectedPlayers: number; - // Phase 22: Per-player playback tracking - playingPlayerIds: string[]; - playingCount: number; - currentStep: number; - messageQueueSize: number; - lastActivity: string; -} - -// Store logs for up to 1 hour -const LOG_TTL_SECONDS = 3600; -const MAX_LOGS_PER_SESSION = 100; - -/** - * Generate a unique request ID - */ -export function generateRequestId(): string { - return crypto.randomUUID().slice(0, 8); -} - -/** - * Create a structured log entry - */ -export function createRequestLog( - requestId: string, - method: string, - path: string, - sessionId?: string -): RequestLog { - return { - timestamp: new Date().toISOString(), - requestId, - method, - path, - sessionId, - status: 0, - responseTime: 0, - }; -} - -/** - * Store a log entry in KV - */ -export async function storeLog(env: Env, log: RequestLog): Promise { - // Store by request ID for individual lookup - const logKey = `log:${log.requestId}`; - await env.SESSIONS.put(logKey, JSON.stringify(log), { - expirationTtl: LOG_TTL_SECONDS, - }); - - // Also index by session ID for session-specific queries - if (log.sessionId) { - const sessionLogKey = `session-logs:${log.sessionId}`; - const existingLogs = await env.SESSIONS.get(sessionLogKey, 'json') as string[] | null; - const logIds = existingLogs ?? []; - - // Keep only the most recent logs - logIds.push(log.requestId); - if (logIds.length > MAX_LOGS_PER_SESSION) { - logIds.shift(); - } - - await env.SESSIONS.put(sessionLogKey, JSON.stringify(logIds), { - expirationTtl: LOG_TTL_SECONDS, - }); - } - - // Also add to global recent logs list - const recentKey = 'logs:recent'; - const recentLogs = await env.SESSIONS.get(recentKey, 'json') as string[] | null; - const recentIds = recentLogs ?? []; - - recentIds.push(log.requestId); - if (recentIds.length > 200) { - recentIds.shift(); - } - - await env.SESSIONS.put(recentKey, JSON.stringify(recentIds), { - expirationTtl: LOG_TTL_SECONDS, - }); - - // Console log for Wrangler tail - console.log(`[${log.method}] ${log.path} -> ${log.status} (${log.responseTime}ms)`, - log.sessionId ? `session=${log.sessionId}` : '', - log.error ? `error=${log.error}` : '' - ); -} - -/** - * Get logs for a specific session - */ -export async function getSessionLogs( - env: Env, - sessionId: string, - limit: number = 50 -): Promise { - const sessionLogKey = `session-logs:${sessionId}`; - const logIds = await env.SESSIONS.get(sessionLogKey, 'json') as string[] | null; - - if (!logIds || logIds.length === 0) { - return []; - } - - // Get the most recent logs up to the limit - const idsToFetch = logIds.slice(-limit); - const logs: RequestLog[] = []; - - for (const id of idsToFetch) { - const log = await env.SESSIONS.get(`log:${id}`, 'json') as RequestLog | null; - if (log) { - logs.push(log); - } - } - - return logs; -} - -/** - * Get recent logs across all sessions - */ -export async function getRecentLogs( - env: Env, - limit: number = 50 -): Promise { - const recentKey = 'logs:recent'; - const logIds = await env.SESSIONS.get(recentKey, 'json') as string[] | null; - - if (!logIds || logIds.length === 0) { - return []; - } - - // Get the most recent logs up to the limit - const idsToFetch = logIds.slice(-limit); - const logs: RequestLog[] = []; - - for (const id of idsToFetch) { - const log = await env.SESSIONS.get(`log:${id}`, 'json') as RequestLog | null; - if (log) { - logs.push(log); - } - } - - return logs; -} - -/** - * Metrics tracking - */ -export interface Metrics { - sessions: { - total: number; - createdToday: number; - accessedToday: number; - }; - requests: { - last5Minutes: { - creates: number; - reads: number; - updates: number; - remixes: number; - }; - }; -} - -/** - * Increment a metric counter - */ -export async function incrementMetric( - env: Env, - metric: 'creates' | 'reads' | 'updates' | 'remixes' -): Promise { - const now = Date.now(); - const windowStart = now - 5 * 60 * 1000; // 5 minutes ago - - // Get current window metrics - const key = 'metrics:requests'; - const existing = await env.SESSIONS.get(key, 'json') as { - windowStart: number; - counts: Record - } | null; - - let counts: Record; - - if (existing && existing.windowStart > windowStart) { - // Same window, increment - counts = existing.counts; - } else { - // New window, reset - counts = { creates: 0, reads: 0, updates: 0, remixes: 0 }; - } - - counts[metric] = (counts[metric] ?? 0) + 1; - - await env.SESSIONS.put(key, JSON.stringify({ windowStart: now, counts }), { - expirationTtl: 600, // 10 minutes - }); -} - -/** - * Get current metrics - */ -export async function getMetrics(env: Env): Promise { - const now = Date.now(); - const windowStart = now - 5 * 60 * 1000; - - const _todayStart = new Date().setHours(0, 0, 0, 0); - - // Get request counts - const requestMetrics = await env.SESSIONS.get('metrics:requests', 'json') as { - windowStart: number; - counts: Record - } | null; - - let requestCounts = { creates: 0, reads: 0, updates: 0, remixes: 0 }; - if (requestMetrics && requestMetrics.windowStart > windowStart) { - requestCounts = requestMetrics.counts as typeof requestCounts; - } - - // Get session counts from daily metrics - const dailyKey = `metrics:daily:${new Date().toISOString().slice(0, 10)}`; - const dailyMetrics = await env.SESSIONS.get(dailyKey, 'json') as { - created: number; - accessed: number; - } | null; - - // Total sessions requires listing KV (expensive, so we estimate) - const totalEstimate = await env.SESSIONS.get('metrics:total-sessions', 'json') as number | null; - - return { - sessions: { - total: totalEstimate ?? 0, - createdToday: dailyMetrics?.created ?? 0, - accessedToday: dailyMetrics?.accessed ?? 0, - }, - requests: { - last5Minutes: requestCounts, - }, - }; -} - -/** - * Track a session creation for daily metrics - */ -export async function trackSessionCreated(env: Env): Promise { - const dailyKey = `metrics:daily:${new Date().toISOString().slice(0, 10)}`; - const existing = await env.SESSIONS.get(dailyKey, 'json') as { - created: number; - accessed: number; - } | null; - - const metrics = existing ?? { created: 0, accessed: 0 }; - metrics.created++; - - await env.SESSIONS.put(dailyKey, JSON.stringify(metrics), { - expirationTtl: 86400 * 7, // Keep for 7 days - }); - - // Update total count estimate - const total = await env.SESSIONS.get('metrics:total-sessions', 'json') as number | null; - await env.SESSIONS.put('metrics:total-sessions', JSON.stringify((total ?? 0) + 1)); - - await incrementMetric(env, 'creates'); -} - -/** - * Track a session access for daily metrics - */ -export async function trackSessionAccessed(env: Env): Promise { - const dailyKey = `metrics:daily:${new Date().toISOString().slice(0, 10)}`; - const existing = await env.SESSIONS.get(dailyKey, 'json') as { - created: number; - accessed: number; - } | null; - - const metrics = existing ?? { created: 0, accessed: 0 }; - metrics.accessed++; - - await env.SESSIONS.put(dailyKey, JSON.stringify(metrics), { - expirationTtl: 86400 * 7, - }); - - await incrementMetric(env, 'reads'); -} - -// ============================================================================= -// WebSocket Lifecycle Logging (Phase 7: Multiplayer Observability) -// ============================================================================= - -const WS_LOG_TTL_SECONDS = 3600; // 1 hour -const MAX_WS_LOGS_PER_SESSION = 500; // More events expected for WebSockets - -/** - * Generate a unique player ID - */ -export function generatePlayerId(): string { - return crypto.randomUUID().slice(0, 8); -} - -/** - * Create a WebSocket connect log - */ -export function createWsConnectLog(sessionId: string, playerId: string): WebSocketLog { - return { - type: 'ws_connect', - timestamp: new Date().toISOString(), - sessionId, - playerId, - }; -} - -/** - * Create a WebSocket message log - */ -export function createWsMessageLog( - sessionId: string, - playerId: string, - messageType: string, - payload?: unknown -): WebSocketLog { - return { - type: 'ws_message', - timestamp: new Date().toISOString(), - sessionId, - playerId, - messageType, - payload, - }; -} - -/** - * Create a WebSocket disconnect log - */ -export function createWsDisconnectLog( - sessionId: string, - playerId: string, - reason: string, - durationSeconds: number -): WebSocketLog { - return { - type: 'ws_disconnect', - timestamp: new Date().toISOString(), - sessionId, - playerId, - reason, - duration: durationSeconds, - }; -} - -/** - * Store a WebSocket log entry - */ -export async function storeWsLog(env: Env, log: WebSocketLog): Promise { - const logId = `${log.sessionId}-${log.playerId}-${Date.now()}`; - const logKey = `ws-log:${logId}`; - - // Store individual log - await env.SESSIONS.put(logKey, JSON.stringify(log), { - expirationTtl: WS_LOG_TTL_SECONDS, - }); - - // Index by session - const sessionWsLogKey = `ws-session-logs:${log.sessionId}`; - const existingIds = await env.SESSIONS.get(sessionWsLogKey, 'json') as string[] | null; - const logIds = existingIds ?? []; - - logIds.push(logId); - if (logIds.length > MAX_WS_LOGS_PER_SESSION) { - logIds.shift(); - } - - await env.SESSIONS.put(sessionWsLogKey, JSON.stringify(logIds), { - expirationTtl: WS_LOG_TTL_SECONDS, - }); - - // Console log for Wrangler tail (Phase 7 format) - const typeLabel = log.type.replace('ws_', '').toUpperCase(); - const baseLog = `[WS] ${typeLabel.toLowerCase()} session=${log.sessionId} player=${log.playerId}`; - - if (log.type === 'ws_message') { - console.log(`${baseLog} type=${log.messageType}`); - } else if (log.type === 'ws_disconnect') { - console.log(`${baseLog} reason=${log.reason} duration=${log.duration}s`); - } else { - console.log(baseLog); - } -} - -/** - * Get WebSocket logs for a session + * Used by debug endpoints and the sync system to verify client/server state matches. + * + * Note: HTTP and WebSocket logging has been replaced by Observability 2.0 wide events. + * See observability.ts for the new event-based logging system. */ -export async function getSessionWsLogs( - env: Env, - sessionId: string, - limit: number = 100 -): Promise { - const sessionWsLogKey = `ws-session-logs:${sessionId}`; - const logIds = await env.SESSIONS.get(sessionWsLogKey, 'json') as string[] | null; - - if (!logIds || logIds.length === 0) { - return []; - } - - const idsToFetch = logIds.slice(-limit); - const logs: WebSocketLog[] = []; - - for (const id of idsToFetch) { - const log = await env.SESSIONS.get(`ws-log:${id}`, 'json') as WebSocketLog | null; - if (log) { - logs.push(log); - } - } - return logs; -} +import { DEFAULT_STEP_COUNT } from '../shared/constants'; // ============================================================================= // State Hashing for Consistency Verification (Phase 7) // ============================================================================= -import { DEFAULT_STEP_COUNT } from '../shared/constants'; - // Minimal track type for hash input interface TrackForHash { id: string; @@ -666,102 +151,3 @@ export async function hashStateAsync(state: unknown): Promise { const hashArray = Array.from(new Uint8Array(hashBuffer)); return hashArray.slice(0, 8).map(b => b.toString(16).padStart(2, '0')).join(''); } - -// ============================================================================= -// WebSocket Metrics (Phase 7) -// ============================================================================= - -export interface WebSocketMetrics { - connections: { - active: number; - total: number; - }; - messages: { - last5Minutes: number; - byType: Record; - }; -} - -/** - * Increment WebSocket connection counter - */ -export async function trackWsConnect(env: Env, sessionId: string): Promise { - const key = `ws-metrics:${sessionId}`; - const existing = await env.SESSIONS.get(key, 'json') as { - active: number; - total: number; - messages: Record; - } | null; - - const metrics = existing ?? { active: 0, total: 0, messages: {} }; - metrics.active++; - metrics.total++; - - await env.SESSIONS.put(key, JSON.stringify(metrics), { - expirationTtl: 86400, // 24 hours - }); -} - -/** - * Decrement WebSocket connection counter - */ -export async function trackWsDisconnect(env: Env, sessionId: string): Promise { - const key = `ws-metrics:${sessionId}`; - const existing = await env.SESSIONS.get(key, 'json') as { - active: number; - total: number; - messages: Record; - } | null; - - if (existing) { - existing.active = Math.max(0, existing.active - 1); - await env.SESSIONS.put(key, JSON.stringify(existing), { - expirationTtl: 86400, - }); - } -} - -/** - * Track WebSocket message by type - */ -export async function trackWsMessage(env: Env, sessionId: string, messageType: string): Promise { - const key = `ws-metrics:${sessionId}`; - const existing = await env.SESSIONS.get(key, 'json') as { - active: number; - total: number; - messages: Record; - } | null; - - const metrics = existing ?? { active: 0, total: 0, messages: {} }; - metrics.messages[messageType] = (metrics.messages[messageType] ?? 0) + 1; - - await env.SESSIONS.put(key, JSON.stringify(metrics), { - expirationTtl: 86400, - }); -} - -/** - * Get WebSocket metrics for a session - */ -export async function getWsMetrics(env: Env, sessionId: string): Promise { - const key = `ws-metrics:${sessionId}`; - const existing = await env.SESSIONS.get(key, 'json') as { - active: number; - total: number; - messages: Record; - } | null; - - if (!existing) { - return { - connections: { active: 0, total: 0 }, - messages: { last5Minutes: 0, byType: {} }, - }; - } - - const totalMessages = Object.values(existing.messages).reduce((a, b) => a + b, 0); - - return { - connections: { active: existing.active, total: existing.total }, - messages: { last5Minutes: totalMessages, byType: existing.messages }, - }; -} diff --git a/app/src/worker/observability.ts b/app/src/worker/observability.ts new file mode 100644 index 0000000..9dd6fef --- /dev/null +++ b/app/src/worker/observability.ts @@ -0,0 +1,464 @@ +/** + * Observability 2.0: Wide Events Module + * + * Implements lifecycle-based wide events per the Observability 2.0 pattern. + * See specs/OBSERVABILITY-2-0-IMPLEMENTATION.md for full specification. + * + * Two event types: + * - http_request: One per HTTP request (errors embedded) + * - ws_session: One per WebSocket connection (errors embedded) + * + * Events are emitted via console.log(JSON.stringify(...)) which sends them + * to Cloudflare Workers Logs. + */ + +import type { Env, VersionMetadata } from './types'; + +// ============================================================================= +// Common Types +// ============================================================================= + +/** + * Deployment information from CF_VERSION_METADATA binding + */ +export interface DeployInfo { + versionId: string; + versionTag?: string; + deployedAt: string; +} + +/** + * Infrastructure information from request.cf + */ +export interface InfraInfo { + colo: string; + country: string; +} + +/** + * Service identity + */ +export interface ServiceInfo { + name: string; + environment: string; +} + +/** + * Error context (embedded in wide events when outcome === "error") + */ +export interface ErrorInfo { + type: string; + message: string; + slug: string; + expected: boolean; + handler?: string; + stack?: string; +} + +/** + * Warning for recovered errors and near-misses + */ +export interface Warning { + type: string; + message: string; + occurredAt: string; + recoveryAction: 'retry_succeeded' | 'fallback_used' | 'auto_repaired' | 'degraded_response'; + attemptNumber?: number; + totalAttempts?: number; + latency_ms?: number; +} + +// ============================================================================= +// Creator Identity (for isCreator detection) +// ============================================================================= + +/** + * Creator identity for detecting if a WebSocket connection is the session creator. + * Uses IP + User-Agent hash to identify users across page refreshes. + * + * Per spec: More reliable than playerId because: + * 1. playerId is generated server-side on every WebSocket connection (ephemeral) + * 2. Page refresh = new playerId, but IP + User-Agent remains stable + * 3. Creator identity persists across page refreshes within same browser/network + */ +export interface CreatorIdentity { + ip: string; // CF-Connecting-IP header + userAgentHash: string; // SHA-256 hash of User-Agent (first 16 chars) +} + +/** + * Hash User-Agent to avoid storing raw strings. + * Uses SHA-256, returns first 16 hex chars (sufficient for identity). + */ +export async function hashUserAgent(userAgent: string): Promise { + const encoder = new TextEncoder(); + const data = encoder.encode(userAgent); + const hash = await crypto.subtle.digest('SHA-256', data); + return Array.from(new Uint8Array(hash)) + .map(b => b.toString(16).padStart(2, '0')) + .join('') + .slice(0, 16); +} + +/** + * Create a creator identity from a request. + */ +export async function createCreatorIdentity(request: Request): Promise { + const ip = request.headers.get('CF-Connecting-IP') || 'unknown'; + const userAgent = request.headers.get('User-Agent') || ''; + const userAgentHash = await hashUserAgent(userAgent); + return { ip, userAgentHash }; +} + +/** + * Compare two creator identities for equality. + */ +export function identitiesMatch(a: CreatorIdentity, b: CreatorIdentity): boolean { + return a.ip === b.ip && a.userAgentHash === b.userAgentHash; +} + +// ============================================================================= +// Event Schemas +// ============================================================================= + +/** + * HTTP Request wide event - emitted once per HTTP request + */ +export interface HttpRequestEvent { + event: 'http_request'; + + // Request identity + requestId: string; + method: string; + path: string; + deviceType: 'mobile' | 'desktop'; + + // Timing + timestamp: string; + duration_ms: number; + + // Response + status: number; + responseSize?: number; + + // Context + sessionId?: string; + playerId?: string; + isPublished?: boolean; + sourceSessionId?: string; + + // Classification + routePattern: string; + action?: string; + + // Outcome (Boris Tane pattern) + outcome: 'ok' | 'error'; + + // Error context (only if outcome === "error") + error?: ErrorInfo; + + // Performance + kvReads?: number; + kvWrites?: number; + doRequests?: number; + + // Recovered errors + warnings?: Warning[]; + + // Deployment + deploy: DeployInfo; + + // Infrastructure + infra: InfraInfo; + + // Service identity + service: ServiceInfo; +} + +/** + * WebSocket Session wide event - emitted once per WebSocket connection at disconnect + */ +export interface WsSessionEvent { + event: 'ws_session'; + + // Connection identity + connectionId: string; + sessionId: string; + playerId: string; + isCreator: boolean; + isPublished: boolean; + + // Timing + connectedAt: string; + disconnectedAt: string; + duration_ms: number; + + // Message stats + messageCount: number; + messagesByType: Record; + + // Collaboration context + peakConcurrentPlayers: number; + playersSeenCount: number; + + // Playback + playCount: number; + totalPlayTime_ms: number; + + // Sync health + syncRequestCount: number; + syncErrorCount: number; + + // Outcome (Boris Tane pattern) + outcome: 'ok' | 'error'; + disconnectReason: 'normal_close' | 'timeout' | 'replaced' | 'error'; + + // Error context (only if outcome === "error") + error?: ErrorInfo; + + // Recovered errors + warnings?: Warning[]; + + // Deployment + deploy: DeployInfo; + + // Infrastructure + infra: InfraInfo; + + // Service identity + service: ServiceInfo; +} + +// ============================================================================= +// Helper Functions +// ============================================================================= + +/** + * Extract deployment info from environment + */ +export function getDeployInfo(env: Env): DeployInfo { + const metadata: VersionMetadata | undefined = env.CF_VERSION_METADATA; + return { + versionId: metadata?.id ?? 'unknown', + versionTag: metadata?.tag, + deployedAt: metadata?.timestamp ?? new Date().toISOString(), + }; +} + +/** + * Extract infrastructure info from request.cf + * + * Note: request.cf is available in Cloudflare Workers and contains + * geographic/network information about the request. + */ +export function getInfraInfo(request: Request): InfraInfo { + // Type assertion needed because request.cf is not in the standard Request type + const cf = (request as Request & { cf?: { colo?: string; country?: string } }).cf; + return { + colo: cf?.colo ?? 'unknown', + country: cf?.country ?? 'unknown', + }; +} + +/** + * Extract service info from environment + */ +export function getServiceInfo(env: Env): ServiceInfo { + return { + name: env.SERVICE_NAME ?? 'keyboardia', + environment: env.ENVIRONMENT ?? 'production', + }; +} + +/** + * Determine device type from User-Agent + */ +export function getDeviceType(userAgent: string | null): 'mobile' | 'desktop' { + if (!userAgent) return 'desktop'; + + const mobilePatterns = [ + /android/i, + /webos/i, + /iphone/i, + /ipad/i, + /ipod/i, + /blackberry/i, + /windows phone/i, + /mobile/i, + ]; + + return mobilePatterns.some(pattern => pattern.test(userAgent)) ? 'mobile' : 'desktop'; +} + +// ============================================================================= +// Warning Collection +// ============================================================================= + +const MAX_WARNINGS = 10; + +/** + * Collects warnings during request/connection lifecycle + * Limited to MAX_WARNINGS to prevent unbounded growth + */ +export class WarningCollector { + private warnings: Warning[] = []; + + add(warning: Omit): void { + if (this.warnings.length < MAX_WARNINGS) { + this.warnings.push({ + ...warning, + occurredAt: new Date().toISOString(), + }); + } + } + + get(): Warning[] { + return this.warnings; + } + + hasWarnings(): boolean { + return this.warnings.length > 0; + } +} + +// ============================================================================= +// Error Classification +// ============================================================================= + +/** + * Error slug mappings for common error types + */ +const ERROR_SLUGS: Record = { + 400: { slug: 'bad-request', expected: true }, + 401: { slug: 'unauthorized', expected: true }, + 403: { slug: 'forbidden', expected: true }, + 404: { slug: 'not-found', expected: true }, + 405: { slug: 'method-not-allowed', expected: true }, + 409: { slug: 'conflict', expected: true }, + 413: { slug: 'payload-too-large', expected: true }, + 429: { slug: 'rate-limited', expected: true }, + 500: { slug: 'internal-error', expected: false }, + 502: { slug: 'bad-gateway', expected: false }, + 503: { slug: 'service-unavailable', expected: false }, + 504: { slug: 'gateway-timeout', expected: false }, +}; + +/** + * Classify an error for the ErrorInfo structure + */ +export function classifyError( + status: number, + error: Error | string | null, + handler?: string, +): ErrorInfo { + const errorInfo = ERROR_SLUGS[status] ?? { slug: 'unknown-error', expected: false }; + const errorObj = typeof error === 'string' ? new Error(error) : error; + + let stack: string | undefined; + if (errorObj?.stack && !errorInfo.expected) { + // Truncate stack to 500 chars for unexpected errors only + stack = errorObj.stack.slice(0, 500); + } + + return { + type: errorObj?.name ?? 'Error', + message: errorObj?.message ?? 'Unknown error', + slug: errorInfo.slug, + expected: errorInfo.expected, + handler, + stack, + }; +} + +/** + * Create a custom error classification with specific slug + */ +export function classifyCustomError( + type: string, + message: string, + slug: string, + expected: boolean, + handler?: string, +): ErrorInfo { + return { + type, + message, + slug, + expected, + handler, + }; +} + +// ============================================================================= +// Event Emission +// ============================================================================= + +/** + * Emit an HTTP request wide event to Workers Logs + */ +export function emitHttpRequestEvent(event: HttpRequestEvent): void { + // Remove undefined fields to keep events clean + const cleanEvent = JSON.parse(JSON.stringify(event)); + console.log(JSON.stringify(cleanEvent)); +} + +/** + * Emit a WebSocket session wide event to Workers Logs + */ +export function emitWsSessionEvent(event: WsSessionEvent): void { + // Remove undefined fields to keep events clean + const cleanEvent = JSON.parse(JSON.stringify(event)); + console.log(JSON.stringify(cleanEvent)); +} + +// ============================================================================= +// WebSocket Close Code Mapping +// ============================================================================= + +/** + * Map WebSocket close code to disconnect reason + */ +export function mapCloseCode(code: number): 'normal_close' | 'timeout' | 'replaced' | 'error' { + // 1000 = normal close + if (code === 1000) return 'normal_close'; + + // 1001 = going away (navigation, tab close) + if (code === 1001) return 'normal_close'; + + // 4000+ = custom application codes + if (code >= 4000 && code < 4100) return 'replaced'; + + // 1006 = abnormal close (no close frame received) - likely timeout + if (code === 1006) return 'timeout'; + + // 1011 = unexpected condition + // 1012 = service restart + // 1013 = try again later + // 1014 = bad gateway + // 1015 = TLS handshake failure + return 'error'; +} + +// ============================================================================= +// Performance Metrics Tracking +// ============================================================================= + +/** + * Tracks KV and DO operations for a single request + */ +export interface RequestMetrics { + kvReads: number; + kvWrites: number; + doRequests: number; +} + +/** + * Create a new request metrics tracker + */ +export function createRequestMetrics(): RequestMetrics { + return { + kvReads: 0, + kvWrites: 0, + doRequests: 0, + }; +} diff --git a/app/src/worker/route-patterns.ts b/app/src/worker/route-patterns.ts new file mode 100644 index 0000000..a90f226 --- /dev/null +++ b/app/src/worker/route-patterns.ts @@ -0,0 +1,135 @@ +/** + * Route Pattern Matching for Observability + * + * Maps API paths to route patterns and actions for the http_request event. + * Patterns use :param notation for dynamic segments. + */ + +export interface RouteMatch { + pattern: string; + action?: string; +} + +/** + * Route definitions with their patterns and actions + */ +interface RouteDefinition { + pathRegex: RegExp; + pattern: string; + getAction: (method: string) => string | undefined; +} + +const ROUTE_DEFINITIONS: RouteDefinition[] = [ + // Session create + { + pathRegex: /^\/api\/sessions\/?$/, + pattern: '/api/sessions', + getAction: (method) => method === 'POST' ? 'create' : undefined, + }, + + // Session CRUD + { + pathRegex: /^\/api\/sessions\/([^/]+)\/?$/, + pattern: '/api/sessions/:id', + getAction: (method) => { + switch (method) { + case 'GET': return 'access'; + case 'PUT': + case 'PATCH': return 'update'; + case 'DELETE': return 'delete'; + default: return undefined; + } + }, + }, + + // Session remix + { + pathRegex: /^\/api\/sessions\/([^/]+)\/remix\/?$/, + pattern: '/api/sessions/:id/remix', + getAction: () => 'remix', + }, + + // Session publish + { + pathRegex: /^\/api\/sessions\/([^/]+)\/publish\/?$/, + pattern: '/api/sessions/:id/publish', + getAction: () => 'publish', + }, + + // Session WebSocket + { + pathRegex: /^\/api\/sessions\/([^/]+)\/ws\/?$/, + pattern: '/api/sessions/:id/ws', + getAction: () => 'websocket', + }, + + // Debug endpoints + { + pathRegex: /^\/api\/debug\/logs\/?$/, + pattern: '/api/debug/logs', + getAction: () => 'debug_logs', + }, + { + pathRegex: /^\/api\/debug\/logs\/session\/([^/]+)\/?$/, + pattern: '/api/debug/logs/session/:id', + getAction: () => 'debug_session_logs', + }, + { + pathRegex: /^\/api\/debug\/metrics\/?$/, + pattern: '/api/debug/metrics', + getAction: () => 'debug_metrics', + }, + { + pathRegex: /^\/api\/debug\/session\/([^/]+)\/?$/, + pattern: '/api/debug/session/:id', + getAction: () => 'debug_session', + }, + { + pathRegex: /^\/api\/debug\/session\/([^/]+)\/live\/?$/, + pattern: '/api/debug/session/:id/live', + getAction: () => 'debug_live', + }, + + // Health check + { + pathRegex: /^\/api\/health\/?$/, + pattern: '/api/health', + getAction: () => 'health', + }, + + // Samples endpoint + { + pathRegex: /^\/api\/samples\/([^/]+)\/?$/, + pattern: '/api/samples/:id', + getAction: (method) => method === 'GET' ? 'sample_access' : undefined, + }, +]; + +/** + * Match a path and method to a route pattern and action + */ +export function matchRoute(path: string, method: string): RouteMatch { + for (const route of ROUTE_DEFINITIONS) { + if (route.pathRegex.test(path)) { + return { + pattern: route.pattern, + action: route.getAction(method), + }; + } + } + + // Unknown route + return { + pattern: path, // Use actual path as pattern for unknown routes + action: undefined, + }; +} + +/** + * Extract session ID from a path if present + */ +export function extractSessionId(path: string): string | undefined { + // Match /api/sessions/:id and /api/sessions/:id/* + const match = path.match(/^\/api\/sessions\/([^/]+)/); + return match ? match[1] : undefined; +} diff --git a/app/src/worker/types.ts b/app/src/worker/types.ts index b7b599e..ab799ae 100644 --- a/app/src/worker/types.ts +++ b/app/src/worker/types.ts @@ -36,11 +36,32 @@ import type { Session } from '../shared/state'; // Re-export shared player types (canonical definitions) export type { PlayerInfo, CursorPosition } from '../shared/player'; +/** + * Cloudflare Version Metadata binding + * @see https://developers.cloudflare.com/workers/configuration/versions-and-deployments/ + */ +export interface VersionMetadata { + /** Unique deployment/version ID */ + id: string; + /** Optional tag set via `wrangler deploy --tag` */ + tag?: string; + /** ISO 8601 timestamp when this version was deployed */ + timestamp: string; +} + export interface Env { + // Bindings SESSIONS: KVNamespace; ASSETS: Fetcher; LIVE_SESSIONS: DurableObjectNamespace; SAMPLES: R2Bucket; + + // Observability 2.0: Version metadata for deployment tracking + CF_VERSION_METADATA: VersionMetadata; + + // Environment variables + ENVIRONMENT?: string; // "production" | "staging" + SERVICE_NAME?: string; // "keyboardia" | "keyboardia-staging" } // Import and re-export shared message constants (canonical definitions) diff --git a/app/wrangler.jsonc b/app/wrangler.jsonc index 4841b50..c59cd5e 100644 --- a/app/wrangler.jsonc +++ b/app/wrangler.jsonc @@ -34,6 +34,27 @@ } ], + // Observability 2.0: Wide events to Workers Logs + "observability": { + "enabled": true, + "logs": { + "enabled": true, + "invocation_logs": true, + "head_sampling_rate": 1 + } + }, + + // Version metadata for deployment tracking + "version_metadata": { + "binding": "CF_VERSION_METADATA" + }, + + // Environment variables + "vars": { + "ENVIRONMENT": "production", + "SERVICE_NAME": "keyboardia" + }, + // Custom domain routing for production "routes": [ { "pattern": "keyboardia.dev", "custom_domain": true }, @@ -87,8 +108,20 @@ "routes": [ { "pattern": "staging.keyboardia.dev", "custom_domain": true } ], + "observability": { + "enabled": true, + "logs": { + "enabled": true, + "invocation_logs": true, + "head_sampling_rate": 1 + } + }, + "version_metadata": { + "binding": "CF_VERSION_METADATA" + }, "vars": { - "ENVIRONMENT": "staging" + "ENVIRONMENT": "staging", + "SERVICE_NAME": "keyboardia-staging" } } } diff --git a/specs/OBSERVABILITY-2-0-IMPLEMENTATION.md b/specs/OBSERVABILITY-2-0-IMPLEMENTATION.md new file mode 100644 index 0000000..212b9dd --- /dev/null +++ b/specs/OBSERVABILITY-2-0-IMPLEMENTATION.md @@ -0,0 +1,948 @@ +# Observability 2.0 Implementation Spec + +> **Type:** Implementation Spec +> **Status:** Draft +> **Research:** [OBSERVABILITY-2-0.md](./research/OBSERVABILITY-2-0.md) +> **Supersedes:** [OBSERVABILITY.md](./OBSERVABILITY.md) (upon completion) + +--- + +## Overview + +This spec defines the wide events for Keyboardia's Observability 2.0 implementation. The goal is to replace per-action logging with lifecycle-based wide events emitted to Cloudflare Workers Logs. + +--- + +## Configuration + +Enable Workers Logs and version metadata in `wrangler.jsonc`: + +```jsonc +{ + "observability": { + "enabled": true, + "logs": { + "enabled": true, + "invocation_logs": true, + "head_sampling_rate": 1 // 1 = 100%, 0.1 = 10% + } + }, + // Version metadata binding for deploy tracking + "version_metadata": { + "binding": "CF_VERSION_METADATA" + } +} +``` + +**Accessing version metadata in code:** + +```typescript +interface Env { + CF_VERSION_METADATA: { + id: string; // versionId (e.g., "a5f9abc123") + tag?: string; // versionTag (e.g., "v0.2.1") - set via wrangler deploy --tag + timestamp: string; // deployedAt (ISO 8601) + }; +} + +// Usage in handler +const deploy = { + versionId: env.CF_VERSION_METADATA.id, + versionTag: env.CF_VERSION_METADATA.tag, + deployedAt: env.CF_VERSION_METADATA.timestamp, +}; +``` + +**Infrastructure from request.cf (free tier):** + +```typescript +const infra = { + colo: request.cf?.colo ?? 'unknown', // "SFO", "LHR" + country: request.cf?.country ?? 'unknown', // "US", "GB" +}; +``` + +--- + +## Wide Events + +### 1. `http_request` + +Emitted once per HTTP request, at response time. + +**Trigger:** End of every HTTP request handler + +**Schema:** + +```typescript +interface HttpRequestEvent { + event: "http_request"; + + // Request identity + requestId: string; + method: string; + path: string; + deviceType: "mobile" | "desktop"; // Derived from User-Agent + + // Timing + timestamp: string; // ISO 8601 + duration_ms: number; + + // Response + status: number; + responseSize?: number; + + // Context + sessionId?: string; // If request relates to a session + playerId?: string; // From X-Player-ID header or cookie + isPublished?: boolean; // true if accessing a published (read-only) session + sourceSessionId?: string; // Only for remix: the session being remixed FROM + + // Classification + routePattern: string; // e.g., "/api/sessions/:id" + action?: string; // "create", "access", "publish", "remix" + + // Outcome (Boris Tane pattern) + outcome: "ok" | "error"; + + // Error context (only if outcome === "error") + error?: { + type: string; // e.g., "ValidationError", "KVError" + message: string; + slug: string; // Machine-readable (e.g., "kv-quota-exceeded") + expected: boolean; // true for 404, rate limit; false for unexpected + handler?: string; // e.g., "handleSessionAccess" + stack?: string; // Truncated to 500 chars + }; + + // Performance + kvReads?: number; + kvWrites?: number; + doRequests?: number; + + // Recovered errors (see Warning type below) + warnings?: Warning[]; + + // Deployment (from CF_VERSION_METADATA binding) + deploy: { + versionId: string; // Unique deployment ID + versionTag?: string; // Optional tag (e.g., "v0.2.1") + deployedAt: string; // ISO 8601 - when this version was deployed + }; + + // Infrastructure (from request.cf) + infra: { + colo: string; // 3-letter airport code (SFO, LHR) + country: string; // 2-letter country code (US, GB) + }; + + // Service identity + service: { + name: string; // "keyboardia" or "keyboardia-staging" + environment: string; // "production" | "staging" + }; +} +``` + +**Example (joiner accessing published session):** + +```json +{ + "event": "http_request", + "requestId": "req_abc123", + "method": "GET", + "path": "/api/sessions/sess_xyz789", + "deviceType": "mobile", + "timestamp": "2026-01-15T10:30:00.000Z", + "duration_ms": 45, + "status": 200, + "routePattern": "/api/sessions/:id", + "action": "access", + "outcome": "ok", + "sessionId": "sess_xyz789", + "playerId": "player_456", + "isPublished": true, + "kvReads": 1, + "kvWrites": 0, + "doRequests": 1, + "deploy": { + "versionId": "a5f9abc123", + "versionTag": "v0.2.1", + "deployedAt": "2026-01-14T18:00:00.000Z" + }, + "infra": { + "colo": "SFO", + "country": "US" + }, + "service": { + "name": "keyboardia", + "environment": "production" + } +} +``` + +**Example (remix action - virality tracking):** + +```json +{ + "event": "http_request", + "requestId": "req_def456", + "method": "POST", + "path": "/api/sessions/sess_xyz789/remix", + "deviceType": "desktop", + "timestamp": "2026-01-15T11:00:00.000Z", + "duration_ms": 120, + "status": 201, + "routePattern": "/api/sessions/:id/remix", + "action": "remix", + "sessionId": "sess_new123", + "sourceSessionId": "sess_xyz789", + "playerId": "player_789", + "isPublished": false, + "kvReads": 1, + "kvWrites": 1, + "doRequests": 2 +} +``` + +**Example (publish action):** + +```json +{ + "event": "http_request", + "requestId": "req_ghi789", + "method": "POST", + "path": "/api/sessions/sess_abc/publish", + "deviceType": "desktop", + "timestamp": "2026-01-15T12:00:00.000Z", + "duration_ms": 85, + "status": 201, + "routePattern": "/api/sessions/:id/publish", + "action": "publish", + "sessionId": "sess_published456", + "sourceSessionId": "sess_abc", + "playerId": "player_creator", + "isPublished": true, + "kvReads": 1, + "kvWrites": 1, + "doRequests": 1 +} +``` + +**Example (error - session not found):** + +```json +{ + "event": "http_request", + "requestId": "req_err456", + "method": "GET", + "path": "/api/sessions/sess_nonexistent", + "deviceType": "mobile", + "timestamp": "2026-01-15T12:30:00.000Z", + "duration_ms": 12, + "status": 404, + "routePattern": "/api/sessions/:id", + "action": "access", + "outcome": "error", + "error": { + "type": "NotFoundError", + "message": "Session not found", + "slug": "session-not-found", + "expected": true, + "handler": "handleSessionAccess" + }, + "kvReads": 1, + "kvWrites": 0, + "doRequests": 0, + "deploy": { + "versionId": "a5f9abc123", + "deployedAt": "2026-01-14T18:00:00.000Z" + }, + "infra": { + "colo": "LHR", + "country": "GB" + }, + "service": { + "name": "keyboardia", + "environment": "production" + } +} +``` + +**Queryable questions:** +- "Which sessions generated the most remixes?" β†’ `COUNT(*) WHERE action = 'remix' GROUP BY sourceSessionId` +- "What's the remix rate for published vs editable?" β†’ `COUNT(*) WHERE action = 'remix' GROUP BY isPublished` +- "How many sessions were published today?" β†’ `COUNT(*) WHERE action = 'publish'` +- "Are mobile users more likely to consume or create?" β†’ `COUNT(*) GROUP BY deviceType, action` +- "Are people mostly consuming published content?" β†’ `COUNT(*) WHERE action = 'access' GROUP BY isPublished` +- "What are our error rates by endpoint?" β†’ `COUNT(*) WHERE outcome = 'error' GROUP BY routePattern` +- "Which unexpected errors are most common?" β†’ `COUNT(*) WHERE outcome = 'error' AND error.expected = false GROUP BY error.slug` + +--- + +### 2. `ws_session` + +Emitted once per WebSocket connection, at disconnect time. + +**Trigger:** WebSocket `close` event + +**Schema:** + +```typescript +interface WsSessionEvent { + event: "ws_session"; + + // Connection identity + connectionId: string; + sessionId: string; + playerId: string; + isCreator: boolean; // true if this player created the session + isPublished: boolean; // true if viewing published (read-only) session + + // Timing + connectedAt: string; // ISO 8601 + disconnectedAt: string; // ISO 8601 + duration_ms: number; + + // Message stats + messageCount: number; + messagesByType: Record; + + // Collaboration context + peakConcurrentPlayers: number; + playersSeenCount: number; + + // Playback + playCount: number; + totalPlayTime_ms: number; + + // Sync health + syncRequestCount: number; + syncErrorCount: number; + + // Outcome (Boris Tane pattern) + outcome: "ok" | "error"; + disconnectReason: "normal_close" | "timeout" | "replaced" | "error"; + + // Error context (only if outcome === "error") + error?: { + type: string; // e.g., "WebSocketError", "StateCorruption" + message: string; + slug: string; // Machine-readable (e.g., "ws-protocol-error") + expected: boolean; // true for timeout; false for unexpected crash + handler?: string; + stack?: string; + }; + + // Recovered errors (see Warning type below) + warnings?: Warning[]; + + // Deployment (from CF_VERSION_METADATA binding) + deploy: { + versionId: string; + versionTag?: string; + deployedAt: string; + }; + + // Infrastructure (from request.cf at connect time) + infra: { + colo: string; + country: string; + }; + + // Service identity + service: { + name: string; + environment: string; + }; +} +``` + +**Example (joiner viewing published session):** + +```json +{ + "event": "ws_session", + "connectionId": "conn_abc123", + "sessionId": "sess_xyz789", + "playerId": "player_456", + "isCreator": false, + "isPublished": true, + "connectedAt": "2026-01-15T10:00:00.000Z", + "disconnectedAt": "2026-01-15T10:15:00.000Z", + "duration_ms": 900000, + "messageCount": 23, + "messagesByType": { + "play": 15, + "stop": 8 + }, + "peakConcurrentPlayers": 1, + "playersSeenCount": 1, + "playCount": 15, + "totalPlayTime_ms": 180000, + "syncRequestCount": 0, + "syncErrorCount": 0, + "outcome": "ok", + "disconnectReason": "normal_close", + "deploy": { + "versionId": "a5f9abc123", + "versionTag": "v0.2.1", + "deployedAt": "2026-01-14T18:00:00.000Z" + }, + "infra": { + "colo": "LHR", + "country": "GB" + }, + "service": { + "name": "keyboardia", + "environment": "production" + } +} +``` + +**Reliability:** Guaranteed for every connection. Cloudflare's Hibernation API uses automatic ping/pong (`setWebSocketAutoResponse`) to detect dead connections. + +| Close Type | Timing | `disconnectReason` | Cause | +|------------|--------|-------------------|-------| +| Clean | Immediate | `normal_close` | Browser sends close frame (navigation, tab close) | +| Dirty | 10-30s delay | `timeout` | No close frame (tab killed, mobile backgrounded, network drop) | + +**Note:** For dirty closes, `duration_ms` includes the ping timeout. Use `messagesByType` activity or `totalPlayTime_ms` for true engagement time. + +**Queryable questions:** +- "Do people spend more time on published vs editable?" β†’ `AVG(duration_ms) GROUP BY isPublished` +- "Which sessions get the most total attention?" β†’ `SUM(duration_ms) GROUP BY sessionId ORDER BY 1 DESC` +- "What's the creator-to-joiner ratio?" β†’ `COUNT(*) GROUP BY isCreator` +- "Which published sessions have the most unique viewers?" β†’ `COUNT(DISTINCT playerId) WHERE isPublished GROUP BY sessionId` + +**Expected behavioral patterns:** + +| Dimension | Published (read-only) | Editable | +|-----------|----------------------|----------| +| `messagesByType` | Mostly `play`/`stop` | Rich: `toggle_step`, `set_tempo`, etc. | +| `peakConcurrentPlayers` | Usually 1 (viewing alone) | Higher (collaboration) | +| `messageCount` | Low (passive consumption) | High (active editing) | +| View count | Higher (shareable) | Lower (private work) | +| Engagement depth | Shallow but broad | Deep but narrow | + +--- + +### Slug Convention + +For error slugs: lowercase-kebab-case, specific enough to identify the error class without being unique per instance. Good: `kv-quota-exceeded`. Bad: `error` or `kv-quota-exceeded-sess_abc123`. + +--- + +### Warning Type + +Warnings capture recovered errors and near-misses β€” operations that succeeded despite problems. + +```typescript +interface Warning { + type: string; // "KVReadRetry", "SlowDO", "StateRepair" + message: string; // Human-readable description + occurredAt: string; // ISO 8601 + + recoveryAction: + | "retry_succeeded" // Failed, retried, eventually worked + | "fallback_used" // Primary failed, fallback worked + | "auto_repaired" // State corruption fixed automatically + | "degraded_response"; // Partial success (e.g., served stale) + + attemptNumber?: number; // Which attempt succeeded + totalAttempts?: number; // Total attempts made + latency_ms?: number; // For slow operation warnings +} +``` + +**Warning types:** + +| Type | Action | Trigger | +|------|--------|---------| +| `KVReadRetry` | `retry_succeeded` | KV read failed then succeeded | +| `KVWriteRetry` | `retry_succeeded` | KV write failed then succeeded | +| `DORequestRetry` | `retry_succeeded` | DO request failed then succeeded | +| `StateRepair` | `auto_repaired` | Invariant violation fixed | +| `SlowKV` | `degraded_response` | KV latency > 500ms | +| `SlowDO` | `degraded_response` | DO latency > 200ms | + +**Limit:** Max 10 warnings per event to prevent unbounded growth. + +--- + +### Collecting Warnings + +Warnings are collected during execution and included in the final wide event. + +**HTTP requests β€” explicit parameter:** + +```typescript +// Handler creates warnings array, passes to helpers +async function handleSessionAccess(request: Request, env: Env): Promise { + const warnings: Warning[] = []; + const startTime = Date.now(); + + const session = await kvGetWithRetry(env.SESSIONS, key, warnings); + + // At end, emit wide event + console.log(JSON.stringify({ + event: "http_request", + duration_ms: Date.now() - startTime, + warnings, + // ... + })); + + return new Response(JSON.stringify(session)); +} + +// Helper adds warnings when recovering +async function kvGetWithRetry( + kv: KVNamespace, + key: string, + warnings: Warning[] +): Promise { + try { + return await kv.get(key, 'json'); + } catch (error) { + const result = await kv.get(key, 'json'); // Retry + warnings.push({ + type: "KVReadRetry", + message: `Retry succeeded for ${key}`, + occurredAt: new Date().toISOString(), + recoveryAction: "retry_succeeded", + attemptNumber: 2, + totalAttempts: 2 + }); + return result; + } +} +``` + +**WebSocket sessions β€” instance Map:** + +```typescript +class LiveSessionDurableObject { + private connectionWarnings = new Map(); + + private addWarning(ws: WebSocket, warning: Omit) { + const warnings = this.connectionWarnings.get(ws) ?? []; + if (warnings.length < 10) { + warnings.push({ ...warning, occurredAt: new Date().toISOString() }); + this.connectionWarnings.set(ws, warnings); + } + } + + async webSocketClose(ws: WebSocket) { + console.log(JSON.stringify({ + event: "ws_session", + warnings: this.connectionWarnings.get(ws) ?? [], + // ... + })); + this.connectionWarnings.delete(ws); + } +} +``` + +--- + +## Design Decisions + +### `http_request` β€” Included vs Excluded + +| Included | Why | Excluded | Why Not | +|----------|-----|----------|---------| +| `playerId` | Per-connection analytics, session linking | Request body | Too large, rarely needed | +| `sessionId` | Link requests to sessions | Response body | Too large | +| `sourceSessionId` | Remix virality tracking | IP address | Privacy (used server-side for isCreator, not logged) | +| `isPublished` | Published vs editable consumption | Full User-Agent | Noise, deviceType suffices | +| `deviceType` | Mobile vs desktop segmentation | Headers | Noise | +| `action` (create/access/publish/remix) | Business metrics, funnel analysis | Detailed geo | City/region overkill; colo+country sufficient | +| `duration_ms` | Performance debugging | | | +| `kvReads`, `kvWrites`, `doRequests` | Cost attribution | | | +| `deploy`, `infra`, `service` | Release correlation, regional patterns | | | + +**Note on `isCreator`:** Determined by comparing the connecting user's identity with the stored creator identity. Creator identity is captured at session creation time as: +- `CF-Connecting-IP` β€” Cloudflare-provided client IP address +- `User-Agent` hash β€” SHA-256 hash of browser User-Agent string + +This is more reliable than `playerId` because: +1. `playerId` is generated server-side on every WebSocket connection (ephemeral) +2. Page refresh = new `playerId`, but IP + User-Agent remains stable +3. Creator identity persists across page refreshes within the same browser/network + +**Storage:** Creator identity is stored in DO state when session is created, then compared on each WebSocket connection. + +**Limitations:** +- Different network (VPN, mobile data switch) = different IP = not recognized as creator +- Different browser = different User-Agent hash = not recognized as creator +- Shared IP (NAT, office network) with same browser = false positive possible (rare) + +These limitations are acceptable because: +1. Creator/joiner segmentation is for analytics patterns, not access control +2. False negatives (creator appears as joiner) only slightly skew metrics +3. The majority case (same browser, same network) works correctly + +### `ws_session` β€” Included vs Excluded + +| Included | Why | Excluded | Why Not | +|----------|-----|----------|---------| +| `isCreator` | Segment creators vs joiners | Individual message payloads | Massive, in DO already | +| `isPublished` | Published vs editable engagement | Full player list | Privacy, rarely needed | +| `messagesByType` | Understand usage patterns | Per-message timestamps | Too granular | +| `peakConcurrentPlayers` | Multiplayer health | Pattern state snapshots | Huge, stored in DO | +| `playCount`, `totalPlayTime_ms` | Engagement metrics | Network latency samples | Complex to capture | +| `deploy`, `infra`, `service` | Release correlation, regional patterns | | | + +### Error Fields (embedded in `http_request` and `ws_session`) + +| Included | Why | Excluded | Why Not | +|----------|-----|----------|---------| +| `outcome` | Instant filtering (ok vs error) | | | +| `error.type`, `error.message` | Classification | Full stack trace | Truncate to 500 chars | +| `error.slug` | Machine-readable grouping, alerting | Error instance ID | Slug is for classes, not instances | +| `error.expected` | Filter noise in dashboards | Severity levels | Binary is simpler | +| `error.handler` | Locate code path | Environment variables | Security risk | + +--- + +## Typical Traces + +Most users **join** sessions rather than create them. The traces differ: + +### Creator Flow (minority of users) + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ http_request β”‚ POST /api/sessions +β”‚ action: "create" β”‚ +β”‚ sessionId: abc β”‚ +β”‚ (IP: 1.2.3.4) β”‚ ← Creator IP captured +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό + DO stores creatorIdentity = { ip: "1.2.3.4", userAgentHash: "a1b2c3" } + β”‚ + β–Ό + (WebSocket lifecycle - messages accumulated, not logged) + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ ws_session β”‚ +β”‚ sessionId: abc β”‚ +β”‚ isCreator: true β”‚ ← IP matches stored creatorIdentity +β”‚ messageCount: 200 β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### Joiner Flow (majority of users) + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ http_request β”‚ GET /api/sessions/abc +β”‚ action: "access" β”‚ ← Access, not create +β”‚ sessionId: abc β”‚ +β”‚ (IP: 5.6.7.8) β”‚ ← Different IP +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό + DO compares { ip: "5.6.7.8", userAgentHash: "x9y8z7" } != creatorIdentity + β”‚ + β–Ό + (WebSocket lifecycle - messages accumulated, not logged) + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ ws_session β”‚ +β”‚ sessionId: abc β”‚ +β”‚ isCreator: false β”‚ ← IP doesn't match = joiner +β”‚ messageCount: 47 β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### Combined: One Session, Multiple Users + +``` +Timeline +────────────────────────────────────────────────────────────────────────► + +Creator creates session (IP: 1.2.3.4) +β”‚ +β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ http_request β”‚ action: "create" +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + DO stores creatorIdentity = { ip: "1.2.3.4", userAgentHash: "..." } + β”‚ + Creator shares link + β”‚ + β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ β”‚ + β–Ό β–Ό + Joiner A clicks link (IP: 5.6.7.8) Joiner B clicks link (IP: 9.0.1.2) + β”‚ β”‚ + β–Ό β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ http_request β”‚ β”‚ http_request β”‚ +β”‚ action: "access" β”‚ β”‚ action: "access" β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ β”‚ + β–Ό β–Ό + All 3 collaborate via WebSocket (no events during) + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ ws_session β”‚ β”‚ ws_session β”‚ β”‚ ws_session β”‚ +β”‚ isCreator: true β”‚ β”‚ isCreator: false β”‚ β”‚ isCreator: false β”‚ +β”‚ (IP matched) β”‚ β”‚ (IP: 5.6.7.8) β”‚ β”‚ (IP: 9.0.1.2) β”‚ +β”‚ messageCount: 200 β”‚ β”‚ messageCount: 47 β”‚ β”‚ messageCount: 23 β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + +Total events for this session: 3 http_request + 3 ws_session = 6 +Traditional logging would emit: ~300+ log lines +``` + +--- + +## Architecture Sequence Diagram + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” +β”‚ Browser β”‚ β”‚ Cloudflare Workerβ”‚ β”‚ Durable Object β”‚ β”‚ KV β”‚ +β”‚ (React) β”‚ β”‚ (API) β”‚ β”‚ (SessionDO) β”‚ β”‚ β”‚ +β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”˜ + β”‚ β”‚ β”‚ β”‚ + β”‚ GET /api/sessions/abc (joiner) β”‚ β”‚ + │───────────────────>β”‚ β”‚ β”‚ + β”‚ β”‚ get session β”‚ β”‚ + β”‚ │───────────────────────>β”‚ β”‚ + β”‚ β”‚ β”‚ get(session) β”‚ + β”‚ β”‚ │─────────────────>β”‚ + β”‚ │◄───────────────────────│◄─────────────────│ + β”‚ β”‚ β”‚ β”‚ + β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ + β”‚ β”‚ β”‚ http_request β”‚ β”‚ β”‚ + β”‚ β”‚ β”‚ action: "access" β”‚ β”‚ β”‚ + β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ + │◄───────────────────│ β”‚ β”‚ β”‚ + β”‚ 200 OK β”‚ β–Ό β”‚ β”‚ + β”‚ β”‚ Workers Logs β”‚ β”‚ + β”‚ β”‚ β”‚ β”‚ + β”‚ WebSocket upgrade β”‚ β”‚ β”‚ + │───────────────────>│───────────────────────>β”‚ β”‚ + │◄────────────────────────────────────────────│ (WS established) β”‚ + β”‚ β”‚ β”‚ β”‚ + β”‚ β”‚ β”‚ // Compare IP with creatorIdentity + β”‚ β”‚ β”‚ context = { β”‚ + β”‚ β”‚ β”‚ isCreator: β”‚ + β”‚ β”‚ β”‚ false, β”‚ ← IP doesn't match + β”‚ β”‚ β”‚ msgCount: 0 β”‚ + β”‚ β”‚ β”‚ } β”‚ + β”‚ β”‚ β”‚ β”‚ + β”‚ toggle_step ─────────────────────────────>β”‚ context.msgCount++ + β”‚ set_tempo ───────────────────────────────>β”‚ context.msgCount++ + β”‚ play ────────────────────────────────────>β”‚ context.msgCount++ + β”‚ ...more messages... β”‚ β”‚ + β”‚ β”‚ β”‚ β”‚ + β”‚ close ───────────────────────────────────>β”‚ β”‚ + β”‚ β”‚ β”‚ β”‚ + β”‚ β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ β”‚ β”‚ β”‚ ws_session β”‚ + β”‚ β”‚ β”‚ β”‚ isCreator: false β”‚ + β”‚ β”‚ β”‚ β”‚ messageCount: 47 β”‚ + β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ β”‚ β”‚ β”‚ + β”‚ β”‚ β”‚ β–Ό + β”‚ β”‚ β”‚ Workers Logs +``` + +--- + +## Implementation + +### Creator Identity + +Creator identity is determined by IP address + User-Agent hash, stored when session is created: + +```typescript +interface CreatorIdentity { + ip: string; // CF-Connecting-IP header + userAgentHash: string; // SHA-256 of User-Agent +} + +// Hash User-Agent to avoid storing raw strings +async function hashUserAgent(userAgent: string): Promise { + const encoder = new TextEncoder(); + const data = encoder.encode(userAgent); + const hash = await crypto.subtle.digest('SHA-256', data); + return Array.from(new Uint8Array(hash)) + .map(b => b.toString(16).padStart(2, '0')) + .join('') + .slice(0, 16); // First 16 chars sufficient +} + +function identitiesMatch(a: CreatorIdentity, b: CreatorIdentity): boolean { + return a.ip === b.ip && a.userAgentHash === b.userAgentHash; +} +``` + +### Context Accumulator Pattern + +For wide events, accumulate context during the lifecycle: + +```typescript +// In Durable Object +class SessionDO { + private wsContext: Map = new Map(); + private creatorIdentity: CreatorIdentity | null = null; // Set when session is created + + async handleSessionCreate(request: Request) { + // Capture creator identity from the creation request + this.creatorIdentity = { + ip: request.headers.get('CF-Connecting-IP') || 'unknown', + userAgentHash: await hashUserAgent(request.headers.get('User-Agent') || ''), + }; + } + + async handleWebSocketConnect(ws: WebSocket, playerId: string, request: Request) { + // Derive connecting user's identity + const connectingIdentity: CreatorIdentity = { + ip: request.headers.get('CF-Connecting-IP') || 'unknown', + userAgentHash: await hashUserAgent(request.headers.get('User-Agent') || ''), + }; + + // Compare with stored creator identity + const isCreator = this.creatorIdentity + ? identitiesMatch(this.creatorIdentity, connectingIdentity) + : false; + + this.wsContext.set(ws, { + connectionId: crypto.randomUUID(), + playerId, + isCreator, // Based on IP + User-Agent, not playerId + connectedAt: new Date().toISOString(), + messageCount: 0, + messagesByType: {}, + peakConcurrentPlayers: this.connections.size, + // ... other fields initialized + }); + } + + handleWebSocketMessage(ws: WebSocket, message: Message) { + const ctx = this.wsContext.get(ws)!; + ctx.messageCount++; + ctx.messagesByType[message.type] = (ctx.messagesByType[message.type] || 0) + 1; + ctx.peakConcurrentPlayers = Math.max(ctx.peakConcurrentPlayers, this.connections.size); + // ... handle message + } + + handleWebSocketClose(ws: WebSocket, reason: string) { + const ctx = this.wsContext.get(ws)!; + + // Emit wide event + console.log(JSON.stringify({ + event: "ws_session", + ...ctx, + disconnectedAt: new Date().toISOString(), + duration_ms: Date.now() - new Date(ctx.connectedAt).getTime(), + disconnectReason: reason, + })); + + this.wsContext.delete(ws); + } +} +``` + +### HTTP Middleware + +Wrap handlers to emit `http_request`. + +> **Note:** This example shows the basic pattern. Full implementation would also extract `deviceType` (from User-Agent), `sessionId`/`playerId` (from request context), `isPublished` (from session lookup), `routePattern` (from router), and `action` (from route handler). + +```typescript +function withObservability(handler: Handler): Handler { + return async (request, env, ctx) => { + const startTime = Date.now(); + const requestId = crypto.randomUUID(); + const metrics = { kvReads: 0, kvWrites: 0, doRequests: 0 }; + + try { + const response = await handler(request, env, { ...ctx, requestId, metrics }); + + console.log(JSON.stringify({ + event: "http_request", + requestId, + method: request.method, + path: new URL(request.url).pathname, + timestamp: new Date().toISOString(), + duration_ms: Date.now() - startTime, + status: response.status, + ...metrics, + })); + + return response; + } catch (error) { + console.log(JSON.stringify({ + event: "http_request", + requestId, + method: request.method, + path: new URL(request.url).pathname, + timestamp: new Date().toISOString(), + duration_ms: Date.now() - startTime, + status: 500, + errorType: error.name, + errorMessage: error.message, + ...metrics, + })); + throw error; + } + }; +} +``` + +--- + +## Migration Path + +### Phase 1: Enable Workers Logs +- Add `observability` config to wrangler.jsonc +- No code changes, get automatic invocation logs + +### Phase 2: Add Wide Events +- Implement `http_request` in API routes +- Implement `ws_session` in Durable Object +- Run in parallel with existing logging + +### Phase 3: Remove Legacy Logging +- Remove KV-based log writes +- Remove per-action console.log calls +- Update debug endpoints to query Workers Logs API + +--- + +## Estimated Effort + +| Phase | Work | Estimate | +|-------|------|----------| +| Phase 1 | Config only | 30 min | +| Phase 2 | Wide events | 8 hours | +| Phase 3 | Cleanup | 4 hours | +| **Total** | | **~13 hours** | + +--- + +## Event Volume (1,000 DAU baseline) + +| Event | Frequency | Daily Volume | +|-------|-----------|--------------| +| http_request | Every API call | ~10,000 | +| ws_session | Every WS disconnect | ~1,500 | +| **Total** | | **~11,500** | + +**Note:** All errors (server-side and client-reported) are embedded in `http_request` and `ws_session` events with `outcome: "error"`. No separate error event type. + +Workers Logs limit: 5 billion/day. Usage: 0.0002% + +--- + +*Spec created: January 2026* diff --git a/specs/OBSERVABILITY.md b/specs/OBSERVABILITY.md index e79eea7..36e7353 100644 --- a/specs/OBSERVABILITY.md +++ b/specs/OBSERVABILITY.md @@ -1,7 +1,8 @@ # Observability Architecture > **Status:** Phase 7 Complete -> **Related:** [ROADMAP.md](../specs/ROADMAP.md) Phase 7 +> **Related:** [ROADMAP.md](./ROADMAP.md) Phase 7 +> **See Also:** [Observability 2.0 Research](./research/OBSERVABILITY-2-0.md), [Observability 2.0 Implementation Spec](./OBSERVABILITY-2-0-IMPLEMENTATION.md) --- diff --git a/specs/research/OBSERVABILITY-2-0.md b/specs/research/OBSERVABILITY-2-0.md new file mode 100644 index 0000000..fe77f80 --- /dev/null +++ b/specs/research/OBSERVABILITY-2-0.md @@ -0,0 +1,228 @@ +# Observability 2.0 Research + +> **Type:** Research Document +> **Status:** Reference material for future implementation decisions +> **Related:** [OBSERVABILITY.md](../OBSERVABILITY.md) (current implementation) +> **See Also:** [Implementation Spec](../OBSERVABILITY-2-0-IMPLEMENTATION.md) + +--- + +## Summary + +This document captures research into modern observability practices, specifically the "Observability 2.0" philosophy championed by Charity Majors (Honeycomb) and articulated on loggingsucks.com. The goal is to inform future decisions, not prescribe immediate changes. + +--- + +## Key Sources + +| Source | Key Contribution | +|--------|------------------| +| [loggingsucks.com](https://loggingsucks.com/) | Wide events pattern, "emit once at end" | +| [Charity Majors / Honeycomb](https://charity.wtf/) | Observability 2.0 framework, high-cardinality advocacy | +| [Observability Engineering (O'Reilly)](https://www.oreilly.com/library/view/observability-engineering/9781492076438/) | Industry best practices | +| [Cloudflare Workers Logs](https://developers.cloudflare.com/workers/observability/logs/workers-logs/) | Platform-native capabilities | + +--- + +## Core Insight: Observability 1.0 vs 2.0 + +| Observability 1.0 | Observability 2.0 | +|-------------------|-------------------| +| Three pillars (metrics, logs, traces) as separate concerns | **Single source of truth**: wide structured events | +| Collect metrics independently | **Derive metrics** from events at query time | +| Design for known questions | **Design for unknown unknowns** | +| Aggregate at write time | **Never aggregate**β€”keep raw events | + +> "Observability 2.0 has one source of truth, wide structured log events, from which you can derive all the other data types." +> β€” Charity Majors + +--- + +## Wide Events Pattern + +Instead of emitting many log lines throughout a request lifecycle, **build up context in memory and emit once at the end**. + +**Traditional approach** (many emissions): +``` +[WS] connect session=abc player=xyz +[WS] message session=abc type=toggle_step +[WS] message session=abc type=set_tempo +[WS] disconnect session=abc duration=120s +``` + +**Wide event approach** (single emission with everything): +```json +{ + "sessionId": "abc", + "playerId": "xyz", + "duration": 120, + "messageCount": 47, + "messagesByType": { "toggle_step": 42, "set_tempo": 5 }, + "disconnectReason": "normal_close" +} +``` + +**Benefits:** +- Richer context for debugging (all fields together) +- Fewer writes (cost savings) +- Enables arbitrary queries ("show me sessions where X AND Y AND Z") + +--- + +## Designing Wide Events + +> "Replace your log lines and log levels with arbitrarily wide structured events that describe the request and its context, one event per request per service." +> β€” Charity Majors + +**Principles:** + +1. **One event per lifecycle** β€” Identify the natural lifecycle (HTTP request, WebSocket connection, background job) and emit exactly one event when it ends. + +2. **Build context, emit once** β€” Accumulate data in memory during the lifecycle. Don't log as you go; capture everything and emit at the end. + +3. **Include identifiers freely** β€” `sessionId`, `playerId`, `requestId` β€” high cardinality is the **feature**, not the bug. These enable filtering and grouping. + +4. **Aggregate inside the event** β€” Instead of emitting 47 `message_received` events, emit one event with `messageCount: 47` and `messagesByType: { toggle_step: 42, set_tempo: 5 }`. + +5. **Capture both outcome and journey** β€” Include the result (`status`, `disconnectReason`) AND what happened along the way (`duration_ms`, `errorCount`, `kvWrites`). + +6. **Never aggregate at write-time** β€” Compute metrics, percentiles, aggregations at query time. Pre-aggregation kills the ability to answer unknown unknowns. + +**The "Stuff the Blob" Pattern** (from Stripe's Canonical Log Lines): +``` +1. Initialize empty blob at request start +2. Stuff any interesting details throughout the request lifetime +3. Write it out just before the request errors or exits +4. Bypass local disk entirely, write to remote service +``` + +**Anti-patterns to avoid:** +- Low-dimensionality structured logs (5 fields is not a wide event) +- Logging entire objects with PII +- Wrapping everything in spans without context +- Unstructured string logs designed for grep + +**Litmus test:** If you need to JOIN or correlate multiple log lines to answer a question, your events aren't wide enough. + +--- + +## Cloudflare Workers Logs + +Cloudflare provides native observability that we're not currently using: + +| Feature | Capability | +|---------|------------| +| **Workers Logs** | 7-day retention, automatic JSON indexing, Query Builder | +| **Head sampling** | `head_sampling_rate` in wrangler.jsonc | +| **Automatic instrumentation** | KV, DO, fetch() calls traced automatically | +| **Invocation logs** | Request metadata captured without code | + +**Key configuration:** +```jsonc +{ + "observability": { + "enabled": true, + "logs": { + "enabled": true, + "invocation_logs": true + } + } +} +``` + +This would replace our KV-based logging with zero additional infrastructure. + +--- + +## Applicability to Keyboardia + +### Current State (Phase 7) +- Per-event logging to KV (connect, message, disconnect) +- Separate metrics tracking functions +- 1-hour TTL in KV +- String-formatted console logs + +### If We Adopted Obs 2.0 +- Single `ws_session_end` event per connection lifecycle +- Metrics derived from events, not collected separately +- 7-day retention via Workers Logs +- Structured JSON for queryability + +### Trade-offs + +| Aspect | Current | Obs 2.0 | +|--------|---------|---------| +| Implementation effort | Done | ~13 hours | +| KV writes for observability | Multiple per connection | Zero | +| Retention | 1 hour | 7 days | +| Query flexibility | Fixed debug endpoints | Arbitrary SQL-like queries | +| Complexity | Simple, understood | New patterns to learn | + +--- + +## Event Volume Estimates + +### Assumptions + +| Behavior | Rate | +|----------|------| +| Sessions created/user | 0.5 | +| Session accesses/user | 3 | +| Multiplayer adoption | 30% | +| Plays per session | 5 | + +**~11.5 events/user/day** (see Implementation Spec for detailed breakdown) + +### Volume by Scale + +| DAU | Events/Day | % of 5B Limit | Context | +|-----|------------|---------------|---------| +| 30 | ~350 | 0.00001% | Early launch | +| 1,000 | ~11,500 | 0.0002% | **Primary baseline** | +| 10,000 | ~115,000 | 0.002% | Growth target | +| 100,000 | ~1.15M | 0.02% | Scale scenario | + +--- + +## Cost Analysis + +### Workers Logs Pricing + +As of January 2026, Workers Logs are **included in all Workers plans** (Free and Paid): + +| Plan | Log Limit | Cost | +|------|-----------|------| +| Free | 5B logs/day | $0 | +| Paid ($5/mo) | 5B logs/day | Included | + +**At 1,000 DAU baseline: Zero additional cost.** + +### Total Observability Cost + +| Component | Current (Phase 7) | With Obs 2.0 | +|-----------|-------------------|--------------| +| KV writes for logs | ~$0.05/month | $0 (eliminated) | +| Workers Logs | N/A | $0 (included) | +| Paid Workers plan | $5/month | $5/month | +| **Total** | **~$5.05/month** | **$5/month** | + +Obs 2.0 would slightly reduce costs by eliminating KV writes for logging, while providing 7-day retention (vs 1-hour) and query capabilities. + +--- + +## Recommendation + +**No immediate action required.** + +The current Phase 7 observability is functional. Obs 2.0 principles are worth adopting if/when: + +1. KV quota becomes a constraint (it hasn't since QUOTA-OBSERVABILITY.md) +2. We need to debug issues that require richer context +3. We want longer retention (7 days vs 1 hour) +4. We want arbitrary query capability + +Keep this document as reference for when those needs arise. + +--- + +*Research compiled: January 2026*