Skip to content

Commit c62122c

Browse files
authored
implement max job time (#73)
* implement max job time * timeout check in naive and TODO for adj latency stats * add timeout checks to example schedulers * test for max job time * add job timeout stats * new scoring option for adjusted latency
1 parent 0b99640 commit c62122c

20 files changed

Lines changed: 328 additions & 51 deletions

File tree

eudoxia/__main__.py

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from io import StringIO
1010
from pathlib import Path
1111
from eudoxia.simulator import run_simulator, parse_args_with_defaults, get_param_defaults
12+
from eudoxia.utils import Priority
1213
from eudoxia.scheduler.decorators import SCHEDULING_ALGOS
1314
from eudoxia.workload.csv_io import CSVWorkloadReader, CSVWorkloadWriter, WorkloadTraceGenerator
1415
from eudoxia.workload import WorkloadGenerator
@@ -53,25 +54,58 @@ def run_command(params_file, workload=None):
5354
print(f" Assignments: {stats.assignments}")
5455
print(f" Suspensions: {stats.suspensions}")
5556
print(f" Failures: {stats.failures}")
56-
print(f" Failure/error counts: {stats.failure_error_counts}")
57+
print(f" Container failure counts: {stats.failure_error_counts}")
5758
print(f" Mean memory allocated: {stats.mean_memory_allocated_percent:.1f}%")
5859
print(f" Mean memory consumed: {stats.mean_memory_consumed_percent:.1f}%")
5960
print()
6061
print(" Pipeline Stats:")
61-
print(" " + "-" * 68)
62-
print(f" {'Priority':<15} {'Arrived':>10} {'Completed':>10} {'Mean (s)':>12} {'P99 (s)':>12}")
63-
print(" " + "-" * 68)
62+
print(" " + "-" * 80)
63+
print(f" {'Priority':<15} {'Arrived':>10} {'Completed':>10} {'Timed Out':>10} {'Mean (s)':>12} {'P99 (s)':>12}")
64+
print(" " + "-" * 80)
6465
pipeline_stats = [
6566
("All", stats.pipelines_all),
6667
("Query", stats.pipelines_query),
6768
("Interactive", stats.pipelines_interactive),
6869
("Batch", stats.pipelines_batch),
6970
]
7071
for name, pstats in pipeline_stats:
71-
print(f" {name:<15} {pstats.arrival_count:>10} {pstats.completion_count:>10} {pstats.mean_latency_seconds:>12.2f} {pstats.p99_latency_seconds:>12.2f}")
72-
print(" " + "-" * 68)
72+
print(f" {name:<15} {pstats.arrival_count:>10} {pstats.completion_count:>10} {pstats.timeout_count:>10} {pstats.mean_latency_seconds:>12.2f} {pstats.p99_latency_seconds:>12.2f}")
73+
print(" " + "-" * 80)
7374
print()
74-
print(f" Adjusted latency: {stats.adjusted_latency():.2f}s")
75+
76+
# print adjusted latency, which puts more weight to high-priority
77+
# jobs (like query).
78+
#
79+
# it also penalizes the metric for unfinished work, with the
80+
# approach depending on whether there is a max job time. When
81+
# max_job_seconds is set, each unfinished pipeline is assigned a
82+
# penalty latency of 2x the max job time. Otherwise, the weighted
83+
# mean latency is divided by the completion rate (so finishing half
84+
# the work doubles the metric).
85+
weights = {
86+
Priority.QUERY: 10,
87+
Priority.INTERACTIVE: 5,
88+
Priority.BATCH_PIPELINE: 1,
89+
}
90+
max_job_seconds = params["max_job_seconds"]
91+
if max_job_seconds > 0:
92+
penalty = 2 * max_job_seconds
93+
adjusted = stats.adjusted_latency(
94+
weights=weights,
95+
divide_by_completion_rate=False,
96+
unfinished_penalty_seconds=penalty,
97+
)
98+
print(f" Adjusted latency: {adjusted:.2f}s")
99+
print(f" (weights: query={weights[Priority.QUERY]}, interactive={weights[Priority.INTERACTIVE]}, batch={weights[Priority.BATCH_PIPELINE]}; "
100+
f"unfinished penalty: {penalty}s = 2 * max_job_seconds)")
101+
else:
102+
adjusted = stats.adjusted_latency(
103+
weights=weights,
104+
divide_by_completion_rate=True,
105+
)
106+
print(f" Adjusted latency: {adjusted:.2f}s")
107+
print(f" (weights: query={weights[Priority.QUERY]}, interactive={weights[Priority.INTERACTIVE]}, batch={weights[Priority.BATCH_PIPELINE]}; "
108+
f"divided by completion rate)")
75109

76110

77111
def gentrace_command(params_file, output_file, force=False):

eudoxia/executor/executor.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ class Executor:
1111
Manager of a pool of resources and active containers, the Executor takes
1212
assignments and ensures that all costs and resources are accounted for and
1313
additional are allocated if instructed.
14-
14+
1515
Acts like a cluster manager that keeps track of utilization of machines
1616
(that is, resource pools).
1717
"""
18-
def __init__(self, num_pools, cpus_per_pool, ram_gb_per_pool, ticks_per_second,
18+
def __init__(self, clock, num_pools, cpus_per_pool, ram_gb_per_pool, ticks_per_second,
1919
allow_memory_overcommit=False, **kwargs):
20+
self.clock = clock
2021
self.num_pools = num_pools
2122
self.cpus_per_pool = cpus_per_pool
2223
self.ram_gb_per_pool = ram_gb_per_pool
@@ -26,7 +27,7 @@ def __init__(self, num_pools, cpus_per_pool, ram_gb_per_pool, ticks_per_second,
2627
# Initialize pools with identical resources
2728
self.pools: List[ResourcePool] = []
2829
for i in range(self.num_pools):
29-
new_pool = ResourcePool(pool_id=i, cpu_pool=cpus_per_pool, ram_pool=ram_gb_per_pool,
30+
new_pool = ResourcePool(clock, pool_id=i, cpu_pool=cpus_per_pool, ram_pool=ram_gb_per_pool,
3031
ticks_per_second=self.ticks_per_second,
3132
allow_memory_overcommit=allow_memory_overcommit, **kwargs)
3233
self.pools.append(new_pool)

eudoxia/executor/resource_pool.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ class ResourcePool:
1414
1515
A resource pool is analogous to a machine on which we can run containers.
1616
"""
17-
def __init__(self, pool_id, cpu_pool, ram_pool, ticks_per_second,
17+
def __init__(self, clock, pool_id, cpu_pool, ram_pool, ticks_per_second,
1818
multi_operator_containers=True, allow_memory_overcommit=False, **kwargs):
1919
# CONFIGURATION
20+
self.clock = clock
2021
self.pool_id = pool_id
2122
self.ticks_per_second = ticks_per_second
2223
self.tick_length_secs = 1.0 / ticks_per_second
@@ -141,6 +142,16 @@ def _run_out_of_memory_killer(self):
141142
break
142143
victim.kill("OOM")
143144

145+
def _run_out_of_time_killer(self):
146+
"""Kill containers that have ops belonging to a timed-out pipeline."""
147+
for c in self.active_containers:
148+
if c.is_completed():
149+
continue
150+
for op in c.operators:
151+
if op.pipeline.runtime_status().has_timed_out():
152+
c.kill("timeout")
153+
break
154+
144155
def run_one_tick(self, suspensions: List[Suspend],
145156
assignments: List[Assignment]) -> List[ExecutionResult]:
146157
"""
@@ -194,6 +205,9 @@ def run_one_tick(self, suspensions: List[Suspend],
194205
# Kill as necessary to keep within total and individual limits
195206
self._run_out_of_memory_killer()
196207

208+
# Kill containers with ops belonging to timed-out pipelines
209+
self._run_out_of_time_killer()
210+
197211
# Process completed containers (including those killed by pool-level OOM)
198212
to_remove = []
199213
for c in self.active_containers:

eudoxia/scheduler/naive.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def naive_pipeline(s, results: List[ExecutionResult],
3232
# this case
3333
if len(pipelines) == 0 and len(results) == 0:
3434
return [], []
35-
35+
3636
for p in pipelines:
3737
s.waiting_queue.append(p)
3838

@@ -54,8 +54,9 @@ def naive_pipeline(s, results: List[ExecutionResult],
5454
# find a pipeline with ops we can assign
5555
while s.waiting_queue:
5656
pipeline = s.waiting_queue.pop(0)
57-
has_failures = pipeline.runtime_status().state_counts[OperatorState.FAILED] > 0
58-
if pipeline.runtime_status().is_pipeline_successful() or has_failures:
57+
status = pipeline.runtime_status()
58+
has_failures = status.state_counts[OperatorState.FAILED] > 0
59+
if status.is_pipeline_successful() or has_failures or status.has_timed_out():
5960
# we don't retry, so anything complete or with failures
6061
# will be permanently removed from the queue
6162
continue

eudoxia/scheduler/overbook.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ def make_assignments(s) -> List[Assignment]:
100100
for op_idx, op in enumerate(s.op_queue):
101101
if s.pipeline_failures[op.pipeline.pipeline_id] >= MAX_FAILURES:
102102
continue
103+
if op.pipeline.runtime_status().has_timed_out():
104+
continue
103105
assert op.state() in ASSIGNABLE_STATES, f"op {op.id} of pipeline {op.pipeline.pipeline_id} was in queue, but has non-assignable state, {op.state()}"
104106

105107
assignment = try_make_assignment(s, op)

eudoxia/scheduler/priority.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,12 @@ def priority_scheduler(s, results: List[ExecutionResult],
162162
to_remove = []
163163
to_start = []
164164
for job in queue:
165+
if job.pipeline.runtime_status().has_timed_out():
166+
to_remove.append(job)
167+
continue
168+
165169
# checking which pool has the most available ram. MUST USE
166-
# COPIED STATS as scheduler is placing assignments into pools
170+
# COPIED STATS as scheduler is placing assignments into pools
167171
# TODO: PARAMATERIZE THIS
168172
pool_id = get_pool_with_max_avail_ram(s, pool_stats)
169173
# all pools depleted

eudoxia/scheduler/priority_pool.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,12 @@ def priority_pool_scheduler(s, results: List[ExecutionResult],
135135
to_remove = []
136136
to_start = []
137137
for job in queue:
138-
avail_ram = pool_stats[pool_id]["avail_ram"]
139-
avail_cpu = pool_stats[pool_id]["avail_cpu"]
138+
if job.pipeline.runtime_status().has_timed_out():
139+
to_remove.append(job)
140+
continue
141+
142+
avail_ram = pool_stats[pool_id]["avail_ram"]
143+
avail_cpu = pool_stats[pool_id]["avail_cpu"]
140144

141145
# the pool is depleted so we shouldn't make any allocations
142146
if avail_ram == 0 or avail_cpu == 0:

eudoxia/scheduler/rest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def rest_scheduler(s, results: List[ExecutionResult],
130130
s.other_pipelines[p.pipeline_id] = p
131131
for pipeline_id in list(s.other_pipelines.keys()):
132132
pipeline = s.other_pipelines[pipeline_id]
133-
if pipeline.runtime_status().is_pipeline_successful():
133+
if pipeline.runtime_status().is_pipeline_successful() or pipeline.runtime_status().has_timed_out():
134134
for op in pipeline.values:
135135
del s.operator_lookup[str(op.id)]
136136
del s.other_pipelines[pipeline_id]

0 commit comments

Comments
 (0)