Skip to content

Commit b7df18a

Browse files
committed
add retries
1 parent 9d7ec35 commit b7df18a

File tree

2 files changed

+35
-11
lines changed

2 files changed

+35
-11
lines changed

metaflow/plugins/aws/step_functions/dynamo_db_client.py

+26-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import os
2-
2+
import time
33
import requests
44

55
from metaflow.metaflow_config import SFN_DYNAMO_DB_TABLE
@@ -27,12 +27,31 @@ def save_foreach_cardinality(self, foreach_split_task_id, foreach_cardinality, t
2727
def save_parent_task_id_for_foreach_join(
2828
self, foreach_split_task_id, foreach_join_parent_task_id
2929
):
30-
return self._client.update_item(
31-
TableName=self.name,
32-
Key={"pathspec": {"S": foreach_split_task_id}},
33-
UpdateExpression="ADD parent_task_ids_for_foreach_join :val",
34-
ExpressionAttributeValues={":val": {"SS": [foreach_join_parent_task_id]}},
35-
)
30+
ex = None
31+
for attempt in range(10):
32+
try:
33+
return self._client.update_item(
34+
TableName=self.name,
35+
Key={"pathspec": {"S": foreach_split_task_id}},
36+
UpdateExpression="ADD parent_task_ids_for_foreach_join :val",
37+
ExpressionAttributeValues={
38+
":val": {"SS": [foreach_join_parent_task_id]}
39+
},
40+
)
41+
except self._client.exceptions.ClientError as error:
42+
ex = error
43+
if (
44+
error.response["Error"]["Code"]
45+
== "ProvisionedThroughputExceededException"
46+
):
47+
# hopefully, enough time for AWS to scale up! otherwise
48+
# ensure sufficient on-demand throughput for dynamo db
49+
# is provisioned ahead of time
50+
sleep_time = min((2**attempt) * 10, 60)
51+
time.sleep(sleep_time)
52+
else:
53+
raise
54+
raise ex
3655

3756
def get_parent_task_ids_for_foreach_join(self, foreach_split_task_id):
3857
response = self._client.get_item(

metaflow/plugins/aws/step_functions/step_functions.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -545,10 +545,6 @@ def _batch(self, node):
545545
"Parallel steps are not supported yet with AWS step functions."
546546
)
547547

548-
# Inherit the run id from the parent and pass it along to children.
549-
attrs["metaflow.run_id.$"] = "$.Parameters.run_id"
550-
attrs["run_id.$"] = "$.Parameters.run_id"
551-
552548
# Handle foreach join.
553549
if (
554550
node.type == "join"
@@ -572,6 +568,9 @@ def _batch(self, node):
572568
env["METAFLOW_SPLIT_PARENT_TASK_ID"] = (
573569
"$.Parameters.split_parent_task_id_%s" % node.split_parents[-1]
574570
)
571+
# Inherit the run id from the parent and pass it along to children.
572+
attrs["metaflow.run_id.$"] = "$.Parameters.run_id"
573+
attrs["run_id.$"] = "$.Parameters.run_id"
575574
else:
576575
# Set appropriate environment variables for runtime replacement.
577576
if len(node.in_funcs) == 1:
@@ -580,6 +579,9 @@ def _batch(self, node):
580579
% node.in_funcs[0]
581580
)
582581
env["METAFLOW_PARENT_TASK_ID"] = "$.JobId"
582+
# Inherit the run id from the parent and pass it along to children.
583+
attrs["metaflow.run_id.$"] = "$.Parameters.run_id"
584+
attrs["run_id.$"] = "$.Parameters.run_id"
583585
else:
584586
# Generate the input paths in a quasi-compressed format.
585587
# See util.decompress_list for why this is written the way
@@ -589,6 +591,9 @@ def _batch(self, node):
589591
"${METAFLOW_PARENT_%s_TASK_ID}" % (idx, idx)
590592
for idx, _ in enumerate(node.in_funcs)
591593
)
594+
# Inherit the run id from the parent and pass it along to children.
595+
attrs["metaflow.run_id.$"] = "$.[0].Parameters.run_id"
596+
attrs["run_id.$"] = "$.[0].Parameters.run_id"
592597
for idx, _ in enumerate(node.in_funcs):
593598
env["METAFLOW_PARENT_%s_TASK_ID" % idx] = "$.[%s].JobId" % idx
594599
env["METAFLOW_PARENT_%s_STEP" % idx] = (

0 commit comments

Comments
 (0)