diff --git a/src/cli.ts b/src/cli.ts index e617c31e..ff43e966 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -209,7 +209,23 @@ export const cliOptions = { // Marked as `false` until the feature is ready to be enabled by default. default: false, hidden: true, - describe: 'Set to false to opt-out of usage statistics collection.', + describe: + 'Set to false to opt-out of usage statistics collection. Google collects usage data to improve the tool, handled under the Google Privacy Policy (https://policies.google.com/privacy). This is independent from Chrome browser metrics.', + }, + clearcutEndpoint: { + type: 'string', + hidden: true, + describe: 'Endpoint for Clearcut telemetry.', + }, + clearcutForceFlushIntervalMs: { + type: 'number', + hidden: true, + describe: 'Force flush interval in milliseconds (for testing).', + }, + clearcutIncludePidHeader: { + type: 'boolean', + hidden: true, + describe: 'Include watchdog PID in Clearcut request headers (for testing).', }, } satisfies Record; diff --git a/src/main.ts b/src/main.ts index 00c5cac1..eb2c12b5 100644 --- a/src/main.ts +++ b/src/main.ts @@ -42,6 +42,9 @@ if (args.usageStatistics) { clearcutLogger = new ClearcutLogger({ logFile: args.logFile, appVersion: VERSION, + clearcutEndpoint: args.clearcutEndpoint, + clearcutForceFlushIntervalMs: args.clearcutForceFlushIntervalMs, + clearcutIncludePidHeader: args.clearcutIncludePidHeader, }); } diff --git a/src/telemetry/clearcut-logger.ts b/src/telemetry/clearcut-logger.ts index 80891ff4..6a1d6351 100644 --- a/src/telemetry/clearcut-logger.ts +++ b/src/telemetry/clearcut-logger.ts @@ -37,6 +37,9 @@ export class ClearcutLogger { logFile?: string; persistence?: Persistence; watchdogClient?: WatchdogClient; + clearcutEndpoint?: string; + clearcutForceFlushIntervalMs?: number; + clearcutIncludePidHeader?: boolean; }) { this.#persistence = options.persistence ?? new FilePersistence(); this.#watchdog = @@ -46,6 +49,9 @@ export class ClearcutLogger { appVersion: options.appVersion, osType: detectOsType(), logFile: options.logFile, + clearcutEndpoint: options.clearcutEndpoint, + clearcutForceFlushIntervalMs: options.clearcutForceFlushIntervalMs, + clearcutIncludePidHeader: options.clearcutIncludePidHeader, }); } diff --git a/src/telemetry/types.ts b/src/telemetry/types.ts index 2bb20192..24d18b94 100644 --- a/src/telemetry/types.ts +++ b/src/telemetry/types.ts @@ -47,6 +47,14 @@ export interface LogRequest { }>; } +export interface LogResponse { + /** + * If present, the client must wait this many milliseconds before + * issuing the next HTTP request. + */ + next_request_wait_millis?: number; +} + // Enums export enum OsType { OS_TYPE_UNSPECIFIED = 0, diff --git a/src/telemetry/watchdog-client.ts b/src/telemetry/watchdog-client.ts index 49f21b8f..36090541 100644 --- a/src/telemetry/watchdog-client.ts +++ b/src/telemetry/watchdog-client.ts @@ -20,6 +20,9 @@ export class WatchdogClient { appVersion: string; osType: OsType; logFile?: string; + clearcutEndpoint?: string; + clearcutForceFlushIntervalMs?: number; + clearcutIncludePidHeader?: boolean; }, options?: {spawn?: typeof spawn}, ) { @@ -37,6 +40,17 @@ export class WatchdogClient { if (config.logFile) { args.push(`--log-file=${config.logFile}`); } + if (config.clearcutEndpoint) { + args.push(`--clearcut-endpoint=${config.clearcutEndpoint}`); + } + if (config.clearcutForceFlushIntervalMs) { + args.push( + `--clearcut-force-flush-interval-ms=${config.clearcutForceFlushIntervalMs}`, + ); + } + if (config.clearcutIncludePidHeader) { + args.push('--clearcut-include-pid-header'); + } const spawner = options?.spawn ?? spawn; this.#childProcess = spawner(process.execPath, args, { diff --git a/src/telemetry/watchdog/clearcut-sender.ts b/src/telemetry/watchdog/clearcut-sender.ts index ebd3b338..cd9a38cc 100644 --- a/src/telemetry/watchdog/clearcut-sender.ts +++ b/src/telemetry/watchdog/clearcut-sender.ts @@ -7,53 +7,246 @@ import crypto from 'node:crypto'; import {logger} from '../../logger.js'; -import type {ChromeDevToolsMcpExtension, OsType} from '../types.js'; +import type { + ChromeDevToolsMcpExtension, + LogRequest, + LogResponse, + OsType, +} from '../types.js'; +export interface ClearcutSenderConfig { + appVersion: string; + osType: OsType; + clearcutEndpoint?: string; + forceFlushIntervalMs?: number; + includePidHeader?: boolean; +} + +const MAX_BUFFER_SIZE = 1000; +const DEFAULT_CLEARCUT_ENDPOINT = + 'https://play.googleapis.com/log?format=json_proto'; +const DEFAULT_FLUSH_INTERVAL_MS = 15 * 60 * 1000; + +const LOG_SOURCE = 2839; +const CLIENT_TYPE = 47; +const MIN_RATE_LIMIT_WAIT_MS = 30_000; +const REQUEST_TIMEOUT_MS = 30_000; +const SHUTDOWN_TIMEOUT_MS = 5_000; const SESSION_ROTATION_INTERVAL_MS = 24 * 60 * 60 * 1000; +interface BufferedEvent { + event: ChromeDevToolsMcpExtension; + timestamp: number; +} + export class ClearcutSender { #appVersion: string; #osType: OsType; + #clearcutEndpoint: string; + #flushIntervalMs: number; + #includePidHeader: boolean; #sessionId: string; #sessionCreated: number; + #buffer: BufferedEvent[] = []; + #flushTimer: ReturnType | null = null; + #isFlushing = false; + #timerStarted = false; - constructor(appVersion: string, osType: OsType) { - this.#appVersion = appVersion; - this.#osType = osType; + constructor(config: ClearcutSenderConfig) { + this.#appVersion = config.appVersion; + this.#osType = config.osType; + this.#clearcutEndpoint = + config.clearcutEndpoint ?? DEFAULT_CLEARCUT_ENDPOINT; + this.#flushIntervalMs = + config.forceFlushIntervalMs ?? DEFAULT_FLUSH_INTERVAL_MS; + this.#includePidHeader = config.includePidHeader ?? false; this.#sessionId = crypto.randomUUID(); this.#sessionCreated = Date.now(); } - async send(event: ChromeDevToolsMcpExtension): Promise { - this.#rotateSessionIfNeeded(); - const enrichedEvent = this.#enrichEvent(event); - this.transport(enrichedEvent); - } + enqueueEvent(event: ChromeDevToolsMcpExtension): void { + if (Date.now() - this.#sessionCreated > SESSION_ROTATION_INTERVAL_MS) { + this.#sessionId = crypto.randomUUID(); + this.#sessionCreated = Date.now(); + } - transport(event: ChromeDevToolsMcpExtension): void { - logger('Telemetry event', JSON.stringify(event, null, 2)); + this.#addToBuffer({ + ...event, + session_id: this.#sessionId, + app_version: this.#appVersion, + os_type: this.#osType, + }); + + if (!this.#timerStarted) { + this.#timerStarted = true; + this.#scheduleFlush(this.#flushIntervalMs); + } } async sendShutdownEvent(): Promise { + if (this.#flushTimer) { + clearTimeout(this.#flushTimer); + this.#flushTimer = null; + } + const shutdownEvent: ChromeDevToolsMcpExtension = { server_shutdown: {}, }; - await this.send(shutdownEvent); + this.enqueueEvent(shutdownEvent); + + try { + await Promise.race([ + this.#finalFlush(), + new Promise(resolve => setTimeout(resolve, SHUTDOWN_TIMEOUT_MS)), + ]); + } catch (error) { + logger('Final flush failed:', error); + } } - #rotateSessionIfNeeded(): void { - if (Date.now() - this.#sessionCreated > SESSION_ROTATION_INTERVAL_MS) { - this.#sessionId = crypto.randomUUID(); - this.#sessionCreated = Date.now(); + async #flush(): Promise { + if (this.#isFlushing) { + return; + } + + if (this.#buffer.length === 0) { + this.#scheduleFlush(this.#flushIntervalMs); + return; + } + + this.#isFlushing = true; + let nextDelayMs = this.#flushIntervalMs; + + // Optimistically remove events from buffer before sending. + // This prevents race conditions where a simultaneous #finalFlush would include these same events. + const eventsToSend = [...this.#buffer]; + this.#buffer = []; + + try { + const result = await this.#sendBatch(eventsToSend); + + if (result.success) { + if (result.nextRequestWaitMs !== undefined) { + nextDelayMs = Math.max( + result.nextRequestWaitMs, + MIN_RATE_LIMIT_WAIT_MS, + ); + } + } else if (result.isPermanentError) { + logger( + 'Permanent error, dropped batch of', + eventsToSend.length, + 'events', + ); + } else { + // Transient error: Requeue events at the front of the buffer + // to maintain order and retry them later. + this.#buffer = [...eventsToSend, ...this.#buffer]; + } + } catch (error) { + // Safety catch for unexpected errors, requeue events + this.#buffer = [...eventsToSend, ...this.#buffer]; + logger('Flush failed unexpectedly:', error); + } finally { + this.#isFlushing = false; + this.#scheduleFlush(nextDelayMs); } } - #enrichEvent(event: ChromeDevToolsMcpExtension): ChromeDevToolsMcpExtension { - return { - ...event, - session_id: this.#sessionId, - app_version: this.#appVersion, - os_type: this.#osType, + #addToBuffer(event: ChromeDevToolsMcpExtension): void { + if (this.#buffer.length >= MAX_BUFFER_SIZE) { + this.#buffer.shift(); + logger('Telemetry buffer overflow: dropped oldest event'); + } + this.#buffer.push({ + event, + timestamp: Date.now(), + }); + } + + #scheduleFlush(delayMs: number): void { + if (this.#flushTimer) { + clearTimeout(this.#flushTimer); + } + this.#flushTimer = setTimeout(() => { + this.#flush().catch(err => { + logger('Flush error:', err); + }); + }, delayMs); + } + + async #sendBatch(events: BufferedEvent[]): Promise<{ + success: boolean; + isPermanentError?: boolean; + nextRequestWaitMs?: number; + }> { + const requestBody: LogRequest = { + log_source: LOG_SOURCE, + request_time_ms: Date.now().toString(), + client_info: { + client_type: CLIENT_TYPE, + }, + log_event: events.map(({event, timestamp}) => ({ + event_time_ms: timestamp.toString(), + source_extension_json: JSON.stringify(event), + })), }; + + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), REQUEST_TIMEOUT_MS); + try { + const response = await fetch(this.#clearcutEndpoint, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + // Used in E2E tests to confirm that the watchdog process is killed + ...(this.#includePidHeader + ? {'X-Watchdog-Pid': process.pid.toString()} + : {}), + }, + body: JSON.stringify(requestBody), + signal: controller.signal, + }); + + clearTimeout(timeoutId); + if (response.ok) { + const data = (await response.json()) as LogResponse; + return { + success: true, + nextRequestWaitMs: data.next_request_wait_millis, + }; + } + + const status = response.status; + if (status >= 500 || status === 429) { + return {success: false}; + } + + logger('Telemetry permanent error:', status); + return {success: false, isPermanentError: true}; + } catch { + clearTimeout(timeoutId); + return {success: false}; + } + } + + async #finalFlush(): Promise { + if (this.#buffer.length === 0) { + return; + } + const eventsToSend = [...this.#buffer]; + await this.#sendBatch(eventsToSend); + } + + stopForTesting(): void { + if (this.#flushTimer) { + clearTimeout(this.#flushTimer); + this.#flushTimer = null; + } + this.#timerStarted = false; + } + + get bufferSizeForTesting(): number { + return this.#buffer.length; } } diff --git a/src/telemetry/watchdog/main.ts b/src/telemetry/watchdog/main.ts index 2750d031..60536b58 100644 --- a/src/telemetry/watchdog/main.ts +++ b/src/telemetry/watchdog/main.ts @@ -15,21 +15,76 @@ import {WatchdogMessageType} from '../types.js'; import {ClearcutSender} from './clearcut-sender.js'; -function main() { +interface WatchdogArgs { + // Required arguments + parentPid: number; + appVersion: string; + osType: OsType; + // Optional arguments + logFile?: string; + clearcutEndpoint?: string; + clearcutForceFlushIntervalMs?: number; + clearcutIncludePidHeader?: boolean; +} + +function parseWatchdogArgs(): WatchdogArgs { const {values} = parseArgs({ options: { 'parent-pid': {type: 'string'}, 'app-version': {type: 'string'}, 'os-type': {type: 'string'}, 'log-file': {type: 'string'}, + 'clearcut-endpoint': {type: 'string'}, + 'clearcut-force-flush-interval-ms': {type: 'string'}, + 'clearcut-include-pid-header': {type: 'boolean'}, }, strict: true, }); - + // Verify required arguments const parentPid = parseInt(values['parent-pid'] ?? '', 10); const appVersion = values['app-version']; const osType = parseInt(values['os-type'] ?? '', 10); + if (isNaN(parentPid) || !appVersion || isNaN(osType)) { + console.error( + 'Invalid arguments provided for watchdog process: ', + JSON.stringify({parentPid, appVersion, osType}), + ); + process.exit(1); + } + + // Parse Optional Arguments const logFile = values['log-file']; + const clearcutEndpoint = values['clearcut-endpoint']; + const clearcutIncludePidHeader = values['clearcut-include-pid-header']; + let clearcutForceFlushIntervalMs: number | undefined; + if (values['clearcut-force-flush-interval-ms']) { + const parsed = parseInt(values['clearcut-force-flush-interval-ms'], 10); + if (!isNaN(parsed)) { + clearcutForceFlushIntervalMs = parsed; + } + } + + return { + parentPid, + appVersion, + osType, + logFile, + clearcutEndpoint, + clearcutForceFlushIntervalMs, + clearcutIncludePidHeader, + }; +} + +function main() { + const { + parentPid, + appVersion, + osType, + logFile, + clearcutEndpoint, + clearcutForceFlushIntervalMs, + clearcutIncludePidHeader, + } = parseWatchdogArgs(); let logStream: WriteStream | undefined; if (logFile) { logStream = saveLogsToFile(logFile); @@ -45,15 +100,6 @@ function main() { }); }; - if (isNaN(parentPid) || !appVersion || isNaN(osType)) { - logger( - 'Invalid arguments provided for watchdog process: ', - JSON.stringify({parentPid, appVersion, osType}), - ); - exit(1); - return; - } - logger( 'Watchdog started', JSON.stringify( @@ -68,7 +114,13 @@ function main() { ), ); - const sender = new ClearcutSender(appVersion, osType as OsType); + const sender = new ClearcutSender({ + appVersion, + osType: osType, + clearcutEndpoint, + forceFlushIntervalMs: clearcutForceFlushIntervalMs, + includePidHeader: clearcutIncludePidHeader, + }); let isShuttingDown = false; function onParentDeath(reason: string) { @@ -107,9 +159,7 @@ function main() { const msg = JSON.parse(line); if (msg.type === WatchdogMessageType.LOG_EVENT && msg.payload) { - sender.send(msg.payload).catch(err => { - logger('Error sending event', err); - }); + sender.enqueueEvent(msg.payload); } } catch (err) { logger('Failed to parse IPC message', err); diff --git a/tests/e2e/telemetry.test.ts b/tests/e2e/telemetry.test.ts index 0156d8f7..b5768a6b 100644 --- a/tests/e2e/telemetry.test.ts +++ b/tests/e2e/telemetry.test.ts @@ -6,75 +6,136 @@ import assert from 'node:assert'; import {spawn, type ChildProcess, type SpawnOptions} from 'node:child_process'; -import fs from 'node:fs'; -import os from 'node:os'; +import http from 'node:http'; +import type {AddressInfo} from 'node:net'; import path from 'node:path'; import {describe, it} from 'node:test'; +import type {ChromeDevToolsMcpExtension} from '../../src/telemetry/types'; + const SERVER_PATH = path.resolve('build/src/main.js'); -const WATCHDOG_START_PATTERN = /Watchdog started[\s\S]*?"pid":\s*(\d+)/; -const SHUTDOWN_PATTERN = /server_shutdown/; -const PARENT_DEATH_PATTERN = /Parent death detected/; -interface TestContext { - logFile: string; - process?: ChildProcess; +interface MockServerContext { + server: http.Server; + port: number; + events: ChromeDevToolsMcpExtension[]; watchdogPid?: number; + waitForEvent: ( + predicate: (event: ChromeDevToolsMcpExtension) => boolean, + ) => Promise; } -async function waitForLogPattern( - logFile: string, - pattern: RegExp, - timeoutMs = 10000, -): Promise { - const startTime = Date.now(); - while (Date.now() - startTime < timeoutMs) { - if (fs.existsSync(logFile)) { - const content = fs.readFileSync(logFile, 'utf8'); - const match = content.match(pattern); - if (match) { - return match; +async function startMockServer(): Promise { + const events: ChromeDevToolsMcpExtension[] = []; + let waitingResolvers: Array<{ + predicate: (event: ChromeDevToolsMcpExtension) => boolean; + resolve: (event: ChromeDevToolsMcpExtension) => void; + }> = []; + let watchdogPid: number | undefined; + + const server = http.createServer((req, res) => { + if (req.method === 'POST') { + const pidHeader = req.headers['x-watchdog-pid']; + if (pidHeader && !Array.isArray(pidHeader)) { + watchdogPid = parseInt(pidHeader, 10); } + + let body = ''; + req.on('data', chunk => { + body += chunk.toString(); + }); + req.on('end', () => { + try { + const parsed = JSON.parse(body); + // Extract internal log events + if (parsed.log_event) { + for (const logEvent of parsed.log_event) { + if (logEvent.source_extension_json) { + const ext = JSON.parse( + logEvent.source_extension_json, + ) as ChromeDevToolsMcpExtension; + events.push(ext); + + // Check if any waiters are satisfied + waitingResolvers = waitingResolvers.filter( + ({predicate, resolve}) => { + if (predicate(ext)) { + resolve(ext); + return false; + } + return true; + }, + ); + } + } + } + } catch (err) { + console.error('Failed to parse mock server request', err); + } + res.writeHead(200, {'Content-Type': 'application/json'}); + res.end(JSON.stringify({next_request_wait_millis: 100})); + }); + } else { + res.writeHead(404); + res.end(); } - await new Promise(resolve => setTimeout(resolve, 50)); + }); + + await new Promise(resolve => { + server.listen(0, '127.0.0.1', () => resolve()); + }); + + const address = server.address() as AddressInfo; + return { + server, + port: address.port, + events, + get watchdogPid() { + return watchdogPid; + }, + waitForEvent: predicate => { + const existing = events.find(predicate); + if (existing) { + return Promise.resolve(existing); + } + + return new Promise(resolve => { + waitingResolvers.push({predicate, resolve}); + }); + }, + }; +} + +interface TestContext { + process?: ChildProcess; + mockServer?: MockServerContext; +} + +function isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; } - throw new Error(`Timeout waiting for pattern: ${pattern}`); } async function waitForProcessExit( pid: number, - timeoutMs = 10000, + timeoutMs = 5000, ): Promise { const startTime = Date.now(); - return new Promise((resolve, reject) => { - const checkInterval = setInterval(() => { - try { - process.kill(pid, 0); - if (Date.now() - startTime > timeoutMs) { - clearInterval(checkInterval); - try { - process.kill(pid, 'SIGKILL'); - } catch { - // ignore - } - reject(new Error(`Timeout waiting for process ${pid} to exit`)); - } - } catch { - clearInterval(checkInterval); - resolve(); - } - }, 50); - }); -} - -function createLogFilePath(testName: string): string { - return path.join( - os.tmpdir(), - `test-mcp-telemetry-${testName}-${Date.now()}-${Math.random().toString(36).slice(2)}.log`, - ); + while (Date.now() - startTime < timeoutMs) { + if (!isProcessAlive(pid)) { + return; + } + await new Promise(resolve => setTimeout(resolve, 50)); + } + throw new Error(`Timeout waiting for process ${pid} to exit`); } function cleanupTest(ctx: TestContext): void { + // Kill Main Process if (ctx.process && ctx.process.exitCode === null) { try { ctx.process.kill('SIGKILL'); @@ -82,30 +143,28 @@ function cleanupTest(ctx: TestContext): void { // ignore } } - if (ctx.watchdogPid) { + // Kill Watchdog Process + if (ctx.mockServer?.watchdogPid) { try { - process.kill(ctx.watchdogPid, 'SIGKILL'); + process.kill(ctx.mockServer.watchdogPid, 'SIGKILL'); } catch { // ignore } } - if (ctx.logFile && fs.existsSync(ctx.logFile)) { - try { - fs.unlinkSync(ctx.logFile); - } catch { - // ignore - } + // Stop Mock Server + if (ctx.mockServer) { + ctx.mockServer.server.close(); } } describe('Telemetry E2E', () => { async function runTelemetryTest( killFn: (ctx: TestContext) => void, - testName: string, spawnOptions?: SpawnOptions, ): Promise { + const mockContext = await startMockServer(); const ctx: TestContext = { - logFile: createLogFilePath(testName), + mockServer: mockContext, }; try { @@ -113,40 +172,60 @@ describe('Telemetry E2E', () => { process.execPath, [ SERVER_PATH, - `--log-file=${ctx.logFile}`, '--usage-statistics', '--headless', + `--clearcutEndpoint=http://127.0.0.1:${mockContext.port}`, + '--clearcutForceFlushIntervalMs=10', + '--clearcutIncludePidHeader', ], { stdio: ['pipe', 'pipe', 'pipe'], + env: { + ...process.env, + }, ...spawnOptions, }, ); - const match = await waitForLogPattern( - ctx.logFile, - WATCHDOG_START_PATTERN, + const startEvent = await Promise.race([ + mockContext.waitForEvent(e => e.server_start !== undefined), + new Promise((_, reject) => + setTimeout( + () => reject(new Error('Timeout waiting for server_start')), + 10000, + ), + ), + ]); + assert.ok(startEvent, 'server_start event not received'); + + // Now that we received an event, we should have the Watchdog PID + const watchdogPid = mockContext.watchdogPid; + assert.ok(watchdogPid, 'Watchdog PID not captured from headers'); + + // Assert Watchdog is actually running + assert.strictEqual( + isProcessAlive(watchdogPid), + true, + 'Watchdog process should be running', ); - assert.ok(match, 'Watchdog start log not found'); - ctx.watchdogPid = parseInt(match[1], 10); - assert.ok(ctx.watchdogPid > 0, 'Invalid watchdog PID'); + // Trigger shutdown killFn(ctx); - await waitForProcessExit(ctx.watchdogPid); - - const shutdownMatch = await waitForLogPattern( - ctx.logFile, - SHUTDOWN_PATTERN, - 2000, - ); - assert.ok(shutdownMatch, 'server_shutdown not logged'); - const deathMatch = await waitForLogPattern( - ctx.logFile, - PARENT_DEATH_PATTERN, - 2000, - ); - assert.ok(deathMatch, 'Parent death not detected'); + // Verify shutdown event + const shutdownEvent = await Promise.race([ + mockContext.waitForEvent(e => e.server_shutdown !== undefined), + new Promise((_, reject) => + setTimeout( + () => reject(new Error('Timeout waiting for server_shutdown')), + 10000, + ), + ), + ]); + assert.ok(shutdownEvent, 'server_shutdown event not received'); + + // Wait for Watchdog to exit naturally + await waitForProcessExit(watchdogPid); } finally { cleanupTest(ctx); } @@ -155,12 +234,12 @@ describe('Telemetry E2E', () => { it('handles SIGKILL', () => runTelemetryTest(ctx => { ctx.process!.kill('SIGKILL'); - }, 'SIGKILL')); + })); it('handles SIGTERM', () => runTelemetryTest(ctx => { ctx.process!.kill('SIGTERM'); - }, 'SIGTERM')); + })); it( 'handles POSIX process group SIGTERM', @@ -170,7 +249,6 @@ describe('Telemetry E2E', () => { ctx => { process.kill(-ctx.process!.pid!, 'SIGTERM'); }, - 'sigterm-group', {detached: true}, ), ); diff --git a/tests/telemetry/watchdog/clearcut-sender.test.ts b/tests/telemetry/watchdog/clearcut-sender.test.ts index 870ea947..8eca6a7f 100644 --- a/tests/telemetry/watchdog/clearcut-sender.test.ts +++ b/tests/telemetry/watchdog/clearcut-sender.test.ts @@ -11,64 +11,441 @@ import {describe, it, afterEach, beforeEach} from 'node:test'; import sinon from 'sinon'; import {OsType} from '../../../src/telemetry/types.js'; +import type {LogRequest} from '../../../src/telemetry/types.js'; import {ClearcutSender} from '../../../src/telemetry/watchdog/clearcut-sender.js'; +const FLUSH_INTERVAL_MS = 15 * 1000; + describe('ClearcutSender', () => { - let clock: sinon.SinonFakeTimers; let randomUUIDStub: sinon.SinonStub; + let fetchStub: sinon.SinonStub; + let clock: sinon.SinonFakeTimers; beforeEach(() => { - clock = sinon.useFakeTimers(); + clock = sinon.useFakeTimers({ + now: Date.now(), + toFake: ['setTimeout', 'clearTimeout', 'Date'], + }); + let uuidCounter = 0; randomUUIDStub = sinon.stub(crypto, 'randomUUID').callsFake(() => { return `uuid-${++uuidCounter}` as ReturnType; }); + fetchStub = sinon.stub(global, 'fetch'); + fetchStub.resolves(new Response(JSON.stringify({}), {status: 200})); }); afterEach(() => { - clock.restore(); randomUUIDStub.restore(); + fetchStub.restore(); + clock.restore(); sinon.restore(); }); it('enriches events with app version, os type, and session id', async () => { - const sender = new ClearcutSender('1.0.0', OsType.OS_TYPE_MACOS); - const transportStub = sinon.stub(sender, 'transport'); + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); - await sender.send({mcp_client: undefined}); + sender.enqueueEvent({mcp_client: undefined}); + assert.strictEqual(sender.bufferSizeForTesting, 1); - assert.strictEqual(transportStub.callCount, 1); - const event = transportStub.firstCall.args[0]; + await clock.tickAsync(FLUSH_INTERVAL_MS); + sender.stopForTesting(); + + assert.strictEqual(fetchStub.callCount, 1); + const requestBody = JSON.parse( + fetchStub.firstCall.args[1].body, + ) as LogRequest; + const event = JSON.parse(requestBody.log_event[0].source_extension_json); assert.strictEqual(event.session_id, 'uuid-1'); assert.strictEqual(event.app_version, '1.0.0'); assert.strictEqual(event.os_type, OsType.OS_TYPE_MACOS); }); - it('rotates session ID after 24 hours', async () => { - const sender = new ClearcutSender('1.0.0', OsType.OS_TYPE_MACOS); - const transportStub = sinon.stub(sender, 'transport'); + it('accumulates events in buffer without immediate send', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + + sender.enqueueEvent({ + tool_invocation: {tool_name: 'test1', success: true, latency_ms: 100}, + }); + sender.enqueueEvent({ + tool_invocation: {tool_name: 'test2', success: true, latency_ms: 200}, + }); + + assert.strictEqual(sender.bufferSizeForTesting, 2); + assert.strictEqual(fetchStub.callCount, 0); + + sender.stopForTesting(); + }); + + it('sends correct LogRequest format', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + + sender.enqueueEvent({ + tool_invocation: {tool_name: 'test', success: true, latency_ms: 100}, + }); + + await clock.tickAsync(FLUSH_INTERVAL_MS); + sender.stopForTesting(); + + const [url, options] = fetchStub.firstCall.args; + assert.strictEqual( + url, + 'https://play.googleapis.com/log?format=json_proto', + ); + assert.strictEqual(options.method, 'POST'); + assert.strictEqual(options.headers['Content-Type'], 'application/json'); + + const body = JSON.parse(options.body) as LogRequest; + assert.strictEqual(body.log_source, 2839); + assert.strictEqual(body.client_info.client_type, 47); + assert.ok(body.request_time_ms); + }); + + it('clears buffer on successful send', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + + sender.enqueueEvent({}); + sender.enqueueEvent({}); + assert.strictEqual(sender.bufferSizeForTesting, 2); + + await clock.tickAsync(FLUSH_INTERVAL_MS); + sender.stopForTesting(); + assert.strictEqual(sender.bufferSizeForTesting, 0); + }); + + it('keeps events in buffer on transient 5xx error', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + fetchStub.resolves(new Response('Server Error', {status: 500})); + + sender.enqueueEvent({}); + await clock.tickAsync(FLUSH_INTERVAL_MS); + sender.stopForTesting(); + + assert.strictEqual(sender.bufferSizeForTesting, 1); + }); - await sender.send({}); - assert.strictEqual(transportStub.lastCall.args[0].session_id, 'uuid-1'); + it('keeps events in buffer on transient 429 error', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + fetchStub.resolves(new Response('Too Many Requests', {status: 429})); - clock.tick(23 * 60 * 60 * 1000); - await sender.send({}); - assert.strictEqual(transportStub.lastCall.args[0].session_id, 'uuid-1'); + sender.enqueueEvent({}); + await clock.tickAsync(FLUSH_INTERVAL_MS); + sender.stopForTesting(); - clock.tick(2 * 60 * 60 * 1000); - await sender.send({}); - assert.strictEqual(transportStub.lastCall.args[0].session_id, 'uuid-2'); + assert.strictEqual(sender.bufferSizeForTesting, 1); }); - it('sendShutdownEvent sends a server_shutdown event', async () => { - const sender = new ClearcutSender('1.0.0', OsType.OS_TYPE_MACOS); - const transportStub = sinon.stub(sender, 'transport'); + it('drops batch on permanent 4xx error', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + fetchStub.resolves(new Response('Bad Request', {status: 400})); + + sender.enqueueEvent({}); + await clock.tickAsync(FLUSH_INTERVAL_MS); + sender.stopForTesting(); + + assert.strictEqual(sender.bufferSizeForTesting, 0); + }); + + it('keeps events in buffer on network error', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + fetchStub.rejects(new Error('Network error')); + + sender.enqueueEvent({}); + await clock.tickAsync(FLUSH_INTERVAL_MS); + sender.stopForTesting(); + + assert.strictEqual(sender.bufferSizeForTesting, 1); + }); + + it('sendShutdownEvent sends an immediate server_shutdown event', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); await sender.sendShutdownEvent(); - const event = transportStub.firstCall.args[0]; + assert.strictEqual(fetchStub.callCount, 1); + const requestBody = JSON.parse( + fetchStub.firstCall.args[1].body, + ) as LogRequest; + const event = JSON.parse(requestBody.log_event[0].source_extension_json); + assert.ok(event.server_shutdown); - assert.strictEqual(event.server_start, undefined); + }); + + it('shutdown includes buffered events', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + + sender.enqueueEvent({ + tool_invocation: {tool_name: 'test', success: true, latency_ms: 100}, + }); + await sender.sendShutdownEvent(); + + const requestBody = JSON.parse( + fetchStub.firstCall.args[1].body, + ) as LogRequest; + assert.strictEqual(requestBody.log_event.length, 2); + }); + + it('correctly handles buffer overflow during queued flush', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + + sender.enqueueEvent({ + tool_invocation: {tool_name: 'initial', success: true, latency_ms: 100}, + }); + let resolveRequest: (value: Response) => void; + + fetchStub.onFirstCall().returns( + new Promise(resolve => { + resolveRequest = resolve; + }), + ); + + clock.tick(FLUSH_INTERVAL_MS); + + for (let i = 0; i < 1100; i++) { + sender.enqueueEvent({ + tool_invocation: { + tool_name: `overflow-${i}`, + success: true, + latency_ms: 100, + }, + }); + } + + assert.strictEqual(sender.bufferSizeForTesting, 1000); + + resolveRequest!(new Response(JSON.stringify({}), {status: 200})); + + assert.strictEqual(sender.bufferSizeForTesting, 1000); + + sender.stopForTesting(); + }); + + it('does not duplicate events when shutdown occurs during an active flush', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + sender.enqueueEvent({ + tool_invocation: { + tool_name: 'test-event', + success: true, + latency_ms: 100, + }, + }); + + let resolveFirstRequest: (value: Response) => void; + fetchStub.onFirstCall().returns( + new Promise(resolve => { + resolveFirstRequest = resolve; + }), + ); + + clock.tick(FLUSH_INTERVAL_MS); + + const shutdownPromise = sender.sendShutdownEvent(); + + resolveFirstRequest!(new Response(JSON.stringify({}), {status: 200})); + await shutdownPromise; + + assert.strictEqual(fetchStub.callCount, 2); + const firstBody = JSON.parse(fetchStub.args[0][1].body) as LogRequest; + const secondBody = JSON.parse(fetchStub.args[1][1].body) as LogRequest; + + const firstEvents = firstBody.log_event.map(e => + JSON.parse(e.source_extension_json), + ); + const secondEvents = secondBody.log_event.map(e => + JSON.parse(e.source_extension_json), + ); + + assert.strictEqual(firstEvents.length, 1); + assert.strictEqual(firstEvents[0].tool_invocation?.tool_name, 'test-event'); + + assert.strictEqual( + secondEvents.length, + 1, + 'Shutdown request should only contain shutdown event', + ); + assert.ok( + secondEvents[0].server_shutdown, + 'Shutdown request should contain server_shutdown', + ); + + sender.stopForTesting(); + }); + + it('rotates session id after 24 hours', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + + sender.enqueueEvent({ + tool_invocation: {tool_name: 'test1', success: true, latency_ms: 10}, + }); + await clock.tickAsync(FLUSH_INTERVAL_MS); + + const firstCallBody = JSON.parse( + fetchStub.firstCall.args[1].body, + ) as LogRequest; + const firstEvent = JSON.parse( + firstCallBody.log_event[0].source_extension_json, + ); + const firstSessionId = firstEvent.session_id; + + const SESSION_ROTATION_INTERVAL_MS = 24 * 60 * 60 * 1000; + await clock.tickAsync( + SESSION_ROTATION_INTERVAL_MS - FLUSH_INTERVAL_MS + 1000, + ); + + sender.enqueueEvent({ + tool_invocation: {tool_name: 'test2', success: true, latency_ms: 10}, + }); + await clock.tickAsync(FLUSH_INTERVAL_MS); + + const secondCallBody = JSON.parse( + fetchStub.secondCall.args[1].body, + ) as LogRequest; + const secondEvent = JSON.parse( + secondCallBody.log_event[0].source_extension_json, + ); + const secondSessionId = secondEvent.session_id; + + assert.notStrictEqual(firstSessionId, secondSessionId); + + sender.stopForTesting(); + }); + + it('respects next_request_wait_millis from server', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + + fetchStub.resolves( + new Response( + JSON.stringify({ + next_request_wait_millis: 45000, + }), + {status: 200}, + ), + ); + + sender.enqueueEvent({}); + await clock.tickAsync(FLUSH_INTERVAL_MS); + + fetchStub.resetHistory(); + + sender.enqueueEvent({}); + + await clock.tickAsync(44000); + assert.strictEqual( + fetchStub.callCount, + 0, + 'Should not flush before wait time', + ); + + await clock.tickAsync(1000); + assert.strictEqual(fetchStub.callCount, 1, 'Should flush after wait time'); + + sender.stopForTesting(); + }); + + it('aborts request after timeout', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + const REQUEST_TIMEOUT_MS = 30000; + + let fetchSignal: AbortSignal | undefined; + fetchStub.callsFake((_url, options) => { + fetchSignal = options.signal; + return new Promise(() => { + // Hangs forever + }); + }); + + sender.enqueueEvent({}); + + await clock.tickAsync(FLUSH_INTERVAL_MS); + await clock.tickAsync(REQUEST_TIMEOUT_MS); + + assert.ok(fetchSignal, 'Fetch should have been called with a signal'); + assert.strictEqual( + fetchSignal.aborted, + true, + 'Signal should be aborted after timeout', + ); + + sender.stopForTesting(); + }); + + it('resolves sendShutdownEvent after timeout if flush hangs', async () => { + const sender = new ClearcutSender({ + appVersion: '1.0.0', + osType: OsType.OS_TYPE_MACOS, + forceFlushIntervalMs: FLUSH_INTERVAL_MS, + }); + fetchStub.returns( + new Promise(() => { + // Hangs forever + }), + ); + + const shutdownPromise = sender.sendShutdownEvent(); + + await clock.tickAsync(5000); + + await shutdownPromise; }); });