Skip to content

Commit 5babae5

Browse files
committed
minor refactoring
1 parent 19ca018 commit 5babae5

File tree

2 files changed

+52
-14
lines changed

2 files changed

+52
-14
lines changed

job_executor/worker/manager_state.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
import logging
2+
3+
logger = logging.getLogger()
4+
5+
16
class ManagerState:
27
def __init__(self, default_max_workers=4, max_gb_all_workers=50):
38
"""
@@ -27,18 +32,32 @@ def can_spawn_new_worker(self, new_job_size):
2732
When a job is finished and unregister the number of workers will reset
2833
to default_max_workers
2934
"""
30-
31-
# Could tweak this to be more gradual if we want
32-
if self.current_total_size >= self.max_bytes_all_workers:
33-
self.current_max_workers = 2
34-
else:
35-
self.current_max_workers = self.default_max_workers
35+
can_spawn = True
36+
self.update_worker_limit(new_job_size)
3637

3738
active_workers = len(self.datasets)
3839
if active_workers >= self.current_max_workers:
39-
return False
40+
can_spawn = False
4041

41-
return True
42+
logger.info(
43+
f"Checking can_spawn_new_worker({new_job_size}): "
44+
f"active={active_workers}, dynamic_limit={self.current_max_workers}, "
45+
f"current_total_size={self.current_total_size}, can_spawn={can_spawn}"
46+
)
47+
48+
return can_spawn
49+
50+
def update_worker_limit(self, new_job_size):
51+
"""
52+
Check the current size beeing procces in the pipeline.
53+
And changes the number of workers as needed.
54+
"""
55+
if (
56+
self.current_total_size + new_job_size
57+
) >= self.max_bytes_all_workers:
58+
self.current_max_workers = 2
59+
else:
60+
self.current_max_workers = self.default_max_workers
4261

4362
def register_job(self, job_id, job_size):
4463
"""

tests/unit/worker/test_manager_state.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def test_can_spawn_worker():
1515
assert can_spawn is True
1616

1717

18-
def test_can_not_spawn_worker_to_many_workers():
18+
def test_cannot_spawn_worker_too_many_workers():
1919
manager_state = ManagerState(default_max_workers=4)
2020

2121
# Register 4 jobs
@@ -26,20 +26,39 @@ def test_can_not_spawn_worker_to_many_workers():
2626
assert can_spawn is False
2727

2828

29-
def test_can_not_spawn_worker_size_limit_reached():
29+
def test_cannot_spawn_worker_size_limit_reached():
30+
TWENTY_GB = 20 * 1024**3
3031
manager_state = ManagerState(max_gb_all_workers=20)
3132

3233
# register large job
33-
manager_state.register_job("job_1", 20 * 1024**3) # 20 GB
34+
manager_state.register_job("job_1", TWENTY_GB) # 20 GB
3435

3536
# Max worker should now be 2, We can still spawn a second job
36-
can_spawn = manager_state.can_spawn_new_worker(new_job_size=10 * 1024**3)
37+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=TWENTY_GB)
3738
assert can_spawn is True
3839

39-
manager_state.register_job("job_2", 10 * 1024**3)
40+
manager_state.register_job("job_2", TWENTY_GB)
4041

4142
# We should not be able to spawn a third job
42-
can_spawn = manager_state.can_spawn_new_worker(new_job_size=10 * 1024**3)
43+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=TWENTY_GB)
44+
assert can_spawn is False
45+
46+
47+
def test_oversized_jobs():
48+
# we can run jobs which exceeds the limit for the threshold,
49+
# but only two of them
50+
FIFTY_GB = 50 * 1024**3
51+
manager_state = ManagerState(max_gb_all_workers=10)
52+
53+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=FIFTY_GB)
54+
manager_state.register_job("job_1", FIFTY_GB)
55+
assert can_spawn is True
56+
57+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=FIFTY_GB)
58+
manager_state.register_job("job_2", FIFTY_GB)
59+
assert can_spawn is True
60+
61+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=FIFTY_GB)
4362
assert can_spawn is False
4463

4564

0 commit comments

Comments
 (0)