Skip to content

Commit 1bf579c

Browse files
author
npow
committed
S3 bugfixes
1 parent 71a5e8c commit 1bf579c

File tree

4 files changed

+104
-31
lines changed

4 files changed

+104
-31
lines changed

metaflow/metaflow_config.py

+3
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@
109109
# top-level retries)
110110
S3_TRANSIENT_RETRY_COUNT = from_conf("S3_TRANSIENT_RETRY_COUNT", 20)
111111

112+
# S3 retry configuration used in the aws client
113+
S3_CLIENT_RETRY_CONFIG = from_conf("S3_CLIENT_RETRY_CONFIG", {"max_attempts": 10, "mode": "adaptive"})
114+
112115
# Threshold to start printing warnings for an AWS retry
113116
RETRY_WARNING_THRESHOLD = 3
114117

metaflow/plugins/aws/aws_client.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def get_client(
1414
AWS_SANDBOX_ENABLED,
1515
AWS_SANDBOX_STS_ENDPOINT_URL,
1616
AWS_SANDBOX_API_KEY,
17+
S3_CLIENT_RETRY_CONFIG,
1718
)
1819

1920
if session_vars is None:
@@ -40,7 +41,8 @@ def get_client(
4041
# Use the adaptive retry strategy by default -- do not set anything if
4142
# the user has already set something
4243
config = client_params.get("config", Config())
43-
config.retries = {"max_attempts": 10, "mode": "adaptive"}
44+
config.retries = S3_CLIENT_RETRY_CONFIG,
45+
client_params["config"] = config
4446

4547
if AWS_SANDBOX_ENABLED:
4648
# role is ignored in the sandbox

metaflow/plugins/datatools/s3/s3op.py

+87-30
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
from multiprocessing import Process, Queue
1616
from itertools import starmap, chain, islice
1717

18+
from boto3.exceptions import RetriesExceededError, S3UploadFailedError
1819
from boto3.s3.transfer import TransferConfig
20+
from botocore.config import Config
21+
from botocore.exceptions import ClientError, SSLError
1922

2023
try:
2124
# python2
@@ -46,11 +49,13 @@
4649
import metaflow.tracing as tracing
4750
from metaflow.metaflow_config import (
4851
S3_WORKER_COUNT,
52+
S3_CLIENT_RETRY_CONFIG,
4953
)
5054

5155
DOWNLOAD_FILE_THRESHOLD = 2 * TransferConfig().multipart_threshold
5256
DOWNLOAD_MAX_CHUNK = 2 * 1024 * 1024 * 1024 - 1
5357

58+
DEFAULT_S3_CLIENT_PARAMS = {"config": Config(retries=S3_CLIENT_RETRY_CONFIG)}
5459
RANGE_MATCH = re.compile(r"bytes (?P<start>[0-9]+)-(?P<end>[0-9]+)/(?P<total>[0-9]+)")
5560

5661
S3Config = namedtuple("S3Config", "role session_vars client_params")
@@ -147,6 +152,7 @@ def normalize_client_error(err):
147152
"LimitExceededException",
148153
"RequestThrottled",
149154
"EC2ThrottledException",
155+
"InternalError",
150156
):
151157
return 503
152158
return error_code
@@ -254,21 +260,27 @@ def op_info(url):
254260
except client_error as err:
255261
tmp.close()
256262
os.unlink(tmp.name)
257-
error_code = normalize_client_error(err)
258-
if error_code == 404:
259-
result_file.write("%d %d\n" % (idx, -ERROR_URL_NOT_FOUND))
260-
continue
261-
elif error_code == 403:
262-
result_file.write(
263-
"%d %d\n" % (idx, -ERROR_URL_ACCESS_DENIED)
264-
)
265-
continue
266-
elif error_code == 503:
267-
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
268-
continue
269-
else:
270-
raise
271-
# TODO specific error message for out of disk space
263+
handle_client_error(err, idx, result_file)
264+
continue
265+
except RetriesExceededError as e:
266+
tmp.close()
267+
os.unlink(tmp.name)
268+
err = convert_to_client_error(e)
269+
handle_client_error(err, idx, result_file)
270+
continue
271+
except SSLError as e:
272+
tmp.close()
273+
os.unlink(tmp.name)
274+
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
275+
result_file.flush()
276+
continue
277+
except Exception as e:
278+
tmp.close()
279+
os.unlink(tmp.name)
280+
# assume anything else is transient
281+
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
282+
result_file.flush()
283+
continue
272284
# If we need the metadata, get it and write it out
273285
if pre_op_info:
274286
with open("%s_meta" % url.local, mode="w") as f:
@@ -322,22 +334,61 @@ def op_info(url):
322334
# We indicate that the file was uploaded
323335
result_file.write("%d %d\n" % (idx, 0))
324336
except client_error as err:
325-
error_code = normalize_client_error(err)
326-
if error_code == 403:
327-
result_file.write(
328-
"%d %d\n" % (idx, -ERROR_URL_ACCESS_DENIED)
329-
)
330-
continue
331-
elif error_code == 503:
332-
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
333-
continue
334-
else:
335-
raise
337+
# shouldn't get here, but just in case
338+
handle_client_error(err, idx, result_file)
339+
continue
340+
except S3UploadFailedError as e:
341+
err = convert_to_client_error(e)
342+
handle_client_error(err, idx, result_file)
343+
continue
344+
except SSLError as e:
345+
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
346+
result_file.flush()
347+
continue
348+
except Exception as e:
349+
# assume anything else is transient
350+
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
351+
result_file.flush()
352+
continue
336353
except:
337354
traceback.print_exc()
355+
result_file.flush()
338356
sys.exit(ERROR_WORKER_EXCEPTION)
339357

