2020testing = False
2121
2222
23+ class CreateTasksException (Exception ):
24+ """Exception raised when one or more tasks could not be created."""
25+
26+ def __init__ (self , errors : dict [str , Exception ]):
27+ message = ""
28+ for label , exc in errors .items ():
29+ message += f"\n ERROR: Could not create '{ label } ':\n \n "
30+ message += "\n " .join (f" { line } " for line in str (exc ).splitlines ()) + "\n "
31+
32+ super ().__init__ (message )
33+
34+
2335def create_tasks (graph_config , taskgraph , label_to_taskid , params , decision_task_id ):
2436 taskid_to_label = {t : l for l , t in label_to_taskid .items ()}
2537
@@ -50,33 +62,48 @@ def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task
5062 session = get_session ()
5163 with futures .ThreadPoolExecutor (concurrency ) as e :
5264 fs = {}
65+ fs_to_task = {}
66+ skipped = set ()
67+ errors = {}
5368
5469 # We can't submit a task until its dependencies have been submitted.
5570 # So our strategy is to walk the graph and submit tasks once all
5671 # their dependencies have been submitted.
5772 tasklist = set (taskgraph .graph .visit_postorder ())
5873 alltasks = tasklist .copy ()
5974
60- def schedule_tasks ():
61- # bail out early if any futures have failed
62- if any (f .done () and f .exception () for f in fs .values ()):
63- return
75+ def handle_exception (fut ):
76+ if exc := fut .exception ():
77+ task_id , label = fs_to_task [fut ]
78+ skipped .add (task_id )
79+ errors [label ] = exc
6480
81+ def schedule_tasks ():
6582 to_remove = set ()
6683 new = set ()
6784
6885 def submit (task_id , label , task_def ):
6986 fut = e .submit (create_task , session , task_id , label , task_def )
7087 new .add (fut )
7188 fs [task_id ] = fut
89+ fs_to_task [fut ] = (task_id , label )
90+ fut .add_done_callback (handle_exception )
7291
7392 for task_id in tasklist :
7493 task_def = taskgraph .tasks [task_id ].task
75- # If we haven't finished submitting all our dependencies yet,
76- # come back to this later.
7794 # Some dependencies aren't in our graph, so make sure to filter
7895 # those out
7996 deps = set (task_def .get ("dependencies" , [])) & alltasks
97+
98+ # If one of the dependencies didn't get created, then
99+ # don't attempt to submit as it would fail.
100+ if any (d in skipped for d in deps ):
101+ skipped .add (task_id )
102+ to_remove .add (task_id )
103+ continue
104+
105+ # If we haven't finished submitting all our dependencies yet,
106+ # come back to this later.
80107 if any ((d not in fs or not fs [d ].done ()) for d in deps ):
81108 continue
82109
@@ -90,16 +117,18 @@ def submit(task_id, label, task_def):
90117 submit (slugid (), taskid_to_label [task_id ], task_def )
91118 tasklist .difference_update (to_remove )
92119
93- # as each of those futures complete, try to schedule more tasks
120+ # As each of those futures complete, try to schedule more tasks.
94121 for f in futures .as_completed (new ):
95122 schedule_tasks ()
96123
97- # start scheduling tasks and run until everything is scheduled
124+ # Start scheduling tasks and run until everything is scheduled.
98125 schedule_tasks ()
99126
100- # check the result of each future, raising an exception if it failed
101- for f in futures .as_completed (fs .values ()):
102- f .result ()
127+ # Wait for all futures to complete.
128+ futures .wait (fs .values ())
129+
130+ if errors :
131+ raise CreateTasksException (errors )
103132
104133
105134def create_task (session , task_id , label , task_def ):
0 commit comments