Skip to content

Commit 7267212

Browse files
authored
Retry connection to RabbitMQ on Scheduler Startup (#32)
* Refactor rabbit connection retry * Refactor services that scheduler starts to only come up after RabittMQ connection is established * Subscribe only after initialize * Rabbot promise hack to handle initial failure
1 parent 91a21ad commit 7267212

8 files changed

Lines changed: 358 additions & 286 deletions

File tree

bin/scheduler.js

Lines changed: 65 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,13 @@ const rabbit = require('../lib/rabbitfactory');
99
const config = require('../lib/config');
1010
const constants = require('../lib/constants');
1111
const Job = require('../models/job');
12-
// Don't remove. Loading this causes logger to start
13-
const logger = require('../lib/logwriter'); // eslint-disable-line no-unused-vars
14-
// Don't remove. Loading this causes status to start
15-
const status = require('../lib/mqstatus'); // eslint-disable-line no-unused-vars
16-
// Don't remove. Loading this causes jobwatcher to start
17-
const jobWatcher = require('../lib/k8s/jobwatcher'); // eslint-disable-line no-unused-vars
18-
// Don't remove. Loading this causes eventwatcher to start
19-
const eventWatcher = require('../lib/k8s/eventwatcher'); // eslint-disable-line no-unused-vars
20-
// Don't remove. Loading this causes the zombieRuns to start
21-
const zombieRuns = require('../lib/k8s/zombieruns'); // eslint-disable-line no-unused-vars
22-
// Don't remove. Loading this causes the cleanupJobs to start
23-
const jobCleanup = require('../lib/k8s/jobcleanup'); // eslint-disable-line no-unused-vars
12+
13+
const logwriter = require('../lib/logwriter');
14+
const status = require('../lib/mqstatus');
15+
const jobWatcher = require('../lib/k8s/jobwatcher');
16+
const eventWatcher = require('../lib/k8s/eventwatcher');
17+
const zombieRuns = require('../lib/k8s/zombieruns');
18+
const jobCleanup = require('../lib/k8s/jobcleanup');
2419

2520
const iostatus = require('../lib/iostatus');
2621

@@ -30,6 +25,14 @@ const _loadedJobs = {};
3025

3126
(async () => {
3227
try {
28+
await rabbit.initialize();
29+
debug('starting services');
30+
status.start();
31+
logwriter.start();
32+
eventWatcher.start();
33+
jobWatcher.start();
34+
zombieRuns.start();
35+
jobCleanup.start();
3336
const jobs = await Job.loadAllJobs();
3437
debug('loading jobs');
3538
jobs.forEach((job) => {
@@ -43,6 +46,56 @@ const _loadedJobs = {};
4346

4447
console.info('Scheduler started');
4548
debug('Scheduler started connected to %s', config.db);
49+
50+
// Used so scheduler is notified of changes and can add/remove/change jobs
51+
rabbit.subscribe(constants.QUEUES.SCHEDULER, async (message) => {
52+
async function updateJob(jobId) {
53+
deleteJob(jobId, true);
54+
55+
const dbJob = await Job.findById(new ObjectId(jobId));
56+
_loadedJobs[jobId] = dbJob;
57+
dbJob.startCron();
58+
debug('updated jobId: %s', jobId);
59+
iostatus.sendJobChange(jobId);
60+
}
61+
62+
function deleteJob(jobId, partOfUpdate) {
63+
if (_loadedJobs[jobId]) {
64+
// Get it
65+
const loadedJob = _loadedJobs[jobId];
66+
// Remove it from list
67+
delete _loadedJobs[jobId];
68+
// Stop it
69+
loadedJob.stopCron();
70+
debug('removed jobId: %s', jobId);
71+
}
72+
73+
if (!partOfUpdate) {
74+
iostatus.sendJobChange(jobId);
75+
}
76+
}
77+
78+
switch (message.action) {
79+
case constants.SCHEDULERACTION.NEW:
80+
// New job has been added. Make sure to handle case where
81+
// message is old and job has already been loaded in scheduler
82+
// if this is the case we ignore the message
83+
await updateJob(message.jobId);
84+
break;
85+
86+
case constants.SCHEDULERACTION.DELETED:
87+
// Job has been deleted
88+
deleteJob(message.jobId, false);
89+
break;
90+
91+
case constants.SCHEDULERACTION.UPDATED:
92+
// Job has changed
93+
await updateJob(message.jobId);
94+
break;
95+
}
96+
97+
return true;
98+
});
4699
}
47100
catch (err) {
48101
console.error('Error loading jobs', err);
@@ -51,52 +104,3 @@ const _loadedJobs = {};
51104
})();
52105

53106

54-
// Used so scheduler is notified of changes and can add/remove/change jobs
55-
rabbit.subscribe(constants.QUEUES.SCHEDULER, async (message) => {
56-
async function updateJob(jobId) {
57-
deleteJob(jobId, true);
58-
59-
const dbJob = await Job.findById(new ObjectId(jobId));
60-
_loadedJobs[jobId] = dbJob;
61-
dbJob.startCron();
62-
debug('updated jobId: %s', jobId);
63-
iostatus.sendJobChange(jobId);
64-
}
65-
66-
function deleteJob(jobId, partOfUpdate) {
67-
if (_loadedJobs[jobId]) {
68-
// Get it
69-
const loadedJob = _loadedJobs[jobId];
70-
// Remove it from list
71-
delete _loadedJobs[jobId];
72-
// Stop it
73-
loadedJob.stopCron();
74-
debug('removed jobId: %s', jobId);
75-
}
76-
77-
if (!partOfUpdate) {
78-
iostatus.sendJobChange(jobId);
79-
}
80-
}
81-
82-
switch (message.action) {
83-
case constants.SCHEDULERACTION.NEW:
84-
// New job has been added. Make sure to handle case where
85-
// message is old and job has already been loaded in scheduler
86-
// if this is the case we ignore the message
87-
await updateJob(message.jobId);
88-
break;
89-
90-
case constants.SCHEDULERACTION.DELETED:
91-
// Job has been deleted
92-
deleteJob(message.jobId, false);
93-
break;
94-
95-
case constants.SCHEDULERACTION.UPDATED:
96-
// Job has changed
97-
await updateJob(message.jobId);
98-
break;
99-
}
100-
101-
return true;
102-
});

lib/k8s/eventwatcher.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ async function addToLog(runId, logTime, logMessage) {
3636
}
3737
}
3838

39-
const watcher = new Watcher(`/api/v1/namespaces/${constants.NAMESPACE}/events`, async (type, apiObj, watchObj) => {
39+
module.exports = new Watcher(`/api/v1/namespaces/${constants.NAMESPACE}/events`, async (type, apiObj, watchObj) => {
4040
// Only interested in pod events
4141
if (apiObj.involvedObject && apiObj.involvedObject.kind === 'Pod') {
4242
const podName = apiObj.involvedObject.name;
@@ -74,4 +74,3 @@ const watcher = new Watcher(`/api/v1/namespaces/${constants.NAMESPACE}/events`,
7474
}
7575
});
7676

77-
watcher.start();

lib/k8s/jobcleanup.js

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,33 @@ const config = require('../config');
1212
const k8sClient = require('./clientFactory');
1313
const Job = require('./job');
1414

15-
debug('job cleanup interval: %d minutes', config.scheduler.jobCleanupFrequency);
16-
setInterval(async function () {
17-
debug('looking for jobs to cleanup');
18-
// , fieldSelector: 'status.phase!=Running,status.phase!=Pending' } }
19-
const jobs = await k8sClient.api.batch.listNamespacedJob(constants.NAMESPACE);
20-
debug(`${jobs.body.items.length} jobs found`);
21-
// Filter out inactive jobs
22-
const filteredJobs = _.filter(jobs.body.items, function (job) {
23-
return job.status.active !== 1;
24-
});
25-
if (filteredJobs.length > 0) {
26-
debug(`${filteredJobs.length} active jobs found`);
27-
async.eachLimit(filteredJobs, 5, async function (job) {
28-
if (job.metadata && job.metadata.labels && job.metadata.labels.runId) {
29-
debug(`Removing job for runId: ${job.metadata.labels.runId}`);
30-
await Job.remove(job.metadata.labels.runId);
31-
}
32-
}, function (err) {
33-
debug('Finished job cleanup');
34-
if (err) {
35-
console.error(err);
36-
}
15+
module.exports.start = function start() {
16+
debug('job cleanup interval: %d minutes', config.scheduler.jobCleanupFrequency);
17+
setInterval(async function () {
18+
debug('looking for jobs to cleanup');
19+
// , fieldSelector: 'status.phase!=Running,status.phase!=Pending' } }
20+
const jobs = await k8sClient.api.batch.listNamespacedJob(constants.NAMESPACE);
21+
debug(`${jobs.body.items.length} jobs found`);
22+
// Filter out inactive jobs
23+
const filteredJobs = _.filter(jobs.body.items, function (job) {
24+
return job.status.active !== 1;
3725
});
38-
}
39-
else {
40-
debug('No jobs found');
41-
}
42-
}, config.scheduler.jobCleanupFrequency * 60000);
26+
if (filteredJobs.length > 0) {
27+
debug(`${filteredJobs.length} active jobs found`);
28+
async.eachLimit(filteredJobs, 5, async function (job) {
29+
if (job.metadata && job.metadata.labels && job.metadata.labels.runId) {
30+
debug(`Removing job for runId: ${job.metadata.labels.runId}`);
31+
await Job.remove(job.metadata.labels.runId);
32+
}
33+
}, function (err) {
34+
debug('Finished job cleanup');
35+
if (err) {
36+
console.error(err);
37+
}
38+
});
39+
}
40+
else {
41+
debug('No jobs found');
42+
}
43+
}, config.scheduler.jobCleanupFrequency * 60000);
44+
};

lib/k8s/jobwatcher.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ async function updateStatus(message) {
2525
}
2626
}
2727

28-
const watcher = new Watcher(`/apis/batch/v1/namespaces/${constants.NAMESPACE}/jobs`, async (type, apiObj, watchObj) => {
28+
module.exports = new Watcher(`/apis/batch/v1/namespaces/${constants.NAMESPACE}/jobs`, async (type, apiObj, watchObj) => {
2929
if ((type === 'MODIFIED' || type === 'DELETED') && apiObj.status) {
3030
try {
3131
if (apiObj.status.completionTime) {
@@ -68,5 +68,3 @@ const watcher = new Watcher(`/apis/batch/v1/namespaces/${constants.NAMESPACE}/jo
6868
}
6969
}
7070
});
71-
72-
watcher.start();

lib/k8s/zombieruns.js

Lines changed: 57 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -32,66 +32,68 @@ async function updateStatus(message) {
3232
// Looks at any run in a idle or busy state with an
3333
// updatedAt that is more than 5 minutes old and
3434
// marks it as failed
35-
debug(`zombie run garbage collection interval: ${config.scheduler.zombieFrequency} minutes`);
36-
setInterval(async function () {
37-
try {
38-
debug('garbage collecting zombie runs');
39-
const zombieRuns = await Run.find({ $or: [{ status: constants.JOBSTATUS.BUSY }, { status: constants.JOBSTATUS.IDLE }, { status: constants.JOBSTATUS.SCHEDULED }] });
35+
module.exports.start = function start() {
36+
debug(`zombie run garbage collection interval: ${config.scheduler.zombieFrequency} minutes`);
37+
setInterval(async function () {
38+
try {
39+
debug('garbage collecting zombie runs');
40+
const zombieRuns = await Run.find({ $or: [{ status: constants.JOBSTATUS.BUSY }, { status: constants.JOBSTATUS.IDLE }, { status: constants.JOBSTATUS.SCHEDULED }] });
4041

41-
await async.eachLimit(zombieRuns, 5, async function (zombieRun) {
42-
const jobs = await k8sClient.api.batch.listNamespacedJob(constants.NAMESPACE, undefined, undefined, undefined, undefined, `runId=${zombieRun._id}`, 1);
43-
if (jobs.body.items.length > 0) {
44-
const job = jobs.body.items[0];
45-
debug('job', job);
46-
if (job.status.failed) {
47-
await updateStatus({ status: constants.JOBSTATUS.FAIL, runId: job.metadata.labels.runId, jobId: job.metadata.labels.jobId });
48-
zombieRun.status = constants.JOBSTATUS.FAIL;
49-
}
50-
else if (job.status.succeeded) {
51-
await updateStatus({ status: constants.JOBSTATUS.SUCCESS, runId: job.metadata.labels.runId, jobId: job.metadata.labels.jobId });
52-
zombieRun.status = constants.JOBSTATUS.SUCCESS;
42+
await async.eachLimit(zombieRuns, 5, async function (zombieRun) {
43+
const jobs = await k8sClient.api.batch.listNamespacedJob(constants.NAMESPACE, undefined, undefined, undefined, undefined, `runId=${zombieRun._id}`, 1);
44+
if (jobs.body.items.length > 0) {
45+
const job = jobs.body.items[0];
46+
debug('job', job);
47+
if (job.status.failed) {
48+
await updateStatus({ status: constants.JOBSTATUS.FAIL, runId: job.metadata.labels.runId, jobId: job.metadata.labels.jobId });
49+
zombieRun.status = constants.JOBSTATUS.FAIL;
50+
}
51+
else if (job.status.succeeded) {
52+
await updateStatus({ status: constants.JOBSTATUS.SUCCESS, runId: job.metadata.labels.runId, jobId: job.metadata.labels.jobId });
53+
zombieRun.status = constants.JOBSTATUS.SUCCESS;
54+
}
55+
else if (!job.status.active && moment().subtract(5, 'minutes').isAfter(zombieRun.createdAt)) {
56+
debug(`Job not found updating status to fail runId: ${job.metadata.labels.runId}`);
57+
await updateStatus({ status: constants.JOBSTATUS.FAIL, runId: job.metadata.labels.runId, jobId: job.metadata.labels.jobId });
58+
zombieRun.status = constants.JOBSTATUS.FAIL;
59+
}
5360
}
54-
else if (!job.status.active && moment().subtract(5, 'minutes').isAfter(zombieRun.createdAt)) {
55-
debug(`Job not found updating status to fail runId: ${job.metadata.labels.runId}`);
56-
await updateStatus({ status: constants.JOBSTATUS.FAIL, runId: job.metadata.labels.runId, jobId: job.metadata.labels.jobId });
61+
else {
62+
debug(`No k8s job found updating status to fail runId: ${zombieRun._id}`);
63+
await updateStatus({ status: constants.JOBSTATUS.FAIL, runId: zombieRun._id, jobId: zombieRun.jobId });
5764
zombieRun.status = constants.JOBSTATUS.FAIL;
5865
}
59-
}
60-
else {
61-
debug(`No k8s job found updating status to fail runId: ${zombieRun._id}`);
62-
await updateStatus({ status: constants.JOBSTATUS.FAIL, runId: zombieRun._id, jobId: zombieRun.jobId });
63-
zombieRun.status = constants.JOBSTATUS.FAIL;
64-
}
6566

66-
await async.parallel([
67-
async function () {
68-
try {
69-
return await zombieRun.save();
70-
}
71-
catch (err) {
72-
console.error('Error saving zombieRun', err);
73-
}
74-
},
75-
async function () {
76-
try {
77-
const job = await Job.findByJobId(zombieRun.jobId);
78-
if (!job) {
79-
console.error('Unable to find jobId: ' + zombieRun.jobId);
67+
await async.parallel([
68+
async function () {
69+
try {
70+
return await zombieRun.save();
8071
}
81-
else {
82-
job.lastStatus = zombieRun.status;
83-
await job.save();
72+
catch (err) {
73+
console.error('Error saving zombieRun', err);
74+
}
75+
},
76+
async function () {
77+
try {
78+
const job = await Job.findByJobId(zombieRun.jobId);
79+
if (!job) {
80+
console.error('Unable to find jobId: ' + zombieRun.jobId);
81+
}
82+
else {
83+
job.lastStatus = zombieRun.status;
84+
await job.save();
85+
}
86+
}
87+
catch (err) {
88+
console.error('Unable to find jobId: ' + zombieRun.jobId, err);
8489
}
85-
}
86-
catch (err) {
87-
console.error('Unable to find jobId: ' + zombieRun.jobId, err);
88-
}
8990

90-
}
91-
]);
92-
});
93-
}
94-
catch (err) {
95-
console.error('ZombieRuns error', err);
96-
}
97-
}, config.scheduler.zombieFrequency * 60000);
91+
}
92+
]);
93+
});
94+
}
95+
catch (err) {
96+
console.error('ZombieRuns error', err);
97+
}
98+
}, config.scheduler.zombieFrequency * 60000);
99+
};

0 commit comments

Comments
 (0)