-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathPyodideWorkerManager.ts
More file actions
305 lines (264 loc) · 10.9 KB
/
PyodideWorkerManager.ts
File metadata and controls
305 lines (264 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import path from 'path';
import { Worker } from 'worker_threads';
import { PublishDiagnosticsParams } from 'vscode-languageserver';
import { CloudFormationFileType } from '../../document/Document';
import { CfnLintInitializationSettings, CfnLintSettings } from '../../settings/Settings';
import { LoggerFactory } from '../../telemetry/LoggerFactory';
import { ScopedTelemetry } from '../../telemetry/ScopedTelemetry';
import { Telemetry } from '../../telemetry/TelemetryDecorator';
import { extractErrorMessage } from '../../utils/Errors';
import { retryWithExponentialBackoff } from '../../utils/Retry';
import { WorkerNotInitializedError } from './CfnLintErrors';
interface WorkerTask {
id: string;
action: string;
payload: Record<string, unknown>;
resolve: (value: unknown) => void;
reject: (reason: Error) => void;
}
interface WorkerMessage {
id?: string;
type?: string;
result?: unknown;
error?: string;
success?: boolean;
data?: string;
}
export class PyodideWorkerManager {
private worker: Worker | undefined = undefined;
private nextTaskId = 1;
private readonly tasks = new Map<string, WorkerTask>();
private initialized = false;
private initializationPromise: Promise<void> | undefined = undefined;
@Telemetry() private readonly telemetry!: ScopedTelemetry;
constructor(
private readonly retryConfig: CfnLintInitializationSettings,
private cfnLintSettings: CfnLintSettings,
private readonly log = LoggerFactory.getLogger(PyodideWorkerManager),
) {}
public async initialize(): Promise<void> {
if (this.initialized) {
return;
}
if (this.initializationPromise) {
return await this.initializationPromise;
}
this.initializationPromise = this.initializeWithRetry();
return await this.initializationPromise;
}
private async initializeWithRetry(): Promise<void> {
let attemptCount = 0;
return await retryWithExponentialBackoff(
async () => {
if (attemptCount > 0) {
this.telemetry.count('worker.restart', 1, { attributes: { attempt: attemptCount.toString() } });
}
attemptCount++;
return await this.initializeWorker();
},
{
maxRetries: this.retryConfig.maxRetries,
initialDelayMs: this.retryConfig.initialDelayMs,
maxDelayMs: this.retryConfig.maxDelayMs,
backoffMultiplier: this.retryConfig.backoffMultiplier,
jitterFactor: 0.1, // Add 10% jitter to prevent synchronized retry storms
operationName: 'Pyodide initialization',
totalTimeoutMs: this.retryConfig.totalTimeoutMs,
},
this.log,
);
}
private async initializeWorker(): Promise<void> {
return await new Promise<void>((resolve, reject) => {
try {
// Create worker
// Use a path relative to the current file
const workerPath = path.join(__dirname, 'pyodide-worker.js');
this.log.info(`Loading worker from: ${workerPath}`);
this.worker = new Worker(workerPath);
// Add exit event handler to detect crashes
this.worker.on('exit', (code) => {
if (code !== 0) {
this.log.error(`Worker exited unexpectedly with code ${code}`);
this.telemetry.count('worker.crash', 1, { attributes: { exitCode: code.toString() } });
this.initialized = false;
this.worker = undefined;
// Reject any pending tasks
for (const task of this.tasks.values()) {
task.reject(new Error(`Worker exited unexpectedly with code ${code}`));
}
this.tasks.clear();
}
});
// Set up message handler
this.worker.on('message', this.handleWorkerMessage.bind(this));
// Set up error handler
this.worker.on('error', (error: Error) => {
this.log.error(error, 'Worker error');
reject(new Error(`Worker error: ${error.message}`));
});
// Initialize Pyodide in the worker
const taskId = this.nextTaskId.toString();
this.nextTaskId++;
const pyodideStartTime = performance.now();
const task: WorkerTask = {
id: taskId,
action: 'initialize',
payload: {},
resolve: (result: unknown) => {
this.initialized = true;
this.telemetry.histogram('pyodide.init.duration', performance.now() - pyodideStartTime, {
unit: 'ms',
});
this.telemetry.count('pyodide.init.success', 1);
// Track installation source
const initResult = result as { status: string; installSource?: string };
if (initResult.installSource === 'pypi') {
this.telemetry.count('init.pypi.success', 1);
} else if (initResult.installSource === 'wheels') {
this.telemetry.count('init.wheels.success', 1);
}
resolve();
},
reject: (reason: Error) => {
this.worker = undefined;
this.telemetry.error('pyodide.init.fault', reason, undefined, { captureErrorType: true });
// Try to determine if it was a PyPI or wheels failure
const errorMessage = reason.message || '';
if (errorMessage.includes('PyPI')) {
this.telemetry.error('init.pypi.fault', reason, undefined, {
captureErrorType: true,
});
} else if (errorMessage.includes('wheels') || errorMessage.includes('wheel')) {
this.telemetry.error('init.wheels.fault', reason, undefined, {
captureErrorType: true,
});
}
reject(reason);
},
};
this.tasks.set(taskId, task);
this.worker.postMessage({
id: taskId,
action: 'initialize',
payload: {},
});
} catch (error) {
this.worker = undefined;
reject(error instanceof Error ? error : new Error(extractErrorMessage(error)));
}
});
}
private handleWorkerMessage(message: WorkerMessage): void {
// Handle stdout/stderr messages
if (message.type === 'stdout') {
this.log.info({ message }, 'Pyodide stdout');
return;
}
if (message.type === 'stderr') {
this.log.error({ message }, 'Pyodide stderr');
return;
}
// Handle task responses
const id = message.id;
if (!id) {
return; // Ignore messages without an ID
}
const task = this.tasks.get(id);
if (!task) {
this.log.error(`Received response for unknown task: ${id}`);
return;
}
this.tasks.delete(id);
if (message.success) {
task.resolve(message.result);
} else {
task.reject(new Error(message.error));
}
}
public async lintTemplate(
content: string,
uri: string,
fileType: CloudFormationFileType,
): Promise<PublishDiagnosticsParams[]> {
return await this.executeTask<PublishDiagnosticsParams[]>('lint', {
content,
uri,
fileType,
settings: this.cfnLintSettings,
});
}
public async getCfnLintVersion(): Promise<string> {
return await this.executeTask<string>('getVersion', {});
}
public async lintFile(
path: string,
uri: string,
fileType: CloudFormationFileType,
): Promise<PublishDiagnosticsParams[]> {
return await this.executeTask<PublishDiagnosticsParams[]>('lintFile', {
path,
uri,
fileType,
settings: this.cfnLintSettings,
});
}
public updateSettings(settings: CfnLintSettings): void {
this.cfnLintSettings = settings;
}
public async mountFolder(fsDir: string, mountDir: string): Promise<void> {
await this.executeTask<{ mounted: boolean; mountDir: string }>('mountFolder', { fsDir, mountDir });
}
private async executeTask<T>(action: string, payload: Record<string, unknown>): Promise<T> {
if (!this.initialized) {
await this.initialize();
}
if (!this.worker) {
throw new WorkerNotInitializedError();
}
// Track queue depth
this.telemetry.countUpDown('worker.queue.depth', this.tasks.size, { unit: '1' });
const startTime = performance.now();
return await new Promise<T>((resolve, reject) => {
const taskId = this.nextTaskId.toString();
this.nextTaskId++;
const task: WorkerTask = {
id: taskId,
action,
payload,
resolve: (result: unknown) => {
this.telemetry.histogram('worker.response.time', performance.now() - startTime, { unit: 'ms' });
resolve(result as T);
},
reject: (error: Error) => {
this.telemetry.histogram('worker.response.time', performance.now() - startTime, { unit: 'ms' });
reject(error);
},
};
this.tasks.set(taskId, task);
if (this.worker) {
this.worker.postMessage({ id: taskId, action, payload });
} else {
reject(new WorkerNotInitializedError());
}
});
}
public async shutdown(): Promise<void> {
if (this.worker) {
// Reject all pending tasks
for (const task of this.tasks.values()) {
task.reject(new Error('Worker shutdown'));
}
this.tasks.clear();
// Terminate worker and wait for completion
try {
await this.worker.terminate();
} catch (error) {
this.log.error(error, 'Error terminating worker');
}
this.worker = undefined;
this.initialized = false;
this.initializationPromise = undefined;
}
}
}