-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathtask.js
More file actions
116 lines (108 loc) · 3.68 KB
/
task.js
File metadata and controls
116 lines (108 loc) · 3.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
'use strict';
require('../../../lib/otel');
const assert = require('assert');
const { errors } = require('arsenal');
const async = require('async');
const werelogs = require('werelogs');
const {
DEFAULT_LIVE_ROUTE,
DEFAULT_READY_ROUTE,
DEFAULT_METRICS_ROUTE,
} = require('arsenal').network.probe.ProbeServer;
const { sendSuccess, sendError } = require('arsenal').network.probe.Utils;
const QueueProcessor = require('./QueueProcessor');
const { startProbeServer } = require('../../../lib/util/probe');
const config = require('../../../lib/Config');
const kafkaConfig = config.kafka;
const notifConfig = config.extensions.notification;
const mongoConfig = config.queuePopulator.mongo;
const zkConfig = config.zookeeper;
const log = new werelogs.Logger('Backbeat:NotificationProcessor:task');
werelogs.configure({
level: config.log.logLevel,
dump: config.log.dumpLevel,
});
const destination = process.argv[2];
assert(destination, 'task must be started with a destination as argument');
// get destination auth config from environment variables
let destinationAuth = {
type: process.env.TYPE,
ssl: process.env.SSL === 'true',
protocol: process.env.PROTOCOL,
ca: process.env.CA,
client: process.env.CLIENT,
key: process.env.KEY,
keyPassword: process.env.KEY_PASSWORD,
keytab: process.env.KEYTAB,
principal: process.env.PRINCIPAL,
serviceName: process.env.SERVICE_NAME,
username: process.env.BASIC_USERNAME,
password: process.env.BASIC_PASSWORD,
mechanism: process.env.SCRAM_MECHANISM,
};
// Drop undefined environment variables to prevent hitting unknown fields when validating the auth schema.
Object.keys(destinationAuth).forEach(key => {
if (destinationAuth[key] === undefined) {
delete destinationAuth[key];
}
});
const isDestinationAuthEmpty = Object.values(destinationAuth)
.every(x => !x);
if (isDestinationAuthEmpty) {
destinationAuth = null;
}
const queueProcessor = new QueueProcessor(
mongoConfig, zkConfig, kafkaConfig, notifConfig, destination, destinationAuth);
/**
* Handle ProbeServer liveness check
*
* @param {http.HTTPServerResponse} res - HTTP Response to respond with
* @param {Logger} log - Logger
* @returns {undefined}
*/
function handleLiveness(res, log) {
if (queueProcessor.isReady()) {
sendSuccess(res, log);
} else {
log.error('Notification Queue Processor is not ready');
sendError(res, log, errors.ServiceUnavailable, 'unhealthy');
}
}
async.series([
next => queueProcessor.start(null, next),
next => startProbeServer(notifConfig.probeServer, (err, probeServer) => {
if (err) {
log.error('error starting probe server', { error: err });
return next(err);
}
if (probeServer !== undefined) {
// following the same pattern as other extensions, where liveness
// and readiness are handled by the same handler
probeServer.addHandler([DEFAULT_LIVE_ROUTE, DEFAULT_READY_ROUTE], handleLiveness);
probeServer.addHandler(DEFAULT_METRICS_ROUTE,
(res, log) => queueProcessor.handleMetrics(res, log)
);
}
return next();
})
], err => {
if (err) {
log.error('error starting notification queue processor task', {
method: 'notification.task.queueProcessor',
error: err,
});
process.emit('SIGTERM');
}
});
process.on('SIGTERM', () => {
log.info('received SIGTERM, exiting');
queueProcessor.stop(error => {
if (error) {
log.error('failed to exit properly', {
error,
});
process.exit(1);
}
process.exit(0);
});
});