-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfs_ipc.js
More file actions
185 lines (168 loc) · 5.67 KB
/
fs_ipc.js
File metadata and controls
185 lines (168 loc) · 5.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
'use strict';
const fs = require('fs');
const os = require('os');
const path = require('path');
const { spawn, spawnSync } = require('child_process');
const { PROJECT_ROOT } = require('../proxy/shared');
const IPC_ROOT = path.join(PROJECT_ROOT, 'tools', 'HME', 'runtime', 'event-ipc');
// Safety net: per-invocation cleanup() only runs on graceful close, so a
// SIGKILLed/restarted parent leaks its detached child's dir; sweep stale ones.
const IPC_TTL_MS = (() => {
const n = parseInt(process.env.HME_EVENT_IPC_TTL_MS || '', 10);
return Number.isFinite(n) && n > 0 ? n : 3_600_000;
})();
let _sweptThisProcess = false;
function ensureDir(dir) {
fs.mkdirSync(dir, { recursive: true });
}
function sweepStaleInvocations(now = Date.now()) {
let removed = 0;
let entries;
// silent-ok: absent IPC_ROOT means nothing to sweep.
try { entries = fs.readdirSync(IPC_ROOT, { withFileTypes: true }); }
catch (_e) { return 0; }
for (const ent of entries) {
if (!ent.isDirectory()) continue;
const dir = path.join(IPC_ROOT, ent.name);
let st;
try { st = fs.statSync(dir); } catch (_e) { continue; }
if (now - st.mtimeMs < IPC_TTL_MS) continue;
try { fs.rmSync(dir, { recursive: true, force: true }); removed += 1; }
catch (_e) { /* best effort */ }
}
return removed;
}
function _sweepStaleOnce() {
if (_sweptThisProcess) return;
_sweptThisProcess = true;
if (process.env.HME_KEEP_EVENT_IPC === '1') return;
try { sweepStaleInvocations(); } catch (_e) { /* best effort */ }
}
function atomicWrite(file, text) {
ensureDir(path.dirname(file));
const tmp = `${file}.${process.pid}.${Date.now()}.tmp`;
fs.writeFileSync(tmp, text);
fs.renameSync(tmp, file);
}
function appendJsonl(file, row) {
ensureDir(path.dirname(file));
fs.appendFileSync(file, `${JSON.stringify(row)}\n`);
}
function makeInvocation(label, stdinText = '') {
const safeLabel = String(label || 'event').replace(/[^a-zA-Z0-9_.-]+/g, '-').slice(0, 80) || 'event';
_sweepStaleOnce();
ensureDir(IPC_ROOT);
const dir = fs.mkdtempSync(path.join(IPC_ROOT, `${safeLabel}-`));
const stdinFile = path.join(dir, 'stdin.json');
atomicWrite(stdinFile, stdinText || '');
return {
dir,
stdinFile,
env: { HME_IPC_DIR: dir, HME_IPC_STDIN: stdinFile },
cleanup() {
if (process.env.HME_KEEP_EVENT_IPC === '1') return;
try { fs.rmSync(dir, { recursive: true, force: true }); } catch (_e) { /* best effort */ }
},
};
}
function _normalResult(result) {
return {
stdout: result.stdout || '',
stderr: result.stderr || '',
exit_code: Number.isInteger(result.status) ? result.status : (result.error ? -1 : 0),
signal: result.signal || null,
error: result.error || null,
};
}
function spawnFileInputSync(command, args = [], opts = {}) {
const ipc = makeInvocation(opts.label || path.basename(command), opts.input || '');
try {
const result = spawnSync('bash', ['-lc', 'exec "$@" < "$HME_IPC_STDIN"', 'hme-ipc', command, ...args], {
cwd: opts.cwd || PROJECT_ROOT,
env: { ...process.env, PROJECT_ROOT, ...ipc.env, ...(opts.env || {}) },
encoding: 'utf8',
timeout: opts.timeoutMs || 30_000,
maxBuffer: opts.maxBuffer || 10 * 1024 * 1024,
});
return _normalResult(result);
} finally {
ipc.cleanup();
}
}
function spawnFileInput(command, args = [], opts = {}) {
const ipc = makeInvocation(opts.label || path.basename(command), opts.input || '');
return new Promise((resolve) => {
let child;
try {
child = spawn('bash', ['-lc', 'exec "$@" < "$HME_IPC_STDIN"', 'hme-ipc', command, ...args], {
cwd: opts.cwd || PROJECT_ROOT,
detached: true,
stdio: ['ignore', 'pipe', 'pipe'],
env: { ...process.env, PROJECT_ROOT, ...ipc.env, ...(opts.env || {}) },
});
} catch (err) {
// silent-ok: optional fallback path.
ipc.cleanup();
resolve({ stdout: '', stderr: `[fs_ipc] spawn failed for ${command}: ${err.message}`, exit_code: -1, signal: null, error: err });
return;
}
let stdout = '';
let stderr = '';
let finished = false;
let timedOut = false;
const timeoutMs = opts.timeoutMs || 30_000;
const killGroup = (signal) => {
try { process.kill(-child.pid, signal); }
catch (_e) {
try { child.kill(signal); } catch (_e2) { /* best effort */ }
}
};
const timer = setTimeout(() => {
if (finished) return;
timedOut = true;
killGroup('SIGTERM');
setTimeout(() => {
if (!finished) killGroup('SIGKILL');
}, 500).unref();
}, timeoutMs);
child.stdout.on('data', (chunk) => { stdout += chunk.toString('utf8'); });
child.stderr.on('data', (chunk) => { stderr += chunk.toString('utf8'); });
child.on('error', (err) => {
if (finished) return;
finished = true;
clearTimeout(timer);
ipc.cleanup();
resolve({ stdout, stderr: `${stderr}\n[fs_ipc] error: ${err.message}`, exit_code: -1, signal: null, error: err });
});
child.on('close', (code, signal) => {
if (finished) return;
finished = true;
clearTimeout(timer);
ipc.cleanup();
if (timedOut) {
resolve({ stdout, stderr: `${stderr}\n[fs_ipc] timeout after ${timeoutMs}ms: ${command}`, exit_code: -1, signal: signal || 'SIGTERM', error: null });
return;
}
resolve({ stdout, stderr, exit_code: code ?? 0, signal: signal || null, error: null });
});
});
}
function ipcRoot() {
ensureDir(IPC_ROOT);
return IPC_ROOT;
}
function tmpdir() {
return os.tmpdir();
}
module.exports = {
IPC_ROOT,
atomicWrite,
appendJsonl,
ensureDir,
ipcRoot,
makeInvocation,
spawnFileInput,
spawnFileInputSync,
sweepStaleInvocations,
tmpdir,
};