Skip to content

Commit 65d1c83

Browse files
feat(executors): add support for including / exclusing worklow steps
1 parent d6d0d75 commit 65d1c83

File tree

2 files changed

+37
-3
lines changed

2 files changed

+37
-3
lines changed

libs/executors/garf/executors/entrypoints/cli.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ def main():
6868
default=3600,
6969
type=int,
7070
)
71+
parser.add_argument('--workflow-skip', dest='workflow_skip', default=None)
72+
parser.add_argument(
73+
'--workflow-include', dest='workflow_include', default=None
74+
)
7175
parser.set_defaults(parallel_queries=True)
7276
parser.set_defaults(enable_cache=False)
7377
parser.set_defaults(dry_run=False)
@@ -86,10 +90,15 @@ def main():
8690
if workflow_file := args.workflow:
8791
wf_parent = pathlib.Path.cwd() / pathlib.Path(workflow_file).parent
8892
execution_workflow = workflow.Workflow.from_file(workflow_file)
93+
workflow_skip = args.workflow_skip if args.workflow_skip else None
94+
workflow_include = args.workflow_include if args.workflow_include else None
8995
workflow_runner.WorkflowRunner(
9096
execution_workflow=execution_workflow, wf_parent=wf_parent
9197
).run(
92-
enable_cache=args.enable_cache, cache_ttl_seconds=args.cache_ttl_seconds
98+
enable_cache=args.enable_cache,
99+
cache_ttl_seconds=args.cache_ttl_seconds,
100+
selected_aliases=workflow_include,
101+
skipped_aliases=workflow_skip,
93102
)
94103
sys.exit()
95104

libs/executors/garf/executors/workflows/workflow_runner.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ def __init__(
5555
self.parallel_threshold = parallel_threshold
5656

5757
@classmethod
58-
def from_file(cls, workflow_file: str | pathlib.Path) -> WorkflowRunner:
58+
def from_file(
59+
cls,
60+
workflow_file: str | pathlib.Path,
61+
) -> WorkflowRunner:
5962
"""Initialized Workflow runner from a local or remote file."""
6063
if isinstance(workflow_file, str):
6164
workflow_file = pathlib.Path(workflow_file)
@@ -65,15 +68,37 @@ def from_file(cls, workflow_file: str | pathlib.Path) -> WorkflowRunner:
6568
)
6669

6770
def run(
68-
self, enable_cache: bool = False, cache_ttl_seconds: int = 3600
71+
self,
72+
enable_cache: bool = False,
73+
cache_ttl_seconds: int = 3600,
74+
selected_aliases: list[str] | None = None,
75+
skipped_aliases: list[str] | None = None,
6976
) -> list[str]:
77+
skipped_aliases = skipped_aliases or []
78+
selected_aliases = selected_aliases or []
7079
reader_client = reader.create_reader('file')
7180
execution_results = []
7281
logger.info('Starting Garf Workflow...')
7382
for i, step in enumerate(self.workflow.steps, 1):
7483
step_name = f'{i}-{step.fetcher}'
7584
if step.alias:
7685
step_name = f'{step_name}-{step.alias}'
86+
if step.alias in skipped_aliases:
87+
logger.warning(
88+
'Skipping step %d, fetcher: %s, alias: %s',
89+
i,
90+
step.fetcher,
91+
step.alias,
92+
)
93+
continue
94+
if selected_aliases and step.alias not in selected_aliases:
95+
logger.warning(
96+
'Skipping step %d, fetcher: %s, alias: %s',
97+
i,
98+
step.fetcher,
99+
step.alias,
100+
)
101+
continue
77102
with tracer.start_as_current_span(step_name):
78103
logger.info(
79104
'Running step %d, fetcher: %s, alias: %s', i, step.fetcher, step.alias

0 commit comments

Comments
 (0)