44from pathlib import Path
55
66from job_executor .adapter import datastore_api
7- from job_executor .adapter .datastore_api .models import Job , JobStatus
7+ from job_executor .adapter .datastore_api .models import (
8+ DatastoreVersion ,
9+ Job ,
10+ JobStatus ,
11+ )
812from job_executor .adapter .fs import LocalStorageAdapter
913from job_executor .adapter .fs .models .datastore_versions import (
1014 bump_dotted_version_number ,
1620 RollbackException ,
1721 StartupException ,
1822)
19- from job_executor .config import environment
2023
21- WORKING_DIR_PATH = Path (environment .datastore_dir + "_working" )
2224logger = logging .getLogger ()
2325
2426
25- def rollback_bump (job_id : str , bump_manifesto : dict ) -> None :
26- local_storage = LocalStorageAdapter (Path (environment .datastore_dir ))
27+ def rollback_bump (job : Job , bump_manifesto : DatastoreVersion ) -> None :
28+ job_id = job .job_id
29+ local_storage = LocalStorageAdapter (
30+ datastore_api .get_datastore_directory (job .datastore_rdn )
31+ )
2732 try :
2833 logger .info (f"{ job_id } : Restoring files from temporary backup" )
2934 restored_version_number = (
3035 local_storage .datastore_dir .restore_from_temporary_backup ()
3136 )
32- update_type = bump_manifesto [ "updateType" ]
33- bumped_version_number = (
34- "1.0.0.0"
35- if restored_version_number is None
36- else bump_dotted_version_number (
37+ bumped_version_number = "1.0.0.0"
38+ update_type = bump_manifesto . update_type
39+ if restored_version_number is not None :
40+ assert update_type is not None
41+ bumped_version_number = bump_dotted_version_number (
3742 underscored_to_dotted_version (restored_version_number ),
3843 update_type ,
3944 )
40- )
45+ else :
46+ update_type = "MAJOR"
4147 logger .warning (
4248 f"{ job_id } : Rolling back to { restored_version_number } "
4349 f"from bump to { bumped_version_number } "
@@ -47,9 +53,9 @@ def rollback_bump(job_id: str, bump_manifesto: dict) -> None:
4753 )
4854 bumped_version_data = "_" .join (bumped_version_metadata .split ("_" )[:- 1 ])
4955 manifesto_datasets = [
50- dataset [ " name" ]
51- for dataset in bump_manifesto [ "dataStructureUpdates" ]
52- if dataset [ "releaseStatus" ] != "DRAFT"
56+ dataset . name
57+ for dataset in bump_manifesto . data_structure_updates
58+ if dataset . release_status != "DRAFT"
5359 ]
5460 logger .info (
5561 f"{ job_id } : Found { len (manifesto_datasets )} "
@@ -127,9 +133,12 @@ def rollback_bump(job_id: str, bump_manifesto: dict) -> None:
127133
128134
129135def rollback_worker_phase_import_job (
130- job_id : str , operation : str , dataset_name : str
136+ job : Job , operation : str , dataset_name : str
131137) -> None :
132- local_storage = LocalStorageAdapter (Path (environment .datastore_dir ))
138+ job_id = job .job_id
139+ local_storage = LocalStorageAdapter (
140+ datastore_api .get_datastore_directory (job .datastore_rdn )
141+ )
133142 logger .warning (
134143 f"{ job_id } : Rolling back worker job "
135144 f'with target: "{ dataset_name } " and operation "{ operation } "'
@@ -154,17 +163,19 @@ def rollback_worker_phase_import_job(
154163
155164 if operation in ["ADD" , "CHANGE" ]:
156165 for file in generated_data_files :
157- filepath = WORKING_DIR_PATH / file
166+ filepath = local_storage . working_dir . path / file
158167 if filepath .exists ():
159168 logger .info (f'{ job_id } : Deleting data file "{ filepath } "' )
160169 os .remove (filepath )
161- parquet_directory = WORKING_DIR_PATH / generated_data_directory
170+ parquet_directory = (
171+ local_storage .working_dir .path / generated_data_directory
172+ )
162173 if parquet_directory .exists () and os .path .isdir (parquet_directory ):
163174 logger .info (
164175 f'{ job_id } : Deleting data directory "{ parquet_directory } "'
165176 )
166177 shutil .rmtree (parquet_directory )
167- dataset_directory = WORKING_DIR_PATH / dataset_name
178+ dataset_directory = local_storage . working_dir . path / dataset_name
168179 if dataset_directory .exists () and os .path .isdir (dataset_directory ):
169180 logger .info (
170181 f'{ job_id } : Deleting dataset directory "{ dataset_directory } "'
@@ -173,14 +184,17 @@ def rollback_worker_phase_import_job(
173184
174185
175186def rollback_manager_phase_import_job (
176- job_id : str , operation : str , dataset_name : str
187+ job : Job , operation : str , dataset_name : str
177188) -> None :
178189 """
179190 Rolls back manager phase import job.
180191 Exceptions are not handled here on purpose. It is a catastrophic thing
181192 if a rollback fails.
182193 """
183- local_storage = LocalStorageAdapter (Path (environment .datastore_dir ))
194+ job_id = job .job_id
195+ local_storage = LocalStorageAdapter (
196+ datastore_api .get_datastore_directory (job .datastore_rdn )
197+ )
184198 logger .warning (
185199 f"{ job_id } : Rolling back import job "
186200 f'with target: "{ dataset_name } " and operation "{ operation } "'
@@ -218,7 +232,7 @@ def fix_interrupted_job(job: Job) -> None:
218232 if job_operation in ["ADD" , "CHANGE" , "PATCH_METADATA" ]:
219233 if job .status == "importing" :
220234 rollback_manager_phase_import_job (
221- job . job_id , job_operation , job .parameters .target
235+ job , job_operation , job .parameters .target
222236 )
223237 logger .info (
224238 f"{ job .job_id } : Rolled back importing of job with "
@@ -232,7 +246,7 @@ def fix_interrupted_job(job: Job) -> None:
232246 )
233247 else :
234248 rollback_worker_phase_import_job (
235- job . job_id , job_operation , job .parameters .target
249+ job , job_operation , job .parameters .target
236250 )
237251 logger .info (
238252 f'{ job .job_id } : Setting status to "failed" for interrupted job'
@@ -262,7 +276,7 @@ def fix_interrupted_job(job: Job) -> None:
262276 bump_manifesto = job .parameters .bump_manifesto
263277 if bump_manifesto is None :
264278 raise RollbackException ("No bump manifesto available" )
265- rollback_bump (job . job_id , bump_manifesto . model_dump () )
279+ rollback_bump (job , bump_manifesto )
266280 except Exception as exc :
267281 error_message = f"Failed rollback for { job .job_id } "
268282 logger .exception (error_message , exc_info = exc )
0 commit comments