Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 139 additions & 5 deletions worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,101 @@ async function handleWebSocketWithDispatcher(
});
}, 10_000); // Every 10 seconds


// Pipe: client → container (upstream, no interception needed)
serverWs.addEventListener('message', (event) => {
if (containerWs.readyState === WebSocket.READY_STATE_OPEN) {
containerWs.send(event.data);
}
});

// Queue for messages while dispatcher is reconnecting
const messageQueue: DispatcherTranscriptionMessage[] = [];
let reconnectAttempts = 0;
let isReconnecting = false;
let sessionClosed = false;

// Function to connect/reconnect to dispatcher DO
async function connectToDispatcher(allowDuringClose = false): Promise<WebSocket | null> {
if (!env.DISPATCHER_DO || (sessionClosed && !allowDuringClose)) return null;

try {
const doId = env.DISPATCHER_DO.idFromName('global');
const stub = env.DISPATCHER_DO.get(doId);

const upgradeRequest = new Request('http://dispatcher/websocket', {
headers: { 'Upgrade': 'websocket' },
});
const doResponse = await stub.fetch(upgradeRequest);

if (doResponse.webSocket) {
doResponse.webSocket.accept();
console.log(`Connected to Dispatcher DO via WebSocket for session: ${sessionId}`);
reconnectAttempts = 0;
return doResponse.webSocket;
}
} catch (error) {
const msg = error instanceof Error ? error.message : String(error);
console.error(`Failed to connect to Dispatcher DO: ${msg}, sessionId=${sessionId}`);
}
return null;
}

// Function to flush queued messages
function flushMessageQueue() {
if (messageQueue.length > 0 && dispatcherWs?.readyState === WebSocket.READY_STATE_OPEN) {
console.log(`Flushing ${messageQueue.length} queued messages to dispatcher, sessionId=${sessionId}`);
while (messageQueue.length > 0) {
const msg = messageQueue.shift()!;
dispatcherWs.send(JSON.stringify(msg));
}
}
}

// Function to schedule reconnection with exponential backoff
function scheduleReconnect() {
if (isReconnecting || sessionClosed) return;
isReconnecting = true;

const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000);
reconnectAttempts++;

console.log(`Scheduling dispatcher reconnect in ${delay}ms (attempt ${reconnectAttempts}), sessionId=${sessionId}, queued=${messageQueue.length}`);

setTimeout(async () => {
isReconnecting = false;
if (sessionClosed) return;

const newWs = await connectToDispatcher();
if (newWs) {
dispatcherWs = newWs;
setupDispatcherHandlers();
flushMessageQueue();
} else {
scheduleReconnect();
}
}, delay);
}

// Setup close/error handlers for dispatcher WebSocket
function setupDispatcherHandlers() {
if (!dispatcherWs) return;

dispatcherWs.addEventListener('close', (event) => {
console.error(`Dispatcher WebSocket closed: code=${event.code}, reason=${event.reason || 'none'}, sessionId=${sessionId}`);
if (!sessionClosed) {
scheduleReconnect();
}
});

dispatcherWs.addEventListener('error', () => {
console.error(`Dispatcher WebSocket error, sessionId=${sessionId}`);
});
}

// Setup handlers for initial connection
setupDispatcherHandlers();

// Pipe: container → client (downstream, intercept for dispatcher)
containerWs.addEventListener('message', (event) => {
// Forward to client immediately
Expand All @@ -272,7 +360,7 @@ async function handleWebSocketWithDispatcher(
}

// Dispatch transcriptions via DO WebSocket
if (dispatcherWs && dispatcherWs.readyState === WebSocket.READY_STATE_OPEN && typeof event.data === 'string') {
if (typeof event.data === 'string') {
try {
const data = JSON.parse(event.data) as TranscriptionMessage;
if (data.type === 'transcription-result' && !data.is_interim) {
Expand All @@ -283,18 +371,64 @@ async function handleWebSocketWithDispatcher(
timestamp: data.timestamp,
language: data.language,
};
dispatcherWs.send(JSON.stringify(dispatcherMessage));

if (dispatcherWs?.readyState === WebSocket.READY_STATE_OPEN) {
dispatcherWs.send(JSON.stringify(dispatcherMessage));
} else if (env.DISPATCHER_DO) {
// Queue message while disconnected
messageQueue.push(dispatcherMessage);
if (messageQueue.length === 1) {
console.log(`Dispatcher not connected, queueing messages, sessionId=${sessionId}`);
}
if (!isReconnecting && (!dispatcherWs || dispatcherWs.readyState !== WebSocket.READY_STATE_OPEN)) {
scheduleReconnect();
}
}
}
} catch {
// Not JSON or parse error - ignore, still forwarded to client
} catch (error) {
const msg = error instanceof Error ? error.message : String(error);
console.error(`Failed to parse/dispatch transcription: ${msg}, sessionId=${sessionId}`);
}
}
});

// Flush remaining messages before closing session
async function flushBeforeClose(): Promise<void> {
if (messageQueue.length === 0) return;

console.log(`Flushing ${messageQueue.length} queued messages before session close, sessionId=${sessionId}`);

// Try to reconnect if not connected (allow during close)
if (!dispatcherWs || dispatcherWs.readyState !== WebSocket.READY_STATE_OPEN) {
const newWs = await connectToDispatcher(true);
if (newWs) {
dispatcherWs = newWs;
}
}

// Flush if connected
if (dispatcherWs?.readyState === WebSocket.READY_STATE_OPEN) {
while (messageQueue.length > 0) {
const msg = messageQueue.shift()!;
dispatcherWs.send(JSON.stringify(msg));
}
console.log(`Successfully flushed all queued messages, sessionId=${sessionId}`);
} else {
console.error(`Failed to flush ${messageQueue.length} queued messages - dispatcher not connected, sessionId=${sessionId}`);
}
}

// Handle close events
serverWs.addEventListener('close', (event) => {
sessionClosed = true;
clearInterval(keepAliveInterval);
console.log(`Client WebSocket closed: code=${event.code}, reason=${event.reason || 'none'}`);
console.log(`Client WebSocket closed: code=${event.code}, reason=${event.reason || 'none'}, sessionId=${sessionId}`);

// Flush queued messages in background before fully closing
if (messageQueue.length > 0) {
ctx.waitUntil(flushBeforeClose());
}

if (containerWs.readyState === WebSocket.READY_STATE_OPEN || containerWs.readyState === WebSocket.READY_STATE_CONNECTING) {
containerWs.close(event.code, event.reason);
}
Expand Down