Skip to content

Commit 069a509

Browse files
authored
226 bug scicat dataset is not reset in case of failure (#227)
2 parents 8afd8ff + cf22eb0 commit 069a509

23 files changed

+256
-210
lines changed

backend/api/.dockerignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
.venv
2-
__pycache__
2+
**/__pycache__
33
.pytest_cache
44
.vscode

backend/archiver/.vscode/settings.json

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@
22
"python.testing.pytestArgs": [
33
"./"
44
],
5+
"python.testing.pytestPath": ".",
56
"python.testing.unittestEnabled": false,
6-
"python.testing.pytestEnabled": true
7+
"python.testing.pytestEnabled": true,
8+
"python.languageServer": "Pylance",
9+
"cSpell.words": [
10+
"datablock",
11+
"datablocks",
12+
"datasetlist",
13+
"origdatablocks",
14+
"scicat"
15+
],
16+
"cSpell.enabled": false,
17+
"cSpell.diagnosticLevel": "Hint",
718
}

backend/archiver/config/variables.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from prefect.variables import Variable
33
import os
44

5-
from archiver.utils.log import getLogger
5+
from utils.log import getLogger
66

77
from pydantic_settings import (
88
BaseSettings,

backend/archiver/flows/__main__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
from prefect import serve
22

3-
from .archive_datasets_flow import archive_datasets_flow
4-
from .retrieve_datasets_flow import retrieve_datasets_flow
5-
from .mock_flows import create_test_dataset_flow, end_to_end_test_flow
3+
from archive_datasets_flow import archive_datasets_flow
4+
from retrieve_datasets_flow import retrieve_datasets_flow
5+
from mock_flows import create_test_dataset_flow, end_to_end_test_flow
66

77

88
if __name__ == "__main__":

backend/archiver/flows/archive_datasets_flow.py

Lines changed: 26 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,37 @@
1616
update_progress_artifact,
1717
)
1818

19-
from archiver.config.variables import Variables
20-
from archiver.utils.datablocks import ArchiveInfo
19+
from config.variables import Variables
20+
from utils.datablocks import ArchiveInfo
2121

