diff --git a/src/taskgraph/create.py b/src/taskgraph/create.py index ca876a80..c8e0c7b4 100644 --- a/src/taskgraph/create.py +++ b/src/taskgraph/create.py @@ -20,6 +20,18 @@ testing = False +class CreateTasksException(Exception): + """Exception raised when one or more tasks could not be created.""" + + def __init__(self, errors: dict[str, Exception]): + message = "" + for label, exc in errors.items(): + message += f"\nERROR: Could not create '{label}':\n\n" + message += "\n".join(f" {line}" for line in str(exc).splitlines()) + "\n" + + super().__init__(message) + + def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task_id): taskid_to_label = {t: l for l, t in label_to_taskid.items()} @@ -50,6 +62,9 @@ def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task session = get_session() with futures.ThreadPoolExecutor(concurrency) as e: fs = {} + fs_to_task = {} + skipped = set() + errors = {} # We can't submit a task until its dependencies have been submitted. # So our strategy is to walk the graph and submit tasks once all @@ -57,11 +72,13 @@ def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task tasklist = set(taskgraph.graph.visit_postorder()) alltasks = tasklist.copy() - def schedule_tasks(): - # bail out early if any futures have failed - if any(f.done() and f.exception() for f in fs.values()): - return + def handle_exception(fut): + if exc := fut.exception(): + task_id, label = fs_to_task[fut] + skipped.add(task_id) + errors[label] = exc + def schedule_tasks(): to_remove = set() new = set() @@ -69,14 +86,24 @@ def submit(task_id, label, task_def): fut = e.submit(create_task, session, task_id, label, task_def) new.add(fut) fs[task_id] = fut + fs_to_task[fut] = (task_id, label) + fut.add_done_callback(handle_exception) for task_id in tasklist: task_def = taskgraph.tasks[task_id].task - # If we haven't finished submitting all our dependencies yet, - # come back to this later. # Some dependencies aren't in our graph, so make sure to filter # those out deps = set(task_def.get("dependencies", [])) & alltasks + + # If one of the dependencies didn't get created, then + # don't attempt to submit as it would fail. + if any(d in skipped for d in deps): + skipped.add(task_id) + to_remove.add(task_id) + continue + + # If we haven't finished submitting all our dependencies yet, + # come back to this later. if any((d not in fs or not fs[d].done()) for d in deps): continue @@ -90,16 +117,18 @@ def submit(task_id, label, task_def): submit(slugid(), taskid_to_label[task_id], task_def) tasklist.difference_update(to_remove) - # as each of those futures complete, try to schedule more tasks + # As each of those futures complete, try to schedule more tasks. for f in futures.as_completed(new): schedule_tasks() - # start scheduling tasks and run until everything is scheduled + # Start scheduling tasks and run until everything is scheduled. schedule_tasks() - # check the result of each future, raising an exception if it failed - for f in futures.as_completed(fs.values()): - f.result() + # Wait for all futures to complete. + futures.wait(fs.values()) + + if errors: + raise CreateTasksException(errors) def create_task(session, task_id, label, task_def): diff --git a/test/test_create.py b/test/test_create.py index 1b93c9b1..a72a89f3 100644 --- a/test/test_create.py +++ b/test/test_create.py @@ -2,32 +2,73 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. - +import json +import re import unittest from unittest import mock +import responses + from taskgraph import create from taskgraph.config import GraphConfig +from taskgraph.create import CreateTasksException from taskgraph.graph import Graph from taskgraph.task import Task from taskgraph.taskgraph import TaskGraph +from taskgraph.util import taskcluster as tc_util GRAPH_CONFIG = GraphConfig({"trust-domain": "domain"}, "/var/empty") -class TestCreate(unittest.TestCase): - def setUp(self): - self.created_tasks = {} - self.old_create_task = create.create_task - create.create_task = self.fake_create_task +def mock_taskcluster_api( + created_tasks=None, error_status=None, error_message=None, error_task_ids=None +): + """Mock the Taskcluster Queue API for create task calls.""" + + def request_callback(request): + task_id = request.url.split("/")[-1] - def tearDown(self): - create.create_task = self.old_create_task + # Check if this task should error + if error_status is not None: + if error_task_ids is None or task_id in error_task_ids: + # Support per-task error messages + if isinstance(error_message, dict): + message = error_message.get(task_id, "error") + else: + message = error_message or "error" + return (error_status, {}, f'{{"message": "{message}"}}') - def fake_create_task(self, session, task_id, label, task_def): - self.created_tasks[task_id] = task_def + # Success case - capture task definition if requested + if created_tasks is not None: + task_def = json.loads(request.body) + created_tasks[task_id] = task_def + return (200, {}, f'{{"status": {{"taskId": "{task_id}"}}}}') + + responses.add_callback( + responses.PUT, + re.compile(r"https://tc\.example\.com/api/queue/v1/task/.*"), + callback=request_callback, + content_type="application/json", + ) + + +class TestCreate(unittest.TestCase): + def setUp(self): + # Clear cached Taskcluster clients/sessions since we're mocking the environment + tc_util.get_taskcluster_client.cache_clear() + tc_util.get_session.cache_clear() + + @responses.activate + @mock.patch.dict( + "os.environ", + {"TASKCLUSTER_ROOT_URL": "https://tc.example.com"}, + clear=True, + ) def test_create_tasks(self): + created_tasks = {} + mock_taskcluster_api(created_tasks=created_tasks) + tasks = { "tid-a": Task( kind="test", label="a", attributes={}, task={"payload": "hello world"} @@ -48,7 +89,8 @@ def test_create_tasks(self): decision_task_id="decisiontask", ) - for tid, task in self.created_tasks.items(): + assert created_tasks + for tid, task in created_tasks.items(): self.assertEqual(task["payload"], "hello world") self.assertEqual(task["schedulerId"], "domain-level-4") # make sure the dependencies exist, at least @@ -56,10 +98,19 @@ def test_create_tasks(self): if depid == "decisiontask": # Don't look for decisiontask here continue - self.assertIn(depid, self.created_tasks) - + self.assertIn(depid, created_tasks) + + @responses.activate + @mock.patch.dict( + "os.environ", + {"TASKCLUSTER_ROOT_URL": "https://tc.example.com"}, + clear=True, + ) def test_create_task_without_dependencies(self): "a task with no dependencies depends on the decision task" + created_tasks = {} + mock_taskcluster_api(created_tasks=created_tasks) + tasks = { "tid-a": Task( kind="test", label="a", attributes={}, task={"payload": "hello world"} @@ -77,12 +128,20 @@ def test_create_task_without_dependencies(self): decision_task_id="decisiontask", ) - for tid, task in self.created_tasks.items(): + assert created_tasks + for tid, task in created_tasks.items(): self.assertEqual(task.get("dependencies"), ["decisiontask"]) - @mock.patch("taskgraph.create.create_task") - def test_create_tasks_fails_if_create_fails(self, create_task): - "creat_tasks fails if a single create_task call fails" + @responses.activate + @mock.patch.dict( + "os.environ", + {"TASKCLUSTER_ROOT_URL": "https://tc.example.com"}, + clear=True, + ) + def test_create_tasks_fails_if_create_fails(self): + "create_tasks fails if a single create_task call fails" + mock_taskcluster_api(error_status=403, error_message="oh no!") + tasks = { "tid-a": Task( kind="test", label="a", attributes={}, task={"payload": "hello world"} @@ -92,13 +151,45 @@ def test_create_tasks_fails_if_create_fails(self, create_task): graph = Graph(nodes={"tid-a"}, edges=set()) taskgraph = TaskGraph(tasks, graph) - def fail(*args): - print("UHOH") - raise RuntimeError("oh no!") + with self.assertRaises(CreateTasksException): + create.create_tasks( + GRAPH_CONFIG, + taskgraph, + label_to_taskid, + {"level": "4"}, + decision_task_id="decisiontask", + ) + + @responses.activate + @mock.patch.dict( + "os.environ", + {"TASKCLUSTER_ROOT_URL": "https://tc.example.com"}, + clear=True, + ) + def test_create_tasks_collects_multiple_errors(self): + "create_tasks collects all errors from multiple failing tasks" + mock_taskcluster_api( + error_status=409, + error_message={ + "tid-a": "scope error for task a", + "tid-b": "scope error for task b", + }, + error_task_ids={"tid-a", "tid-b"}, + ) - create_task.side_effect = fail + tasks = { + "tid-a": Task( + kind="test", label="a", attributes={}, task={"payload": "hello world"} + ), + "tid-b": Task( + kind="test", label="b", attributes={}, task={"payload": "hello world"} + ), + } + label_to_taskid = {"a": "tid-a", "b": "tid-b"} + graph = Graph(nodes={"tid-a", "tid-b"}, edges=set()) + taskgraph = TaskGraph(tasks, graph) - with self.assertRaises(RuntimeError): + with self.assertRaises(CreateTasksException) as cm: create.create_tasks( GRAPH_CONFIG, taskgraph, @@ -106,3 +197,8 @@ def fail(*args): {"level": "4"}, decision_task_id="decisiontask", ) + + # Verify both errors are in the exception message + exception_message = str(cm.exception) + self.assertIn("Could not create 'a'", exception_message) + self.assertIn("Could not create 'b'", exception_message) diff --git a/test/test_docker.py b/test/test_docker.py index c75957bb..5cd5c5b7 100644 --- a/test/test_docker.py +++ b/test/test_docker.py @@ -8,6 +8,7 @@ from taskgraph import docker from taskgraph.config import GraphConfig from taskgraph.transforms.docker_image import IMAGE_BUILDER_IMAGE +from taskgraph.util import taskcluster as tc_util from taskgraph.util.vcs import get_repository from .conftest import nowin @@ -22,6 +23,9 @@ def root_url(): def mock_environ(monkeypatch, root_url): # Ensure user specified environment variables don't interfere with URLs. monkeypatch.setattr(os, "environ", {"TASKCLUSTER_ROOT_URL": root_url}) + # Clear cached Taskcluster clients/sessions since we're mocking the environment + tc_util.get_taskcluster_client.cache_clear() + tc_util.get_session.cache_clear() @pytest.fixture(autouse=True, scope="module")