Skip to content

Commit 8230455

Browse files
authored
Merge pull request #144 from statisticsnorway/dynamic-workers
Dynamic workers
2 parents 3bf6d95 + 3f82d91 commit 8230455

File tree

7 files changed

+230
-18
lines changed

7 files changed

+230
-18
lines changed

.test.env

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ JOB_SERVICE_URL=http://mock.job.service
77
NUMBER_OF_WORKERS=4
88
SECRETS_FILE=tests/resources/secrets/secrets.json
99
DOCKER_HOST_NAME=localhost
10-
COMMIT_ID=abc123
10+
COMMIT_ID=abc123
11+
MAX_GB_ALL_WORKERS=50

job_executor/adapter/local_storage.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,3 +423,14 @@ def delete_archived_input(dataset_name: str):
423423
archived_file: Path = INPUT_DIR / f"archive/{dataset_name}.tar"
424424
if archived_file.is_file():
425425
os.remove(archived_file)
426+
427+
428+
def get_input_tar_size_in_bytes(dataset_name: str) -> int:
429+
"""
430+
Checks the size in bytes of the dataset.tar file.
431+
returns size in bytes
432+
"""
433+
tar_path = INPUT_DIR / f"{dataset_name}.tar"
434+
if not tar_path.exists():
435+
return 0
436+
return os.path.getsize(tar_path)

job_executor/app.py

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,28 @@
66
from pathlib import Path
77
from typing import Dict, List
88

9-
from job_executor.adapter import job_service
9+
from job_executor.adapter import job_service, local_storage
1010
from job_executor.config import environment
1111
from job_executor.config.log import setup_logging, initialize_logging_thread
1212
from job_executor.domain import rollback
13-
from job_executor.exception import RollbackException, StartupException
13+
from job_executor.exception import (
14+
RollbackException,
15+
StartupException,
16+
)
1417
from job_executor.model import Job, Datastore
1518
from job_executor.model.worker import Worker
16-
from job_executor.worker import build_dataset_worker, build_metadata_worker
19+
from job_executor.worker import (
20+
build_dataset_worker,
21+
build_metadata_worker,
22+
manager_state as ManagerState,
23+
)
24+
1725

1826
logger = logging.getLogger()
1927
setup_logging()
2028

2129
NUMBER_OF_WORKERS = int(environment.get("NUMBER_OF_WORKERS"))
30+
MAX_GB_ALL_WORKERS = int(environment.get("MAX_GB_ALL_WORKERS"))
2231
DATASTORE_DIR = environment.get("DATASTORE_DIR")
2332

2433
datastore = None
@@ -199,7 +208,11 @@ def initialize_app():
199208
def main():
200209
initialize_app()
201210
logging_queue, log_thread = initialize_logging_thread()
202-
workers: List[Worker] = []
211+
212+
manager_state = ManagerState(
213+
default_max_workers=NUMBER_OF_WORKERS,
214+
max_gb_all_workers=MAX_GB_ALL_WORKERS,
215+
)
203216

204217
try:
205218
while True:
@@ -210,12 +223,7 @@ def main():
210223
built_jobs = job_dict["built_jobs"]
211224
queued_manager_jobs = job_dict["queued_manager_jobs"]
212225

213-
dead_workers = [
214-
worker for worker in workers if not worker.is_alive()
215-
]
216-
clean_up_after_dead_workers(dead_workers)
217-
218-
workers = [worker for worker in workers if worker.is_alive()]
226+
clean_up_after_dead_workers(manager_state)
219227

220228
available_jobs = (
221229
len(queued_worker_jobs)
@@ -229,11 +237,28 @@ def main():
229237
f" (worker, built, queued manager jobs)"
230238
)
231239
for job in queued_worker_jobs:
232-
if len(workers) < NUMBER_OF_WORKERS:
233-
_handle_worker_job(job, workers, logging_queue)
240+
job_size = local_storage.get_input_tar_size_in_bytes(
241+
job.dataset_name
242+
)
243+
if job_size == 0:
244+
logger.info(
245+
f"{job.job_id} Failed to get the size of the dataset."
246+
)
247+
job_service.update_job_status(
248+
job.job_id,
249+
"failed",
250+
log="No such dataset available for import",
251+
)
252+
continue # skip futher processing of this job
253+
254+
if manager_state.can_spawn_new_worker(job_size):
255+
_handle_worker_job(
256+
job, manager_state, job_size, logging_queue
257+
)
234258

