-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlaunch.js
More file actions
339 lines (299 loc) · 11.1 KB
/
Copy pathlaunch.js
File metadata and controls
339 lines (299 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
'use strict';
const fs = require('fs');
const { CronJob } = require('cron');
const { format } = require('date-fns');
const args = require('minimist')(process.argv.slice(2), {
boolean: [
'download',
'load',
'serve',
'gc',
'process'
],
default: {
download: true,
load: true,
serve: true,
process: true,
collectGarbage: false
}
});
const appConfig = require('./config');
const logger = require('./logger');
const updateLock = require('./updateLock');
const redisPrefix = appConfig.app.redisPrefix;
const csvFile = appConfig.app.downloadLocation;
const tempCsvFile = csvFile + '.tmp';
const collectGarbage = appConfig.app.collectGarbage || args.collectGarbage;
const includePath = __dirname + '/staging';
const lockKey = redisPrefix + 'update_lock';
const statusKey = redisPrefix + 'update_status';
const Redis = require('ioredis');
/**
* Get Redis connection for status updates
* @returns {Redis} Redis instance
*/
function getStatusRedis() {
return new Redis({
host: appConfig.redis.host,
port: appConfig.redis.port,
family: appConfig.redis.family,
password: appConfig.redis.password,
db: appConfig.redis.db
});
}
/**
* Update status in Redis
* @param {string} status - Status value
* @param {Object} metadata - Additional metadata
* @returns {Promise<void>}
*/
async function updateStatus(status, metadata = {}) {
try {
const redis = getStatusRedis();
const statusData = {
status,
timestamp: new Date().toISOString(),
pid: process.pid,
...metadata
};
await redis.set(statusKey, JSON.stringify(statusData));
await redis.quit();
} catch (error) {
logger.warn({ error: error.message, status }, 'Failed to update status');
}
}
/**
* Validate CSV file before loading
* @param {string} file - CSV file path
* @returns {Promise<boolean>} True if valid
*/
async function validateCsvFile(file) {
try {
if (!fs.existsSync(file)) {
logger.error({ file }, 'CSV file does not exist');
return false;
}
const stats = fs.statSync(file);
if (stats.size === 0) {
logger.error({ file }, 'CSV file is empty');
return false;
}
// Check minimum expected size (header line)
if (stats.size < 20) {
logger.error({ file, size: stats.size }, 'CSV file too small');
return false;
}
// Read first few lines to validate format
const readline = require('readline');
const stream = fs.createReadStream(file, { encoding: 'utf8' });
const rl = readline.createInterface({ input: stream });
let lineCount = 0;
let headerFound = false;
for await (const line of rl) {
lineCount++;
if (lineCount === 1) {
// Check header
if (line.includes('start_int') && line.includes('end_int') && line.includes('list')) {
headerFound = true;
}
} else if (lineCount <= 10) {
// Validate a few data lines
const parts = line.split('|');
if (parts.length !== 3) {
logger.error({ file, line, lineCount }, 'Invalid CSV format');
rl.close();
return false;
}
// Check that first two parts are numbers
if (isNaN(parseInt(parts[0])) || isNaN(parseInt(parts[1]))) {
logger.error({ file, line, lineCount }, 'Invalid numeric values in CSV');
rl.close();
return false;
}
} else {
break; // Only check first 10 lines
}
}
rl.close();
if (!headerFound) {
logger.error({ file }, 'CSV header not found');
return false;
}
if (lineCount < 2) {
logger.error({ file }, 'CSV file has no data rows');
return false;
}
logger.info({ file, size: stats.size, lines: lineCount }, 'CSV file validation passed');
return true;
} catch (error) {
logger.error({ error: error.message, file }, 'CSV validation error');
return false;
}
}
const concat = async (sourceFile, destination) => {
logger.info({ sourceFile }, 'Concatenating file');
return new Promise((resolve, reject) => {
const source = fs.createReadStream(sourceFile);
source.on('close', function() {
logger.info({ sourceFile }, 'Finished writing file');
resolve();
});
source.on('error', (err) => {
logger.error({ error: err.message, sourceFile }, 'Error reading file');
reject(err);
});
source.pipe(destination);
});
};
async function main() {
let lockValue = null;
try {
// Acquire distributed lock
const acquired = await updateLock.acquireLock(lockKey, 3600); // 1 hour max
if (!acquired) {
const isLocked = await updateLock.isLocked(lockKey);
if (isLocked) {
logger.warn('Update already in progress, skipping');
await updateStatus('skipped', { reason: 'Lock already held' });
return 'skipped';
}
throw new Error('Failed to acquire update lock');
}
// Get lock value for later release
lockValue = await updateLock.getLockValue(lockKey);
await updateStatus('in_progress', { stage: 'starting' });
// run plugins which stage IP lists
if (args.download) {
await updateStatus('in_progress', { stage: 'downloading' });
const plugins = require('./plugins');
// Add overall timeout for downloads (10 minutes)
const downloadTimeout = setTimeout(() => {
logger.error('Download timeout exceeded');
throw new Error('Download timeout exceeded (10 minutes)');
}, 600000);
try {
const results = await Promise.allSettled(plugins.map(p => p.load()));
clearTimeout(downloadTimeout);
let k = 0;
for (const result of results) {
if (result.status === 'rejected' && plugins[k].abortOnFail === true) {
logger.error({
plugin: plugins[k].name,
error: result.reason
}, 'Plugin failed and abortOnFail is true');
throw new Error(`Abort: plugin [${plugins[k].name}] has been set to abort process on fail.`);
}
if (result.status === 'rejected') {
logger.warn({
plugin: plugins[k].name,
error: result.reason
}, 'Plugin failed but continuing');
}
k++;
}
logger.info({ results: results.map(r => ({
status: r.status,
...(r.status === 'rejected' && { reason: r.reason })
})) }, 'Plugins done');
} catch (error) {
clearTimeout(downloadTimeout);
throw error;
}
}
if (args.process) {
await updateStatus('in_progress', { stage: 'processing' });
// Write to temp file first, then atomically rename
fs.writeFileSync(tempCsvFile, "start_int|end_int|list\n");
for (const file of fs.readdirSync(includePath)) {
const theFile = `${includePath}/${file}`;
if (file.match(/^\./)) {
logger.debug({ file: theFile }, 'Skipping hidden file');
continue;
}
const destination = fs.createWriteStream(tempCsvFile, { flags: 'a' });
await concat(theFile, destination);
}
// Validate CSV before proceeding
const isValid = await validateCsvFile(tempCsvFile);
if (!isValid) {
throw new Error('CSV validation failed');
}
// Atomically rename temp file to final file
if (fs.existsSync(csvFile)) {
fs.renameSync(csvFile, csvFile + '.backup');
}
fs.renameSync(tempCsvFile, csvFile);
logger.info('CSV file updated atomically');
}
if (args.load) {
await updateStatus('in_progress', { stage: 'loading' });
const load = require('./loadToRedis').load;
await load(csvFile, redisPrefix, collectGarbage);
logger.info('Loading done');
}
await updateStatus('completed', {
timestamp: new Date().toISOString()
});
// Release lock
if (lockValue) {
await updateLock.releaseLock(lockKey, lockValue);
}
return 'success';
} catch (error) {
logger.error({ error: error.message, stack: error.stack }, 'Main function error');
await updateStatus('failed', {
error: error.message,
timestamp: new Date().toISOString()
});
// Release lock on error
if (lockValue) {
await updateLock.releaseLock(lockKey, lockValue);
}
// Clean up temp file on error
if (fs.existsSync(tempCsvFile)) {
try {
fs.unlinkSync(tempCsvFile);
logger.info('Cleaned up temp CSV file');
} catch (cleanupError) {
logger.warn({ error: cleanupError.message }, 'Failed to cleanup temp CSV file');
}
}
throw error;
}
}
if (args.serve) {
try {
const serve = require('./serve').serve;
serve(appConfig.app.httpPort, redisPrefix, appConfig.app.prefix);
} catch (error) {
logger.error({ error: error.message, port: appConfig.app.httpPort }, 'Failed to start server');
process.exit(1);
}
}
// Modern cron syntax - CronJob constructor takes cronTime, onTick, onComplete, start, timezone, context, runOnInit
const job = new CronJob(
appConfig.app.cron,
async function() {
logger.info('Cron job triggered');
try {
await main();
logger.info('Cron job completed successfully');
} catch (error) {
logger.error({ error: error.message }, 'Cron job failed');
}
},
null, // onComplete
true, // start
appConfig.app.cronTimezone // timezone (configurable via IP_CRON_TIMEZONE env var, default: UTC)
);
main().then(() => {
if (args.serve) {
logger.info({ port: appConfig.app.httpPort }, 'Ready to serve!');
} else {
logger.info('Update process completed. Server not started (--serve flag not set).');
}
}).catch(e => {
logger.error({ error: e.message, stack: e.stack }, 'Startup failed');
process.exit(1);
});