Skip to content

Commit f1dc6b7

Browse files
2025 housekeeping ii (#195)
* 2025_housekeeping_II: environment object * 2025_housekeeping_II: secrets object * 2025_housekeeping_II: move models to where they belong * 2025_housekeeping_II: move manager/worker into domain exc into common
1 parent 3759856 commit f1dc6b7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+405
-389
lines changed

job_executor/adapter/datastore_api/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
MaintenanceStatus,
1414
Operation,
1515
)
16+
from job_executor.common.exceptions import HttpRequestError, HttpResponseError
1617
from job_executor.config import environment
17-
from job_executor.exception import HttpRequestError, HttpResponseError
1818

19-
DATASTORE_API_URL = environment.get("DATASTORE_API_URL")
19+
DATASTORE_API_URL = environment.datastore_api_url
2020
DEFAULT_REQUESTS_TIMEOUT = (10, 60) # (read timeout, connect timeout)
2121

2222
logger = logging.getLogger()

job_executor/adapter/datastore_api/models.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33

44
from pydantic import model_validator
55

6-
from job_executor.model import DatastoreVersion
7-
from job_executor.model.camelcase_model import CamelModel
6+
from job_executor.adapter.local_storage.models.datastore_versions import (
7+
DatastoreVersion,
8+
)
9+
from job_executor.common.models import CamelModel
810

911

1012
class MaintenanceStatus(CamelModel):

job_executor/adapter/local_storage.py renamed to job_executor/adapter/local_storage/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66

77
from pydantic import ValidationError
88

9+
from job_executor.common.exceptions import LocalStorageError
910
from job_executor.config import environment
10-
from job_executor.exception import LocalStorageError
1111

12-
WORKING_DIR = Path(environment.get("WORKING_DIR"))
13-
DATASTORE_DIR = Path(environment.get("DATASTORE_DIR"))
14-
INPUT_DIR = Path(environment.get("INPUT_DIR"))
12+
WORKING_DIR = Path(environment.working_dir)
13+
DATASTORE_DIR = Path(environment.datastore_dir)
14+
INPUT_DIR = Path(environment.input_dir)
1515

1616
DATASTORE_VERSIONS_PATH = DATASTORE_DIR / "datastore/datastore_versions.json"
1717
DRAFT_METADATA_ALL_PATH = DATASTORE_DIR / "datastore/metadata_all__DRAFT.json"

job_executor/model/datastore_version.py renamed to job_executor/adapter/local_storage/models/datastore_versions.py

Lines changed: 134 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,47 @@
22
from typing import Iterator
33

