Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b99063b
Replace S3 subprocess/multiprocessing with direct boto3 + ThreadPoolE…
Apr 24, 2026
af86e64
Add S3_DIRECT_BOTO3 feature flag for gradual rollout
Apr 24, 2026
a4d6621
style: fix black formatting in s3.py
Apr 24, 2026
427448a
refactor: extract direct boto3 ops into s3op_boto.py
Apr 24, 2026
820db67
refactor: minimal s3.py diff via early-return pattern
Apr 25, 2026
f381703
refactor: move dispatch logic into S3DirectClient.read_many
Apr 25, 2026
0be6d42
fix: address code review findings in s3op_boto
Apr 25, 2026
f44f3c2
fix(ci): replace minikube devstack with standalone minio container
Apr 26, 2026
063a49d
fix(ci): add numpy to minio test deps
Apr 26, 2026
924a428
Cap S3 retry count in minio CI to prevent multi-hour test runs
Apr 26, 2026
e282c2e
Increase minio CI timeout to 60 minutes
Apr 26, 2026
8c8e68b
Reduce CI retry count to 3 and increase timeout to 90min
Apr 26, 2026
f752aba
Fix get_recursive prefix loss, cap exponential backoff, bump CI retries
Apr 26, 2026
1362493
Optimize S3 tests: remove inject_failure_rate bloat, add dedicated re…
Apr 26, 2026
2332930
debug: fail-fast false, single Python version for minio CI
Apr 26, 2026
5d33ce4
debug: add -x --tb=long, skip slow put tests for traceback
Apr 26, 2026
4150c11
Fix read_many() generator bug: return → yield from
Apr 26, 2026
2e57c05
Fix list prefix filtering, retry monkeypatch, and minio test compat
Apr 26, 2026
17e8b91
Gate test changes on S3_DIRECT_BOTO3 flag
Apr 26, 2026
db449e5
Fix retry sleep bottleneck: skip backoff for injected failures
Apr 26, 2026
e7421a0
Fix retry exhaustion at high inject_failure_rates
Apr 26, 2026
dc132f6
Fix retry boost overriding exhausted-retries test
Apr 26, 2026
ff0ce85
Fix inject_failure_rate=100 not deterministic in exhaustion test
Apr 26, 2026
fde9a4d
Harden S3DirectClient error handling (Greptile P1 fixes)
Apr 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@
"S3_CLIENT_RETRY_CONFIG", {"max_attempts": 10, "mode": "adaptive"}
)

S3_DIRECT_BOTO3 = from_conf("S3_DIRECT_BOTO3", True)

# Threshold to start printing warnings for an AWS retry
RETRY_WARNING_THRESHOLD = 3

