Skip to content

Commit 3205aee

Browse files
200 new job model (#206)
* 200-new-job-model: update config * 200-new-job-model: update datastore-api adapter * 200-new-job-model: test adapter * 200-new-job-model: domain tests * 200-new-job-model: datastore specific localstorage adapters
1 parent b77f119 commit 3205aee

File tree

14 files changed

+434
-250
lines changed

14 files changed

+434
-250
lines changed

.test.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
DATASTORE_DIR=tests/resources/datastores/TEST_DATASTORE
2+
DATASTORE_RDN=no.ssb.test
23
PSEUDONYM_SERVICE_URL=http://mock.pseudonym.service
34
DATASTORE_API_URL=http://mock.job.service
45
NUMBER_OF_WORKERS=4

job_executor/adapter/datastore_api/__init__.py

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from pathlib import Path
23
from urllib.error import HTTPError
34

45
import requests
@@ -22,6 +23,37 @@
2223
logger = logging.getLogger()
2324

2425

26+
def execute_request(
27+
method: str,
28+
url: str,
29+
retry: bool = False,
30+
**kwargs, # noqa
31+
) -> Response:
32+
try:
33+
if retry:
34+
with requests.Session() as s:
35+
retries = Retry(
36+
total=6,
37+
backoff_factor=0.5,
38+
# [0.0s, 1.0s, 2.0s, 4.0s, 8.0s, 16.0s] between retries
39+
allowed_methods={"GET"},
40+
)
41+
s.mount("http://", HTTPAdapter(max_retries=retries))
42+
response = s.request(method=method, url=url, **kwargs)
43+
else:
44+
response = requests.request(
45+
method=method,
46+
url=url,
47+
timeout=DEFAULT_REQUESTS_TIMEOUT,
48+
**kwargs,
49+
)
50+
if response.status_code != 200:
51+
raise HttpResponseError(f"{response.status_code}: {response.text}")
52+
return response
53+
except (RequestException, HTTPError) as e:
54+
raise HttpRequestError(e) from e
55+
56+
2557
def get_jobs(
2658
job_status: JobStatus | None = None,
2759
operations: list[Operation] | None = None,
@@ -40,7 +72,10 @@ def get_jobs(
4072
request_url += f"?{'&'.join(query_fields)}"
4173

4274
response = execute_request("GET", request_url, True)
43-
return [Job(**job) for job in response.json()]
75+
return [
76+
Job.model_validate({"datastoreRdn": environment.datastore_rdn, **job})
77+
for job in response.json()
78+
]
4479

4580

4681
def update_job_status(
@@ -72,35 +107,8 @@ def is_system_paused() -> bool:
72107
return maintenance_status.paused
73108

74109

75-
def execute_request(
76-
method: str,
77-
url: str,
78-
retry: bool = False,
79-
**kwargs, # noqa
80-
) -> Response:
81-
try:
82-
if retry:
83-
with requests.Session() as s:
84-
retries = Retry(
85-
total=6,
86-
backoff_factor=0.5,
87-
# [0.0s, 1.0s, 2.0s, 4.0s, 8.0s, 16.0s] between retries
88-
allowed_methods={"GET"},
89-
)
90-
s.mount("http://", HTTPAdapter(max_retries=retries))
91-
response = s.request(method=method, url=url, **kwargs)
92-
else:
93-
response = requests.request(
94-
method=method,
95-
url=url,
96-
timeout=DEFAULT_REQUESTS_TIMEOUT,
97-
**kwargs,
98-
)
99-
if response.status_code != 200:
100-
raise HttpResponseError(f"{response.status_code}: {response.text}")
101-
return response
102-
except (RequestException, HTTPError) as e:
103-
raise HttpRequestError(e) from e
110+
def get_datastore_directory(rdn: str) -> Path:
111+
return Path(environment.datastore_dir)
104112

105113

106114
def query_for_jobs() -> JobQueryResult:

job_executor/adapter/datastore_api/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ class Log(CamelModel, extra="forbid"):
9090

9191
class Job(CamelModel, use_enum_values=True):
9292
job_id: str
93+
datastore_rdn: str
9394
status: JobStatus
9495
parameters: JobParameters
9596
log: list[Log] | None = []

job_executor/app.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ def initialize_app() -> None:
2929
def handle_jobs(manager: Manager, logging_queue: Queue) -> None:
3030
job_query_result = datastore_api.query_for_jobs()
3131
manager.clean_up_after_dead_workers()
32-
local_storage = LocalStorageAdapter(Path(environment.datastore_dir))
3332
if job_query_result.available_jobs_count:
3433
logger.info(
3534
f"Found {len(job_query_result.queued_worker_jobs)}"
@@ -39,6 +38,9 @@ def handle_jobs(manager: Manager, logging_queue: Queue) -> None:
3938
)
4039

4140
for job in job_query_result.queued_worker_jobs:
41+
local_storage = LocalStorageAdapter(
42+
datastore_api.get_datastore_directory(job.datastore_rdn)
43+
)
4244
job_size = local_storage.input_dir.get_importable_tar_size_in_bytes(
4345
job.parameters.target
4446
)

job_executor/config/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
@dataclass
77
class Environment:
88
datastore_dir: str
9+
datastore_rdn: str
910
secrets_file: str
1011
pseudonym_service_url: str
1112
datastore_api_url: str
@@ -18,6 +19,7 @@ class Environment:
1819
def _initialize_environment() -> Environment:
1920
return Environment(
2021
datastore_dir=os.environ["DATASTORE_DIR"],
22+
datastore_rdn=os.environ["DATASTORE_RDN"],
2123
secrets_file=os.environ["SECRETS_FILE"],
2224
pseudonym_service_url=os.environ["PSEUDONYM_SERVICE_URL"],
2325
datastore_api_url=os.environ["DATASTORE_API_URL"],

0 commit comments

Comments
 (0)