Skip to content
Open
168 changes: 75 additions & 93 deletions work_queue/src/work_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ See the file COPYING for details.
#include "hash_table.h"
#include "interfaces_address.h"
#include "itable.h"
#include "skip_list.h"
#include "list.h"
#include "macros.h"
#include "username.h"
Expand Down Expand Up @@ -159,8 +160,8 @@ struct work_queue {

struct itable *tasks; // taskid -> task
struct itable *task_state_map; // taskid -> state
struct list *ready_list; // ready to be sent to a worker

struct skip_list *ready_list;
struct skip_list_cursor *ready_list_cursor;
struct hash_table *worker_table;
struct hash_table *worker_blocklist;
struct itable *worker_task_map;
Expand Down Expand Up @@ -356,8 +357,6 @@ const char *task_state_str(work_queue_task_state_t state);
static int task_state_is( struct work_queue *q, uint64_t taskid, work_queue_task_state_t state);
/* pointer to first task found with state. NULL if no such task */
static struct work_queue_task *task_state_any(struct work_queue *q, work_queue_task_state_t state);
/* number of tasks with state */
static int task_state_count( struct work_queue *q, const char *category, work_queue_task_state_t state);
/* number of tasks with the resource allocation request */
static int task_request_count( struct work_queue *q, const char *category, category_allocation_t request);

Expand Down Expand Up @@ -1993,30 +1992,20 @@ static void fetch_output_from_worker(struct work_queue *q, struct work_queue_wor
return;
}

static int expire_waiting_tasks(struct work_queue *q)
{
struct work_queue_task *t;
static int expire_waiting_task(struct work_queue *q, struct work_queue_task *t, double current_time)
{
int expired = 0;

int tasks_considered = 0;
double current_time = timestamp_get() / ONE_SECOND;
while( (t = list_rotate(q->ready_list)) ) {
if(tasks_considered > q->attempt_schedule_depth) {
return expired;
}
if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) {
update_task_result(t, WORK_QUEUE_RESULT_TASK_TIMEOUT);
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
expired++;
list_pop_tail(q->ready_list);
} else if(t->max_retries > 0 && t->try_count > t->max_retries) {
update_task_result(t, WORK_QUEUE_RESULT_MAX_RETRIES);
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
expired++;
list_pop_tail(q->ready_list);
}
tasks_considered++;
if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) {
update_task_result(t, WORK_QUEUE_RESULT_TASK_TIMEOUT);
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
expired = 1;
} else if(t->max_retries > 0 && t->try_count > t->max_retries) {
update_task_result(t, WORK_QUEUE_RESULT_MAX_RETRIES);
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
expired = 1;
}

return expired;
}

Expand Down Expand Up @@ -2416,11 +2405,14 @@ static struct rmsummary *total_resources_needed(struct work_queue *q) {
struct rmsummary *total = rmsummary_create(0);

/* for waiting tasks, we use what they would request if dispatched right now. */
list_first_item(q->ready_list);
while((t = list_next_item(q->ready_list))) {

struct skip_list_cursor *cursor = skip_list_cursor_create(q->ready_list);
skip_list_seek(cursor, 0);
SKIP_LIST_ITERATE(cursor, t) {
const struct rmsummary *s = task_min_resources(q, t);
rmsummary_add(total, s);
}
skip_list_cursor_delete(cursor);

/* for running tasks, we use what they have been allocated already. */
char *key;
Expand Down Expand Up @@ -3757,7 +3749,7 @@ int in_ramp_down(struct work_queue *q) {
return 0;
}

if(hash_table_size(q->worker_table) > list_size(q->ready_list)) {
if(hash_table_size(q->worker_table) > skip_list_size(q->ready_list)) {
return 1;
}

Expand Down Expand Up @@ -4581,21 +4573,27 @@ static void reap_task_from_worker(struct work_queue *q, struct work_queue_worker
count_worker_resources(q, w);
}

static int send_one_task( struct work_queue *q )
static int send_one_task_with_cr( struct work_queue *q , struct skip_list_cursor *cr , int iter_depth, double now_secs)
{
struct work_queue_task *t;
struct work_queue_worker *w;

int tasks_considered = 0;
timestamp_t now = timestamp_get();
int iter_count = 0;

while( (t = list_rotate(q->ready_list)) ) {
if(tasks_considered++ > q->attempt_schedule_depth) {
return 0;
SKIP_LIST_ITERATE(cr, t) {
if(iter_count >= iter_depth) {
break;
}
iter_count++;

// Expire task if its wait time has been exceeded.
if(expire_waiting_task(q, t, now_secs)) {
skip_list_remove_here(cr);
continue;
}

// Skip task if min requested start time not met.
if(t->resources_requested->start > now) {
if(t->resources_requested->start > (timestamp_t)(now_secs * ONE_SECOND)) {
continue;
}

Expand All @@ -4612,7 +4610,8 @@ static int send_one_task( struct work_queue *q )
}

// Otherwise, remove it from the ready list and start it:
list_pop_tail(q->ready_list);
skip_list_remove_here(cr);

commit_task_to_worker(q,w,t);
return 1;
}
Expand All @@ -4621,6 +4620,24 @@ static int send_one_task( struct work_queue *q )
return 0;
}

static int send_one_task(struct work_queue *q)
{
double now_secs = ((double)timestamp_get()) / ONE_SECOND;
int iter_depth = MIN(skip_list_size(q->ready_list), q->attempt_schedule_depth);

// if ready_tasks_cr is not pointing at a task
// (e.g it is at the end of the list, or first time through)
// set it to the beginning of the ready list.
if (!skip_list_get(q->ready_list_cursor, NULL)) {
skip_list_seek(q->ready_list_cursor, 0);
}

int sent = 0;
sent = send_one_task_with_cr(q, q->ready_list_cursor, iter_depth, now_secs);

return sent;
}

static void print_large_tasks_warning(struct work_queue *q)
{
timestamp_t current_time = timestamp_get();
Expand All @@ -4638,8 +4655,9 @@ static void print_large_tasks_warning(struct work_queue *q)

struct rmsummary *largest_unfit_task = rmsummary_create(-1);

list_first_item(q->ready_list);
while( (t = list_next_item(q->ready_list))){
struct skip_list_cursor *cur = skip_list_cursor_create(q->ready_list);
skip_list_seek(cur, 0);
SKIP_LIST_ITERATE(cur, t){
// check each task against the queue of connected workers
int bit_set = is_task_larger_than_connected_workers(q, t);
if(bit_set) {
Expand All @@ -4659,6 +4677,7 @@ static void print_large_tasks_warning(struct work_queue *q)
unfit_gpu++;
}
}
skip_list_cursor_delete(cur);

if(unfit_core || unfit_mem || unfit_disk || unfit_gpu){
notice(D_WQ,"There are tasks that cannot fit any currently connected worker:\n");
Expand Down Expand Up @@ -5877,7 +5896,8 @@ struct work_queue *work_queue_ssl_create(int port, const char *key, const char *

q->next_taskid = 1;

q->ready_list = list_create();
q->ready_list = skip_list_create(2, 0.5);
q->ready_list_cursor = skip_list_cursor_create(q->ready_list);

q->tasks = itable_create(0);
q->task_state_map = itable_create(0);
Expand Down Expand Up @@ -6209,7 +6229,13 @@ void work_queue_delete(struct work_queue *q)
hash_table_clear(q->categories, (void *)category_free);
hash_table_delete(q->categories);

list_delete(q->ready_list);
skip_list_cursor_delete(q->ready_list_cursor);

while (skip_list_pop_head(q->ready_list)) {
/* drain the list so skip_list_delete can free the nodes.
The struct vine_task is deleted in delete_task_at_exit below. */
}
skip_list_delete(q->ready_list);

itable_delete(q->tasks);

Expand Down Expand Up @@ -6389,31 +6415,20 @@ char *work_queue_monitor_wrap(struct work_queue *q, struct work_queue_worker *w,
return wrap_cmd;
}

static double work_queue_task_priority(void *item) {
assert(item);
struct work_queue_task *t = item;
return t->priority;
}

/* Put a given task on the ready list, taking into account the task priority and the queue schedule. */

void push_task_to_ready_list( struct work_queue *q, struct work_queue_task *t )
{
int by_priority = 1;

if(t->result == WORK_QUEUE_RESULT_RESOURCE_EXHAUSTION) {
/* when a task is resubmitted given resource exhaustion, we
* push it at the head of the list, so it gets to run as soon
* as possible. This avoids the issue in which all 'big' tasks
* fail because the first allocation is too small. */
by_priority = 0;
const double *head_priority = skip_list_peek_head_priority(q->ready_list);
t->priority = *head_priority + 1.0;
}

if(by_priority) {
list_push_priority(q->ready_list, work_queue_task_priority, t);
} else {
list_push_head(q->ready_list,t);
}
skip_list_insert(q->ready_list, t, t->priority, -t->taskid);

/* If the task has been used before, clear out accumulated state. */
clean_task_state(t, 0);
Expand Down Expand Up @@ -6653,24 +6668,6 @@ static struct work_queue_task *task_state_any_with_tag(struct work_queue *q, wor
return NULL;
}

static int task_state_count(struct work_queue *q, const char *category, work_queue_task_state_t state) {
struct work_queue_task *t;
uint64_t taskid;

int count = 0;

itable_firstkey(q->tasks);
while( itable_nextkey(q->tasks, &taskid, (void **) &t) ) {
if( task_state_is(q, taskid, state) ) {
if(!category || strcmp(category, t->category) == 0) {
count++;
}
}
}

return count;
}

static int task_request_count( struct work_queue *q, const char *category, category_allocation_t request) {
struct work_queue_task *t;
uint64_t taskid;
Expand Down Expand Up @@ -7041,17 +7038,7 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo
// retrieved at least one task
events++;
compute_manager_load(q, 1);
continue;
}

// expired tasks
BEGIN_ACCUM_TIME(q, time_internal);
result = expire_waiting_tasks(q);
END_ACCUM_TIME(q, time_internal);
if(result) {
// expired at least one task
events++;
compute_manager_load(q, 1);
skip_list_seek(q->ready_list_cursor, 0); // reset ready list cursor on receive event
continue;
}

Expand Down Expand Up @@ -7100,6 +7087,7 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo
if(result) {
// accepted at least one worker
events++;
skip_list_seek(q->ready_list_cursor, 0); // reset ready list cursor on new worker event
continue;
}

Expand Down Expand Up @@ -7189,21 +7177,15 @@ int work_queue_hungry(struct work_queue *q)
int64_t ready_task_gpus = 0;

struct work_queue_task *t;

int count = task_state_count(q, NULL, WORK_QUEUE_TASK_READY);

while(count > 0)
{
count--;
t = list_pop_head(q->ready_list);

struct skip_list_cursor *cursor = skip_list_cursor_create(q->ready_list);
skip_list_seek(cursor, 0);
SKIP_LIST_ITERATE(cursor, t) {
ready_task_cores += MAX(1,t->resources_requested->cores);
ready_task_memory += t->resources_requested->memory;
ready_task_disk += t->resources_requested->disk;
ready_task_gpus += t->resources_requested->gpus;

list_push_tail(q->ready_list, t);
}
skip_list_cursor_delete(cursor);

//check possible limiting factors
//return false if required resources exceed available resources
Expand Down