Skip to content

Commit 2936e0d

Browse files
committed
move get size to local_storage
1 parent 2a731bd commit 2936e0d

File tree

2 files changed

+24
-12
lines changed

2 files changed

+24
-12
lines changed

job_executor/adapter/local_storage.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,3 +423,13 @@ def delete_archived_input(dataset_name: str):
423423
archived_file: Path = INPUT_DIR / f"archive/{dataset_name}.tar"
424424
if archived_file.is_file():
425425
os.remove(archived_file)
426+
427+
428+
def get_size_in_bytes(dataset_name: str) -> int:
429+
"""
430+
Returns the size of the .tar file
431+
"""
432+
tar_path = INPUT_DIR / f"{dataset_name}.tar"
433+
if not tar_path.exists():
434+
return 0
435+
return os.path.getsize(tar_path)

job_executor/app.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@
66
from pathlib import Path
77
from typing import Dict, List
88

9-
from job_executor.adapter import job_service
9+
from job_executor.adapter import job_service, local_storage
1010
from job_executor.config import environment
1111
from job_executor.config.log import setup_logging, initialize_logging_thread
1212
from job_executor.domain import rollback
13-
from job_executor.exception import RollbackException, StartupException
13+
from job_executor.exception import (
14+
RollbackException,
15+
StartupException,
16+
LocalStorageError,
17+
)
1418
from job_executor.model import Job, Datastore
1519
from job_executor.model.worker import Worker
1620
from job_executor.worker import (
@@ -26,19 +30,10 @@
2630
NUMBER_OF_WORKERS = int(environment.get("NUMBER_OF_WORKERS"))
2731
DYNAMIC_WORKER_THRESHOLD = int(environment.get("DYNAMIC_WORKER_THRESHOLD"))
2832
DATASTORE_DIR = environment.get("DATASTORE_DIR")
29-
INPUT_DIR = Path(environment.get("INPUT_DIR"))
3033

3134
datastore = None
3235

3336

34-
def get_size_in_bytes(dataset_name: str) -> int:
35-
tar_path = INPUT_DIR / f"{dataset_name}.tar"
36-
if not tar_path.exists():
37-
logger.warning(f"Tar file not found: {tar_path}")
38-
return 0 # or maybe rais exception
39-
return os.path.getsize(tar_path)
40-
41-
4237
def is_system_paused() -> bool:
4338
"""Return True if the system is paused, otherwise False."""
4439
maintenance_status = job_service.get_maintenance_status()
@@ -248,7 +243,14 @@ def main():
248243
f" (worker, built, queued manager jobs)"
249244
)
250245
for job in queued_worker_jobs:
251-
job_size = get_size_in_bytes(job.dataset_name)
246+
job_size = local_storage.get_size_in_bytes(job.dataset_name)
247+
if job_size == 0:
248+
logger.info(
249+
f"{job.job_id} Failed to get the size of the dataset."
250+
)
251+
raise LocalStorageError(
252+
"Failed to get size of the dataset"
253+
)
252254

253255
if manager_state.can_spawn_new_worker(job_size):
254256
_handle_worker_job(job, workers, logging_queue)

0 commit comments

Comments
 (0)