Skip to content

Commit 21dbe4f

Browse files
authored
Add download script as artifacts (#250)
2 parents 40fd2a3 + 6edd511 commit 21dbe4f

File tree

12 files changed

+133
-29
lines changed

12 files changed

+133
-29
lines changed

backend/archiver/flows/archive_datasets_flow.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def update_progress(progress):
181181
prefix=prefix,
182182
bucket=Bucket.staging_bucket(),
183183
source_folder=datablocks_scratch_folder,
184-
ext=".gz",
184+
ext=".tar",
185185
progress_callback=update_progress,
186186
)
187187

@@ -225,7 +225,7 @@ def calculate_checksum(dataset_id: str, datablock: DataBlock):
225225

226226
@task(task_run_name=generate_task_name_dataset, tags=[ConcurrencyLimits().LTS_WRITE_TAG])
227227
def move_data_to_LTS(dataset_id: str, datablock: DataBlock):
228-
"""Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances
228+
"""Prefect task to move a datablock (.tar file) to the LTS. Concurrency of this task is limited to 2 instances
229229
at the same time.
230230
"""
231231
datablocks_operations.move_data_to_LTS(dataset_id, datablock)
@@ -238,7 +238,7 @@ def move_data_to_LTS(dataset_id: str, datablock: DataBlock):
238238
retry_delay_seconds=[60, 120, 240, 480, 960],
239239
)
240240
def copy_datablock_from_LTS(dataset_id: str, datablock: DataBlock):
241-
"""Prefect task to move a datablock (.tar.gz file) to the LTS. Concurrency of this task is limited to 2 instances
241+
"""Prefect task to move a datablock (.tar file) to the LTS. Concurrency of this task is limited to 2 instances
242242
at the same time.
243243
"""
244244
datablocks_operations.copy_file_from_LTS(dataset_id, datablock)
@@ -312,7 +312,7 @@ def move_datablocks_to_lts_flow(dataset_id: str, datablocks: List[DataBlock]):
312312

313313
@flow(name="create_datablocks", flow_run_name=generate_subflow_run_name_job_id_dataset_id)
314314
def create_datablocks_flow(dataset_id: str) -> List[DataBlock]:
315-
"""Prefect (sub-)flow to create datablocks (.tar.gz files) for files of a dataset and register them in Scicat.
315+
"""Prefect (sub-)flow to create datablocks (.tar files) for files of a dataset and register them in Scicat.
316316
317317
Args:
318318
dataset_id (str): Dataset id

backend/archiver/flows/mock_flows.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ def verify_data_from_minio(dataset_pid, datablock_name, datablock_url):
340340
# Verify retrieved datablock in MINIO
341341
retrieved_datablock = s3_client.stat_object(
342342
bucket=Bucket("retrieval"),
343-
filename=f"openem-network/datasets/{dataset_pid}/datablocks/{dataset_pid}_0.tar.gz",
343+
filename=f"openem-network/datasets/{dataset_pid}/datablocks/{dataset_pid}_0.tar",
344344
)
345345
assert retrieved_datablock is not None
346346
assert retrieved_datablock.Size > 80 * 1024 * 1024

backend/archiver/flows/retrieve_datasets_flow.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,16 +224,14 @@ async def retrieve_datasets_flow(job_id: UUID):
224224
else:
225225
await wait_for_retrieval_flow(existing_run_id)
226226

227-
job_results_future = create_job_result_object_task.submit(dataset_ids=dataset_ids)
228-
job_results = job_results_future.result()
229-
job_results_object = JobResultObject(result=job_results)
227+
job_results_object = create_job_result_object_task.submit(dataset_ids=dataset_ids)
230228

231-
access_token = get_scicat_access_token.submit(wait_for=[job_results_future])
229+
access_token = get_scicat_access_token.submit(wait_for=[job_results_object])
232230

233231
update_scicat_retrieval_job_status.submit(
234232
job_id=job_id,
235233
status_code=SciCatClient.JOBSTATUSCODE.FINISHED_SUCCESSFULLY,
236234
status_message=SciCatClient.JOBSTATUSMESSAGE.JOB_FINISHED,
237-
jobResultObject=job_results_object,
235+
jobResultObject=job_results_object.result(),
238236
token=access_token,
239237
).wait()

backend/archiver/flows/tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def aws_and_s3_credentials():
1414
os.environ["AWS_DEFAULT_REGION"] = "eu-west-1"
1515
os.environ["MINIO_REGION"] = "eu-west-1"
1616
os.environ["MINIO_ENDPOINT"] = "endpoint:9000"
17+
os.environ["MINIO_EXTERNAL_ENDPOINT"] = "endpoint:9000"
1718

