-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathqueuePopulator.js
More file actions
105 lines (97 loc) · 3.27 KB
/
queuePopulator.js
File metadata and controls
105 lines (97 loc) · 3.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
require('../lib/otel');
const async = require('async');
const schedule = require('node-schedule');
const werelogs = require('werelogs');
const config = require('../lib/Config');
const zkConfig = config.zookeeper;
const kafkaConfig = config.kafka;
const extConfigs = config.extensions;
const qpConfig = config.queuePopulator;
const httpsConfig = config.internalHttps;
const mConfig = config.metrics;
const rConfig = config.redis;
const vConfig = config.vaultAdmin;
const QueuePopulator = require('../lib/queuePopulator/QueuePopulator');
const { startProbeServer } = require('../lib/util/probe');
const { DEFAULT_LIVE_ROUTE, DEFAULT_METRICS_ROUTE, DEFAULT_READY_ROUTE } =
require('arsenal').network.probe.ProbeServer;
const log = new werelogs.Logger('Backbeat:QueuePopulator');
werelogs.configure({ level: config.log.logLevel,
dump: config.log.dumpLevel });
/* eslint-disable no-param-reassign */
function queueBatch(queuePopulator, taskState) {
if (taskState.batchInProgress) {
log.debug('skipping batch: previous one still in progress');
return undefined;
}
log.debug('start queueing batch');
taskState.batchInProgress = true;
const maxRead = qpConfig.batchMaxRead;
const timeoutMs = qpConfig.batchTimeoutMs;
queuePopulator.processLogEntries({ maxRead, timeoutMs }, err => {
taskState.batchInProgress = false;
if (err) {
log.error('an error occurred during batch processing', {
method: 'QueuePopulator::task.queueBatch',
error: err,
});
// exit process and let Kubernetes respawn the pod
process.exit(1);
}
});
return undefined;
}
/* eslint-enable no-param-reassign */
const queuePopulator = new QueuePopulator(zkConfig, kafkaConfig,
qpConfig, httpsConfig, mConfig, rConfig, vConfig, extConfigs);
async.waterfall([
done => queuePopulator.open(done),
done => startProbeServer(qpConfig.probeServer, (err, probeServer) => {
if (err) {
log.error('error starting probe server', {
error: err,
method: 'QueuePopulator::startProbeServer',
});
done(err);
return;
}
if (probeServer !== undefined) {
probeServer.addHandler([DEFAULT_LIVE_ROUTE, DEFAULT_READY_ROUTE],
(res, log) => queuePopulator.handleLiveness(res, log)
);
probeServer.addHandler(DEFAULT_METRICS_ROUTE,
(res, log) => queuePopulator.handleMetrics(res, log)
);
}
done();
}),
done => {
const taskState = {
batchInProgress: false,
};
schedule.scheduleJob(qpConfig.cronRule, () => {
queueBatch(queuePopulator, taskState);
});
done();
},
], err => {
if (err) {
log.error('error during queue populator initialization', {
method: 'QueuePopulator::task',
error: err,
});
process.exit(1);
}
});
process.on('SIGTERM', () => {
log.info('received SIGTERM, exiting');
queuePopulator.close(error => {
if (error) {
log.error('failed to exit properly', {
error,
});
process.exit(1);
}
process.exit(0);
});
});