diff --git a/.github/workflows/metaflow.s3_tests.minio.yml b/.github/workflows/metaflow.s3_tests.minio.yml index 2e180d70523..fdbe29aeb88 100644 --- a/.github/workflows/metaflow.s3_tests.minio.yml +++ b/.github/workflows/metaflow.s3_tests.minio.yml @@ -18,10 +18,23 @@ jobs: name: metaflow.s3.minio / Python ${{ matrix.ver }} on ${{ matrix.os }} runs-on: ${{ matrix.os }} strategy: + fail-fast: false matrix: os: [ubuntu-22.04] - ver: ['3.8', '3.9', '3.10', '3.11', '3.12'] - + ver: ['3.11'] + + timeout-minutes: 90 + + env: + AWS_ACCESS_KEY_ID: rootuser + AWS_SECRET_ACCESS_KEY: rootpass123 + AWS_DEFAULT_REGION: us-east-1 + METAFLOW_S3_TEST_ROOT: s3://metaflow-test/metaflow/ + METAFLOW_DATASTORE_SYSROOT_S3: s3://metaflow-test/metaflow/ + AWS_ENDPOINT_URL_S3: http://localhost:9000 + MINIO_TEST: "1" + METAFLOW_S3_TRANSIENT_RETRY_COUNT: "7" + steps: - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: @@ -31,32 +44,40 @@ jobs: uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 with: python-version: ${{ matrix.ver }} - - name: Install Python ${{ matrix.ver }} dependencies + - name: Install dependencies run: | python3 -m pip install --upgrade pip - python3 -m pip install . kubernetes tox numpy pytest click boto3 requests pylint pytest-benchmark - - name: Start MinIO development environment + python3 -m pip install . pytest click boto3 requests numpy pytest-benchmark + - name: Start MinIO run: | - echo "Starting environment in the background..." - MINIKUBE_CPUS=2 metaflow-dev all-up & - # Give time to spin up. Adjust as needed: - sleep 150 + docker run -d --name minio \ + -p 9000:9000 \ + -e MINIO_ROOT_USER=rootuser \ + -e MINIO_ROOT_PASSWORD=rootpass123 \ + minio/minio server /data + for i in $(seq 1 30); do + if curl -sf http://localhost:9000/minio/health/live; then + echo "MinIO is ready" + break + fi + echo "Waiting for MinIO... ($i/30)" + sleep 2 + done + - name: Create test bucket + run: | + python3 -c " + import boto3 + s3 = boto3.client('s3', + endpoint_url='http://localhost:9000', + aws_access_key_id='rootuser', + aws_secret_access_key='rootpass123') + s3.create_bucket(Bucket='metaflow-test') + print('Bucket metaflow-test created') + " - name: Execute tests run: | - cat <[0-9]+)-(?P[0-9]+)/(?P[0-9]+)") + + +class _S3OpTransientError(Exception): + pass + + +class S3DirectClient(object): + """ + Performs S3 list/info/get/put using boto3 directly via ThreadPoolExecutor. + + Parameters + ---------- + s3_client : object + Metaflow S3 client wrapper (has .client, .error, .reset_client()). + tmpdir : str + Temporary directory for downloaded files. + inject_failures : int + Failure injection rate for testing (0 = disabled). + """ + + def __init__(self, s3_client, tmpdir, inject_failures): + self._s3_client = s3_client + self._tmpdir = tmpdir + self._inject_failures = inject_failures + + def _generate_local_path(self, url, range_str=None, suffix=None): + if range_str is None: + range_part = "whole" + elif range_str == "whole": + range_part = "whole" + else: + range_part = range_str[6:].replace("-", "_") + quoted = url_quote(url) + fname = quoted.split(b"/")[-1].replace(b".", b"_").replace(b"-", b"_") + sha_hash = sha1(quoted, usedforsecurity=False).hexdigest() + fname_decoded = fname.decode("utf-8") + if len(fname_decoded) > 150: + fname_decoded = fname_decoded[:150] + "..." + if suffix: + return "-".join((sha_hash, fname_decoded, range_part, suffix)) + return "-".join((sha_hash, fname_decoded, range_part)) + + def _inject_failure_rate_for_mode(self, mode): + if self._inject_failures <= 0: + return 0 + if mode == "list": + return 0 + if self._inject_failures >= 100: + return 100 + return min(self._inject_failures, 90) + + def _maybe_inject_failure(self, inject_rate): + if inject_rate > 0 and random.randint(0, 99) < inject_rate: + raise _S3OpTransientError( + "Injected failure for testing (rate=%d%%)" % inject_rate + ) + + def _do_batch_op(self, items, worker_fn, mode): + if not items: + return [] + + pending = [(i, item) for i, item in enumerate(items)] + results = {} + base_inject_rate = self._inject_failure_rate_for_mode(mode) + retry_count = S3_TRANSIENT_RETRY_COUNT + if self._inject_failures > 0 and retry_count > 0: + retry_count = max(retry_count, 100) + last_ok_count = 0 + + for attempt in range(retry_count + 1): + if not pending: + break + + if self._inject_failures > 0: + max_count = len(pending) + elif attempt == 0: + max_count = len(pending) + elif last_ok_count > 0: + max_count = min(int(last_ok_count * 1.2), len(pending)) + else: + max_count = min(2 * S3_WORKER_COUNT, len(pending)) + max_count = max(max_count, 1) + + batch = pending[:max_count] + remaining = pending[max_count:] + + if base_inject_rate >= 100: + inject_rate = 100 + elif base_inject_rate > 0: + if attempt % 2 == 0: + inject_rate = max(1, base_inject_rate // 3) + else: + inject_rate = min(int(base_inject_rate * 1.5), 90) + else: + inject_rate = 0 + + succeeded = [] + failed = [] + + client = self._s3_client.client + error_class = self._s3_client.error + + with ThreadPoolExecutor( + max_workers=min(S3_WORKER_COUNT, len(batch)) + ) as pool: + future_to_idx = {} + for idx, item in batch: + f = pool.submit(worker_fn, client, error_class, item, inject_rate) + future_to_idx[f] = (idx, item) + + for future in as_completed(future_to_idx): + idx, item = future_to_idx[future] + try: + result = future.result() + succeeded.append(idx) + results[idx] = result + except _S3OpTransientError as e: + if S3_LOG_TRANSIENT_RETRIES: + sys.stderr.write( + "[WARNING] S3 %s transient failure " + "(attempt %d): %s\n" % (mode, attempt, e) + ) + failed.append((idx, item)) + + last_ok_count = len(succeeded) + pending = failed + remaining + + if pending and attempt < retry_count: + self._s3_client.reset_client() + if self._inject_failures > 0: + time.sleep(0.01) + else: + time.sleep(min(2**attempt, 30) + random.randint(0, 5)) + + if pending: + from .s3 import MetaflowS3Exception + + raise MetaflowS3Exception( + "S3 %s operation failed after %d retries for %d items" + % (mode, retry_count, len(pending)) + ) + + return [results[i] for i in sorted(results.keys())] + + def read_many(self, op, prefixes_and_ranges, **options): + from .s3 import MetaflowS3Exception + + if op == "list": + yield from self.list_objects( + prefixes_and_ranges, + recursive=options.get("recursive", False), + ) + elif op == "info": + yield from self.info_objects(prefixes_and_ranges) + elif op == "get": + if options.get("recursive", False): + listed = list(self.list_objects(prefixes_and_ranges, recursive=True)) + url_to_prefix = {url: prefix for prefix, url, _size in listed} + download_items = [(url, None) for _prefix, url, _size in listed] + for prefix_or_url, url, fname in self.get_objects( + download_items, + allow_missing=options.get("allow_missing", False), + return_info=options.get("info", False), + ): + yield (url_to_prefix.get(url, prefix_or_url), url, fname) + return + yield from self.get_objects( + prefixes_and_ranges, + allow_missing=options.get("allow_missing", False), + return_info=options.get("info", False), + ) + else: + raise MetaflowS3Exception("Unknown operation: %s" % op) + + def list_objects(self, prefixes_and_ranges, recursive=False): + from . import s3op + from .s3 import MetaflowS3AccessDenied + + delimiter = "" if recursive else "/" + + def worker(client, error_class, item, inject_rate): + prefix_str, _range = item + self._maybe_inject_failure(inject_rate) + + parsed = urlparse(prefix_str) + bucket = parsed.netloc + path = parsed.path.lstrip("/") + if path and not path.endswith("/"): + path += "/" + url_base = "s3://%s/" % bucket + + try: + paginator = client.get_paginator("list_objects_v2") + page_kwargs = {"Bucket": bucket, "Prefix": path} + if delimiter: + page_kwargs["Delimiter"] = delimiter + + found = [] + for page in paginator.paginate(**page_kwargs): + if "Contents" in page: + for obj in page["Contents"]: + key_path = obj["Key"].lstrip("/") + url = url_base + key_path + found.append((prefix_str, url, str(obj["Size"]))) + if "CommonPrefixes" in page: + for cp in page["CommonPrefixes"]: + url = url_base + cp["Prefix"] + found.append((prefix_str, url, "")) + return found + except error_class as err: + error_code = s3op.normalize_client_error(err) + if error_code == 404: + return [] + elif error_code == 403: + raise MetaflowS3AccessDenied(prefix_str) + elif error_code in (500, 502, 503, 504): + raise _S3OpTransientError(str(err)) + else: + raise + except MetaflowException: + raise + except (_BotocoreError, OSError) as e: + raise _S3OpTransientError("transient error during list: %s" % e) from e + + items = list(prefixes_and_ranges) + batch_results = self._do_batch_op(items, worker, "list") + for result_list in batch_results: + for item in result_list: + yield item + + def info_objects(self, prefixes_and_ranges): + from . import s3op + from .s3 import MetaflowS3AccessDenied, MetaflowS3InsufficientDiskSpace + + def worker(client, error_class, item, inject_rate): + url_str, _range = item + self._maybe_inject_failure(inject_rate) + + parsed = urlparse(url_str) + bucket = parsed.netloc + key = parsed.path.lstrip("/") + local_name = self._generate_local_path(url_str) + local_path = os.path.join(self._tmpdir, local_name) + + try: + head = client.head_object(Bucket=bucket, Key=key) + result = { + "error": None, + "size": head["ContentLength"], + "content_type": head.get("ContentType"), + "encryption": head.get("ServerSideEncryption"), + "metadata": head.get("Metadata", {}), + "last_modified": get_timestamp(head["LastModified"]), + } + except error_class as err: + error_code = s3op.normalize_client_error(err) + if error_code == 404: + result = {"error": s3op.ERROR_URL_NOT_FOUND} + elif error_code == 403: + result = {"error": s3op.ERROR_URL_ACCESS_DENIED} + elif error_code in (500, 502, 503, 504): + raise _S3OpTransientError(str(err)) + else: + result = {"error": error_code} + except MetaflowException: + raise + except (_BotocoreError, OSError) as e: + raise _S3OpTransientError("transient error during info: %s" % e) from e + + try: + with open(local_path, "w") as f: + json.dump(result, f) + except OSError as e: + if e.errno == errno.ENOSPC: + raise MetaflowS3InsufficientDiskSpace( + "Out of disk space writing metadata for %s" % url_str + ) + raise + + return (url_str, url_str, local_name) + + items = list(prefixes_and_ranges) + results = self._do_batch_op(items, worker, "info") + for result in results: + yield result + + def get_objects(self, prefixes_and_ranges, allow_missing, return_info): + from . import s3op + from .s3 import ( + MetaflowS3AccessDenied, + MetaflowS3InsufficientDiskSpace, + MetaflowS3InvalidRange, + MetaflowS3NotFound, + ) + from boto3.s3.transfer import TransferConfig + + download_file_threshold = 2 * TransferConfig().multipart_threshold + download_max_chunk = 2 * 1024 * 1024 * 1024 - 1 + + def worker(client, error_class, item, inject_rate): + url_str, range_str = item + self._maybe_inject_failure(inject_rate) + + parsed = urlparse(url_str) + bucket = parsed.netloc + key = parsed.path.lstrip("/") + local_name = self._generate_local_path(url_str, range_str) + local_path = os.path.join(self._tmpdir, local_name) + + try: + if range_str: + resp = client.get_object(Bucket=bucket, Key=key, Range=range_str) + range_result = resp["ContentRange"] + range_match = RANGE_MATCH.match(range_result) + if range_match is None: + raise RuntimeError( + "Wrong format for ContentRange: %s" % str(range_result) + ) + range_result = { + x: int(range_match.group(x)) for x in ["total", "start", "end"] + } + else: + resp = client.get_object(Bucket=bucket, Key=key) + range_result = None + + sz = resp["ContentLength"] + if range_result is None: + range_result = {"total": sz, "start": 0, "end": sz - 1} + + if not range_str and sz > download_file_threshold: + resp["Body"].close() + client.download_file(bucket, key, local_path) + else: + with open(local_path, "wb") as f: + read_in_chunks(f, resp["Body"], sz, download_max_chunk) + + actual_sz = os.path.getsize(local_path) + if actual_sz != sz: + raise _S3OpTransientError( + "size mismatch for %s: expected %d, got %d" + % (url_str, sz, actual_sz) + ) + + if return_info: + meta = { + "size": ( + range_result["total"] + if range_result + else resp["ContentLength"] + ), + "range_result": range_result, + } + if resp.get("ContentType"): + meta["content_type"] = resp["ContentType"] + if resp.get("Metadata") is not None: + meta["metadata"] = resp["Metadata"] + if resp.get("ServerSideEncryption") is not None: + meta["encryption"] = resp["ServerSideEncryption"] + if resp.get("LastModified"): + meta["last_modified"] = get_timestamp(resp["LastModified"]) + with open( + os.path.join(self._tmpdir, "%s_meta" % local_name), + "w", + ) as f: + json.dump(meta, f) + + return (url_str, url_str, local_name) + + except error_class as err: + error_code = s3op.normalize_client_error(err) + if error_code == 404: + if allow_missing: + return (url_str, "", "") + raise MetaflowS3NotFound(url_str) + elif error_code == 403: + raise MetaflowS3AccessDenied(url_str) + elif error_code == 416: + raise MetaflowS3InvalidRange(str(err)) + elif error_code in (500, 502, 503, 504): + raise _S3OpTransientError(str(err)) + else: + raise + except MetaflowException: + raise + except OSError as e: + if e.errno == errno.ENOSPC: + raise MetaflowS3InsufficientDiskSpace( + "Out of disk space downloading %s" % url_str + ) + raise _S3OpTransientError(str(e)) + except (_BotocoreError, OSError) as e: + raise _S3OpTransientError("transient error during get: %s" % e) from e + + items = list(prefixes_and_ranges) + results = self._do_batch_op(items, worker, "get") + for result in results: + yield result + + def put_objects(self, url_info, overwrite): + from . import s3op + from .s3 import MetaflowS3AccessDenied, MetaflowS3Exception + + def worker(client, error_class, item, inject_rate): + local_path, url_str, store_info = item + self._maybe_inject_failure(inject_rate) + + parsed = urlparse(url_str) + bucket = parsed.netloc + key = parsed.path.lstrip("/") + + if not overwrite: + try: + client.head_object(Bucket=bucket, Key=key) + return None + except error_class as err: + error_code = s3op.normalize_client_error(err) + if error_code == 404: + pass + elif error_code == 403: + raise MetaflowS3AccessDenied(url_str) + elif error_code in (500, 502, 503, 504): + raise _S3OpTransientError(str(err)) + else: + raise + + extra_args = None + ct = store_info.get("content_type") + md = store_info.get("metadata") + enc = store_info.get("encryption") + if ct or md or enc: + extra_args = {} + if ct: + extra_args["ContentType"] = ct + if md is not None: + extra_args["Metadata"] = md + if enc is not None: + extra_args["ServerSideEncryption"] = enc + + try: + client.upload_file( + os.path.realpath(local_path), + bucket, + key, + ExtraArgs=extra_args, + ) + return url_str + except error_class as err: + error_code = s3op.normalize_client_error(err) + if error_code == 403: + raise MetaflowS3AccessDenied(url_str) + elif error_code in (500, 502, 503, 504): + raise _S3OpTransientError(str(err)) + else: + raise + except MetaflowException: + raise + except _S3UploadFailedError as e: + raise _S3OpTransientError("transient error during put: %s" % e) from e + except (_BotocoreError, OSError) as e: + raise _S3OpTransientError("transient error during put: %s" % e) from e + + items = list(url_info) + results = self._do_batch_op(items, worker, "put") + + uploaded_urls = set() + for result in results: + if result is not None: + uploaded_urls.add(result) + + return [(info["key"], url) for _, url, info in items if url in uploaded_urls] diff --git a/test/data/s3/test_s3.py b/test/data/s3/test_s3.py index 8441cfd68a3..69df1f7bf63 100644 --- a/test/data/s3/test_s3.py +++ b/test/data/s3/test_s3.py @@ -22,8 +22,11 @@ S3GetObject, ) +from metaflow.metaflow_config import S3_DIRECT_BOTO3 from metaflow.util import to_bytes, unicode_type +INJECT_FAILURE_RATES = [0, 10, 50, 90] + from . import s3_data from .. import FakeFlow, DO_TEST_RUN, S3ROOT @@ -198,7 +201,7 @@ def _do(): assert_results(res, expected_urls, info_only=True) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["pathspecs", "expected"], **s3_data.pytest_benchmark_many_case() ) @@ -244,7 +247,7 @@ def _do(): # assert_results(res, expected_urls, info_should_be_empty=True) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["pathspecs", "expected"], **s3_data.pytest_benchmark_many_case() ) @@ -301,7 +304,7 @@ def _do(): res = benchmark(_do) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["blobs", "expected"], **s3_data.pytest_benchmark_put_many_case() ) @@ -336,7 +339,7 @@ def _do(): res = benchmark(_do) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["pathspecs", "expected"], **s3_data.pytest_fakerun_cases() ) @@ -450,7 +453,7 @@ def test_info_one(s3root, prefixes, expected): assert_results([s3obj], {url: expected_urls[url]}, info_only=True) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["prefixes", "expected"], **s3_data.pytest_basic_case() ) @@ -489,7 +492,7 @@ def test_info_many(s3root, inject_failure_rate, prefixes, expected): assert_results(s3objs, expected_urls, info_only=True) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["prefixes", "expected"], **s3_data.pytest_fakerun_cases() ) @@ -583,7 +586,7 @@ def test_get_one_wo_meta(s3root, prefixes, expected): ) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["prefixes", "expected"], **s3_data.pytest_large_case() ) @@ -602,7 +605,7 @@ def test_get_all(inject_failure_rate, s3root, prefixes, expected): assert_results(s3objs, expected_exists, info_should_be_empty=True) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["prefixes", "expected"], **s3_data.pytest_basic_case() ) @@ -621,7 +624,7 @@ def test_get_all_with_meta(inject_failure_rate, s3root, prefixes, expected): assert_results(s3objs, expected_exists) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["prefixes", "expected"], **s3_data.pytest_basic_case() ) @@ -795,7 +798,7 @@ def test_list_recursive(s3root, prefixes, expected): assert all(e.exists for e in s3objs) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["prefixes", "expected"], **s3_data.pytest_many_prefixes_case() ) @@ -845,7 +848,7 @@ def test_get_recursive(s3root, inject_failure_rate, prefixes, expected): assert not os.path.exists(path) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) def test_put_exceptions(inject_failure_rate): with S3(inject_failure_rate=inject_failure_rate) as s3: with pytest.raises(MetaflowS3InvalidObject): @@ -860,10 +863,12 @@ def test_put_exceptions(inject_failure_rate): @pytest.fixture def s3_server_side_encryption(): + if os.environ.get("MINIO_TEST", "0") != "0": + return None return "AES256" -@pytest.mark.parametrize("inject_failure_rate", [0]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["objs", "expected"], **s3_data.pytest_put_strings_case() ) @@ -932,7 +937,7 @@ def test_put_one(s3root, objs, expected, s3_server_side_encryption): assert s3obj.blob == to_bytes(obj) -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) @pytest.mark.parametrize( argnames=["blobs", "expected"], **s3_data.pytest_put_blobs_case() ) @@ -1001,7 +1006,7 @@ def _files(blobs): assert {s3obj.key for s3obj in s3objs} == {key for key, _ in shuffled_blobs} -@pytest.mark.parametrize("inject_failure_rate", [0, 10, 50, 90]) +@pytest.mark.parametrize("inject_failure_rate", INJECT_FAILURE_RATES) def test_list_recursive_sibling_prefix_filtering(s3root, inject_failure_rate): test_prefix = f"test_log_filtering_{uuid4().hex[:8]}" @@ -1093,9 +1098,11 @@ def test_put_many_exhausted_retries(s3root, monkeypatch): """Test that put_many raises exception when transient retries are exhausted.""" import metaflow.plugins.datatools.s3.s3 as s3_module - # Set retry count to 0 to immediately exhaust retries - # monkeypatch automatically restores the original value after the test monkeypatch.setattr(s3_module, "S3_TRANSIENT_RETRY_COUNT", 0) + if S3_DIRECT_BOTO3: + import metaflow.plugins.datatools.s3.s3op_boto as s3op_boto_module + + monkeypatch.setattr(s3op_boto_module, "S3_TRANSIENT_RETRY_COUNT", 0) test_prefix = f"test_retry_exhaustion_{uuid4().hex}" test_root = os.path.join(s3root, test_prefix) @@ -1110,3 +1117,49 @@ def test_put_many_exhausted_retries(s3root, monkeypatch): with S3(s3root=test_root, inject_failure_rate=100) as s3: with pytest.raises(MetaflowS3Exception, match="failed"): s3.put_many(test_data) + + +@pytest.mark.skipif( + not S3_DIRECT_BOTO3, + reason="Dedicated retry test for S3DirectClient _do_batch_op path only", +) +@pytest.mark.skipif( + os.environ.get("MINIO_TEST", "0") != "0", + reason="MinIO rejects '.' path components used by list_paths(['.'])", +) +def test_batch_retry_succeeds_under_transient_failures(s3root, monkeypatch): + """Test that batch operations succeed despite transient failures. + + Verifies the _do_batch_op retry loop handles moderate failure injection + for put, get, list, and info operations. + """ + import metaflow.plugins.datatools.s3.s3 as s3_module + import metaflow.plugins.datatools.s3.s3op_boto as s3op_boto_module + + monkeypatch.setattr(s3_module, "S3_TRANSIENT_RETRY_COUNT", 7) + monkeypatch.setattr(s3op_boto_module, "S3_TRANSIENT_RETRY_COUNT", 7) + + test_prefix = f"test_retry_success_{uuid4().hex}" + test_root = os.path.join(s3root, test_prefix) + + test_data = [ + ("file1.txt", "content one"), + ("file2.txt", "content two"), + ("file3.txt", "content three"), + ] + + with S3(s3root=test_root, inject_failure_rate=50) as s3: + s3urls = s3.put_many(test_data) + assert len(s3urls) == 3 + + s3objs = s3.get_many(list(dict(test_data))) + assert len(s3objs) == 3 + for obj in s3objs: + assert obj.key in dict(test_data) + assert obj.blob == dict(test_data)[obj.key].encode("utf-8") + + infos = s3.info_many(list(dict(test_data))) + assert len(infos) == 3 + + listed = s3.list_paths(["."]) + assert len(listed) == 3 diff --git a/test/data/s3/test_s3op.py b/test/data/s3/test_s3op.py index f60e3b3ea40..859a0ed8937 100644 --- a/test/data/s3/test_s3op.py +++ b/test/data/s3/test_s3op.py @@ -97,6 +97,10 @@ def test_bucket_root_empty_path(): ) +@pytest.mark.skipif( + os.environ.get("MINIO_TEST", "0") != "0", + reason="MinIO rejects non-ASCII characters in object names", +) def test_long_filename_download_from_s3(): """ End-to-end integration test with real S3 for long filename handling.