1819

1920
@pytest.fixture(scope="function")

backend/archiver/flows/tests/helpers.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import base64
12
from typing import List, Dict, Any
23

34
from pydantic import SecretStr
@@ -13,6 +14,7 @@
1314
from pathlib import Path
1415

1516
from utils.s3_storage_interface import S3Storage
17+
from utils.script_generation import generate_download_script
1618

1719

1820
def mock_s3client() -> S3Storage:
@@ -113,7 +115,7 @@ def expected_datablocks(dataset_id: str, idx: int):
113115

114116
return DataBlock(
115117
id=f"Block_{idx}",
116-
archiveId=f"/path/to/archived/Block_{idx}.tar.gz",
118+
archiveId=f"/path/to/archived/Block_{idx}.tar",
117119
size=size_per_file * 10,
118120
packedSize=size_per_file * 10,
119121
version=str(1),
@@ -133,7 +135,15 @@ def expected_jobresultsobject(dataset_id: str, datablocks: List[DataBlock]):
133135
)
134136
)
135137

136-
return JobResultObject(result=results).model_dump(exclude_none=True)
138+
dataset_to_datablocks = {}
139+
140+
for result in results:
141+
dataset_to_datablocks.setdefault(dataset_id, []).append({"name" : Path(result.archiveId).name, "url" : result.url})
142+
143+
script = generate_download_script(dataset_to_datablocks)
144+
script_b64 = base64.b64encode(bytes(script, 'utf-8'))
145+
146+
return JobResultObject(result=results, downloadScript=script_b64).model_dump(exclude_none=True)
137147

138148

