11import logging
22import time
33from multiprocessing import Queue
4+ from pathlib import Path
45
5- from job_executor .adapter import datastore_api , local_storage
6+ from job_executor .adapter import datastore_api
67from job_executor .adapter .datastore_api .models import JobStatus
8+ from job_executor .adapter .fs import LocalStorageAdapter
79from job_executor .common .exceptions import StartupException
810from job_executor .config import environment
911from job_executor .config .log import initialize_logging_thread , setup_logging
1618
1719def initialize_app () -> None :
1820 try :
21+ local_storage = LocalStorageAdapter (Path (environment .datastore_dir ))
1922 rollback .fix_interrupted_jobs ()
20- if local_storage .temporary_backup_exists ():
23+ if local_storage .datastore_dir . temporary_backup_exists ():
2124 raise StartupException ("tmp directory exists" )
2225 except Exception as e :
2326 raise StartupException ("Exception when initializing" ) from e
@@ -26,6 +29,7 @@ def initialize_app() -> None:
2629def handle_jobs (manager : Manager , logging_queue : Queue ) -> None :
2730 job_query_result = datastore_api .query_for_jobs ()
2831 manager .clean_up_after_dead_workers ()
32+ local_storage = LocalStorageAdapter (Path (environment .datastore_dir ))
2933 if job_query_result .available_jobs_count :
3034 logger .info (
3135 f"Found { len (job_query_result .queued_worker_jobs )} "
@@ -35,7 +39,7 @@ def handle_jobs(manager: Manager, logging_queue: Queue) -> None:
3539 )
3640
3741 for job in job_query_result .queued_worker_jobs :
38- job_size = local_storage .get_input_tar_size_in_bytes (
42+ job_size = local_storage .input_dir . get_importable_tar_size_in_bytes (
3943 job .parameters .target
4044 )
4145 if job_size == 0 :
0 commit comments