235259
for job in built_jobs + queued_manager_jobs:
236260
try:
261+
manager_state.unregister_job(job.job_id)
237262
_handle_manager_job(job)
238263
except Exception as exc:
239264
# All exceptions that occur during the handling of a job
@@ -253,7 +278,8 @@ def main():
253278
log_thread.join()
254279

255280

256-
def clean_up_after_dead_workers(dead_workers: List[Worker]) -> None:
281+
def clean_up_after_dead_workers(manager_state) -> None:
282+
dead_workers = manager_state.dead_workers
257283
if len(dead_workers) > 0:
258284
in_progress_jobs = job_service.get_jobs(ignore_completed=True)
259285
for dead_worker in dead_workers:
@@ -268,9 +294,12 @@ def clean_up_after_dead_workers(dead_workers: List[Worker]) -> None:
268294
if job and job.status not in ["queued", "built"]:
269295
logger.info(f"Worker died and did not finish job {job.job_id}")
270296
fix_interrupted_job(job)
297+
manager_state.unregister_job(dead_worker.job_id)
271298

272299

273-
def _handle_worker_job(job: Job, workers: List[Worker], logging_queue: Queue):
300+
def _handle_worker_job(
301+
job: Job, manager_state: ManagerState, job_size: int, logging_queue: Queue
302+
):
274303
dataset_name = job.parameters.target
275304
job_id = job.job_id
276305
operation = job.parameters.operation
@@ -285,8 +314,9 @@ def _handle_worker_job(job: Job, workers: List[Worker], logging_queue: Queue):
285314
),
286315
),
287316
job_id=job_id,
317+
job_size=job_size,
288318
)
289-
workers.append(worker)
319+
manager_state.register_job(worker, job_id, job_size)
290320
job_service.update_job_status(job_id, "initiated")
291321
worker.start()
292322
elif operation == "PATCH_METADATA":
@@ -301,7 +331,7 @@ def _handle_worker_job(job: Job, workers: List[Worker], logging_queue: Queue):
301331
),
302332
job_id=job_id,
303333
)
304-
workers.append(worker)
334+
manager_state.register_job(worker)
305335
job_service.update_job_status(job_id, "initiated")
306336
worker.start()
307337
else:

job_executor/config/environment.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def _initialize_environment() -> dict:
1313
"SECRETS_FILE": os.environ["SECRETS_FILE"],
1414
"DOCKER_HOST_NAME": os.environ["DOCKER_HOST_NAME"],
1515
"COMMIT_ID": os.environ["COMMIT_ID"],
16+
"MAX_GB_ALL_WORKERS": os.environ["MAX_GB_ALL_WORKERS"],
1617
}
1718

1819

job_executor/model/worker.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@
22

33

44
class Worker:
5-
def __init__(self, process: Process, job_id: str):
5+
def __init__(self, process: Process, job_id: str, job_size: int):
66
self.process = process
77
self.job_id = job_id
8+
self.job_size = job_size
9+
10+
def job_size(self) -> int:
11+
return self.job_size
812