139149
def mock_create_datablock_entries(
@@ -143,7 +153,7 @@ def mock_create_datablock_entries(
143153
for o in origDataBlocks:
144154
d = DataBlock(
145155
id=o.id,
146-
archiveId=f"/path/to/archived/{o.id}.tar.gz",
156+
archiveId=f"/path/to/archived/{o.id}.tar",
147157
size=o.size,
148158
packedSize=o.size,
149159
version=str(1),

backend/archiver/scicat/scicat_tasks.py

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import base64
12
from typing import List
23
from prefect import task
34
from uuid import UUID
@@ -18,7 +19,8 @@
1819
from utils.s3_storage_interface import Bucket, S3Storage, get_s3_client
1920

2021

21-
from prefect.artifacts import create_link_artifact
22+
from prefect.artifacts import create_link_artifact, create_markdown_artifact
23+
from utils.script_generation import generate_download_script
2224

2325
scicat_instance: SciCatClient | None = None
2426

@@ -197,7 +199,7 @@ def reset_dataset(dataset_id: str, token: SecretStr):
197199

198200

199201
@task
200-
def create_job_result_object_task(dataset_ids: List[str]) -> List[JobResultEntry]:
202+
def create_job_result_object_task(dataset_ids: List[str]) -> JobResultObject:
201203
access_token = get_scicat_access_token.submit()
202204
access_token.wait()
203205

@@ -209,28 +211,52 @@ def create_job_result_object_task(dataset_ids: List[str]) -> List[JobResultEntry
209211
datablocks_future.wait()
210212
datablocks = datablocks_future.result()
211213

212-
dataset_job_results = create_job_result_object(dataset_id, datablocks)
214+
dataset_job_results = create_job_result_entries(dataset_id, datablocks)
213215
job_results = job_results + dataset_job_results
214216

215-
return job_results
217+
job_results_object = JobResultObject(result=job_results)
216218

219+
script = create_download_script(job_results);
220+
job_results_object.downloadScript = base64.b64encode(bytes(script, 'utf-8'))
221+
222+
markdown = f"""Download script for all datablocks in this job\n```bash\n{script}\n```\n"""
223+
224+
create_markdown_artifact(
225+
key=f"script", markdown=markdown)
226+
227+
return job_results_object
228+
229+
def create_download_script(job_result_entries: List[JobResultEntry]) -> str:
230+
231+
dataset_to_datablocks = {}
232+
233+
for result in job_result_entries:
234+
dataset_to_datablocks.setdefault(result.datasetId, []).append({"name" : Path(result.archiveId).name, "url" : result.url})
235+
236+
return generate_download_script(dataset_to_datablocks)
237+
238+
217239

218240
def create_presigned_url(client: S3Storage, datablock: DataBlock):
219241
url = client.get_presigned_url(Bucket.retrieval_bucket(), datablock.archiveId)
220242
return url
221243

244+
def sanitize_name(name: str) -> str:
245+
invalid_chars = ["/", ".", "_"]
246+
sanitized_name = ""
247+
for c in invalid_chars:
248+
sanitized_name = name.replace(c, "-")
249+
return sanitized_name
222250

223251
@log
224-
def create_job_result_object(dataset_id: str, datablocks: List[DataBlock]) -> List[JobResultEntry]:
252+
def create_job_result_entries(dataset_id: str, datablocks: List[DataBlock]) -> List[JobResultEntry]:
225253
s3_client = get_s3_client()
226254
job_result_entries: List[JobResultEntry] = []
227255
for datablock in datablocks:
228256
url = create_presigned_url(s3_client, datablock)
229257

230-
invalid_chars = ["/", ".", "_"]
231-
sanitized_name = str(Path(datablock.archiveId).name)
232-
for c in invalid_chars:
233-
sanitized_name = sanitized_name.replace(c, "-")
258+
sanitized_name = sanitize_name(str(Path(datablock.archiveId).stem))
259+
234260
create_link_artifact(
235261
key=sanitized_name,
236262
link=url,

backend/archiver/tests/test_e2e.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ async def test_end_to_end(scicat_token_setup, set_env, s3_client):
341341
# Verify retrieved datablock in MINIO
342342
retrieved_datablock = s3_client.stat_object(
343343
bucket=Bucket("retrieval"),
344-
filename=f"openem-network/datasets/{dataset_pid}/datablocks/{dataset_pid}_0.tar.gz",
344+
filename=f"openem-network/datasets/{dataset_pid}/datablocks/{dataset_pid}_0.tar",
345345
)
346346
assert retrieved_datablock is not None
347347
assert retrieved_datablock.Size > 80 * 1024 * 1024

backend/archiver/utils/datablocks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def create_tarfiles(
9393
target_size: int,
9494
progress_callback: Callable[[float], None] = None,
9595
) -> List[ArchiveInfo]:
96-
"""Create datablocks, i.e. .tar.gz files, from all files in a folder. Folder structures are kept and symlnks not resolved.
96+
"""Create datablocks, i.e. .tar files, from all files in a folder. Folder structures are kept and symlnks not resolved.
9797
The created tar files will be named according to the dataset they belong to.
9898
9999
Args:
@@ -120,7 +120,7 @@ def create_tar(idx: int, files: List) -> ArchiveInfo:
120120
current_tar_info = ArchiveInfo(
121121
unpackedSize=0,
122122
packedSize=0,
123-
path=Path(dst_folder / Path(f"{tar_name}_{idx}.tar.gz")),
123+
path=Path(dst_folder / Path(f"{tar_name}_{idx}.tar")),
124124
fileCount=len(files),
125125
)
126126
current_tarfile: tarfile.TarFile = tarfile.open(current_tar_info.path, "w")

backend/archiver/utils/model.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class JobResultEntry(BaseModel):
2020

2121
class JobResultObject(BaseModel):
2222
result: Optional[List[JobResultEntry]]
23+
downloadScript: Optional[str] = None
2324

2425

2526
class Job(BaseModel):

backend/archiver/utils/s3_storage_interface.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,15 @@ def __init__(self, url: str, user: str, password: SecretStr, region: str):
7676
region_name=self._REGION,
7777
config=Config(signature_version="s3v4", max_pool_connections=32),
7878
)
79+
80+
self._external_minio = boto3.client(
81+
"s3",
82+
endpoint_url=f"https://{Variables().MINIO_EXTERNAL_ENDPOINT}",
83+
aws_access_key_id=self._USER.strip(),
84+
aws_secret_access_key=self._PASSWORD.get_secret_value().strip(),
85+
region_name=self._REGION,
86+
config=Config(signature_version="s3v4", max_pool_connections=32),
87+
)
7988
self._resource = boto3.resource(
8089
"s3",
8190
endpoint_url=f"https://{self._URL}" if self._URL is not None and self._URL != "" else None,
@@ -97,7 +106,9 @@ def url(self):
97106
@log_debug
98107
def get_presigned_url(self, bucket: Bucket, filename: str) -> str:
99108
days_to_seconds = 60 * 60 * 24
100-
presigned_url = self._minio.generate_presigned_url(
109+
110+
111+
presigned_url = self._external_minio.generate_presigned_url(
101112
"get_object",
102113
Params={"Bucket": bucket.name, "Key": filename},
103114
ExpiresIn=Variables().MINIO_URL_EXPIRATION_DAYS

0 commit comments

Comments
 (0)