Skip to content

Commit 1c0e195

Browse files
authored
fix: reconnect to dispatcher DO and flush messages (#64)
1 parent ca765ca commit 1c0e195

File tree

1 file changed

+139
-5
lines changed

1 file changed

+139
-5
lines changed

worker/index.ts

Lines changed: 139 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,101 @@ async function handleWebSocketWithDispatcher(
257257
});
258258
}, 10_000); // Every 10 seconds
259259

260+
260261
// Pipe: client → container (upstream, no interception needed)
261262
serverWs.addEventListener('message', (event) => {
262263
if (containerWs.readyState === WebSocket.READY_STATE_OPEN) {
263264
containerWs.send(event.data);
264265
}
265266
});
266267

268+
// Queue for messages while dispatcher is reconnecting
269+
const messageQueue: DispatcherTranscriptionMessage[] = [];
270+
let reconnectAttempts = 0;
271+
let isReconnecting = false;
272+
let sessionClosed = false;
273+
274+
// Function to connect/reconnect to dispatcher DO
275+
async function connectToDispatcher(allowDuringClose = false): Promise<WebSocket | null> {
276+
if (!env.DISPATCHER_DO || (sessionClosed && !allowDuringClose)) return null;
277+
278+
try {
279+
const doId = env.DISPATCHER_DO.idFromName('global');
280+
const stub = env.DISPATCHER_DO.get(doId);
281+
282+
const upgradeRequest = new Request('http://dispatcher/websocket', {
283+
headers: { 'Upgrade': 'websocket' },
284+
});
285+
const doResponse = await stub.fetch(upgradeRequest);
286+
287+
if (doResponse.webSocket) {
288+
doResponse.webSocket.accept();
289+
console.log(`Connected to Dispatcher DO via WebSocket for session: ${sessionId}`);
290+
reconnectAttempts = 0;
291+
return doResponse.webSocket;
292+
}
293+
} catch (error) {
294+
const msg = error instanceof Error ? error.message : String(error);
295+
console.error(`Failed to connect to Dispatcher DO: ${msg}, sessionId=${sessionId}`);
296+
}
297+
return null;
298+
}
299+
300+
// Function to flush queued messages
301+
function flushMessageQueue() {
302+
if (messageQueue.length > 0 && dispatcherWs?.readyState === WebSocket.READY_STATE_OPEN) {
303+
console.log(`Flushing ${messageQueue.length} queued messages to dispatcher, sessionId=${sessionId}`);
304+
while (messageQueue.length > 0) {
305+
const msg = messageQueue.shift()!;
306+
dispatcherWs.send(JSON.stringify(msg));
307+
}
308+
}
309+
}
310+
311+
// Function to schedule reconnection with exponential backoff
312+
function scheduleReconnect() {
313+
if (isReconnecting || sessionClosed) return;
314+
isReconnecting = true;
315+
316+
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000);
317+
reconnectAttempts++;
318+
319+
console.log(`Scheduling dispatcher reconnect in ${delay}ms (attempt ${reconnectAttempts}), sessionId=${sessionId}, queued=${messageQueue.length}`);
320+
321+
setTimeout(async () => {
322+
isReconnecting = false;
323+
if (sessionClosed) return;
324+
325+
const newWs = await connectToDispatcher();
326+
if (newWs) {
327+
dispatcherWs = newWs;
328+
setupDispatcherHandlers();
329+
flushMessageQueue();
330+
} else {
331+
scheduleReconnect();
332+
}
333+
}, delay);
334+
}
335+
336+
// Setup close/error handlers for dispatcher WebSocket
337+
function setupDispatcherHandlers() {
338+
if (!dispatcherWs) return;
339+
340+
dispatcherWs.addEventListener('close', (event) => {
341+
console.error(`Dispatcher WebSocket closed: code=${event.code}, reason=${event.reason || 'none'}, sessionId=${sessionId}`);
342+
if (!sessionClosed) {
343+
scheduleReconnect();
344+
}
345+
});
346+
347+
dispatcherWs.addEventListener('error', () => {
348+
console.error(`Dispatcher WebSocket error, sessionId=${sessionId}`);
349+
});
350+
}
351+
352+
// Setup handlers for initial connection
353+
setupDispatcherHandlers();
354+
267355
// Pipe: container → client (downstream, intercept for dispatcher)
268356
containerWs.addEventListener('message', (event) => {
269357
// Forward to client immediately
@@ -272,7 +360,7 @@ async function handleWebSocketWithDispatcher(
272360
}
273361

274362
// Dispatch transcriptions via DO WebSocket
275-
if (dispatcherWs && dispatcherWs.readyState === WebSocket.READY_STATE_OPEN && typeof event.data === 'string') {
363+
if (typeof event.data === 'string') {
276364
try {
277365
const data = JSON.parse(event.data) as TranscriptionMessage;
278366
if (data.type === 'transcription-result' && !data.is_interim) {
@@ -283,18 +371,64 @@ async function handleWebSocketWithDispatcher(
283371
timestamp: data.timestamp,
284372
language: data.language,
285373
};
286-
dispatcherWs.send(JSON.stringify(dispatcherMessage));
374+
375+
if (dispatcherWs?.readyState === WebSocket.READY_STATE_OPEN) {
376+
dispatcherWs.send(JSON.stringify(dispatcherMessage));
377+
} else if (env.DISPATCHER_DO) {
378+
// Queue message while disconnected
379+
messageQueue.push(dispatcherMessage);
380+
if (messageQueue.length === 1) {
381+
console.log(`Dispatcher not connected, queueing messages, sessionId=${sessionId}`);
382+
}
383+
if (!isReconnecting && (!dispatcherWs || dispatcherWs.readyState !== WebSocket.READY_STATE_OPEN)) {
384+
scheduleReconnect();
385+
}
386+
}
287387
}
288-
} catch {
289-
// Not JSON or parse error - ignore, still forwarded to client
388+
} catch (error) {
389+
const msg = error instanceof Error ? error.message : String(error);
390+
console.error(`Failed to parse/dispatch transcription: ${msg}, sessionId=${sessionId}`);
290391
}
291392
}
292393
});
293394

395+
// Flush remaining messages before closing session
396+
async function flushBeforeClose(): Promise<void> {
397+
if (messageQueue.length === 0) return;
398+
399+
console.log(`Flushing ${messageQueue.length} queued messages before session close, sessionId=${sessionId}`);
400+
401+
// Try to reconnect if not connected (allow during close)
402+
if (!dispatcherWs || dispatcherWs.readyState !== WebSocket.READY_STATE_OPEN) {
403+
const newWs = await connectToDispatcher(true);
404+
if (newWs) {
405+
dispatcherWs = newWs;
406+
}
407+
}
408+
409+
// Flush if connected
410+
if (dispatcherWs?.readyState === WebSocket.READY_STATE_OPEN) {
411+
while (messageQueue.length > 0) {
412+
const msg = messageQueue.shift()!;
413+
dispatcherWs.send(JSON.stringify(msg));
414+
}
415+
console.log(`Successfully flushed all queued messages, sessionId=${sessionId}`);
416+
} else {
417+
console.error(`Failed to flush ${messageQueue.length} queued messages - dispatcher not connected, sessionId=${sessionId}`);
418+
}
419+
}
420+
294421
// Handle close events
295422
serverWs.addEventListener('close', (event) => {
423+
sessionClosed = true;
296424
clearInterval(keepAliveInterval);
297-
console.log(`Client WebSocket closed: code=${event.code}, reason=${event.reason || 'none'}`);
425+
console.log(`Client WebSocket closed: code=${event.code}, reason=${event.reason || 'none'}, sessionId=${sessionId}`);
426+
427+
// Flush queued messages in background before fully closing
428+
if (messageQueue.length > 0) {
429+
ctx.waitUntil(flushBeforeClose());
430+
}
431+
298432
if (containerWs.readyState === WebSocket.READY_STATE_OPEN || containerWs.readyState === WebSocket.READY_STATE_CONNECTING) {
299433
containerWs.close(event.code, event.reason);
300434
}

0 commit comments

Comments
 (0)