Skip to content

Commit c868934

Browse files
DanielElisenbergpawbulinnbjornerud
authored
Workflow multitenancy tests (#211)
* workflow_multitenancy_tests: move code from app * workflow_multitenancy_tests: move code to manager * workflow_multitenancy_tests: update to use job context * workflow_multitenancy_tests: restart from scratch tests * workflow_multitenancy_tests: gitkeeps * Remove dotenv (#212) * remove_dotenv: move to conftest * remove_dotenv: remove dependency * More integration tests (#213) * more_integration_tests: new datastore resouce dirs * more_integration_tests: add to common functions * more_integration_tests: add rollback tests * more_integration_tests: fix * more_integration_tests: fix * Reintroduce unit tests (#214) * reintroduce_unit_tests: adapter tests * reintroduce_unit_tests: config tests * reintroduce_unit_tests: manager tests * reintroduce_unit_tests: worker tests * reintroduce_unit_tests: fix resources * reintroduce_unit_tests: fixes manager test * reintroduce_unit_tests: fixes * reintroduce_unit_tests: missing gitkeep * reintroduce_unit_tests: fix imports * reintroduce_unit_tests: test fixes * update_urllib3: update (#215) * reintroduce_fs_models_test: reintroduce (#217) * Check tmp dir of all datastores (#219) * pin_actions: better security (#210) * Use variables from env in workflow scripts * remove double env in workflow-step * update INTEGRATION_TEST_WORKFLOW and DEPLOY_WORKFLOW in env * Update urllib3 * Check tmp dir of all datastores --------- Co-authored-by: linnbjornerud <linnchb@gmail.com> Co-authored-by: Linn Bjørnerud <125447659+linnbjornerud@users.noreply.github.com> * Extra assertions integration tests (#221) * extra_assertions_integration_tests: added metadata update assertions * extra_assertions_integration_tests: more assertions, update resources * extra_assertions_integration_tests: update dependencies * extra_assertions_integration_tests: ruff --------- Co-authored-by: pawbu <pawbu@users.noreply.github.com> Co-authored-by: linnbjornerud <linnchb@gmail.com> Co-authored-by: Linn Bjørnerud <125447659+linnbjornerud@users.noreply.github.com>
1 parent 90c683f commit c868934

File tree

192 files changed

+2259
-5447
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

192 files changed

+2259
-5447
lines changed

.test.env

Lines changed: 0 additions & 9 deletions
This file was deleted.

job_executor/app.py

Lines changed: 27 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,112 +1,59 @@
11
import logging
22
import time
3-
from multiprocessing import Queue
4-
from pathlib import Path
53

64
from job_executor.adapter import datastore_api
7-
from job_executor.adapter.datastore_api.models import JobStatus
85
from job_executor.adapter.fs import LocalStorageAdapter
96
from job_executor.common.exceptions import StartupException
107
from job_executor.config import environment
11-
from job_executor.config.log import initialize_logging_thread, setup_logging
8+
from job_executor.config.log import setup_logging
129
from job_executor.domain import rollback
1310
from job_executor.domain.manager import Manager
1411

1512
logger = logging.getLogger()
1613
setup_logging()
1714

1815

19-
def initialize_app() -> None:
16+
def initialize_app() -> Manager:
17+
"""
18+
Initializing the datastore by rolling back any unfinished jobs in any
19+
datastore, and checking if any datastores have unresolved temporary backups.
20+
21+
Returns a manager if all datastores appear healthy.
22+
"""
2023
try:
2124
rollback.fix_interrupted_jobs()
22-
rdns = datastore_api.get_datastores()
23-
for rdn in rdns:
24-
local_storage = LocalStorageAdapter(Path(environment.datastore_dir))
25+
for rdn in datastore_api.get_datastores():
26+
local_storage = LocalStorageAdapter(
27+
datastore_api.get_datastore_directory(rdn)
28+
)
2529
if local_storage.datastore_dir.temporary_backup_exists():
2630
raise StartupException(f"tmp directory exists for {rdn}")
27-
except Exception as e:
28-
raise StartupException("Exception when initializing") from e
29-
30-
31-
def handle_jobs(manager: Manager, logging_queue: Queue) -> None:
32-
job_query_result = datastore_api.query_for_jobs()
33-
manager.clean_up_after_dead_workers()
34-
if job_query_result.available_jobs_count:
35-
logger.info(
36-
f"Found {len(job_query_result.queued_worker_jobs)}"
37-
f"/{len(job_query_result.built_jobs)}"
38-
f"/{len(job_query_result.queued_manager_jobs)}"
39-
f" (worker, built, queued manager jobs)"
40-
)
41-
42-
for job in job_query_result.queued_worker_jobs:
43-
local_storage = LocalStorageAdapter(
44-
datastore_api.get_datastore_directory(job.datastore_rdn)
45-
)
46-
job_size = local_storage.input_dir.get_importable_tar_size_in_bytes(
47-
job.parameters.target
48-
)
49-
if job_size == 0:
50-
logger.error(f"{job.job_id} Failed to get the size of the dataset.")
51-
datastore_api.update_job_status(
52-
job.job_id,
53-
JobStatus.FAILED,
54-
log="No such dataset available for import",
55-
)
56-
continue # skip futher processing of this job
57-
if job_size > manager.max_bytes_all_workers:
58-
logger.warning(
59-
f"{job.job_id} Exceeded the maximum size for all workers."
60-
)
61-
datastore_api.update_job_status(
62-
job.job_id,
63-
JobStatus.FAILED,
64-
log="Dataset too large for import",
65-
)
66-
continue # skip futher processing of this job
67-
if manager.can_spawn_new_worker(job_size):
68-
manager.handle_worker_job(job, job_size, logging_queue)
69-
70-
for job in job_query_result.queued_manager_and_built_jobs():
71-
try:
72-
manager.handle_manager_job(job)
73-
except Exception as exc:
74-
# All exceptions that occur during the handling of a job
75-
# are resolved by rolling back. The exceptions that
76-
# reach here are exceptions raised by the rollback.
77-
logger.exception(
78-
f"{job.job_id} failed and could not roll back",
79-
exc_info=exc,
80-
)
81-
raise exc
82-
83-
84-
def main() -> None:
85-
logging_queue = None
86-
log_thread = None
87-
try:
88-
initialize_app()
89-
logging_queue, log_thread = initialize_logging_thread()
90-
manager = Manager(
31+
return Manager(
9132
max_workers=environment.number_of_workers,
9233
max_bytes_all_workers=(
9334
environment.max_gb_all_workers * 1024**3
9435
), # Covert from GB to bytes
9536
)
37+
except Exception as e:
38+
raise StartupException("Exception when initializing") from e
9639

40+
41+
def main() -> None:
42+
manager = initialize_app()
43+
try:
9744
while True:
9845
time.sleep(5)
99-
handle_jobs(manager, logging_queue)
46+
job_query_result = datastore_api.query_for_jobs()
47+
manager.handle_jobs(job_query_result)
10048
except Exception as e:
101-
logger.exception("Service stopped by exception", exc_info=e)
49+
raise e
10250
finally:
103-
# Tell the logging thread to finish up
104-
if logging_queue is not None:
105-
logging_queue.put(None)
106-
if log_thread is not None:
107-
log_thread.join()
51+
manager.close_logging_thread()
10852

10953

11054
if __name__ == "__main__":
11155
logger.info("Polling for jobs...")
112-
main()
56+
try:
57+
main()
58+
except Exception as e:
59+
logger.exception("Service stopped by exception", exc_info=e)

0 commit comments

Comments
 (0)