Skip to content

Commit 109a056

Browse files
clean
1 parent 5bcc794 commit 109a056

File tree

3 files changed

+43
-14
lines changed

3 files changed

+43
-14
lines changed

abdiff/cli.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@
1111
from abdiff.config import Config, configure_logger
1212
from abdiff.core import (
1313
build_ab_images,
14-
download_input_files,
1514
calc_ab_diffs,
1615
calc_ab_metrics,
1716
collate_ab_transforms,
17+
download_input_files,
1818
init_run,
1919
run_ab_transforms,
2020
)
@@ -153,7 +153,7 @@ def init_job(
153153
"--download-files", is_flag=True, help="Pass to skip download of extract files"
154154
)
155155
def run_diff(
156-
job_directory: str, input_files: str, message: str, download_files: bool
156+
job_directory: str, input_files: str, message: str, *, download_files: bool
157157
) -> None:
158158

159159
job_data = read_job_json(job_directory)
@@ -174,7 +174,7 @@ def run_diff(
174174
image_tag_a=job_data["image_tag_a"],
175175
image_tag_b=job_data["image_tag_b"],
176176
input_files=input_files_list,
177-
download_files=download_files,
177+
use_local_s3=download_files,
178178
)
179179
collated_dataset_path = collate_ab_transforms(
180180
run_directory=run_directory,

abdiff/core/download_input_files.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,32 @@
77

88
from abdiff.config import Config
99

10-
1110
logger = logging.getLogger(__name__)
1211

1312
CONFIG = Config()
1413

1514

16-
def download_input_files(input_files: str):
15+
def download_input_files(input_files: list[str]) -> None:
16+
"""Download extract files from S3 to a local MinIO server.
17+
18+
For each file download, two AWS CLI commands are run by subprocess.
19+
The output from the first command is piped to the second command.
20+
These commands are further explained below:
21+
22+
1. Copy the contents from the input file and direct to stdout.
23+
```
24+
aws s3 cp <input_file> -
25+
```
26+
27+
2. Given the stdout from the previous command as input, copy the contents
28+
to a similarly named file on the local MinIO server.
29+
```
30+
aws s3 cp --endpoint-url <minio_s3_url> --profile minio - <input_file>
31+
```
32+
33+
Note: An S3 client connected to the local MinIO server will check whether the file exists
34+
prior to any download.
35+
"""
1736
s3_client = boto3.client(
1837
"s3",
1938
endpoint_url=CONFIG.minio_s3_url,
@@ -57,9 +76,10 @@ def check_object_exists(bucket: str, input_file: str, s3_client: S3Client) -> bo
5776
key = input_file.replace(f"s3://{bucket}/", "")
5877
try:
5978
s3_client.head_object(Bucket=bucket, Key=key)
60-
return True
6179
except ClientError as exception:
6280
if exception.response["Error"]["Code"] == "404":
6381
return False
6482
logger.exception(f"Cannot determine if object exists for key {key}.")
6583
return False
84+
else:
85+
return True

abdiff/core/run_ab_transforms.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ def run_ab_transforms(
3434
image_tag_a: str,
3535
image_tag_b: str,
3636
input_files: list[str],
37-
download_files: bool,
3837
docker_client: docker.client.DockerClient | None = None,
38+
*,
39+
use_local_s3: bool = False,
3940
) -> tuple[list[str], ...]:
4041
"""Run Docker containers with versioned images of Transmogrifier.
4142
@@ -60,6 +61,10 @@ def run_ab_transforms(
6061
URIs for input files on S3 are accepted.
6162
docker_client (docker.client.DockerClient | None, optional): Docker client.
6263
Defaults to None.
64+
use_local_s3 (bool): Boolean indicating whether the container should
65+
access input files from a local MinIO server (i.e., "local S3 bucket")
66+
or from AWS S3. This flag determines the appropriate environment variables
67+
to set for the Docker containers. Default is False.
6368
6469
Returns:
6570
tuple[list[str], ...]: A tuple containing two lists, where each list contains
@@ -97,7 +102,7 @@ def run_ab_transforms(
97102

98103
# run containers and collect results
99104
futures = run_all_docker_containers(
100-
docker_client, input_files, run_configs, download_files
105+
docker_client, input_files, run_configs, use_local_s3=use_local_s3
101106
)
102107
containers, exceptions = collect_container_results(futures)
103108
logger.info(
@@ -132,7 +137,8 @@ def run_all_docker_containers(
132137
docker_client: docker.client.DockerClient,
133138
input_files: list[str],
134139
run_configs: list[tuple],
135-
download_files: bool,
140+
*,
141+
use_local_s3: bool = False,
136142
) -> list[Future]:
137143
"""Invoke Docker containers to run in parallel via threads.
138144
@@ -155,9 +161,12 @@ def run_all_docker_containers(
155161
input_file,
156162
get_transformed_filename(filename_details),
157163
docker_client,
158-
download_files,
159164
)
160-
tasks.append(executor.submit(run_docker_container, *args))
165+
tasks.append(
166+
executor.submit(
167+
run_docker_container, *args, use_local_s3=use_local_s3
168+
)
169+
)
161170

162171
logger.info(f"All {len(tasks)} containers have exited.")
163172
return tasks
@@ -170,16 +179,16 @@ def run_docker_container(
170179
input_file: str,
171180
output_file: str,
172181
docker_client: docker.client.DockerClient,
173-
download_files: bool,
174182
timeout: int = CONFIG.transmogrifier_timeout,
183+
*,
184+
use_local_s3: bool = False,
175185
) -> tuple[Container, Exception | None]:
176186
"""Run Transmogrifier via Docker container to transform input file.
177187
178188
The container is run in a detached state to capture a container handle for later use
179189
but this function waits for the container to exit before returning.
180190
"""
181-
182-
if download_files:
191+
if use_local_s3:
183192
environment_variables = {
184193
"AWS_ENDPOINT_URL": CONFIG.minio_s3_container_url,
185194
"AWS_ACCESS_KEY_ID": CONFIG.minio_root_user,

0 commit comments

Comments
 (0)