Skip to content

Commit 42d9893

Browse files
Functional interface datastore (#191)
* functional_interface_datastore: function wrappers * functional_interface_datastore: use function wrappers * functional_interface_datastore: update tests * functional_interface_datastore: ruff
1 parent e2d862a commit 42d9893

File tree

5 files changed

+178
-51
lines changed

5 files changed

+178
-51
lines changed

job_executor/domain/datastore.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,3 +509,84 @@ def delete_archived_input(self, job_id: str, dataset_name: str) -> None:
509509
self._log(job_id, "An unexpected error occured", "ERROR")
510510
self._log(job_id, str(e), "EXC", e)
511511
job_service.update_job_status(job_id, JobStatus.FAILED)
512+
513+
514+
def patch_metadata(
515+
datastore: Datastore, job_id: str, dataset_name: str, description: str
516+
) -> None:
517+
"""
518+
Patch metadata for a released dataset with updated metadata
519+
file.
520+
"""
521+
datastore.patch_metadata(job_id, dataset_name, description)
522+
523+
524+
def add(
525+
datastore: Datastore, job_id: str, dataset_name: str, description: str
526+
) -> None:
527+
"""
528+
Import metadata and data as draft for a new dataset that
529+
has not been released in a previous versions.
530+
"""
531+
datastore.add(job_id, dataset_name, description)
532+
533+
534+
def change(
535+
datastore: Datastore, job_id: str, dataset_name: str, description: str
536+
) -> None:
537+
"""
538+
Import metadata and data as draft for as an update
539+
for a dataset that has already been released in a
540+
previous version.
541+
"""
542+
datastore.change(job_id, dataset_name, description)
543+
544+
545+
def remove(
546+
datastore: Datastore, job_id: str, dataset_name: str, description: str
547+
) -> None:
548+
"""
549+
Remove a released dataset that has been released in
550+
a previous version from future versions of the datastore.
551+
"""
552+
datastore.remove(job_id, dataset_name, description)
553+
554+
555+
def delete_draft(
556+
datastore: Datastore, job_id: str, dataset_name: str, rollback_remove: bool
557+
) -> None:
558+
"""
559+
Delete a dataset from the draft version of the datastore.
560+
"""
561+
datastore.delete_draft(job_id, dataset_name, rollback_remove)
562+
563+
564+
def set_draft_release_status(
565+
datastore: Datastore, job_id: str, dataset_name: str, new_status: str
566+
) -> None:
567+
"""
568+
Set a new release status for a dataset in the draft version.
569+
"""
570+
datastore.set_draft_release_status(job_id, dataset_name, new_status)
571+
572+
573+
def bump_version(
574+
datastore: Datastore,
575+
job_id: str,
576+
bump_manifesto: DatastoreVersion,
577+
description: str,
578+
) -> None:
579+
"""
580+
Release a new version of the datastore with the pending
581+
operations in the draft version of the datastore.
582+
"""
583+
datastore.bump_version(job_id, bump_manifesto, description)
584+
585+
586+
def delete_archived_input(
587+
datastore: Datastore, job_id: str, dataset_name: str
588+
) -> None:
589+
"""
590+
Delete the archived dataset from archive directory.
591+
"""
592+
datastore.delete_archived_input(job_id, dataset_name)

job_executor/manager/__init__.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from multiprocessing import Process, Queue
33

44
from job_executor.adapter import job_service
5-
from job_executor.domain import rollback
5+
from job_executor.domain import datastore, rollback
66
from job_executor.domain.datastore import Datastore
77
from job_executor.model.job import Job, JobStatus, Operation
88
from job_executor.model.worker import Worker
@@ -23,7 +23,10 @@ class Manager:
2323
"""
2424

2525
def __init__(
26-
self, max_workers: int, max_bytes_all_workers: int, datastore: Datastore
26+
self,
27+
max_workers: int,
28+
max_bytes_all_workers: int,
29+
this_datastore: Datastore,
2730
) -> None:
2831
"""
2932
:param default_max_workers: The maximum number of workers
@@ -34,7 +37,7 @@ def __init__(
3437
"""
3538
self.max_workers = max_workers
3639
self.max_bytes_all_workers = max_bytes_all_workers
37-
self.datastore = datastore
40+
self.datastore = this_datastore
3841
self.workers: list[Worker] = []
3942

4043
@property
@@ -144,51 +147,65 @@ def handle_manager_job(self, job: Job) -> None:
144147
# Ignoring a lot of types here as we already have done the validation
145148
# in the pydantic model.
146149
if operation == Operation.BUMP:
147-
self.datastore.bump_version(
150+
datastore.bump_version(
151+
self.datastore,
148152
job_id,
149153
job.parameters.bump_manifesto, # type: ignore
150154
job.parameters.description, # type: ignore
151155
)
152156
elif operation == Operation.PATCH_METADATA:
153-
self.datastore.patch_metadata(
157+
datastore.patch_metadata(
158+
self.datastore,
154159
job_id,
155160
job.parameters.target,
156161
job.parameters.description, # type: ignore
157162
)
158163
elif operation == Operation.SET_STATUS:
159-
self.datastore.set_draft_release_status(
164+
datastore.set_draft_release_status(
165+
self.datastore,
160166
job_id,
161167
job.parameters.target,
162168
job.parameters.release_status, # type: ignore
163169
)
164170
elif operation == Operation.ADD:
165-
self.datastore.add(
171+
datastore.add(
172+
self.datastore,
166173
job_id,
167174
job.parameters.target,
168175
job.parameters.description, # type: ignore
169176
)
170177
elif operation == Operation.CHANGE:
171-
self.datastore.change(
178+
datastore.change(
179+
self.datastore,
172180
job_id,
173181
job.parameters.target,
174182
job.parameters.description, # type: ignore
175183
)
176184
elif operation == Operation.REMOVE:
177-
self.datastore.remove(
185+
datastore.remove(
186+
self.datastore,
178187
job_id,
179188
job.parameters.target,
180189
job.parameters.description, # type: ignore
181190
)
182191
elif operation == Operation.ROLLBACK_REMOVE:
183-
self.datastore.delete_draft(
184-
job_id, job.parameters.target, rollback_remove=True
192+
datastore.delete_draft(
193+
self.datastore,
194+
job_id,
195+
job.parameters.target,
196+
rollback_remove=True,
185197
)
186198
elif operation == Operation.DELETE_DRAFT:
187-
self.datastore.delete_draft(
188-
job_id, job.parameters.target, rollback_remove=False
199+
datastore.delete_draft(
200+
self.datastore,
201+
job_id,
202+
job.parameters.target,
203+
rollback_remove=False,
189204
)
190205
elif operation == Operation.DELETE_ARCHIVE:
191-
self.datastore.delete_archived_input(job_id, job.parameters.target)
206+
datastore.delete_archived_input(
207+
self.datastore, job_id, job.parameters.target
208+
)
192209
else:
193210
job_service.update_job_status(
194211
job.job_id, JobStatus.FAILED, log="Unknown operation for job"

0 commit comments

Comments
 (0)