Skip to content

Commit c801763

Browse files
187 do not persist datastore models in memory (#193)
* 187: read datastore files before each job * 187: no need to refresh after job * 187: remove filereading from models * 187: linting
1 parent d14368f commit c801763

File tree

11 files changed

+85
-99
lines changed

11 files changed

+85
-99
lines changed

job_executor/app.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from job_executor.config import environment
77
from job_executor.config.log import initialize_logging_thread, setup_logging
88
from job_executor.domain import rollback
9-
from job_executor.domain.datastore import Datastore
109
from job_executor.exception import StartupException
1110
from job_executor.manager import Manager
1211
from job_executor.model.job import JobStatus
@@ -86,7 +85,6 @@ def main() -> None:
8685
int(environment.get("MAX_GB_ALL_WORKERS"))
8786
* 1024**3 # Covert from GB to bytes
8887
),
89-
this_datastore=Datastore(),
9088
)
9189

9290
while True:
Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,23 @@ class Datastore:
2929
latest_version_number: str | None
3030

3131
def __init__(self) -> None:
32-
self.refresh()
33-
34-
def refresh(self) -> None:
35-
self.draft_version = DraftVersion() # type: ignore
36-
self.datastore_versions = DatastoreVersions() # type: ignore
32+
self.draft_version = DraftVersion.model_validate(
33+
local_storage.get_draft_version()
34+
)
35+
self.datastore_versions = DatastoreVersions.model_validate(
36+
local_storage.get_datastore_versions()
37+
)
3738
self.latest_version_number = (
3839
self.datastore_versions.get_latest_version_number()
3940
)
40-
self.metadata_all_draft = MetadataAllDraft() # type: ignore
41+
self.metadata_all_draft = MetadataAllDraft.model_validate(
42+
local_storage.get_metadata_all("DRAFT")
43+
)
4144
if self.latest_version_number is None:
4245
self.metadata_all_latest = None
4346
else:
44-
self.metadata_all_latest = MetadataAll(
45-
**local_storage.get_metadata_all(self.latest_version_number)
47+
self.metadata_all_latest = MetadataAll.model_validate(
48+
local_storage.get_metadata_all(self.latest_version_number)
4649
)
4750

4851

@@ -197,15 +200,13 @@ def patch_metadata(
197200
job_id, "PATCH_METADATA", dataset_name
198201
)
199202
job_service.update_job_status(job_id, JobStatus.FAILED, str(e))
200-
datastore.refresh()
201203
except Exception as e:
202204
logger.error(f"{job_id}: An unexpected error occured")
203205
logger.exception(f"{job_id}: {str(e)}", exc_info=e)
204206
rollback_manager_phase_import_job(
205207
job_id, "PATCH_METADATA", dataset_name
206208
)
207209
job_service.update_job_status(job_id, JobStatus.FAILED)
208-
datastore.refresh()
209210

210211

211212
def add(
@@ -250,7 +251,6 @@ def add(
250251
logger.exception(f"{job_id}: {str(e)}", exc_info=e)
251252
rollback_manager_phase_import_job(job_id, "ADD", dataset_name)
252253
job_service.update_job_status(job_id, JobStatus.FAILED)
253-
datastore.refresh()
254254

255255

256256
def change(
@@ -297,7 +297,6 @@ def change(
297297
logger.exception(f"{job_id}: {str(e)}", exc_info=e)
298298
rollback_manager_phase_import_job(job_id, "CHANGE", dataset_name)
299299
job_service.update_job_status(job_id, JobStatus.FAILED)
300-
datastore.refresh()
301300

302301

303302
def remove(
@@ -491,7 +490,6 @@ def bump_version(
491490
bump_manifesto.model_dump(by_alias=True, exclude_none=True),
492491
)
493492
job_service.update_job_status(job_id, JobStatus.FAILED)
494-
datastore.refresh()
495493

496494

497495
def delete_archived_input(job_id: str, dataset_name: str) -> None:

job_executor/manager/__init__.py

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
from multiprocessing import Process, Queue
33

44
from job_executor.adapter import job_service
5-
from job_executor.domain import datastore, rollback
6-
from job_executor.domain.datastore import Datastore
5+
from job_executor.domain import datastores, rollback
6+
from job_executor.domain.datastores import Datastore
77
from job_executor.model.job import Job, JobStatus, Operation
88
from job_executor.model.worker import Worker
99
from job_executor.worker import build_dataset_worker, build_metadata_worker
@@ -26,7 +26,6 @@ def __init__(
2626
self,
2727
max_workers: int,
2828
max_bytes_all_workers: int,
29-
this_datastore: Datastore,
3029
) -> None:
3130
"""
3231
:param default_max_workers: The maximum number of workers
@@ -37,7 +36,6 @@ def __init__(
3736
"""
3837
self.max_workers = max_workers
3938
self.max_bytes_all_workers = max_bytes_all_workers
40-
self.datastore = this_datastore
4139
self.workers: list[Worker] = []
4240

4341
@property
@@ -141,69 +139,70 @@ def handle_worker_job(
141139
def handle_manager_job(self, job: Job) -> None:
142140
job_id = job.job_id
143141
operation = job.parameters.operation
142+
datastore = Datastore()
144143
self.unregister_worker(
145144
job_id
146145
) # Filter out job from worker jobs if built
147146
# Ignoring a lot of types here as we already have done the validation
148147
# in the pydantic model.
149148
if operation == Operation.BUMP:
150-
datastore.bump_version(
151-
self.datastore,
149+
datastores.bump_version(
150+
datastore,
152151
job_id,
153152
job.parameters.bump_manifesto, # type: ignore
154153
job.parameters.description, # type: ignore
155154
)
156155
elif operation == Operation.PATCH_METADATA:
157-
datastore.patch_metadata(
158-
self.datastore,
156+
datastores.patch_metadata(
157+
datastore,
159158
job_id,
160159
job.parameters.target,
161160
job.parameters.description, # type: ignore
162161
)
163162
elif operation == Operation.SET_STATUS:
164-
datastore.set_draft_release_status(
165-
self.datastore,
163+
datastores.set_draft_release_status(
164+
datastore,
166165
job_id,
167166
job.parameters.target,
168167
job.parameters.release_status, # type: ignore
169168
)
170169
elif operation == Operation.ADD:
171-
datastore.add(
172-
self.datastore,
170+
datastores.add(
171+
datastore,
173172
job_id,
174173
job.parameters.target,
175174
job.parameters.description, # type: ignore
176175
)
177176
elif operation == Operation.CHANGE:
178-
datastore.change(
179-
self.datastore,
177+
datastores.change(
178+
datastore,
180179
job_id,
181180
job.parameters.target,
182181
job.parameters.description, # type: ignore
183182
)
184183
elif operation == Operation.REMOVE:
185-
datastore.remove(
186-
self.datastore,
184+
datastores.remove(
185+
datastore,
187186
job_id,
188187
job.parameters.target,
189188
job.parameters.description, # type: ignore
190189
)
191190
elif operation == Operation.ROLLBACK_REMOVE:
192-
datastore.delete_draft(
193-
self.datastore,
191+
datastores.delete_draft(
192+
datastore,
194193
job_id,
195194
job.parameters.target,
196195
rollback_remove=True,
197196
)
198197
elif operation == Operation.DELETE_DRAFT:
199-
datastore.delete_draft(
200-
self.datastore,
198+
datastores.delete_draft(
199+
datastore,
201200
job_id,
202201
job.parameters.target,
203202
rollback_remove=False,
204203
)
205204
elif operation == Operation.DELETE_ARCHIVE:
206-
datastore.delete_archived_input(job_id, job.parameters.target)
205+
datastores.delete_archived_input(job_id, job.parameters.target)
207206
else:
208207
job_service.update_job_status(
209208
job.job_id, JobStatus.FAILED, log="Unknown operation for job"

job_executor/model/datastore_version.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
from datetime import UTC, datetime
22
from typing import Iterator
33

4-
from pydantic import model_validator
5-
64
from job_executor.adapter import local_storage
75
from job_executor.exception import (
86
BumpException,
@@ -77,11 +75,6 @@ def contains(self, dataset_name: str) -> bool:
7775

7876

7977
class DraftVersion(DatastoreVersion):
80-
@model_validator(mode="before")
81-
@classmethod
82-
def read_file(cls, _): # noqa
83-
return local_storage.get_draft_version()
84-
8578
def add(self, data_structure_update: DataStructureUpdate) -> None:
8679
current_update_names = [update.name for update in self]
8780
if data_structure_update.name in current_update_names:

job_executor/model/datastore_versions.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from datetime import UTC, datetime
22

3-
from pydantic import model_validator
4-
53
from job_executor.adapter import local_storage
64
from job_executor.exception import VersioningException
75
from job_executor.model.camelcase_model import CamelModel
@@ -15,11 +13,6 @@ class DatastoreVersions(CamelModel, extra="forbid"):
1513
description: str
1614
versions: list[DatastoreVersion]
1715

18-
@model_validator(mode="before")
19-
@classmethod
20-
def read_file(cls, _): # noqa
21-
return local_storage.get_datastore_versions()
22-
2316
def _write_to_file(self) -> None:
2417
local_storage.write_datastore_versions(self.model_dump(by_alias=True))
2518

job_executor/model/metadata_all.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from collections.abc import Iterator
22

3-
from pydantic import model_validator
4-
53
from job_executor.adapter import local_storage
64
from job_executor.exception import BumpException
75
from job_executor.model import Metadata
@@ -48,11 +46,6 @@ def get(self, dataset_name: str) -> Metadata | None:
4846

4947

5048
class MetadataAllDraft(MetadataAll):
51-
@model_validator(mode="before")
52-
@classmethod
53-
def read_file(cls, _) -> dict: # noqa
54-
return local_storage.get_metadata_all("DRAFT")
55-
5649
def _write_to_file(self) -> None:
5750
local_storage.write_metadata_all(
5851
self.model_dump(by_alias=True, exclude_none=True), "DRAFT"

0 commit comments

Comments
 (0)