2020testing = False
2121
2222
23+ class CreateTasksException (Exception ):
24+ """Exception raised when one or more tasks could not be created."""
25+
26+ def __init__ (self , errors : dict [str , Exception ]):
27+ message = ""
28+ for label , exc in errors .items ():
29+ message += f"\n ERROR: Could not create '{ label } ':\n \n "
30+ message += "\n " .join (f" { line } " for line in str (exc ).splitlines ()) + "\n "
31+
32+ super ().__init__ (message )
33+
34+
2335def create_tasks (graph_config , taskgraph , label_to_taskid , params , decision_task_id ):
2436 taskid_to_label = {t : l for l , t in label_to_taskid .items ()}
2537
@@ -50,6 +62,8 @@ def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task
5062 session = get_session ()
5163 with futures .ThreadPoolExecutor (concurrency ) as e :
5264 fs = {}
65+ fs_to_task = {}
66+ skipped = set ()
5367
5468 # We can't submit a task until its dependencies have been submitted.
5569 # So our strategy is to walk the graph and submit tasks once all
@@ -58,28 +72,33 @@ def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task
5872 alltasks = tasklist .copy ()
5973
6074 def schedule_tasks ():
61- # bail out early if any futures have failed
62- if any (f .done () and f .exception () for f in fs .values ()):
63- return
64-
6575 to_remove = set ()
6676 new = set ()
6777
6878 def submit (task_id , label , task_def ):
6979 fut = e .submit (create_task , session , task_id , label , task_def )
7080 new .add (fut )
7181 fs [task_id ] = fut
82+ fs_to_task [fut ] = (task_id , label )
7283
7384 for task_id in tasklist :
7485 task_def = taskgraph .tasks [task_id ].task
75- # If we haven't finished submitting all our dependencies yet,
76- # come back to this later.
7786 # Some dependencies aren't in our graph, so make sure to filter
7887 # those out
7988 deps = set (task_def .get ("dependencies" , [])) & alltasks
89+
90+ # If we haven't finished submitting all our dependencies yet,
91+ # come back to this later.
8092 if any ((d not in fs or not fs [d ].done ()) for d in deps ):
8193 continue
8294
95+ # If one of the dependencies didn't get created, then
96+ # don't attempt to submit as it would fail.
97+ if any (d in skipped for d in deps ):
98+ skipped .add (task_id )
99+ to_remove .add (task_id )
100+ continue
101+
83102 submit (task_id , taskid_to_label [task_id ], task_def )
84103 to_remove .add (task_id )
85104
@@ -90,16 +109,24 @@ def submit(task_id, label, task_def):
90109 submit (slugid (), taskid_to_label [task_id ], task_def )
91110 tasklist .difference_update (to_remove )
92111
93- # as each of those futures complete, try to schedule more tasks
112+ # As each of those futures complete, try to schedule more tasks.
94113 for f in futures .as_completed (new ):
95114 schedule_tasks ()
96115
97- # start scheduling tasks and run until everything is scheduled
116+ # Start scheduling tasks and run until everything is scheduled.
98117 schedule_tasks ()
99118
100- # check the result of each future, raising an exception if it failed
119+ # Check the result of each future and save the exception for later
120+ # printing if it failed.
121+ errors = {}
101122 for f in futures .as_completed (fs .values ()):
102- f .result ()
123+ if exc := f .exception ():
124+ task_id , label = fs_to_task [f ]
125+ skipped .add (task_id )
126+ errors [label ] = exc
127+
128+ if errors :
129+ raise CreateTasksException (errors )
103130
104131
105132def create_task (session , task_id , label , task_def ):
0 commit comments