Skip to content

Commit 4588550

Browse files
authored
feat: add dispatcher integration for transcription fan-out (#45)
* feat: add dispatcher integration for transcription fan-out * docs: update dispatcher configuration options
1 parent 77fa5b8 commit 4588550

File tree

4 files changed

+185
-23
lines changed

4 files changed

+185
-23
lines changed

.beads/metadata.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"database": "beads.db",
3+
"jsonl_export": "issues.jsonl"
4+
}

DISPATCHER_INTEGRATION.md

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,30 +94,47 @@ containerClientWs.addEventListener('message', (event) => {
9494

9595
## Configuration
9696

97-
### 1. Enable Dispatcher in Worker Config
97+
### 1. Add Service Binding
9898

99-
Edit `wrangler-container.jsonc`:
99+
Add the `TRANSCRIPTION_DISPATCHER` service binding to your `wrangler.jsonc`:
100100

101101
```jsonc
102102
{
103103
"services": [
104104
{
105105
"binding": "TRANSCRIPTION_DISPATCHER",
106-
"service": "your-dispatcher-worker-name",
107-
"environment": "production"
106+
"service": "transcription-dispatcher"
108107
}
109108
]
110109
}
111110
```
112111

113-
### 2. Connect with Dispatcher Enabled
112+
### 2. Enable Dispatcher
113+
114+
The dispatcher can be enabled in two ways:
115+
116+
**Option A: Environment Variable (recommended for global enable)**
117+
118+
Add `USE_DISPATCHER` to your wrangler.jsonc vars:
119+
120+
```jsonc
121+
{
122+
"vars": {
123+
"USE_DISPATCHER": "true"
124+
}
125+
}
126+
```
127+
128+
**Option B: Query Parameter (per-request control)**
114129

115130
Add `useDispatcher=true` query parameter:
116131

117132
```
118-
wss://your-worker.workers.dev/transcribe?sessionId=test&transcribe=true&sendBack=true&useDispatcher=true
133+
wss://your-worker.workers.dev/transcribe?sessionId=test&useDispatcher=true
119134
```
120135

136+
**Precedence:** Query parameter overrides the environment variable. If neither is set, dispatcher is disabled by default.
137+
121138
### 3. Implement Dispatcher Worker
122139

123140
Your dispatcher worker must implement the `dispatch()` RPC method:
@@ -242,15 +259,24 @@ interface DispatcherTranscriptionMessage {
242259
### Without Dispatcher
243260

244261
```bash
245-
# Media server receives transcripts directly
246-
wscat -c "wss://your-worker.workers.dev/transcribe?sessionId=test&transcribe=true&sendBack=true"
262+
# Media server receives transcripts directly (dispatcher disabled)
263+
wscat -c "wss://your-worker.workers.dev/transcribe?sessionId=test&sendBack=true"
247264
```
248265

249-
### With Dispatcher
266+
### With Dispatcher (via query param)
250267

251268
```bash
252269
# Media server receives transcripts + dispatcher gets notified
253-
wscat -c "wss://your-worker.workers.dev/transcribe?sessionId=test&transcribe=true&sendBack=true&useDispatcher=true"
270+
wscat -c "wss://your-worker.workers.dev/transcribe?sessionId=test&sendBack=true&useDispatcher=true"
271+
```
272+
273+
### With Dispatcher (via env var)
274+
275+
If `USE_DISPATCHER=true` is set in wrangler.jsonc, all connections will dispatch:
276+
277+
```bash
278+
# Dispatcher enabled globally via env var
279+
wscat -c "wss://your-worker.workers.dev/transcribe?sessionId=test&sendBack=true"
254280
```
255281

256282
Check dispatcher logs:

worker/env.d.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,35 @@
11
// Environment types for the Cloudflare Worker
22

3+
import type { TranscriptionDispatcher } from './index';
4+
35
interface Env {
46
// Durable Object binding for the container
57
TRANSCRIBER: DurableObjectNamespace;
68

9+
// Service bindings
10+
TRANSCRIPTION_DISPATCHER?: Service<TranscriptionDispatcher>;
11+
12+
// Durable Object for auto-scaling
13+
CONTAINER_COORDINATOR: DurableObjectNamespace;
14+
715
// Environment variables
816
OPENAI_API_KEY: string;
917
OPENAI_MODEL?: string;
18+
GEMINI_API_KEY?: string;
19+
DEEPGRAM_API_KEY?: string;
20+
DEEPGRAM_MODEL?: string;
21+
DEEPGRAM_DETECT_LANGUAGE?: string;
22+
DEEPGRAM_INCLUDE_LANGUAGE?: string;
23+
DEEPGRAM_PUNCTUATE?: string;
24+
DEEPGRAM_ENCODING?: string;
25+
PROVIDERS_PRIORITY?: string;
1026
FORCE_COMMIT_TIMEOUT?: string;
1127
DEBUG?: string;
28+
ROUTING_MODE?: string;
29+
CONTAINER_POOL_SIZE?: string;
30+
MAX_CONNECTIONS_PER_CONTAINER?: string;
31+
MIN_CONTAINERS?: string;
32+
SCALE_DOWN_IDLE_TIME?: string;
33+
TRANSLATION_MIXING_MODE?: string;
34+
USE_DISPATCHER?: string;
1235
}

worker/index.ts

Lines changed: 122 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,16 @@ export interface TranscriptionDispatcher extends WorkerEntrypoint<Env> {
3333
* Transcription message from container
3434
*/
3535
interface TranscriptionMessage {
36-
type: 'transcription' | 'interim_transcription';
36+
type: 'transcription-result';
37+
is_interim: boolean;
3738
participant: {
3839
id?: string;
3940
};
4041
transcript: Array<{
4142
text: string;
4243
}>;
4344
timestamp: number;
45+
language?: string;
4446
}
4547

4648
/**
@@ -168,6 +170,112 @@ async function selectContainerInstance(request: Request, env: Env): Promise<stri
168170
}
169171
}
170172

173+
/**
174+
* Handle WebSocket connection with dispatcher interception.
175+
* Creates a proxy between client and container, dispatching transcriptions asynchronously.
176+
*/
177+
async function handleWebSocketWithDispatcher(
178+
request: Request,
179+
container: ReturnType<typeof getContainer>,
180+
env: Env,
181+
ctx: ExecutionContext,
182+
sessionId: string,
183+
): Promise<Response> {
184+
// Create WebSocket pair for the client
185+
const clientPair = new WebSocketPair();
186+
const [clientWs, serverWs] = Object.values(clientPair);
187+
188+
// Accept the server side of the client connection
189+
serverWs.accept();
190+
191+
// Forward the upgrade request to the container
192+
const containerResponse = await container.fetch(request);
193+
194+
if (containerResponse.status !== 101 || !containerResponse.webSocket) {
195+
// Container didn't upgrade - return error to client
196+
serverWs.close(1011, 'Container failed to upgrade WebSocket');
197+
return containerResponse;
198+
}
199+
200+
const containerWs = containerResponse.webSocket;
201+
containerWs.accept();
202+
203+
const dispatcher = env.TRANSCRIPTION_DISPATCHER!;
204+
205+
// Pipe: client → container (upstream, no interception needed)
206+
serverWs.addEventListener('message', (event) => {
207+
if (containerWs.readyState === WebSocket.READY_STATE_OPEN) {
208+
containerWs.send(event.data);
209+
}
210+
});
211+
212+
// Pipe: container → client (downstream, intercept for dispatcher)
213+
containerWs.addEventListener('message', (event) => {
214+
// Forward to client immediately
215+
if (serverWs.readyState === WebSocket.READY_STATE_OPEN) {
216+
serverWs.send(event.data);
217+
}
218+
219+
// Dispatch transcriptions asynchronously (non-blocking)
220+
if (typeof event.data === 'string') {
221+
try {
222+
const data = JSON.parse(event.data) as TranscriptionMessage;
223+
if (data.type === 'transcription-result' && !data.is_interim) {
224+
const dispatcherMessage: DispatcherTranscriptionMessage = {
225+
sessionId,
226+
endpointId: data.participant?.id || 'unknown',
227+
text: data.transcript.map((t) => t.text).join(' '),
228+
timestamp: data.timestamp,
229+
language: data.language,
230+
};
231+
232+
// Fire and forget - don't block the client
233+
dispatcher
234+
.dispatch(dispatcherMessage)
235+
.then((response) => {
236+
if (!response.success || response.errors) {
237+
console.error('Dispatcher error:', {
238+
message: response.message,
239+
errors: response.errors,
240+
});
241+
}
242+
})
243+
.catch((error) => {
244+
const msg = error instanceof Error ? error.message : String(error);
245+
console.error('Dispatcher RPC failed:', msg);
246+
});
247+
}
248+
} catch {
249+
// Not JSON or parse error - ignore, still forwarded to client
250+
}
251+
}
252+
});
253+
254+
// Handle close events
255+
serverWs.addEventListener('close', (event) => {
256+
containerWs.close(event.code, event.reason);
257+
});
258+
259+
containerWs.addEventListener('close', (event) => {
260+
serverWs.close(event.code, event.reason);
261+
});
262+
263+
// Handle errors
264+
serverWs.addEventListener('error', () => {
265+
containerWs.close(1011, 'Client WebSocket error');
266+
});
267+
268+
containerWs.addEventListener('error', () => {
269+
serverWs.close(1011, 'Container WebSocket error');
270+
});
271+
272+
// Return the client WebSocket
273+
return new Response(null, {
274+
status: 101,
275+
webSocket: clientWs,
276+
});
277+
}
278+
171279
export default {
172280
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
173281
const url = new URL(request.url);
@@ -185,7 +293,11 @@ export default {
185293
}
186294
}
187295

188-
const useDispatcher = url.searchParams.get('useDispatcher') === 'true';
296+
// Check query param first, fall back to env var
297+
const useDispatcherParam = url.searchParams.get('useDispatcher');
298+
const useDispatcher = useDispatcherParam !== null
299+
? useDispatcherParam === 'true'
300+
: env.USE_DISPATCHER === 'true';
189301
const sessionId = url.searchParams.get('sessionId') || 'unknown';
190302

191303
// Select which container instance to use based on routing strategy
@@ -194,13 +306,9 @@ export default {
194306
// Get the container instance
195307
const container = getContainer(env.TRANSCRIBER, containerInstanceId);
196308

197-
// Start the container and wait for ports to be ready
198-
// This is required for the fetch to work properly
199-
await container.startAndWaitForPorts();
200-
201-
// For now, just forward all requests directly to the container
202-
// TODO: Implement WebSocket interception for dispatcher fan-out
203-
// This requires more complex WebSocket handling with Cloudflare Containers
309+
// Start the container and wait for ports to be ready
310+
// This is required for the fetch to work properly
311+
await container.startAndWaitForPorts();
204312

205313
// Report connection tracking for autoscale mode
206314
const routingMode = env.ROUTING_MODE || 'session';
@@ -220,13 +328,14 @@ export default {
220328
console.error('Failed to report connection opened:', error);
221329
}),
222330
);
331+
}
223332

224-
// Note: We can't easily detect when WebSocket closes from here
225-
// The container would need to report back, or we'd need WebSocket interception
226-
// For now, rely on scale-down idle timeout
333+
// If dispatcher is enabled and this is a WebSocket upgrade, intercept the connection
334+
if (useDispatcher && upgradeHeader === 'websocket' && env.TRANSCRIPTION_DISPATCHER) {
335+
return handleWebSocketWithDispatcher(request, container, env, ctx, sessionId);
227336
}
228337

229-
// Forward request directly to container
338+
// Forward request directly to container (pass-through mode)
230339
return container.fetch(request);
231340

232341
},

0 commit comments

Comments
 (0)