340358

359+
def convert_to_client_error(e):
360+
match = re.search(
361+
r"An error occurred \((\w+)\) when calling the (\w+) operation.*: (.+)", str(e)
362+
)
363+
assert match, "Failed to parse error message"
364+
error_code = match.group(1)
365+
operation_name = match.group(2)
366+
error_message = match.group(3)
367+
response = {
368+
"Error": {
369+
"Code": error_code,
370+
"Message": error_message,
371+
}
372+
}
373+
return ClientError(response, operation_name)
374+
375+
376+
def handle_client_error(err, idx, result_file):
377+
error_code = normalize_client_error(err)
378+
if error_code == 404:
379+
result_file.write("%d %d\n" % (idx, -ERROR_URL_NOT_FOUND))
380+
result_file.flush()
381+
elif error_code == 403:
382+
result_file.write("%d %d\n" % (idx, -ERROR_URL_ACCESS_DENIED))
383+
result_file.flush()
384+
elif error_code == 503:
385+
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
386+
result_file.flush()
387+
else:
388+
raise
389+
# TODO specific error message for out of disk space
390+
391+
341392
def start_workers(mode, urls, num_workers, inject_failure, s3config):
342393
# We start the minimum of len(urls) or num_workers to avoid starting
343394
# workers that will definitely do nothing
@@ -381,6 +432,12 @@ def start_workers(mode, urls, num_workers, inject_failure, s3config):
381432
if proc.exitcode is not None:
382433
if proc.exitcode != 0:
383434
msg = "Worker process failed (exit code %d)" % proc.exitcode
435+
436+
# IMPORTANT: if a child process has put items on a queue, then that process will not
437+
# terminate until all buffered items have been flushed to the pipe, causing a deadlock.
438+
# `cancel_join_thread()` allows the subprocess to exit without flushing the queue.
439+
queue.cancel_join_thread()
440+
384441
exit(msg, proc.exitcode)
385442
# Read the output file if all went well
386443
with open(out_path, "r") as out_file:
@@ -745,7 +802,7 @@ def lst(
745802
s3config = S3Config(
746803
s3role,
747804
json.loads(s3sessionvars) if s3sessionvars else None,
748-
json.loads(s3clientparams) if s3clientparams else None,
805+
json.loads(s3clientparams) if s3clientparams else DEFAULT_S3_CLIENT_PARAMS,
749806
)
750807

751808
urllist = []
@@ -878,7 +935,7 @@ def _make_url(idx, local, user_url, content_type, metadata, encryption):
878935
s3config = S3Config(
879936
s3role,
880937
json.loads(s3sessionvars) if s3sessionvars else None,
881-
json.loads(s3clientparams) if s3clientparams else None,
938+
json.loads(s3clientparams) if s3clientparams else DEFAULT_S3_CLIENT_PARAMS,
882939
)
883940

884941
urls = list(starmap(_make_url, _files()))
@@ -1025,7 +1082,7 @@ def get(
10251082
s3config = S3Config(
10261083
s3role,
10271084
json.loads(s3sessionvars) if s3sessionvars else None,
1028-
json.loads(s3clientparams) if s3clientparams else None,
1085+
json.loads(s3clientparams) if s3clientparams else DEFAULT_S3_CLIENT_PARAMS,
10291086
)
10301087

10311088
# Construct a list of URL (prefix) objects
@@ -1172,7 +1229,7 @@ def info(
11721229
s3config = S3Config(
11731230
s3role,
11741231
json.loads(s3sessionvars) if s3sessionvars else None,
1175-
json.loads(s3clientparams) if s3clientparams else None,
1232+
json.loads(s3clientparams) if s3clientparams else DEFAULT_S3_CLIENT_PARAMS,
11761233
)
11771234

11781235
# Construct a list of URL (prefix) objects

test/data/s3/test_s3op.py

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from metaflow.plugins.datatools.s3.s3op import convert_to_client_error
2+
3+
4+
def test_convert_to_client_error():
5+
s = "boto3.exceptions.S3UploadFailedError: Failed to upload /a/b/c/d.parquet to e/f/g/h.parquet: An error occurred (SlowDown) when calling the CompleteMultipartUpload operation (reached max retries: 4): Please reduce your request rate."
6+
client_error = convert_to_client_error(s)
7+
assert client_error.response["Error"]["Code"] == "SlowDown"
8+
assert (
9+
client_error.response["Error"]["Message"] == "Please reduce your request rate."
10+
)
11+
assert client_error.operation_name == "CompleteMultipartUpload"

0 commit comments

Comments
 (0)