22-
from .utils import StoragePaths, report_archival_error
22+
from .flow_utils import StoragePaths, report_archival_error
2323
from .task_utils import (
2424
generate_task_name_dataset,
2525
generate_flow_name_job_id,
2626
generate_subflow_run_name_job_id_dataset_id,
2727
generate_sleep_for_task_name
2828
)
29-
from archiver.scicat.scicat_interface import SciCatClient
30-
from archiver.scicat.scicat_tasks import (
29+
from scicat.scicat_interface import SciCatClient
30+
from scicat.scicat_tasks import (
3131
update_scicat_archival_job_status,
3232
update_scicat_archival_dataset_lifecycle,
3333
get_origdatablocks,
3434
register_datablocks,
3535
get_scicat_access_token,
3636
get_job_datasetlist,
37+
reset_dataset
3738
)
38-
from archiver.scicat.scicat_tasks import (
39+
from scicat.scicat_tasks import (
3940
report_job_failure_system_error,
40-
report_dataset_user_error,
41+
report_dataset_user_error
4142
)
42-
from archiver.utils.datablocks import wait_for_free_space
43-
from archiver.utils.model import OrigDataBlock, DataBlock
44-
import archiver.utils.datablocks as datablocks_operations
45-
from archiver.config.concurrency_limits import ConcurrencyLimits
46-
from archiver.utils.s3_storage_interface import Bucket, get_s3_client
47-
from archiver.utils.log import getLogger
43+
44+
from utils.datablocks import wait_for_free_space
45+
from utils.model import OrigDataBlock, DataBlock
46+
import utils.datablocks as datablocks_operations
47+
from config.concurrency_limits import ConcurrencyLimits
48+
from utils.s3_storage_interface import Bucket, get_s3_client
49+
from utils.log import getLogger
4850

4951

5052
def on_get_origdatablocks_error(dataset_id: str, task: Task, task_run: TaskRun, state: State):
@@ -108,28 +110,6 @@ def update_progress(p):
108110
return file_paths
109111

110112

111-
# @task(task_run_name=generate_task_name_dataset)
112-
# def create_datablocks(dataset_id: str, origDataBlocks: List[OrigDataBlock], file_paths: List[Path]) -> List[DataBlock]:
113-
# """Prefect task to create datablocks.
114-
115-
# Args:
116-
# dataset_id (str): dataset id
117-
# origDataBlocks (List[OrigDataBlock]): List of OrigDataBlocks (Pydantic Model)
118-
119-
# Returns:
120-
# List[DataBlock]: List of DataBlocks (Pydantic Model)
121-
# """
122-
123-
# s3_client = get_s3_client()
124-
125-
# progress_artifact_id = create_progress_artifact(
126-
# progress=0.0,
127-
# description="Create datablocks from datafiles",
128-
# )
129-
130-
# return datablocks_operations.create_datablocks(s3_client, dataset_id, origDataBlocks, file_paths, update_progress)
131-
132-
133113
@task(task_run_name=generate_task_name_dataset)
134114
def create_tarfiles(dataset_id: str) -> List[ArchiveInfo]:
135115
datablocks_scratch_folder = StoragePaths.scratch_archival_datablocks_folder(dataset_id)
@@ -388,22 +368,29 @@ def on_dataset_flow_failure(flow: Flow, flow_run: FlowRun, state: State):
388368
task_run=None,
389369
token=scicat_token,
390370
)
371+
try:
372+
reset_dataset(
373+
dataset_id=flow_run.parameters["dataset_id"],
374+
token=scicat_token
375+
)
376+
except Exception as e:
377+
getLogger().error(f"failed to reset datablocks {e}")
391378
datablocks_operations.cleanup_lts_folder(flow_run.parameters["dataset_id"])
392379
datablocks_operations.cleanup_scratch(flow_run.parameters["dataset_id"])
393380
try:
394381
s3_client = get_s3_client()
395382
datablocks_operations.cleanup_s3_staging(s3_client, flow_run.parameters["dataset_id"])
396-
except:
397-
pass
383+
except Exception as e:
384+
getLogger().error(f"failed to cleanup staging {e}")
398385

399386

400387
def cleanup_dataset(flow: Flow, flow_run: FlowRun, state: State):
401388
try:
402389
s3_client = get_s3_client()
403390
datablocks_operations.cleanup_s3_landingzone(s3_client, flow_run.parameters["dataset_id"])
404391
datablocks_operations.cleanup_s3_staging(s3_client, flow_run.parameters["dataset_id"])
405-
except:
406-
pass
392+
except Exception as e:
393+
getLogger().error(f"failed to cleanup staging or landingzone {e}")
407394
datablocks_operations.cleanup_scratch(flow_run.parameters["dataset_id"])
408395

409396

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
from prefect import State
55
from prefect.client.schemas.objects import TaskRun
66

7-
from archiver.config.variables import Variables
8-
from archiver.scicat.scicat_tasks import (
7+
from config.variables import Variables
8+
from scicat.scicat_tasks import (
99
report_dataset_system_error,
1010
report_dataset_user_error,
11-
report_dataset_retrieval_error,
11+
report_dataset_retrieval_error
1212
)
1313

1414

@@ -45,7 +45,7 @@ def report_archival_error(dataset_id: str, state: State, task_run: TaskRun, toke
4545

4646

4747
def report_retrieval_error(dataset_id: str, state: State, task_run: TaskRun, token: SecretStr):
48-
"""Report a retrieval error of a job of a dataset. Differntiates betwen "DatasetError" (User error, e.g. missing files)
48+
"""Report a retrieval error of a job of a dataset. Differentiates between "DatasetError" (User error, e.g. missing files)
4949
and SystemError (transient error).
5050
5151
Args:

backend/archiver/flows/mock_flows.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
import urllib
1616
import asyncio
1717

18-
from archiver.config.variables import Variables
19-
from archiver.utils.datablocks import upload_objects_to_s3
20-
from archiver.utils.s3_storage_interface import Bucket, get_s3_client
21-
from archiver.utils.model import OrigDataBlock, DataFile, Dataset, DatasetLifecycle
22-
23-
from archiver.utils.log import getLogger, log
24-
from archiver.flows.utils import StoragePaths
25-
from archiver.scicat.scicat_tasks import get_scicat_access_token
26-
from archiver.utils.model import DatasetListEntry, Job
18+
from config.variables import Variables
19+
from utils.datablocks import upload_objects_to_s3
20+
from utils.s3_storage_interface import Bucket, get_s3_client
21+
from utils.model import OrigDataBlock, DataFile, Dataset, DatasetLifecycle
22+
23+
from utils.log import getLogger, log
24+
from .flow_utils import StoragePaths
25+
from scicat.scicat_tasks import get_scicat_access_token
26+
from utils.model import DatasetListEntry, Job
2727
from .task_utils import generate_task_name_dataset
2828

2929
from prefect.flow_runs import wait_for_flow_run

backend/archiver/flows/retrieve_datasets_flow.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,29 @@
1010
from prefect.client.schemas.filters import FlowRunFilter
1111
from prefect.flow_runs import wait_for_flow_run
1212
from prefect.context import get_run_context
13-
from archiver.utils.s3_storage_interface import get_s3_client
14-
from archiver.utils.s3_storage_interface import Bucket
13+
from utils.s3_storage_interface import get_s3_client
14+
from utils.s3_storage_interface import Bucket
1515

1616

1717
from .task_utils import generate_flow_name_dataset, generate_flow_name_job_id, generate_task_name_dataset, generate_task_name_datablock
18-
from .utils import report_retrieval_error
19-
from archiver.scicat.scicat_interface import SciCatClient
20-
from archiver.utils.model import DataBlock, JobResultObject
21-
from archiver.scicat.scicat_tasks import (
18+
from .flow_utils import report_retrieval_error
19+
from scicat.scicat_interface import SciCatClient
20+
from utils.model import DataBlock, JobResultObject
21+
from scicat.scicat_tasks import (
2222
update_scicat_retrieval_job_status,
2323
update_scicat_retrieval_dataset_lifecycle,
2424
get_scicat_access_token,
2525
get_job_datasetlist,
2626
create_job_result_object_task,
2727
)
28-
from archiver.scicat.scicat_tasks import (
28+
from scicat.scicat_tasks import (
2929
report_job_failure_system_error,
3030
report_dataset_user_error,
3131
get_datablocks,
3232
)
33-
from archiver.config.concurrency_limits import ConcurrencyLimits
34-
import archiver.utils.datablocks as datablocks_operations
33+
from config.concurrency_limits import ConcurrencyLimits
34+
from config.variables import Variables
35+
import utils.datablocks as datablocks_operations
3536

3637

3738
def on_get_datablocks_error(dataset_id: str, task: Task, task_run: TaskRun, state: State):
@@ -42,8 +43,7 @@ def on_get_datablocks_error(dataset_id: str, task: Task, task_run: TaskRun, stat
4243
@task(
4344
task_run_name=generate_task_name_datablock,
4445
tags=[ConcurrencyLimits().LTS_READ_TAG],
45-
retries=5,
46-
retry_delay_seconds=[60, 120, 240, 480, 960]
46+
retry_delay_seconds=[60, 120, 240, 480, 960]
4747
)
4848
def copy_datablock_from_LTS_to_scratch(dataset_id: str, datablock: DataBlock):
4949
datablocks_operations.copy_from_LTS_to_scratch_retrieval(dataset_id, datablock)

backend/archiver/flows/tests/helpers.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
from typing import List, Dict, Any
22

33
from pydantic import SecretStr
4-
from archiver.utils.model import DataFile, OrigDataBlock, DataBlock
5-
from archiver.utils.model import (
4+
from utils.model import DataFile, OrigDataBlock, DataBlock
5+
from utils.model import (
66
Job,
77
Dataset,
88
DatasetLifecycle,
99
JobResultObject,
1010
JobResultEntry,
1111
)
12-
from archiver.scicat.scicat_interface import SciCatClient
12+
from scicat.scicat_interface import SciCatClient
1313
from pathlib import Path
1414

15-
from archiver.utils.s3_storage_interface import S3Storage
15+
from utils.s3_storage_interface import S3Storage
1616

1717

1818
def mock_s3client() -> S3Storage:

backend/archiver/flows/tests/scicat_unittest_mock.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
from uuid import UUID
44
import urllib.parse
55

6-
from archiver.scicat.scicat_interface import SciCatClient
7-
from archiver.utils.model import OrigDataBlock, DataBlock
8-
from archiver.utils.model import DatasetListEntry, Job
6+
from scicat.scicat_interface import SciCatClient
7+
from utils.model import OrigDataBlock, DataBlock
8+
from utils.model import DatasetListEntry, Job
99

1010

1111
def mock_scicat_get_token() -> str:
@@ -64,23 +64,30 @@ def __init__(
6464
)
6565

6666
json_list = []
67-
for o in origDataBlocks:
68-
json_list.append(o.model_dump_json())
67+
for d in origDataBlocks:
68+
json_list.append(d.model_dump_json())
6969

7070
self.matchers["origdatablocks"] = self.get(
7171
f"{self.ENDPOINT}{self.API_PREFIX}/datasets/{safe_dataset_url}/origdatablocks",
7272
json=json_list,
7373
)
7474

7575
json_list = []
76-
for o in datablocks:
77-
json_list.append(o.model_dump_json())
76+
for d in datablocks:
77+
json_list.append(d.model_dump_json())
7878

7979
self.matchers["get_datablocks"] = self.get(
8080
f"{self.ENDPOINT}{self.API_PREFIX}/datasets/{safe_dataset_url}/datablocks",
8181
json=json_list,
8282
)
8383

84+
self.matchers["delete_datablocks"] = []
85+
86+
for d in datablocks:
87+
self.matchers["delete_datablocks"].append(self.delete(
88+
f"{self.ENDPOINT}{self.API_PREFIX}/datasets/{safe_dataset_url}/datablocks/{d.id}")
89+
)
90+
8491
@property
8592
def jobs_matcher(self):
8693
return self.matchers["jobs"]
@@ -97,6 +104,10 @@ def datablocks_post_matcher(self):
97104
def datablocks_get_matcher(self):
98105
return self.matchers["get_datablocks"]
99106

107+
@property
108+
def datablocks_delete_matcher(self):
109+
return self.matchers["delete_datablocks"]
110+
100111
@property
101112
def origdatablocks_matcher(self):
102113
return self.matchers["origdatablocks"]

0 commit comments

Comments
 (0)