Skip to content

Commit d3dc54e

Browse files
authored
refactor container and fix suspend time (#38)
1 parent 06d5d21 commit d3dc54e

3 files changed

Lines changed: 241 additions & 65 deletions

File tree

eudoxia/executor/container.py

Lines changed: 93 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
import logging
2-
from typing import List
2+
from typing import List, Generator, Optional
33
from eudoxia.utils import DISK_SCAN_GB_SEC, Priority
44
from eudoxia.workload import Operator
55

66
logger = logging.getLogger(__name__)
77

88

9-
class Container:
9+
class Container:
1010
"""
11-
An encapsulation of CPU, RAM, and a list of operators. A container is
12-
created and then calculates how many ticks it will need to run with
13-
resources provided.
11+
An encapsulation of CPU, RAM, and a list of operators. A container executes
12+
operators tick-by-tick, tracking memory usage and allowing suspension only
13+
between operator boundaries.
1414
"""
1515
next_container_num = 1
1616

@@ -21,18 +21,22 @@ def __init__(self, ram, cpu, ops, prty: Priority, pool_id: int, ticks_per_second
2121
self.ram = ram
2222
self.cpu = cpu
2323
self.operators: List[Operator] = ops
24-
self.segment_tick_boundaries = []
2524
self.suspend_ticks = None
2625
self._suspend_ticks_left = None
2726
self.priority = prty
28-
self.error: str = None
27+
self.error: Optional[str] = None
2928
self.ticks_per_second = ticks_per_second
3029
self.tick_length_secs = 1.0 / ticks_per_second
3130

32-
self.num_ticks = self._compute_ticks()
33-
self._num_ticks_left = self.num_ticks
34-
self.num_secs = self.num_ticks * self.tick_length_secs
35-
31+
# Tick state (updated by generator)
32+
self._current_memory: float = 0.0
33+
self._can_suspend: bool = False
34+
self._completed: bool = False
35+
self._ticks_elapsed: int = 0
36+
37+
# Start the generator
38+
self._tick_iter = self._tick_generator()
39+
3640
def get_pipeline_id(self):
3741
"""Get the pipeline ID from operators, handling mixed pipelines"""
3842
pipeline_ids = set()
@@ -49,62 +53,98 @@ def get_pipeline_id(self):
4953

5054
def __repr__(self):
5155
num_ops = len(self.operators)
52-
return f"container={self.container_id} pipeline={self.get_pipeline_id()} ops={num_ops} cpus={self.cpu} ram_gb={self.ram} runtime_secs={self.num_secs:.1f}"
56+
return f"container={self.container_id} pipeline={self.get_pipeline_id()} ops={num_ops} cpus={self.cpu} ram_gb={self.ram}"
5357

54-
def _compute_ticks(self) -> int:
58+
def _tick_generator(self) -> Generator[None, None, None]:
5559
"""
56-
This function utilizes functions provided by Segment to calculate the
57-
amount of CPU and RAM ticks that are needed across all segments. It also
58-
computes the boundaries between segments. The reason is that we can only
59-
suspend containers between segments (not in the middle of while a segment
60-
is running).
61-
Returns:
62-
int: number of ticks this container will need to run for
60+
Generator that drives tick-by-tick execution, updating container state directly.
61+
62+
Updates self._current_memory and self._can_suspend on each yield.
63+
64+
Memory usage is determined by the current segment:
65+
- If segment.memory_gb is set: fixed memory usage
66+
- If segment.memory_gb is None: memory grows linearly with I/O progress
67+
68+
Suspension is only allowed between operators (not between segments).
6369
"""
64-
total_ticks = 0
65-
for op in self.operators:
66-
for seg in op.get_segments():
67-
# will it OOM, and if so, when?
68-
oom_seconds = seg.get_seconds_until_oom(self.ram)
69-
70-
if oom_seconds is not None:
71-
# compute how long it it will be until the OOM occurs
72-
self.error = "OOM"
73-
seg_ticks_before_OOM = int(oom_seconds / self.tick_length_secs)
74-
total_ticks += seg_ticks_before_OOM
75-
return total_ticks # after OOM, we cannot run other segments
76-
else:
77-
# there is no OOM. We will spend all the time
78-
# expected on I/O (first), then CPU (second)
79-
io_time_secs = seg.get_io_seconds()
80-
cpu_time_secs = seg.get_cpu_time(self.cpu)
81-
total_ticks += int((io_time_secs + cpu_time_secs) / self.tick_length_secs)
82-
self.segment_tick_boundaries.append(total_ticks)
83-
return total_ticks
70+
71+
# loop over every tick of every segment of every op
72+
#
73+
# at each iteration, determine memory usage, whether there is
74+
# an OOM, and whether suspension is possible
75+
for op_idx, op in enumerate(self.operators):
76+
segments = op.get_segments()
77+
for seg_idx, seg in enumerate(segments):
78+
# Calculate ticks for I/O phase and CPU phase
79+
io_secs = seg.get_io_seconds()
80+
cpu_secs = seg.get_cpu_time(self.cpu)
81+
io_ticks = int(io_secs / self.tick_length_secs)
82+
cpu_ticks = int(cpu_secs / self.tick_length_secs)
83+
total_seg_ticks = io_ticks+cpu_ticks
84+
85+
for i in range(total_seg_ticks):
86+
# determine current memory consumption
87+
if i < io_ticks:
88+
if seg.memory_gb is not None:
89+
self._current_memory = seg.memory_gb
90+
else:
91+
# Memory grows linearly with I/O progress
92+
io_progress_secs = (i + 1) * self.tick_length_secs
93+
self._current_memory = io_progress_secs * DISK_SCAN_GB_SEC
94+
else:
95+
self._current_memory = seg.get_peak_memory_gb()
96+
97+
# have we OOM'd?
98+
if self._current_memory > self.ram:
99+
self.error = "OOM"
100+
self._completed = True
101+
yield
102+
return
103+
104+
# are we at the end of the op (last tick of last
105+
# seg)? if so, we're either completed, or we can
106+
# suspend, depending on whether this is the last
107+
# op.
108+
self._can_suspend = False
109+
if seg_idx == len(segments)-1 and i == total_seg_ticks - 1:
110+
if op_idx == len(self.operators) - 1:
111+
self._current_memory = 0.0
112+
self._completed = True
113+
else:
114+
self._can_suspend = True
115+
yield
84116

85117
def tick(self):
86-
self._num_ticks_left -= 1
118+
"""
119+
Execute one tick. Advances to next state (OOM checked in _advance_tick).
120+
"""
121+
if self._completed:
122+
return
123+
next(self._tick_iter)
124+
self._ticks_elapsed += 1
87125

88126
def is_completed(self):
89-
# Use <= to handle edge case: immediate OOM results in 0 ticks,
90-
# but tick() may still be called, causing _num_ticks_left to go negative
91-
return (self._num_ticks_left <= 0)
127+
return self._completed
128+
129+
def ticks_elapsed(self) -> int:
130+
"""Returns the number of ticks that have been executed."""
131+
return self._ticks_elapsed
92132

93133
def can_suspend_container(self) -> bool:
94-
"""Can only suspend a container if between execution of
95-
segments. Must wait for Segment to complete before the
96-
container is pausable"""
97-
elapsed = self.num_ticks - self._num_ticks_left
98-
if elapsed in self.segment_tick_boundaries:
99-
return True
100-
return False
134+
"""Can only suspend a container between operators."""
135+
return self._can_suspend
136+
137+
def get_current_memory_usage(self) -> float:
138+
"""Returns current memory usage in GB."""
139+
return self._current_memory
101140

102141
def suspend_container(self):
103142
"""
104143
Suspend container execution, free CPUs and RAM. Requires writing
105-
current data to disk.
144+
current data to disk.
106145
"""
107-
write_to_disk_ticks = self.ram / DISK_SCAN_GB_SEC
146+
write_to_disk_secs = self.ram / DISK_SCAN_GB_SEC
147+
write_to_disk_ticks = int(write_to_disk_secs / self.tick_length_secs)
108148
self.suspend_ticks = write_to_disk_ticks
109149
self._suspend_ticks_left = write_to_disk_ticks
110150

eudoxia/executor/resource_pool.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ def run_one_tick(self, suspensions: List[Suspend],
138138
self.avail_cpu_pool -= a.cpu
139139
self.avail_ram_pool -= a.ram
140140
self.active_containers.append(container)
141-
self.container_tick_times.append(container.num_ticks)
142141
logger.info(f"start container {container}")
143142

144143
to_remove = []
@@ -168,6 +167,9 @@ def run_one_tick(self, suspensions: List[Suspend],
168167
logger.info(result)
169168
results.append(result)
170169

170+
# Record actual ticks executed for stats
171+
self.container_tick_times.append(c.ticks_elapsed())
172+
171173
# Track successful completions
172174
if c.error is None:
173175
self.num_completed += 1

tests/test_container.py

Lines changed: 145 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,14 @@ def test_container_oom():
3535

3636
assert ticks_executed <= 1000, "Container should complete within 1000 ticks"
3737

38-
# the first 3 containers should run (30s/30 ticks), then we should
39-
assert ticks_executed == 30, \
40-
f"Container should stop at OOM at 30 ticks, but ran for {ticks_executed} ticks"
38+
# the first 3 operators should run (30 ticks), then the 4th operator (40GB)
39+
# triggers OOM on tick 31 when we try to execute it
40+
assert ticks_executed == 31, \
41+
f"Container should stop at OOM at 31 ticks, but ran for {ticks_executed} ticks"
4142
assert container.error == "OOM", "Container should have OOM error after completion"
4243

4344

44-
def test_container_tick0_oom():
45+
def test_container_immediate_oom():
4546
"""Test that immediate OOM (when segment needs more memory than container has) completes properly"""
4647

4748
# Create a segment that needs 200GB of memory
@@ -63,13 +64,146 @@ def test_container_tick0_oom():
6364
ticks_per_second=10
6465
)
6566

66-
# Container should have 0 ticks since it OOMs immediately
67-
assert container.num_ticks == 0, f"Expected 0 ticks for immediate OOM, got {container.num_ticks}"
68-
assert container.error == "OOM", "Container should have OOM error"
67+
# Container is not yet completed - OOM detected on first tick
68+
assert not container.is_completed(), "Container should not be completed before first tick"
6969

70-
# Container should already be completed (0 ticks to run)
71-
assert container.is_completed(), "Container with 0 ticks should be immediately completed"
70+
# First tick should detect OOM
71+
container.tick()
72+
assert container.ticks_elapsed() == 1, f"Expected 1 tick before OOM, got {container.ticks_elapsed()}"
73+
assert container.error == "OOM", "Container should have OOM error"
74+
assert container.is_completed(), "Container should be completed after OOM"
7275

73-
# Calling tick() shouldn't break the is_completed() check
76+
# Calling tick() again shouldn't break anything
7477
container.tick()
75-
assert container.is_completed(), "Container should still be completed after tick()"
78+
assert container.is_completed(), "Container should still be completed after extra tick()"
79+
80+
81+
def test_container_suspension_ticks():
82+
"""Test that suspension takes the correct number of ticks to write memory to disk"""
83+
84+
op = Operator()
85+
op.add_segment(Segment(
86+
baseline_cpu_seconds=1.0,
87+
cpu_scaling="const",
88+
memory_gb=10,
89+
))
90+
91+
# 100 ticks per second, so tick_length = 0.01 seconds
92+
container = Container(
93+
ram=100, # 100GB RAM
94+
cpu=10,
95+
ops=[op],
96+
prty=Priority.BATCH_PIPELINE,
97+
pool_id=0,
98+
ticks_per_second=100
99+
)
100+
101+
# Trigger suspension
102+
container.suspend_container()
103+
104+
# Suspension should take: ram / DISK_SCAN_GB_SEC seconds = 100 / 20 = 5 seconds
105+
# At 100 ticks/second, that's 500 ticks
106+
expected_suspend_ticks = int((100 / DISK_SCAN_GB_SEC) / container.tick_length_secs)
107+
assert container.suspend_ticks == expected_suspend_ticks, \
108+
f"Expected {expected_suspend_ticks} suspend ticks, got {container.suspend_ticks}"
109+
assert container.suspend_ticks == 500, \
110+
f"Expected 500 suspend ticks, got {container.suspend_ticks}"
111+
112+
# Tick through suspension
113+
suspend_ticks_counted = 0
114+
while not container.is_suspended():
115+
container.suspend_container_tick()
116+
suspend_ticks_counted += 1
117+
118+
assert suspend_ticks_counted == 500, \
119+
f"Expected 500 ticks to suspend, counted {suspend_ticks_counted}"
120+
121+
122+
def test_container_memory_over_time():
123+
"""Test that memory usage is tracked correctly over time.
124+
125+
Op1: single segment with fixed memory (10GB), 1 second CPU
126+
Op2: two segments with I/O-based memory growth
127+
- seg1: 2 seconds I/O (memory grows 0->40GB), 1 second CPU (stays at 40GB)
128+
- seg2: 1 second I/O (memory grows 0->20GB), 0.5 second CPU (stays at 20GB)
129+
130+
At 10 ticks/second:
131+
- Op1 seg1: 10 CPU ticks at 10GB fixed
132+
- Op2 seg1: 20 I/O ticks (growing 2->40GB), then 10 CPU ticks at 40GB
133+
- Op2 seg2: 10 I/O ticks (growing 2->20GB), then 5 CPU ticks at 20GB
134+
"""
135+
# Op1: fixed memory
136+
op1 = Operator()
137+
op1.add_segment(Segment(
138+
baseline_cpu_seconds=1.0,
139+
cpu_scaling="const",
140+
memory_gb=10,
141+
storage_read_gb=0,
142+
))
143+
144+
# Op2: I/O-based memory (memory_gb=None means memory grows with I/O)
145+
op2 = Operator()
146+
op2.add_segment(Segment(
147+
baseline_cpu_seconds=1.0,
148+
cpu_scaling="const",
149+
memory_gb=None,
150+
storage_read_gb=40, # 40GB read at 20GB/sec = 2 sec I/O, peak memory = 40GB
151+
))
152+
op2.add_segment(Segment(
153+
baseline_cpu_seconds=0.5,
154+
cpu_scaling="const",
155+
memory_gb=None,
156+
storage_read_gb=20, # 20GB read at 20GB/sec = 1 sec I/O, peak memory = 20GB
157+
))
158+
159+
container = Container(
160+
ram=100,
161+
cpu=1, # 1 CPU so baseline_cpu_seconds = actual seconds
162+
ops=[op1, op2],
163+
prty=Priority.BATCH_PIPELINE,
164+
pool_id=0,
165+
ticks_per_second=10,
166+
)
167+
168+
tick_length = 0.1 # 10 ticks/sec
169+
expected_memories = []
170+
171+
# Op1 seg1: 10 CPU ticks at 10GB (no I/O since storage_read_gb=0)
172+
for _ in range(10):
173+
expected_memories.append(10.0)
174+
175+
# Op2 seg1: 20 I/O ticks with memory growing linearly
176+
# Memory grows at DISK_SCAN_GB_SEC (20 GB/sec)
177+
# After tick i, memory = (i+1) * tick_length * 20 = (i+1) * 2
178+
for i in range(20):
179+
expected_memories.append((i + 1) * tick_length * DISK_SCAN_GB_SEC)
180+
181+
# Op2 seg1: 10 CPU ticks at peak memory (40GB)
182+
for _ in range(10):
183+
expected_memories.append(40.0)
184+
185+
# Op2 seg2: 10 I/O ticks with memory growing linearly
186+
for i in range(10):
187+
expected_memories.append((i + 1) * tick_length * DISK_SCAN_GB_SEC)
188+
189+
# Op2 seg2: 5 CPU ticks at peak memory (20GB)
190+
# Last tick will show 0.0 since container completes
191+
for i in range(5):
192+
if i < 4:
193+
expected_memories.append(20.0)
194+
else:
195+
expected_memories.append(0.0) # Container completes, memory cleared
196+
197+
# Run through all ticks and verify memory
198+
tick = 0
199+
while not container.is_completed():
200+
container.tick()
201+
actual_memory = container.get_current_memory_usage()
202+
expected_memory = expected_memories[tick]
203+
assert actual_memory == expected_memory, \
204+
f"Tick {tick}: expected memory {expected_memory}GB, got {actual_memory}GB"
205+
tick += 1
206+
207+
assert tick == len(expected_memories), \
208+
f"Expected {len(expected_memories)} ticks, got {tick}"
209+
assert tick == 55, f"Expected 55 total ticks, got {tick}"

0 commit comments

Comments
 (0)