Skip to content

Commit 3f82d91

Browse files
committed
Simplify logic, and updated tests
1 parent e5d2266 commit 3f82d91

File tree

2 files changed

+20
-59
lines changed

2 files changed

+20
-59
lines changed

job_executor/worker/manager_state.py

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
from typing import List
44

55

6-
REDUCED_WORKER_NUMBER = 2
7-
8-
96
class ManagerState:
107
def __init__(self, default_max_workers=4, max_gb_all_workers=50):
118
"""
@@ -14,7 +11,6 @@ def __init__(self, default_max_workers=4, max_gb_all_workers=50):
1411
of workers are reduced
1512
"""
1613
self.default_max_workers = default_max_workers
17-
self.current_max_workers = default_max_workers
1814
self.max_bytes_all_workers = (
1915
max_gb_all_workers * 1024**3 # Threshold in bytes
2016
)
@@ -45,30 +41,20 @@ def can_spawn_new_worker(self, new_job_size: int) -> bool:
4541
"""
4642
Called to check if a new worker can be spawned.
4743
"""
48-
if len(self.alive_workers) >= self.current_max_workers:
44+
if len(self.alive_workers) >= self.default_max_workers:
4945
return False
50-
if self.current_total_size + new_job_size >= self.max_bytes_all_workers:
46+
if (
47+
self.current_total_size + new_job_size
48+
>= self.max_bytes_all_workers
49+
):
5150
return False
5251
return True
5352

54-
def update_worker_limit(self, new_job_size: int):
55-
"""
56-
Check the current size beeing procces in the pipeline.
57-
And changes the number of workers as needed.
58-
"""
59-
new_total = self.current_total_size + new_job_size
60-
if new_total >= self.max_bytes_all_workers:
61-
self.current_max_workers = REDUCED_WORKER_NUMBER
62-
else:
63-
self.current_max_workers = self.default_max_workers
64-
6553
def register_job(self, worker: Worker):
6654
"""
6755
Called when a worker picks up a job.
68-
When a job is register the current_max_workers are updated.
6956
"""
7057
self.workers.append(worker)
71-
self.update_worker_limit(worker.job_size)
7258

7359
def unregister_job(self, job_id):
7460
"""

tests/unit/worker/test_manager_state.py

Lines changed: 15 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ def test_cannot_spawn_worker_size_limit_reached():
4444
TWENTY_GB = 20 * 1024**3
4545
manager_state = ManagerState(max_gb_all_workers=20)
4646

47-
# register large job
4847
large_job = Worker(
4948
process=Process(target=dummy),
5049
job_id="job_large",
@@ -53,55 +52,31 @@ def test_cannot_spawn_worker_size_limit_reached():
5352
manager_state.register_job(large_job)
5453
large_job.start()
5554

56-
# Max worker should now be 2, We can still spawn a second job
57-
can_spawn = manager_state.can_spawn_new_worker(new_job_size=TWENTY_GB)
58-
assert can_spawn is True
59-
60-
large_job = Worker(
61-
process=Process(target=dummy),
62-
job_id="job_large_2",
63-
job_size=TWENTY_GB,
64-
)
65-
manager_state.register_job(large_job)
66-
large_job.start()
67-
68-
# We should not be able to spawn a third job
69-
can_spawn = manager_state.can_spawn_new_worker(new_job_size=TWENTY_GB)
55+
# Only one job active but size limit is reached cannot spawn new job
56+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=1024)
7057
assert can_spawn is False
7158

7259

7360
def test_oversized_jobs():
74-
# we can run jobs which exceeds the limit for the threshold,
75-
# but only two of them
7661
FIFTY_GB = 50 * 1024**3
7762
TEN_GB = 10 * 1024**3
78-
manager_state = ManagerState(max_gb_all_workers=10)
63+
manager_state = ManagerState(max_gb_all_workers=20)
7964

65+
# This job will never be processed
8066
can_spawn = manager_state.can_spawn_new_worker(new_job_size=FIFTY_GB)
81-
large_job = Worker(
82-
process=Process(target=dummy),
83-
job_id="job_1",
84-
job_size=FIFTY_GB,
85-
)
86-
manager_state.register_job(large_job)
87-
large_job.start()
88-
assert can_spawn is True
67+
assert can_spawn is False
8968

90-
can_spawn = manager_state.can_spawn_new_worker(new_job_size=FIFTY_GB)
91-
large_job = Worker(
92-
process=Process(target=dummy),
93-
job_id="job_2",
94-
job_size=FIFTY_GB,
95-
)
96-
manager_state.register_job(large_job)
97-
large_job.start()
69+
# This job will be accepted
70+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=TEN_GB)
9871
assert can_spawn is True
99-
100-
assert manager_state.max_bytes_all_workers == TEN_GB
101-
assert manager_state.current_total_size == (FIFTY_GB + FIFTY_GB)
102-
103-
can_spawn = manager_state.can_spawn_new_worker(new_job_size=FIFTY_GB)
104-
assert can_spawn is False
72+
if can_spawn:
73+
worker = Worker(
74+
process=Process(target=dummy),
75+
job_id="job_2",
76+
job_size=TEN_GB,
77+
)
78+
manager_state.register_job(worker)
79+
worker.start()
10580

10681

10782
def test_unregister_job():

0 commit comments

Comments
 (0)