Expand Down
203 changes: 127 additions & 76 deletions metaflow/plugins/datatools/s3/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from metaflow.metaflow_current import current
from metaflow.metaflow_config import (
DATATOOLS_S3ROOT,
S3_DIRECT_BOTO3,
S3_RETRY_COUNT,
S3_TRANSIENT_RETRY_COUNT,
S3_LOG_TRANSIENT_RETRIES,
Expand Down Expand Up @@ -1449,91 +1450,141 @@ def _jitter_sleep(
# and url_unquote.
def _read_many_files(self, op, prefixes_and_ranges, **options):
prefixes_and_ranges = list(prefixes_and_ranges)
with NamedTemporaryFile(
dir=self._tmpdir,
mode="wb",
delete=not debug.s3client,
prefix="metaflow.s3.inputs.",
) as inputfile:
inputfile.write(
b"\n".join(
[
b" ".join([url_quote(prefix)] + ([url_quote(r)] if r else []))
for prefix, r in prefixes_and_ranges
]
)
)
inputfile.flush()
stdout_lines, stderr, err_code = self._s3op_with_retries(
op, inputs=inputfile.name, **options
)
if stderr:
from . import s3op
if not prefixes_and_ranges:
return
if S3_DIRECT_BOTO3:
from .s3op_boto import S3DirectClient

# Raise the appropriate exception type based on error code
if err_code == s3op.ERROR_URL_NOT_FOUND:
raise MetaflowS3NotFound(stderr)
elif err_code == s3op.ERROR_URL_ACCESS_DENIED:
raise MetaflowS3AccessDenied(stderr)
elif err_code == s3op.ERROR_INVALID_RANGE:
raise MetaflowS3InvalidRange(stderr)
direct = S3DirectClient(
self._s3_client, self._tmpdir, self._s3_inject_failures
)
if op == "list":
yield from direct.list_objects(
prefixes_and_ranges,
recursive=options.get("recursive", False),
)
elif op == "info":
yield from direct.info_objects(prefixes_and_ranges)
elif op == "get":
recursive = options.get("recursive", False)
if recursive:
listed = list(
direct.list_objects(prefixes_and_ranges, recursive=True)
)
download_items = [(url, None) for _prefix, url, _size in listed]
yield from direct.get_objects(
download_items,
allow_missing=options.get("allow_missing", False),
return_info=options.get("info", False),
)
else:
raise MetaflowS3Exception(
"Getting S3 files failed.\n"
"First prefix requested: %s\n"
"Error: %s" % (prefixes_and_ranges[0], stderr)
yield from direct.get_objects(
prefixes_and_ranges,
allow_missing=options.get("allow_missing", False),
return_info=options.get("info", False),
)
else:
for line in stdout_lines:
yield tuple(map(url_unquote, line.strip(b"\n").split(b" ")))
raise MetaflowS3Exception("Unknown operation: %s" % op)
else:
with NamedTemporaryFile(
dir=self._tmpdir,
mode="wb",
delete=not debug.s3client,
prefix="metaflow.s3.inputs.",
) as inputfile:
inputfile.write(
b"\n".join(
[
b" ".join(
[url_quote(prefix)] + ([url_quote(r)] if r else [])
)
for prefix, r in prefixes_and_ranges
]
)
)
inputfile.flush()
stdout_lines, stderr, err_code = self._s3op_with_retries(
op, inputs=inputfile.name, **options
)
if stderr:
from . import s3op

if err_code == s3op.ERROR_URL_NOT_FOUND:
raise MetaflowS3NotFound(stderr)
elif err_code == s3op.ERROR_URL_ACCESS_DENIED:
raise MetaflowS3AccessDenied(stderr)
elif err_code == s3op.ERROR_INVALID_RANGE:
raise MetaflowS3InvalidRange(stderr)
else:
raise MetaflowS3Exception(
"Getting S3 files failed.\n"
"First prefix requested: %s\n"
"Error: %s" % (prefixes_and_ranges[0], stderr)
)
else:
for line in stdout_lines:
yield tuple(map(url_unquote, line.strip(b"\n").split(b" ")))

def _put_many_files(self, url_info, overwrite):
url_info = list(url_info)
url_dicts = [
dict(
chain([("local", os.path.realpath(local)), ("url", url)], info.items())
)
for local, url, info in url_info
]

with NamedTemporaryFile(
dir=self._tmpdir,
mode="wb",
delete=not debug.s3client,
prefix="metaflow.s3.put_inputs.",
) as inputfile:
lines = [to_bytes(json.dumps(x)) for x in url_dicts]
inputfile.write(b"\n".join(lines))
inputfile.flush()
stdout_lines, stderr, err_code = self._s3op_with_retries(
"put",
inputs=inputfile.name,
verbose=False,
overwrite=overwrite,
listing=True,
)
if stderr:
from . import s3op
if not url_info:
return []
if S3_DIRECT_BOTO3:
from .s3op_boto import S3DirectClient

# Raise the appropriate exception type based on error code
if err_code == s3op.ERROR_URL_NOT_FOUND:
raise MetaflowS3NotFound(stderr)
elif err_code == s3op.ERROR_URL_ACCESS_DENIED:
raise MetaflowS3AccessDenied(stderr)
elif err_code == s3op.ERROR_INVALID_RANGE:
raise MetaflowS3InvalidRange(stderr)
else:
raise MetaflowS3Exception(
"Uploading S3 files failed.\n"
"First key: %s\n"
"Error: %s" % (url_info[0][2]["key"], stderr)
direct = S3DirectClient(
self._s3_client, self._tmpdir, self._s3_inject_failures
)
return direct.put_objects(url_info, overwrite)
else:
url_dicts = [
dict(
chain(
[("local", os.path.realpath(local)), ("url", url)],
info.items(),
)
else:
urls = set()
for line in stdout_lines:
url, _, _ = map(url_unquote, line.strip(b"\n").split(b" "))
urls.add(url)
return [(info["key"], url) for _, url, info in url_info if url in urls]
)
for local, url, info in url_info
]
with NamedTemporaryFile(
dir=self._tmpdir,
mode="wb",
delete=not debug.s3client,
prefix="metaflow.s3.put_inputs.",
) as inputfile:
lines = [to_bytes(json.dumps(x)) for x in url_dicts]
inputfile.write(b"\n".join(lines))
inputfile.flush()
stdout_lines, stderr, err_code = self._s3op_with_retries(
"put",
inputs=inputfile.name,
verbose=False,
overwrite=overwrite,
listing=True,
)
if stderr:
from . import s3op

if err_code == s3op.ERROR_URL_NOT_FOUND:
raise MetaflowS3NotFound(stderr)
elif err_code == s3op.ERROR_URL_ACCESS_DENIED:
raise MetaflowS3AccessDenied(stderr)
elif err_code == s3op.ERROR_INVALID_RANGE:
raise MetaflowS3InvalidRange(stderr)
else:
raise MetaflowS3Exception(
"Uploading S3 files failed.\n"
"First key: %s\n"
"Error: %s" % (url_info[0][2]["key"], stderr)
)
else:
urls = set()
for line in stdout_lines:
url, _, _ = map(url_unquote, line.strip(b"\n").split(b" "))
urls.add(url)
return [
(info["key"], url) for _, url, info in url_info if url in urls
]

def _s3op_with_retries(self, mode, **options):
from . import s3op
Expand Down
Loading
Loading