44
from job_executor.adapter import local_storage
5-
from job_executor.exception import (
5+
from job_executor.common.exceptions import (
66
BumpException,
77
ExistingDraftException,
88
NoSuchDraftException,
9+
ReleaseStatusException,
910
UnnecessaryUpdateException,
11+
VersioningException,
1012
)
11-
from job_executor.model.camelcase_model import CamelModel
12-
from job_executor.model.data_structure_update import DataStructureUpdate
13+
from job_executor.common.models import CamelModel
14+
15+
16+
class DataStructureUpdate(CamelModel, extra="forbid"):
17+
name: str
18+
description: str
19+
operation: str
20+
release_status: str
21+
22+
def set_release_status(self, new_status: str) -> None:
23+
if new_status == "PENDING_RELEASE":
24+
if self.operation not in ["ADD", "CHANGE", "PATCH_METADATA"]:
25+
raise ReleaseStatusException(
26+
f"Can't set release status: {new_status} "
27+
f"for dataset with operation: {self.operation}"
28+
)
29+
elif new_status == "PENDING_DELETE":
30+
if self.operation != "REMOVE":
31+
raise ReleaseStatusException(
32+
f"Can't set release status: {new_status} "
33+
f"for dataset with operation: {self.operation}"
34+
)
35+
elif new_status == "DRAFT":
36+
if self.operation == "REMOVE":
37+
raise ReleaseStatusException(
38+
f"Can't set release status: {new_status} "
39+
f"for dataset with operation: {self.operation}"
40+
)
41+
elif new_status != "DRAFT":
42+
raise ReleaseStatusException(
43+
f"Invalid release status: {new_status}"
44+
)
45+
self.release_status = new_status
1346

1447

1548
class DatastoreVersion(CamelModel):
@@ -173,3 +206,101 @@ def _set_release_time(self) -> None:
173206
version_list = list(self.version.split("."))
174207
version_list[-1] = str(self.release_time)
175208
self.version = ".".join(version_list)
209+
210+
211+
class DatastoreVersions(CamelModel, extra="forbid"):
212+
name: str
213+
label: str
214+
description: str
215+
versions: list[DatastoreVersion]
216+
217+
def _write_to_file(self) -> None:
218+
local_storage.write_datastore_versions(self.model_dump(by_alias=True))
219+
220+
def _get_current_epoch_seconds(self) -> int:
221+
return int(
222+
(
223+
datetime.now(UTC).replace(tzinfo=None)
224+
- datetime.fromtimestamp(0, UTC).replace(tzinfo=None)
225+
).total_seconds()
226+
)
227+
228+
def add_new_release_version(
229+
self,
230+
data_structure_updates: list[DataStructureUpdate],
231+
description: str,
232+
update_type: str,
233+
) -> str:
234+
released_data_structure_updates = [
235+
DataStructureUpdate(
236+
name=data_structure.name,
237+
description=data_structure.description,
238+
operation=data_structure.operation,
239+
release_status=(
240+
"DELETED"
241+
if data_structure.operation == "REMOVE"
242+
else "RELEASED"
243+
),
244+
)
245+
for data_structure in data_structure_updates
246+
]
247+
new_version_number = (
248+
"1.0.0.0"
249+
if self.versions == []
250+
else bump_dotted_version_number(
251+
self.versions[0].version, update_type
252+
)
253+
)
254+
new_release_version = DatastoreVersion(
255+
version=new_version_number,
256+
description=description,
257+
release_time=self._get_current_epoch_seconds(),
258+
language_code="no",
259+
update_type=update_type,
260+
data_structure_updates=released_data_structure_updates,
261+
)
262+
self.versions = [new_release_version] + self.versions
263+
self._write_to_file()
264+
return dotted_to_underscored_version(new_version_number)
265+
266+
def get_dataset_release_status(self, dataset_name: str) -> str | None:
267+
for version in self.versions:
268+
release_status = version.get_dataset_release_status(dataset_name)
269+
if release_status is not None:
270+
return release_status
271+
return None
272+
273+
def get_latest_version_number(self) -> str | None:
274+
if len(self.versions):
275+
return dotted_to_underscored_version(self.versions[0].version)
276+
else:
277+
return None
278+
279+
280+
def dotted_to_underscored_version(version: str) -> str:
281+
return "_".join(version.split(".")[:-1])
282+
283+
284+
def underscored_to_dotted_version(version: str) -> str:
285+
return f"{version.replace('_', '.')}.0"
286+
287+
288+
def bump_dotted_version_number(version: str, update_type: str) -> str:
289+
version_list = [int(number) for number in version.split(".")]
290+
if update_type == "MAJOR":
291+
return ".".join([str(version_list[0] + 1), "0", "0", "0"])
292+
elif update_type == "MINOR":
293+
return ".".join(
294+
[str(version_list[0]), str(version_list[1] + 1), "0", "0"]
295+
)
296+
elif update_type == "PATCH":
297+
return ".".join(
298+
[
299+
str(version_list[0]),
300+
str(version_list[1]),
301+
str(version_list[2] + 1),
302+
"0",
303+
]
304+
)
305+
else:
306+
raise VersioningException(f"Invalid update_type {update_type}")

job_executor/model/metadata.py renamed to job_executor/adapter/local_storage/models/metadata.py

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
1-
from job_executor.exception import MetadataException, PatchingError
2-
from job_executor.model.camelcase_model import CamelModel
1+
from collections.abc import Iterator
2+
3+
from job_executor.adapter import local_storage
4+
from job_executor.adapter.local_storage.models.datastore_versions import (
5+
DatastoreVersion,
6+
)
7+
from job_executor.common.exceptions import (
8+
BumpException,
9+
MetadataException,
10+
PatchingError,
11+
)
12+
from job_executor.common.models import CamelModel
313

414
DATA_TYPES_MAPPING = {
515
"STRING": "String",
@@ -357,3 +367,98 @@ def patch(self, other: "Metadata | None") -> "Metadata":
357367
if self.temporal_status_dates is None:
358368
del metadata_dict["temporalStatusDates"]
359369
return Metadata(**metadata_dict)
370+
371+
372+
class DataStoreInfo(CamelModel):
373+
name: str
374+
label: str
375+
description: str
376+
language_code: str
377+
378+
379+
class LanguageInfo(CamelModel):
380+
code: str
381+
label: str
382+
383+
384+
class MetadataAll(CamelModel):
385+
data_store: DataStoreInfo
386+
data_structures: list[Metadata]
387+
languages: list[LanguageInfo]
388+
389+
def __iter__(self) -> Iterator[Metadata]: # type: ignore
390+
return iter(
391+
[
392+
Metadata(
393+
**data_structure.model_dump(
394+
by_alias=True, exclude_none=True
395+
)
396+
)
397+
for data_structure in self.data_structures
398+
]
399+
)
400+
401+
def get(self, dataset_name: str) -> Metadata | None:
402+
for metadata in self.data_structures:
403+
if metadata.name == dataset_name:
404+
return Metadata(
405+
**metadata.model_dump(by_alias=True, exclude_none=True)
406+
)
407+
return None
408+
409+
410+
class MetadataAllDraft(MetadataAll):
411+
def _write_to_file(self) -> None:
412+
local_storage.write_metadata_all(
413+
self.model_dump(by_alias=True, exclude_none=True), "DRAFT"
414+
)
415+
416+
def remove(self, dataset_name: str) -> None:
417+
self.data_structures = [
418+
metadata
419+
for metadata in self.data_structures
420+
if metadata.name != dataset_name
421+
]
422+
self._write_to_file()
423+
424+
def update_one(self, dataset_name: str, metadata: Metadata) -> None:
425+
self.data_structures = [
426+
metadata
427+
for metadata in self.data_structures
428+
if metadata.name != dataset_name
429+
]
430+
self.data_structures.append(metadata)
431+
self._write_to_file()
432+
433+
def remove_all(self) -> None:
434+
self.data_structures = []
435+
self._write_to_file()
436+
437+
def add(self, metadata: Metadata) -> None:
438+
self.data_structures.append(metadata)
439+
self._write_to_file()
440+
441+
def rebuild(
442+
self,
443+
released_metadata: list[Metadata],
444+
draft_version: DatastoreVersion,
445+
) -> None:
446+
previous_data_structures = {ds.name: ds for ds in self.data_structures}
447+
new_data_structures = {
448+
ds.name: Metadata(**ds.model_dump(by_alias=True, exclude_none=True))
449+
for ds in released_metadata
450+
}
451+
for draft in draft_version:
452+
if draft.operation == "REMOVE":
453+
del new_data_structures[draft.name]
454+
else:
455+
draft_metadata = previous_data_structures.get(draft.name, None)
456+
if draft_metadata is None:
457+
raise BumpException(
458+
"Could not rebuild metadata_all__DRAFT. "
459+
f"No metadata for {draft.name} in previous "
460+
"metadata_all__DRAFT"
461+
)
462+
new_data_structures[draft.name] = draft_metadata
463+
self.data_structures = list(new_data_structures.values())
464+
self._write_to_file()

job_executor/adapter/pseudonym_service.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import requests
22
from microdata_tools.validation.model.metadata import UnitIdType
33

4+
from job_executor.common.exceptions import HttpResponseError
45
from job_executor.config import environment, secrets
5-
from job_executor.exception import HttpResponseError
66

7-
PSEUDONYM_SERVICE_URL = environment.get("PSEUDONYM_SERVICE_URL")
8-
PSEUDONYM_SERVICE_API_KEY = secrets.get("PSEUDONYM_SERVICE_API_KEY")
7+
PSEUDONYM_SERVICE_URL = environment.pseudonym_service_url
8+
PSEUDONYM_SERVICE_API_KEY = secrets.pseudonym_service_api_key
99

1010

1111
def pseudonymize(

job_executor/app.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
from job_executor.adapter import datastore_api, local_storage
66
from job_executor.adapter.datastore_api.models import JobStatus
7+
from job_executor.common.exceptions import StartupException
78
from job_executor.config import environment
89
from job_executor.config.log import initialize_logging_thread, setup_logging
910
from job_executor.domain import rollback
10-
from job_executor.exception import StartupException
11-
from job_executor.manager import Manager
11+
from job_executor.domain.manager import Manager
1212

1313
logger = logging.getLogger()
1414
setup_logging()
@@ -80,11 +80,10 @@ def main() -> None:
8080
initialize_app()
8181
logging_queue, log_thread = initialize_logging_thread()
8282
manager = Manager(
83-
max_workers=int(environment.get("NUMBER_OF_WORKERS")),
83+
max_workers=environment.number_of_workers,
8484
max_bytes_all_workers=(
85-
int(environment.get("MAX_GB_ALL_WORKERS"))
86-
* 1024**3 # Covert from GB to bytes
87-
),
85+
environment.max_gb_all_workers * 1024**3
86+
), # Covert from GB to bytes
8887
)
8988

9089
while True:

0 commit comments

Comments
 (0)