Skip to content

Commit 977f9c9

Browse files
committed
implement max job time
1 parent 9d89c2d commit 977f9c9

8 files changed

Lines changed: 66 additions & 24 deletions

File tree

eudoxia/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def run_command(params_file, workload=None):
5353
print(f" Assignments: {stats.assignments}")
5454
print(f" Suspensions: {stats.suspensions}")
5555
print(f" Failures: {stats.failures}")
56-
print(f" Failure/error counts: {stats.failure_error_counts}")
56+
print(f" Container failure counts: {stats.failure_error_counts}")
5757
print(f" Mean memory allocated: {stats.mean_memory_allocated_percent:.1f}%")
5858
print(f" Mean memory consumed: {stats.mean_memory_consumed_percent:.1f}%")
5959
print()

eudoxia/executor/executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def get_consumed_ram_gb(self) -> float:
6464
"""Return total consumed RAM across all pools in GB."""
6565
return sum(p.get_consumed_ram_gb() for p in self.pools)
6666

67-
def run_one_tick(self, suspensions: List[Suspend],
67+
def run_one_tick(self, current_tick: int, suspensions: List[Suspend],
6868
assignments: List[Assignment]) -> List[ExecutionResult]:
6969
'''
7070
Largely passing through relevant assignments to the pool they belong to.
@@ -73,7 +73,7 @@ def run_one_tick(self, suspensions: List[Suspend],
7373
for id_ in range(self.num_pools):
7474
pool_suspensions = [s for s in suspensions if s.pool_id == id_]
7575
pool_assignments = [a for a in assignments if a.pool_id == id_]
76-
pool_results = self.pools[id_].run_one_tick(pool_suspensions, pool_assignments)
76+
pool_results = self.pools[id_].run_one_tick(current_tick, pool_suspensions, pool_assignments)
7777
results.extend(pool_results)
7878

7979
return results

eudoxia/executor/resource_pool.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,17 @@ def _run_out_of_memory_killer(self):
141141
break
142142
victim.kill("OOM")
143143

144-
def run_one_tick(self, suspensions: List[Suspend],
144+
def _run_out_of_time_killer(self, current_tick: int):
145+
"""Kill containers that have ops belonging to a timed-out pipeline."""
146+
for c in self.active_containers:
147+
if c.is_completed():
148+
continue
149+
for op in c.operators:
150+
if op.pipeline.runtime_status().has_timed_out(current_tick):
151+
c.kill("timeout")
152+
break
153+
154+
def run_one_tick(self, current_tick: int, suspensions: List[Suspend],
145155
assignments: List[Assignment]) -> List[ExecutionResult]:
146156
"""
147157
Run a single tick for the executor, decrement remaining ticks for all
@@ -194,6 +204,9 @@ def run_one_tick(self, suspensions: List[Suspend],
194204
# Kill as necessary to keep within total and individual limits
195205
self._run_out_of_memory_killer()
196206

207+
# Kill containers with ops belonging to timed-out pipelines
208+
self._run_out_of_time_killer(current_tick)
209+
197210
# Process completed containers (including those killed by pool-level OOM)
198211
to_remove = []
199212
for c in self.active_containers:

eudoxia/simulator.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ def get_param_defaults() -> Dict:
230230
# random seed for workload generation
231231
"random_seed": 42,
232232

233+
### Pipeline Params ###
234+
# maximum job time in seconds (0 = no limit)
235+
"max_job_seconds": 0,
236+
233237
### Estimator Params ###
234238
# estimator algorithm: "" (default, no estimator) or "noisy"
235239
"estimator_algo": "",
@@ -315,8 +319,9 @@ def run_simulator(param_input: Union[str, Dict], workload: Workload = None) -> S
315319
handler.setFormatter(sim_formatter)
316320

317321
tick_number = 0
318-
max_ticks = int(params["duration"] * params["ticks_per_second"])
319-
logger.info(f"Running for {params['duration']}s or {max_ticks} ticks")
322+
max_simulation_ticks = int(params["duration"] * params["ticks_per_second"])
323+
max_job_ticks = int(params["max_job_seconds"] * ticks_per_second)
324+
logger.info(f"Running for {params['duration']}s or {max_simulation_ticks} ticks")
320325
logger.info(f"Running with random seed {params['random_seed']}")
321326

322327
# a pipeline may have many operators. These can get grouped
@@ -329,6 +334,13 @@ def run_simulator(param_input: Union[str, Dict], workload: Workload = None) -> S
329334
num_failures = 0
330335
failure_error_counts = defaultdict(int)
331336
executor_results = []
337+
# outstanding_pipelines tracks pipelines we still expect to complete.
338+
# Pipelines are removed when they succeed or time out. Timed-out
339+
# pipelines simply show up as not completed — we don't record their
340+
# latency. Note: the scheduler manages its own queues independently,
341+
# so it may still assign ops for a pipeline that has already been
342+
# removed from here. That's fine — the executor's _run_out_of_time_killer
343+
# will immediately kill any such containers.
332344
outstanding_pipelines: Dict[str, Pipeline] = {}
333345
pipeline_arrivals_by_priority: Dict[Priority, int] = {
334346
Priority.QUERY: 0,
@@ -344,22 +356,22 @@ def run_simulator(param_input: Union[str, Dict], workload: Workload = None) -> S
344356
memory_consumed_percent_samples: List[float] = []
345357

346358
# IMPORTANT! This is the main simulation loop.
347-
for tick_number in range(max_ticks):
359+
for tick_number in range(max_simulation_ticks):
348360
sim_formatter.set_simulated_elapsed_seconds(tick_number / ticks_per_second)
349361

350362
# track new work
351363
new_pipelines: List[Pipeline] = workload.run_one_tick()
352364
for p in new_pipelines:
353365
logger.info(f"Pipeline arrived with Priority {p.priority} and {len(p.values)} op(s)")
354-
p.runtime_status().record_arrival(tick_number)
366+
p.runtime_status().record_arrival(tick_number, max_job_ticks)
355367
outstanding_pipelines[p.pipeline_id] = p
356368
pipeline_arrivals_by_priority[p.priority] += 1
357369
for op in p.values:
358370
estimator.estimate(op)
359371

360372
# simulate scheduler/executor
361373
suspensions, assignments = scheduler.run_one_tick(executor_results, new_pipelines)
362-
executor_results = executor.run_one_tick(suspensions, assignments)
374+
executor_results = executor.run_one_tick(tick_number, suspensions, assignments)
363375

364376
# track stats
365377
num_pipelines_created += len(new_pipelines)
@@ -380,6 +392,14 @@ def run_simulator(param_input: Union[str, Dict], workload: Workload = None) -> S
380392
pipeline_latencies_by_priority[pipeline.priority].append(latency_ticks)
381393
del outstanding_pipelines[pipeline_id]
382394

395+
# Optimization: periodically remove timed-out pipelines so we
396+
# don't keep scanning them for completion every tick.
397+
if max_job_ticks > 0 and tick_number % ticks_per_second == 0:
398+
for pipeline_id in list(outstanding_pipelines.keys()):
399+
pipeline = outstanding_pipelines[pipeline_id]
400+
if pipeline.runtime_status().has_timed_out(tick_number):
401+
del outstanding_pipelines[pipeline_id]
402+
383403
# log memory stats every 1 second of simulated time
384404
if tick_number % ticks_per_second == 0:
385405
total_ram = executor.get_total_ram_gb()

eudoxia/workload/runtime_status.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,24 @@ def __init__(self, pipeline: 'Pipeline'):
5656
self.state_counts: Dict[OperatorState, int] = {state: 0 for state in OperatorState}
5757
self.arrival_tick: Optional[int] = None
5858
self.finish_tick: Optional[int] = None
59+
self.max_job_ticks: int = 0
5960

6061
for operator in pipeline.values:
6162
self.operator_states[operator] = OperatorState.PENDING
6263
self.state_counts[OperatorState.PENDING] += 1
6364

64-
def record_arrival(self, tick: int):
65+
def record_arrival(self, tick: int, max_job_ticks: int = 0):
6566
"""Record the tick at which this pipeline arrived."""
6667
assert self.arrival_tick is None, "arrival_tick already recorded"
6768
self.arrival_tick = tick
69+
self.max_job_ticks = max_job_ticks
70+
71+
def has_timed_out(self, current_tick: int) -> bool:
72+
"""Check if this pipeline has exceeded its maximum allowed job time."""
73+
if self.max_job_ticks <= 0:
74+
return False
75+
assert self.arrival_tick is not None, "arrival_tick not recorded"
76+
return (current_tick - self.arrival_tick) >= self.max_job_ticks
6877

6978
def check_transition(self, operator: 'Operator', new_state: OperatorState) -> tuple[bool, Optional[str]]:
7079
"""

tests/test_container.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ def test_container_oom():
3939
)
4040

4141
# Start container via pool
42-
pool.run_one_tick([], [assignment])
42+
pool.run_one_tick(0, [], [assignment])
4343
container = pool.active_containers[0]
4444

4545
# Run until completion
4646
ticks_executed = 1 # First tick already done
4747
while not container.is_completed():
48-
pool.run_one_tick([], [])
48+
pool.run_one_tick(0, [], [])
4949
ticks_executed += 1
5050
assert ticks_executed <= 1000, "Container should complete within 1000 ticks"
5151

@@ -77,10 +77,10 @@ def test_container_oom_transitions_remaining_ops_to_failed():
7777
)
7878

7979
# Start container and run until OOM
80-
pool.run_one_tick([], [assignment])
80+
pool.run_one_tick(0, [], [assignment])
8181
container = pool.active_containers[0]
8282
while not container.is_completed():
83-
pool.run_one_tick([], [])
83+
pool.run_one_tick(0, [], [])
8484

8585
assert container.error == "OOM"
8686

@@ -112,7 +112,7 @@ def test_container_immediate_oom():
112112
)
113113

114114
# First tick creates container and runs OOM killer
115-
results = pool.run_one_tick([], [assignment])
115+
results = pool.run_one_tick(0, [], [assignment])
116116

117117
# Container should be killed immediately
118118
assert len(results) == 1, "Should have one result"

tests/test_executor.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def test_resource_pool_basic():
6969
results_by_pipeline = {}
7070

7171
for tick in range(max_ticks):
72-
results = pool.run_one_tick(suspensions, assignments)
72+
results = pool.run_one_tick(0, suspensions, assignments)
7373

7474
# Organize results by pipeline_id
7575
for result in results:
@@ -116,7 +116,7 @@ def test_resource_pool_dependencies():
116116
)
117117

118118
with pytest.raises(AssertionError, match="Dependencies not satisfied"):
119-
pool.run_one_tick([], [assignment_b])
119+
pool.run_one_tick(0, [], [assignment_b])
120120

121121

122122
def test_runtime_status_dependencies():
@@ -214,7 +214,7 @@ def test_memory_allocated_vs_consumed():
214214
completed = False
215215
for tick in range(1000):
216216
assignments = [assignment] if tick == 0 else []
217-
results = executor.run_one_tick([], assignments)
217+
results = executor.run_one_tick(0, [], assignments)
218218

219219
if results:
220220
# Container just completed: both should be 0
@@ -287,7 +287,7 @@ def test_memory_overcommit_kills_highest_scorer():
287287
)
288288

289289
# Start both containers
290-
pool.run_one_tick([], [assignment_a, assignment_b])
290+
pool.run_one_tick(0, [], [assignment_a, assignment_b])
291291

292292
# Tick until consumption exceeds capacity
293293
# Memory grows at DISK_SCAN_GB_SEC = 20 GB/sec
@@ -297,7 +297,7 @@ def test_memory_overcommit_kills_highest_scorer():
297297

298298
killed_pipeline = None
299299
for tick in range(100):
300-
results = pool.run_one_tick([], [])
300+
results = pool.run_one_tick(0, [], [])
301301
for r in results:
302302
if r.failed():
303303
# Get pipeline ID from the failed container's ops

tests/test_scheduler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def test_op_not_double_queued_across_ticks():
157157
assert remaining_in_queue > 0, "some children should remain in queue"
158158

159159
# Actually consume resources by passing assignments to executor
160-
executor.run_one_tick([], assignments)
160+
executor.run_one_tick(0, [], assignments)
161161

162162
# op2 completes
163163
op2.transition(OperatorState.COMPLETED)
@@ -196,7 +196,7 @@ def test_failed_op_gets_retry_stats():
196196
first_ram = assignments[0].ram
197197

198198
# Run executor - op will OOM because it needs 10GB but only got 5GB
199-
results = executor.run_one_tick([], assignments)
199+
results = executor.run_one_tick(0, [], assignments)
200200
assert len(results) == 1
201201
assert results[0].error == "OOM"
202202

@@ -236,9 +236,9 @@ def test_partial_failure_unblocks_dependent_op():
236236
assert set(assignments[0].ops) == {op1, op2, op3}
237237

238238
# Run executor until we get a result (op1 should complete, then op2 OOMs)
239-
results = executor.run_one_tick([], assignments)
239+
results = executor.run_one_tick(0, [], assignments)
240240
while not results:
241-
results = executor.run_one_tick([], [])
241+
results = executor.run_one_tick(0, [], [])
242242
assert len(results) == 1
243243
assert results[0].error == "OOM"
244244
assert op1.state() == OperatorState.COMPLETED, "op1 should have completed before OOM"

0 commit comments

Comments
 (0)