913
def is_alive(self):
1014
return self.process.is_alive()
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from job_executor.model.worker import Worker
2+
3+
from typing import List
4+
5+
6+
class ManagerState:
7+
def __init__(self, default_max_workers=4, max_gb_all_workers=50):
8+
"""
9+
:param default_max_workers: The maximum number of workers
10+
:param max_gb_all_workers: Threshold in GB (50) for when the number
11+
of workers are reduced
12+
"""
13+
self.default_max_workers = default_max_workers
14+
self.max_bytes_all_workers = (
15+
max_gb_all_workers * 1024**3 # Threshold in bytes
16+
)
17+
18+
self.workers: List[Worker] = []
19+
20+
@property
21+
def dead_workers(self) -> List[Worker]:
22+
"""
23+
Return a list of dead workers
24+
"""
25+
return [worker for worker in self.workers if not worker.is_alive()]
26+
27+
@property
28+
def alive_workers(self) -> List[Worker]:
29+
"""
30+
Return a list of alive workers
31+
"""
32+
return [worker for worker in self.workers if worker.is_alive()]
33+
34+
@property
35+
def current_total_size(self) -> int:
36+
return sum(
37+
worker.job_size for worker in self.workers if worker.is_alive()
38+
)
39+
40+
def can_spawn_new_worker(self, new_job_size: int) -> bool:
41+
"""
42+
Called to check if a new worker can be spawned.
43+
"""
44+
if len(self.alive_workers) >= self.default_max_workers:
45+
return False
46+
if (
47+
self.current_total_size + new_job_size
48+
>= self.max_bytes_all_workers
49+
):
50+
return False
51+
return True
52+
53+
def register_job(self, worker: Worker):
54+
"""
55+
Called when a worker picks up a job.
56+
"""
57+
self.workers.append(worker)
58+
59+
def unregister_job(self, job_id):
60+
"""
61+
Called when a job finishes or fails.
62+
"""
63+
self.workers = [
64+
worker for worker in self.workers if worker.job_id != job_id
65+
]
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import time
2+
from job_executor.worker.manager_state import ManagerState
3+
from job_executor.model.worker import Worker
4+
from multiprocessing import Process
5+
6+
7+
def dummy():
8+
time.sleep(10)
9+
print("hello")
10+
11+
12+
def test_initial_state():
13+
manager_state = ManagerState()
14+
15+
assert manager_state.current_total_size == 0
16+
assert len(manager_state.workers) == 0
17+
18+
19+
def test_can_spawn_worker():
20+
manager_state = ManagerState()
21+
22+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=1)
23+
assert can_spawn is True
24+
25+
26+
def test_cannot_spawn_worker_too_many_workers():
27+
manager_state = ManagerState(default_max_workers=4)
28+
29+
# Register 4 jobs
30+
for i in range(4):
31+
worker = Worker(
32+
process=Process(target=dummy),
33+
job_id=f"job_{i}",
34+
job_size=1024,
35+
)
36+
manager_state.register_job(worker)
37+
worker.start()
38+
39+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=1024)
40+
assert can_spawn is False
41+
42+
43+
def test_cannot_spawn_worker_size_limit_reached():
44+
TWENTY_GB = 20 * 1024**3
45+
manager_state = ManagerState(max_gb_all_workers=20)
46+
47+
large_job = Worker(
48+
process=Process(target=dummy),
49+
job_id="job_large",
50+
job_size=TWENTY_GB,
51+
)
52+
manager_state.register_job(large_job)
53+
large_job.start()
54+
55+
# Only one job active but size limit is reached cannot spawn new job
56+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=1024)
57+
assert can_spawn is False
58+
59+
60+
def test_oversized_jobs():
61+
FIFTY_GB = 50 * 1024**3
62+
TEN_GB = 10 * 1024**3
63+
manager_state = ManagerState(max_gb_all_workers=20)
64+
65+
# This job will never be processed
66+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=FIFTY_GB)
67+
assert can_spawn is False
68+
69+
# This job will be accepted
70+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=TEN_GB)
71+
assert can_spawn is True
72+
if can_spawn:
73+
worker = Worker(
74+
process=Process(target=dummy),
75+
job_id="job_2",
76+
job_size=TEN_GB,
77+
)
78+
manager_state.register_job(worker)
79+
worker.start()
80+
81+
82+
def test_unregister_job():
83+
manager_state = ManagerState(default_max_workers=4)
84+
85+
# Register 4 jobs
86+
for i in range(4):
87+
worker = Worker(
88+
process=Process(target=dummy),
89+
job_id=f"job_{i}",
90+
job_size=1024,
91+
)
92+
manager_state.register_job(worker)
93+
worker.start()
94+
95+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=1024)
96+
assert can_spawn is False
97+
98+
manager_state.unregister_job("job_1")
99+
can_spawn = manager_state.can_spawn_new_worker(new_job_size=1024)
100+
assert can_spawn is True

0 commit comments

Comments
 (0)