-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathexecutor.py
More file actions
188 lines (154 loc) · 6.92 KB
/
executor.py
File metadata and controls
188 lines (154 loc) · 6.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import logging
import warnings
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, TypeVar
from rich.console import Console
from rich.table import Table
from .common.logging import log_exception
from .common.registrable import Registrable
from .common.util import import_extra_module
from .step_graph import StepGraph
from .workspace import Workspace
if TYPE_CHECKING:
from .step import Step
logger = logging.getLogger(__name__)
T = TypeVar("T")
@dataclass
class ExecutionMetadata:
logs_location: Optional[str] = None
"""
Path or URL to the logs for the step's execution.
"""
result_location: Optional[str] = None
"""
Path or URL to the result of the step's execution.
"""
@dataclass
class ExecutorOutput:
"""
Describes the outcome of the execution.
"""
successful: Dict[str, ExecutionMetadata] = field(default_factory=dict)
"""Steps which ran successfully or were found in the cache."""
failed: Dict[str, ExecutionMetadata] = field(default_factory=dict)
"""Steps that failed."""
not_run: Dict[str, ExecutionMetadata] = field(default_factory=dict)
"""Steps that were ignored (usually because of failed dependencies)."""
def display(self) -> None:
table = Table(caption_style="")
table.add_column("Step Name", justify="left", style="cyan")
table.add_column("Status", justify="left")
table.add_column("Results", justify="left")
all_steps = dict(self.successful)
all_steps.update(self.failed)
all_steps.update(self.not_run)
for step_name in sorted(all_steps):
status_str: str
result_str: str = "[grey62]N/A[/]"
if step_name in self.failed:
status_str = "[red]\N{ballot x} failed[/]"
execution_metadata = self.failed[step_name]
if execution_metadata.logs_location is not None:
result_str = f"[cyan]{execution_metadata.logs_location}[/]"
elif step_name in self.not_run:
status_str = "[yellow]- not run[/]"
elif step_name in self.successful:
status_str = "[green]\N{check mark} succeeded[/]"
execution_metadata = self.successful[step_name]
if execution_metadata.result_location is not None:
result_str = f"[cyan]{execution_metadata.result_location}[/]"
elif execution_metadata.logs_location is not None:
result_str = f"[cyan]{execution_metadata.logs_location}[/]"
else:
continue
table.add_row(step_name, status_str, result_str)
caption_parts: List[str] = []
if self.failed:
caption_parts.append(f"[red]\N{ballot x}[/] [italic]{len(self.failed)} failed[/]")
if self.successful:
caption_parts.append(
f"[green]\N{check mark}[/] [italic]{len(self.successful)} succeeded[/]"
)
if self.not_run:
caption_parts.append(f"[italic]{len(self.not_run)} not run[/]")
table.caption = ", ".join(caption_parts)
console = Console()
console.print(table)
class Executor(Registrable):
"""
An ``Executor`` is a class that is responsible for running steps and caching their results.
This is the base class and default implementation, registered as "default".
.. note::
The ``parallelism`` parameter has no effect with this default :class:`Executor`,
but is part of the API because most subclass implementations allow configuring
parallelism.
"""
default_implementation = "default"
def __init__(
self,
workspace: Workspace,
include_package: Optional[Sequence[str]] = None,
parallelism: Optional[int] = None,
) -> None:
self.workspace = workspace
self.include_package = include_package
self.parallelism = parallelism
def execute_step(self, step: "Step") -> None:
# Import included packages to find registered components.
if self.include_package is not None:
for package_name in self.include_package:
import_extra_module(package_name)
if step.cache_results:
step.ensure_result(self.workspace)
else:
step.result(self.workspace)
def execute_step_graph(
self, step_graph: StepGraph, run_name: Optional[str] = None
) -> ExecutorOutput:
"""
Execute a :class:`~tango.step_graph.StepGraph`. This attempts to execute
every step in order. If a step fails, its dependent steps are not run,
but unrelated steps are still executed. Step failures will be logged, but
no exceptions will be raised.
"""
if self.parallelism is not None:
warnings.warn(
"The 'parallelism' parameter has no effect with the default Executor. "
"If you want to run steps in parallel, consider using the MulticoreExecutor.",
UserWarning,
)
successful: Dict[str, ExecutionMetadata] = {}
failed: Dict[str, ExecutionMetadata] = {}
not_run: Dict[str, ExecutionMetadata] = {}
uncacheable_leaf_steps = step_graph.uncacheable_leaf_steps()
for step in step_graph.values():
if not step.cache_results and step not in uncacheable_leaf_steps:
# If a step is uncacheable and required for another step, it will be
# executed as part of the downstream step's execution.
continue
if any(dep.name in failed for dep in step.recursive_dependencies):
not_run[step.name] = ExecutionMetadata()
else:
try:
self.execute_step(step)
successful[step.name] = ExecutionMetadata(
result_location=self.workspace.step_info(step).result_location
)
except Exception as exc:
failed[step.name] = ExecutionMetadata()
log_exception(exc, logger)
return ExecutorOutput(successful=successful, failed=failed, not_run=not_run)
# NOTE: The reason for having this method instead of just using `execute_step()` to run
# a single step is that the certain executors, such as the BeakerExecutor, need to
# serialize steps somehow, and the easiest way to serialize a step is by serializing the
# whole step config (which can be accessed via the step graph).
def execute_sub_graph_for_step(
self, step_graph: StepGraph, step_name: str, run_name: Optional[str] = None
) -> ExecutorOutput:
"""
Execute the sub-graph associated with a particular step in a
:class:`~tango.step_graph.StepGraph`.
"""
sub_graph = step_graph.sub_graph(step_name)
return self.execute_step_graph(sub_graph, run_name=run_name)
Executor.register("default")(Executor)