forked from livekit/agents-js
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjob_proc_lazy_main.ts
More file actions
310 lines (273 loc) · 9.7 KB
/
job_proc_lazy_main.ts
File metadata and controls
310 lines (273 loc) · 9.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { Room, RoomEvent, dispose } from '@livekit/rtc-node';
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import { EventEmitter, once } from 'node:events';
import { pathToFileURL } from 'node:url';
import type { Logger } from 'pino';
import { type Agent, isAgent } from '../generator.js';
import { JobContext, JobProcess, type RunningJobInfo, runWithJobContextAsync } from '../job.js';
import { setJobContext } from '../log.js';
import { initializeLogger, log } from '../log.js';
import { Future, shortuuid } from '../utils.js';
import { defaultInitializeProcessFunc } from '../worker.js';
import type { InferenceExecutor } from './inference_executor.js';
import type { IPCMessage } from './message.js';
const ORPHANED_TIMEOUT = 15 * 1000;
const safeSend = (msg: IPCMessage): boolean => {
try {
if (process.connected && process.send) {
process.send(msg);
return true;
}
return false;
} catch (error) {
// Channel closed is expected during graceful shutdown
// Log at debug level to avoid noise in production logs
if (error instanceof Error && error.message.includes('Channel closed')) {
log().debug({ msgCase: msg.case }, 'IPC channel closed, message not sent');
} else {
log().error({ error, msgCase: msg.case }, 'IPC send failed unexpectedly');
}
return false;
}
};
type JobTask = {
ctx: JobContext;
task: Promise<void>;
};
class PendingInference {
promise = new ThrowsPromise<{ requestId: string; data: unknown; error?: Error }, never>(
(resolve) => {
this.resolve = resolve; // this is how JavaScript lets you resolve promises externally
},
);
resolve(arg: { requestId: string; data: unknown; error?: Error }) {
arg; // useless call to counteract TypeScript E6133
}
}
class InfClient implements InferenceExecutor {
#requests: { [id: string]: PendingInference } = {};
#logger = log();
constructor() {
process.on('message', (msg: IPCMessage) => {
switch (msg.case) {
case 'inferenceResponse':
const fut = this.#requests[msg.value.requestId];
delete this.#requests[msg.value.requestId];
if (!fut) {
this.#logger.child({ resp: msg.value }).warn('received unexpected inference response');
return;
}
fut.resolve(msg.value);
break;
}
});
}
async doInference(method: string, data: unknown): Promise<unknown> {
const requestId = shortuuid('inference_job_');
if (!safeSend({ case: 'inferenceRequest', value: { requestId, method, data } })) {
this.#logger.debug(
{ method, requestId },
'IPC channel closed during inference, aborting gracefully',
);
throw new Error(`Inference ${method} aborted: IPC channel closed (expected during shutdown)`);
}
this.#requests[requestId] = new PendingInference();
const resp = await this.#requests[requestId]!.promise;
if (resp.error) {
throw new Error(`inference of ${method} failed: ${resp.error.message}`);
}
return resp.data;
}
}
const startJob = (
proc: JobProcess,
func: (ctx: JobContext) => Promise<void>,
info: RunningJobInfo,
closeEvent: EventEmitter,
logger: Logger,
joinFuture: Future,
): JobTask => {
let connect = false;
let shutdown = false;
const room = new Room();
room.on(RoomEvent.Disconnected, () => {
if (!shutdown) {
closeEvent.emit('close', false);
}
});
const onConnect = () => {
connect = true;
};
const onShutdown = (reason: string) => {
shutdown = true;
closeEvent.emit('close', true, reason);
};
const ctx = new JobContext(proc, info, room, onConnect, onShutdown, new InfClient());
const task = (async () => {
const unconnectedTimeout = setTimeout(() => {
if (!(connect || shutdown)) {
logger.warn(
'room not connect after job_entry was called after 10 seconds, ',
'did you forget to call ctx.connect()?',
);
}
}, 10000);
try {
const closePromise = once(closeEvent, 'close').then((close) => {
logger.debug('shutting down');
shutdown = true;
safeSend({ case: 'exiting', value: { reason: close[1] } });
});
// Run the job function within the AsyncLocalStorage context
await runWithJobContextAsync(ctx, async () => {
const { tracer, traceTypes } = await import('../telemetry/index.js');
return tracer.startActiveSpan(
async (span) => {
span.setAttribute(traceTypes.ATTR_JOB_ID, info.job.id);
span.setAttribute(traceTypes.ATTR_AGENT_NAME, info.job.agentName);
span.setAttribute(traceTypes.ATTR_ROOM_NAME, info.job.room?.name ?? '');
return func(ctx);
},
{ name: 'job_entrypoint' },
);
})
.then(async () => {
if (!shutdown) {
await closePromise;
}
})
.finally(async () => {
clearTimeout(unconnectedTimeout);
});
} catch (error) {
logger.error({ error }, 'error in entry function');
shutdown = true;
safeSend({
case: 'exiting',
value: { reason: error instanceof Error ? error.message : String(error) },
});
}
// Close the primary agent session if it exists
if (ctx._primaryAgentSession) {
await ctx._primaryAgentSession.close();
}
// Generate and save/upload session report
await ctx._onSessionEnd();
await room.disconnect();
logger.debug('disconnected from room');
const shutdownTasks = [];
for (const callback of ctx.shutdownCallbacks) {
shutdownTasks.push(callback());
}
await ThrowsPromise.all(shutdownTasks).catch((error) =>
logger.error({ error }, 'error while shutting down the job'),
);
setJobContext({});
safeSend({ case: 'done', value: undefined });
joinFuture.resolve();
})();
return { ctx, task };
};
(async () => {
if (process.send) {
const join = new Future();
// process.argv:
// [0] `node'
// [1] import.meta.filename
// [2] import.meta.filename of function containing entry file
const moduleFile = process.argv[2];
const agent: Agent = await import(pathToFileURL(moduleFile!).pathname).then((module) => {
// Handle both ESM (module.default is the agent) and CJS (module.default.default is the agent)
const agent =
typeof module.default === 'function' || isAgent(module.default)
? module.default
: module.default?.default;
if (agent === undefined || !isAgent(agent)) {
throw new Error(`Unable to load agent: Missing or invalid default export in ${moduleFile}`);
}
return agent;
});
if (!agent.prewarm) {
agent.prewarm = defaultInitializeProcessFunc;
}
// don't do anything on C-c
// this is handled in cli, triggering a termination of all child processes at once.
process.on('SIGINT', () => {
logger.debug('SIGINT received in job proc');
});
// don't do anything on SIGTERM
// Render uses SIGTERM in autoscale, this ensures the processes are properly drained if needed
process.on('SIGTERM', () => {
logger.debug('SIGTERM received in job proc');
});
await once(process, 'message').then(([msg]: IPCMessage[]) => {
msg = msg!;
if (msg.case !== 'initializeRequest') {
throw new Error('first message must be InitializeRequest');
}
initializeLogger(msg.value.loggerOptions);
});
const proc = new JobProcess();
let logger = log().child({ pid: proc.pid });
process.on('unhandledRejection', (reason) => {
logger.debug({ error: reason }, 'Unhandled promise rejection');
});
logger.debug('initializing job runner');
await agent.prewarm(proc);
logger.debug('job runner initialized');
safeSend({ case: 'initializeResponse', value: undefined });
let job: JobTask | undefined = undefined;
const closeEvent = new EventEmitter();
const orphanedTimeout = setTimeout(() => {
logger.warn('job process orphaned, shutting down.');
join.resolve();
}, ORPHANED_TIMEOUT);
const messageHandler = (msg: IPCMessage) => {
switch (msg.case) {
case 'pingRequest': {
orphanedTimeout.refresh();
safeSend({
case: 'pongResponse',
value: { lastTimestamp: msg.value.timestamp, timestamp: Date.now() },
});
break;
}
case 'startJobRequest': {
if (job) {
throw new Error('job task already running');
}
logger = logger.child({ jobID: msg.value.runningJob.job.id });
job = startJob(proc, agent.entry, msg.value.runningJob, closeEvent, logger, join);
logger.debug('job started');
break;
}
case 'shutdownRequest': {
if (!job) {
join.resolve();
}
closeEvent.emit('close', 'shutdownRequest');
clearTimeout(orphanedTimeout);
process.off('message', messageHandler);
}
}
};
process.on('message', messageHandler);
await join.await;
// Dispose native FFI resources (Rust FfiServer, tokio runtimes, libwebrtc)
// before process.exit() to prevent libc++abi mutex crash during teardown.
// Without this, process.exit() can kill the process while native threads are
// still running, causing: "mutex lock failed: Invalid argument"
// See: https://github.com/livekit/node-sdks/issues/564
try {
await dispose();
logger.debug('native resources disposed');
} catch (error) {
logger.warn({ error }, 'failed to dispose native resources');
}
logger.debug('Job process shutdown');
process.exit(0);
}
})();