Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ See README.md for complete configuration reference.
- Prometheus metrics exported to OTLP HTTP endpoint
- Tracks: active sessions, audio bytes, transcription latency, backend errors
- Only enabled when `OTLP_ENDPOINT` is set
- Each container instance is differentiated via `CLOUDFLARE_DURABLE_OBJECT_ID` (falls back to random UUID for local dev). Metrics use `service.instance.id` (standard OTEL, Mimir-friendly). Logs use `runId` (custom name to avoid Loki auto-indexing it as a high-cardinality label).
- Container location is tagged via `city` (from `CLOUDFLARE_LOCATION`) and `country` (from `CLOUDFLARE_COUNTRY_A2`)

**Logging** (`src/logger.ts`)
- Uses Winston with OTLP logs transport
Expand Down
54 changes: 45 additions & 9 deletions src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,57 @@ let loggerProvider: LoggerProvider | null = null;
let meter: Meter | null = null;

/**
* Create shared resource with common attributes for both metrics and logs.
* Build the base resource attributes shared by metrics and logs.
*/
function createResource(): Resource {
const resourceAttributes: Record<string, string> = {
function baseResourceAttributes(): Record<string, string> {
const attrs: Record<string, string> = {
[ATTR_SERVICE_NAME]: 'opus-transcriber-proxy',
};

if (config.otlp.env) {
resourceAttributes[ATTR_DEPLOYMENT_ENVIRONMENT] = config.otlp.env;
resourceAttributes['env'] = config.otlp.env;
attrs[ATTR_DEPLOYMENT_ENVIRONMENT] = config.otlp.env;
attrs['env'] = config.otlp.env;
}

Object.assign(resourceAttributes, config.otlp.resourceAttributes);
// Cloudflare container location context (injected automatically by CF runtime)
if (process.env.CLOUDFLARE_LOCATION) {
attrs['city'] = process.env.CLOUDFLARE_LOCATION;
}
if (process.env.CLOUDFLARE_COUNTRY_A2) {
attrs['country'] = process.env.CLOUDFLARE_COUNTRY_A2;
}

Object.assign(attrs, config.otlp.resourceAttributes);

return attrs;
}

// Instance ID generated once per process — shared between metrics and logs resources.
// CONTAINER_INSTANCE_NAME is the human-readable name passed to getContainer() by the worker
// (equals the session/meeting ID in session routing mode). Falls back to DO hex hash or random UUID.
const instanceId = process.env.CONTAINER_INSTANCE_NAME || process.env.CLOUDFLARE_DURABLE_OBJECT_ID || crypto.randomUUID();

return new Resource(resourceAttributes);
/**
* Resource for metrics (Mimir). Uses the standard 'service.instance.id' attribute
* so each container produces unique series for correct aggregation.
*/
function createMetricsResource(): Resource {
const attrs = baseResourceAttributes();
attrs['service.instance.id'] = instanceId;
return new Resource(attrs);
}

/**
* Resource for logs (Loki). Sets a static 'service.instance.id' to override the OTEL SDK's
* auto-detected value. This prevents Loki from creating a unique indexed stream per container
* (high-cardinality explosion). The per-container identity is stored in 'runId' instead — a
* name chosen to avoid Loki auto-indexing (unlike 'session_id' or 'container_id').
*/
function createLogsResource(): Resource {
const attrs = baseResourceAttributes();
attrs['service.instance.id'] = 'opus-transcriber-proxy';
attrs['runId'] = instanceId;
return new Resource(attrs);
}

/**
Expand All @@ -57,7 +93,7 @@ export function initTelemetry(): void {

logger.info(`Initializing OpenTelemetry metrics, endpoint=${config.otlp.endpoint}`);

const resource = createResource();
const resource = createMetricsResource();

// Create OTLP exporter
const exporterConfig: { url: string; headers?: Record<string, string> } = {
Expand Down Expand Up @@ -137,7 +173,7 @@ export function initTelemetryLogs(): void {
// Use console.log to avoid circular dependency with logger
console.log(`Initializing OpenTelemetry logs, endpoint=${config.otlp.endpoint}/v1/logs`);

const resource = createResource();
const resource = createLogsResource();

const logExporterConfig: { url: string; headers?: Record<string, string> } = {
url: `${config.otlp.endpoint}/v1/logs`,
Expand Down
69 changes: 40 additions & 29 deletions worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,41 @@ interface TranscriptionMessage {
language?: string;
}

/**
* Build the container env vars from the worker env bindings.
* Shared between the class-level default and startAndWaitForPorts() override
* (which adds CONTAINER_INSTANCE_NAME — not available at class init time).
*/
function buildContainerEnvVars(env: Env): Record<string, string> {
return {
OPENAI_API_KEY: env.OPENAI_API_KEY,
OPENAI_MODEL: env.OPENAI_MODEL || 'gpt-4o-transcribe',
GEMINI_API_KEY: env.GEMINI_API_KEY || '',
DEEPGRAM_API_KEY: env.DEEPGRAM_API_KEY || '',
DEEPGRAM_MODEL: env.DEEPGRAM_MODEL || 'nova-3-general',
DEEPGRAM_DETECT_LANGUAGE: env.DEEPGRAM_DETECT_LANGUAGE || 'true',
DEEPGRAM_INCLUDE_LANGUAGE: env.DEEPGRAM_INCLUDE_LANGUAGE || 'false',
DEEPGRAM_PUNCTUATE: env.DEEPGRAM_PUNCTUATE || 'true',
DEEPGRAM_ENCODING: env.DEEPGRAM_ENCODING || 'opus',
DEEPGRAM_TAGS: env.DEEPGRAM_TAGS || '',
PROVIDERS_PRIORITY: env.PROVIDERS_PRIORITY || 'openai',
FORCE_COMMIT_TIMEOUT: env.FORCE_COMMIT_TIMEOUT || '2',
DEBUG: env.DEBUG || 'true',
ROUTING_MODE: env.ROUTING_MODE || 'session',
CONTAINER_POOL_SIZE: env.CONTAINER_POOL_SIZE || '5',
MAX_CONNECTIONS_PER_CONTAINER: env.MAX_CONNECTIONS_PER_CONTAINER || '10',
MIN_CONTAINERS: env.MIN_CONTAINERS || '2',
SCALE_DOWN_IDLE_TIME: env.SCALE_DOWN_IDLE_TIME || '600000',
TRANSLATION_MIXING_MODE: env.TRANSLATION_MIXING_MODE || 'true',
OTLP_ENDPOINT: env.OTLP_ENDPOINT || '',
OTLP_ENV: env.OTLP_ENV || '',
OTLP_RESOURCE_ATTRIBUTES: env.OTLP_RESOURCE_ATTRIBUTES || '',
OTLP_HEADERS: env.OTLP_HEADERS || '',
PORT: '8080',
HOST: '0.0.0.0',
};
}

/**
* TranscriberContainer wraps the Node.js transcription server
* and forwards WebSocket requests to it.
Expand All @@ -52,35 +87,8 @@ export class TranscriberContainer extends Container<Env> {
// For session-based routing: Keep this shorter (containers are session-specific)
sleepAfter = this.env.SLEEP_AFTER || '1m';

// Pass environment variables to the container
envVars: Record<string, string> = {
// These will be available as process.env in the container
OPENAI_API_KEY: this.env.OPENAI_API_KEY,
OPENAI_MODEL: this.env.OPENAI_MODEL || 'gpt-4o-transcribe',
GEMINI_API_KEY: this.env.GEMINI_API_KEY || '',
DEEPGRAM_API_KEY: this.env.DEEPGRAM_API_KEY || '',
DEEPGRAM_MODEL: this.env.DEEPGRAM_MODEL || 'nova-3-general',
DEEPGRAM_DETECT_LANGUAGE: this.env.DEEPGRAM_DETECT_LANGUAGE || 'true',
DEEPGRAM_INCLUDE_LANGUAGE: this.env.DEEPGRAM_INCLUDE_LANGUAGE || 'false',
DEEPGRAM_PUNCTUATE: this.env.DEEPGRAM_PUNCTUATE || 'true',
DEEPGRAM_ENCODING: this.env.DEEPGRAM_ENCODING || 'opus',
DEEPGRAM_TAGS: this.env.DEEPGRAM_TAGS || '',
PROVIDERS_PRIORITY: this.env.PROVIDERS_PRIORITY || 'openai',
FORCE_COMMIT_TIMEOUT: this.env.FORCE_COMMIT_TIMEOUT || '2',
DEBUG: this.env.DEBUG || 'true',
ROUTING_MODE: this.env.ROUTING_MODE || 'session',
CONTAINER_POOL_SIZE: this.env.CONTAINER_POOL_SIZE || '5',
MAX_CONNECTIONS_PER_CONTAINER: this.env.MAX_CONNECTIONS_PER_CONTAINER || '10',
MIN_CONTAINERS: this.env.MIN_CONTAINERS || '2',
SCALE_DOWN_IDLE_TIME: this.env.SCALE_DOWN_IDLE_TIME || '600000',
TRANSLATION_MIXING_MODE: this.env.TRANSLATION_MIXING_MODE || 'true',
OTLP_ENDPOINT: this.env.OTLP_ENDPOINT || '',
OTLP_ENV: this.env.OTLP_ENV || '',
OTLP_RESOURCE_ATTRIBUTES: this.env.OTLP_RESOURCE_ATTRIBUTES || '',
OTLP_HEADERS: this.env.OTLP_HEADERS || '',
PORT: '8080',
HOST: '0.0.0.0',
};
// Pass environment variables to the container (default, without instance name)
envVars: Record<string, string> = buildContainerEnvVars(this.env);

override onStart() {
console.log('Transcriber container started');
Expand Down Expand Up @@ -513,6 +521,9 @@ export default {
// This is required for the fetch to work properly
try {
await container.startAndWaitForPorts({
startOptions: {
envVars: { ...buildContainerEnvVars(env), CONTAINER_INSTANCE_NAME: containerInstanceId },
},
cancellationOptions: {
waitInterval: 100, // Poll every 100ms (default: 1000ms)
},
Expand Down
Loading