Skip to content

Commit

Permalink
Improve Modal backups implementation (#238)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite authored Jun 29, 2023
1 parent 647a68b commit 11facd9
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions cubed/runtime/executors/modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@

# We need map_unordered for the use_backups implementation
async def map_unordered(
app_function, input, use_backups=False, return_stats=False, name=None, **kwargs
app_function,
input,
use_backups=False,
backup_function=None,
return_stats=False,
name=None,
**kwargs,
):
"""
Apply a function to items of an input list, yielding results as they are completed
Expand All @@ -42,6 +48,8 @@ async def map_unordered(
yield result
return

backup_function = backup_function or app_function

tasks = {
asyncio.ensure_future(app_function.call.aio(i, **kwargs)): i for i in input
}
Expand All @@ -55,12 +63,14 @@ async def map_unordered(
finished, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED, timeout=2
)
# print("finished", finished)
# print("pending", pending)

for task in finished:
# TODO: use exception groups in Python 3.11 to handle case of multiple task exceptions
if task.exception():
# if the task has a backup that is not done, or is done with no exception, then don't raise this exception
backup = backups.get(task, None)
if backup:
if not backup.done() or not backup.exception():
continue
raise task.exception()
end_times[task] = time.monotonic()
if return_stats:
Expand All @@ -76,9 +86,11 @@ async def map_unordered(
if use_backups:
backup = backups.get(task, None)
if backup:
pending.remove(backup)
if backup in pending:
pending.remove(backup)
del backups[task]
del backups[backup]
backup.cancel()

if use_backups:
now = time.monotonic()
Expand All @@ -87,8 +99,11 @@ async def map_unordered(
task, now, start_times, end_times
):
# launch backup task
print("Launching backup task")
i = tasks[task]
new_task = asyncio.ensure_future(app_function.call.aio(i, **kwargs))
new_task = asyncio.ensure_future(
backup_function.call.aio(i, **kwargs)
)
tasks[new_task] = i
start_times[new_task] = time.monotonic()
pending.add(new_task)
Expand Down

0 comments on commit 11facd9

Please sign in to comment.