Skip to content

Commit ca9bfc5

Browse files
author
Soichi Hayashi
committed
changed timing of resource_id update - before start_task so we don't over submit tasks
1 parent 5a3581d commit ca9bfc5

File tree

2 files changed

+87
-99
lines changed

2 files changed

+87
-99
lines changed

api/resource.js

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,43 +34,27 @@ exports.select = function(user, task, cb) {
3434
return;
3535
}
3636

37-
const find = {
37+
//now let's start out by all the resources that user has access and app is enabled on
38+
db.Resource.find({
3839
status: {$ne: "removed"},
3940
active: true,
40-
/*
41-
"$or": [
42-
43-
//we don't want to use resource simply because user owns it.
44-
//for locked down project that user want to be specific about which resource to use
45-
//we don't want a owner of resource submit jobs and end up running the job somewhere
46-
//{user_id: user.sub},
47-
48-
{
49-
gids: {"$in": task.gids||[1]}
50-
},
51-
],
52-
*/
5341
gids: {"$in": task.gids},
5442
'config.services.name': task.service,
55-
}
56-
57-
db.Resource.find(find).lean().sort('create_date').exec((err, resources)=>{
43+
}).lean().sort('create_date').exec((err, resources)=>{
5844
if(err) return cb(err);
59-
if(task.preferred_resource_id) console.info("user preferred_resource_id:"+task.preferred_resource_id);
45+
6046
//select the best resource based on the task
6147
var best = null;
6248
var best_score = null;
6349
var considered = [];
6450
async.eachSeries(resources, (resource, next_resource)=>{
65-
//console.log("scoring "+resource.name);
6651
score_resource(user, resource, task, (err, score, detail)=>{
6752
if(score === null) {
6853
//not configured to run on this resource.. ignore
6954
//console.log("no score produced for "+resource.name);
7055
return next_resource();
7156
}
7257

73-
//let resource_detail = config.resources[resource.resource_id];
7458
let consider = {
7559
_id: resource._id,
7660
id: resource._id, //deprecated.. use _id
@@ -159,19 +143,23 @@ function score_resource(user, resource, task, cb) {
159143
});
160144
}
161145
if(score === null) return cb(null, null); //this resource doesn't know about this service..
162-
//console.log("scoring", resource.name);
163146

164147
let maxtask = resource.config.maxtask;
165148
if(maxtask === undefined) maxtask = 1; //backward compatibility
166149
if(maxtask == 0) return cb(null, null); //can't run here
167150

168151
db.Task.countDocuments({
152+
_id: {$ne: task._id}, //don't count myself waiting
169153
resource_id: resource._id,
170154
$or: [
171-
{status: "running"},
172-
{status: "requested", start_date: {$exists: true}}, //starting..
155+
{
156+
status: "running",
157+
},
158+
{
159+
status: "requested",
160+
start_date: {$exists: true},
161+
}, //starting..
173162
],
174-
_id: {$ne: task._id}, //don't count myself waiting
175163
}, (err, running)=>{
176164
if(err) console.error(err);
177165

bin/task.js

Lines changed: 75 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ db.init(function(err) {
7070
});
7171

7272
//https://github.com/soichih/workflow/issues/15
73-
function set_nextdate(task) {
73+
function setNextDate(task) {
7474
switch(task.status) {
7575
case "failed":
7676
case "finished":
@@ -138,7 +138,7 @@ function check(cb) {
138138
task.deps_config = task.deps.map(dep=>{task: dep});
139139
}
140140

141-
set_nextdate(task);
141+
setNextDate(task);
142142
_counts.tasks++;
143143
console.log("------- ", task._id.toString(), "user:", task.user_id, task.status, task.service, task.name);
144144
//console.log("request_date", task.request_date);
@@ -166,14 +166,13 @@ function check(cb) {
166166
}
167167

168168
let previous_status = task.status;
169-
task.handle_date = new Date(); //record the handle date
169+
task.handle_date = new Date();
170170
handler(task, async (err, skipSave)=>{
171171
if(err) console.error(err); //continue
172172

173173
//handle_requested split start_task which does its own task.save().
174174
//to prevent parallel save, I let the last save from handle_requested by start_task
175175
//so we should not save here..
176-
//if(skipSave) console.log("skipping task.save()---", skipSave);
177176
if(!skipSave) await task.save();
178177

179178
//if task status changed, update instance status also
@@ -341,7 +340,7 @@ function handle_housekeeping(task, cb) {
341340
});
342341
}
343342

344-
function handle_requested(task, next) {
343+
async function handle_requested(task, next) {
345344

346345
const now = new Date();
347346
let initialState = task.status;
@@ -352,13 +351,13 @@ function handle_requested(task, next) {
352351
//WARNING - don't run anything asynchrnous after checking for task.start_date before I save the task with new start_date
353352
if(task.start_date) {
354353
let starting_for = now - task.start_date;
355-
console.log("start_date is set", starting_for);
354+
//console.log("start_date is set", starting_for);
356355
if(starting_for < 1000*60*30) {
357356
console.log("job seems to be still starting.. for "+starting_for);
358357
task.status_msg = "Job poked at "+now.toLocaleString()+" but job is still starting.. for "+starting_for/1000+"secs";
359358
return next();
360359
}
361-
console.error("start_date is set on requested job, but it's been a while... guess it failed to start but didn't have start_date cleared.. proceeding?");
360+
//console.error("start_date is set on requested job, but it's been a while... guess it failed to start but didn't have start_date cleared.. proceeding?");
362361
}
363362

364363
//check if remove_date has not been reached (maybe set by request_task_removal got overridden)
@@ -393,7 +392,7 @@ function handle_requested(task, next) {
393392
task.fail_date = new Date();
394393
return next();
395394
}
396-
395+
397396
//fail the task if any dependency is removed
398397
if(removed_deps.length > 0) {
399398
console.debug("dependency removed.. failing this task");
@@ -402,7 +401,7 @@ function handle_requested(task, next) {
402401
task.fail_date = new Date();
403402
return next();
404403
}
405-
404+
406405
//fail if requested for too long
407406
var reqtime = now - (task.request_date||task.create_date); //request_date may not be set for old task
408407
if(reqtime > 1000 * 3600*24*20) {
@@ -418,84 +417,85 @@ function handle_requested(task, next) {
418417
//when dependency finished, it should auto-poke this task. so it's okay for this to be long
419418
task.next_date = new Date(Date.now()+1000*3600*24);
420419
return next();
421-
}
420+
}
422421

423422
//set start date before checking for resource_select to prevent this task from getting double processed
423+
//also make sure we get correct count for running/starting task in score_resource
424424
task.status_msg = "Looking for resource";
425425
task.start_date = new Date();
426-
task.save(err=>{
427-
if(err) console.error(err);
426+
await task.save()
428427

429-
let user = {
430-
sub: task.user_id,
431-
gids: task.gids,
432-
}
433-
_resource_select(user, task, function(err, resource, score, considered) {
434-
if(err) return next(err);
435-
if(!resource || resource.status == "removed") {
436-
437-
//check again in N minutes where N is determined by the number of tasks the project is running (and requested)
438-
//this should make sure that no project will consume all available slots simply because the project
439-
//submits tons of tasks..
440-
//TODO - another way to do this might be to find the max next_date and add +10 seconds to that?
441-
db.Task.countDocuments({status: "running", _group_id: task._group_id}, (err, running_count)=>{
428+
_resource_select({
429+
//mock user object
430+
sub: task.user_id,
431+
gids: task.gids,
432+
}, task, async (err, resource, score, considered)=>{
433+
if(err) return next(err);
434+
if(!resource || resource.status == "removed") {
435+
436+
//check again in N minutes where N is determined by the number of tasks the project is running (and requested)
437+
//this should make sure that no project will consume all available slots simply because the project
438+
//submits tons of tasks..
439+
//TODO - another way to do this might be to find the max next_date and add +10 seconds to that?
440+
db.Task.countDocuments({status: "running", _group_id: task._group_id}, (err, running_count)=>{
441+
if(err) return next(err);
442+
db.Task.countDocuments({status: "requested", _group_id: task._group_id}, (err, requested_count)=>{
442443
if(err) return next(err);
443-
db.Task.countDocuments({status: "requested", _group_id: task._group_id}, (err, requested_count)=>{
444-
if(err) return next(err);
445444

446-
//penalize projects that are running a lot of jobs already (15 seconds per job)
447-
//also add up to an hour for projects that has a lot of jobs requested (1 second each)
448-
let secs = (15*running_count)+Math.min(requested_count, 3600);
449-
secs = Math.max(secs, 15); //min 15 seconds
445+
//penalize projects that are running a lot of jobs already (15 seconds per job)
446+
//also add up to an hour for projects that has a lot of jobs requested (1 second each)
447+
let secs = (15*running_count)+Math.min(requested_count, 3600);
448+
secs = Math.max(secs, 15); //min 15 seconds
450449

451-
console.log("can't find resource.. retry in %d secs -- running:%d group_id:%d(requested:%d)", secs, running_count, task._group_id, requested_count);
450+
console.log("can't find resource.. retry in %d secs -- running:%d group_id:%d(requested:%d)", secs, running_count, task._group_id, requested_count);
452451

453-
task.status_msg = "No resource currently available to run this task.. waiting.. ";
454-
task.next_date = new Date(Date.now()+1000*secs);
455-
task.start_date = undefined; //reset start_date so it will be handled again later
456-
return next();
457-
});
452+
task.status_msg = "No resource currently available to run this task.. waiting.. ";
453+
task.next_date = new Date(Date.now()+1000*secs);
454+
task.start_date = undefined; //reset start_date so it will be handled again later
455+
return next();
458456
});
459-
return;
460-
}
457+
});
458+
return;
459+
}
461460

462-
//ready to start it!
463-
start_task(task, resource, considered, err=>{
464-
if(err) {
465-
//permanently failed to start (or running_sync failed).. mark the task as failed
466-
console.error("start_task failed. taskid:", task._id.toString(), err);
467-
task.status = "failed";
468-
task.status_msg = err;
469-
task.fail_date = new Date();
470-
}
471-
472-
//shouldn't we do this in start_task?
473-
task.resource_id = resource._id;
474-
task.resource_ids.addToSet(resource._id);
475-
476-
//if we couldn't start (in case of retry), reset start_date so we can handle it later again
477-
if(task.status == "requested") task.start_date = undefined;
478-
479-
//check() handles save/update_instance_status, but we are diverging here..
480-
console.log(task.status_msg);
481-
482-
task.save(err=>{
483-
if(err) console.error(err);
461+
//we need to mark starting resource id so we don't over count while starting jobs
462+
task.resource_id = resource._id;
463+
await task.save();
484464

485-
//if status changes, then let's update instance status also
486-
if(task.status != initialState) {
487-
common.update_instance_status(task.instance_id, err=>{
488-
if(err) console.error(err);
489-
});
490-
}
491-
});
492-
});
465+
//ready to start it! (THIS FORKS the handler)
466+
start_task(task, resource, considered, err=>{
467+
if(err) {
468+
//permanently failed to start (or running_sync failed).. mark the task as failed
469+
console.error("start_task failed. taskid:", task._id.toString(), err);
470+
task.status = "failed";
471+
task.status_msg = err;
472+
task.fail_date = new Date();
473+
}
474+
475+
task.resource_ids.addToSet(resource._id);
476+
477+
//if we couldn't start (in case of retry), reset start_date so we can handle it later again
478+
if(task.status == "requested") task.start_date = undefined;
493479

494-
//Don't wait for start_task to finish.. could take a while to start.. (especially rsyncing could take a while)..
495-
//start_task is designed to be able to run concurrently..
496-
console.log("started task.. skiping save");
497-
next(null, true); //skip saving to prevent parallel save with start_task
480+
//check() handles save/update_instance_status, but we are diverging here..
481+
console.log(task.status_msg);
482+
483+
task.save(err=>{
484+
if(err) console.error(err);
485+
486+
//if status changes, then let's update instance status also
487+
if(task.status != initialState) {
488+
common.update_instance_status(task.instance_id, err=>{
489+
if(err) console.error(err);
490+
});
491+
}
492+
});
498493
});
494+
495+
//Don't wait for start_task to finish.. could take a while to start.. (especially rsyncing could take a while)..
496+
//start_task is designed to be able to run concurrently..
497+
console.log("started task.. skiping save");
498+
next(null, true); //skip saving to prevent parallel save with start_task
499499
});
500500
}
501501

@@ -1044,7 +1044,7 @@ function start_task(task, resource, considered, cb) {
10441044
//it might be also handy to run app installed executable, but maybe it will do more harm than good?
10451045
//if we get rid of this, I need to have all apps register hooks like "start": "./start.sh". instead of just "start.sh"
10461046
stream.write("export PATH=$PATH:$PWD\n");
1047-
1047+
10481048
//report why the resource was picked
10491049
stream.write("\n# why was this resource chosen?\n");
10501050
considered.forEach(con=>{
@@ -1058,7 +1058,7 @@ function start_task(task, resource, considered, cb) {
10581058
});
10591059
});
10601060
},
1061-
1061+
10621062
//finally, run the service!
10631063
next=>{
10641064
if(service_detail.run) return next(); //some app uses run instead of start .. run takes precedence

0 commit comments

Comments
 (0)