Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion libs/executors/garf/executors/entrypoints/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ def main():
default=3600,
type=int,
)
parser.add_argument('--workflow-skip', dest='workflow_skip', default=None)
parser.add_argument(
'--workflow-include', dest='workflow_include', default=None
)
parser.set_defaults(parallel_queries=True)
parser.set_defaults(enable_cache=False)
parser.set_defaults(dry_run=False)
Expand All @@ -86,10 +90,15 @@ def main():
if workflow_file := args.workflow:
wf_parent = pathlib.Path.cwd() / pathlib.Path(workflow_file).parent
execution_workflow = workflow.Workflow.from_file(workflow_file)
workflow_skip = args.workflow_skip if args.workflow_skip else None
workflow_include = args.workflow_include if args.workflow_include else None
workflow_runner.WorkflowRunner(
execution_workflow=execution_workflow, wf_parent=wf_parent
).run(
enable_cache=args.enable_cache, cache_ttl_seconds=args.cache_ttl_seconds
enable_cache=args.enable_cache,
cache_ttl_seconds=args.cache_ttl_seconds,
selected_aliases=workflow_include,
skipped_aliases=workflow_skip,
)
sys.exit()

Expand Down
29 changes: 27 additions & 2 deletions libs/executors/garf/executors/workflows/workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def __init__(
self.parallel_threshold = parallel_threshold

@classmethod
def from_file(cls, workflow_file: str | pathlib.Path) -> WorkflowRunner:
def from_file(
cls,
workflow_file: str | pathlib.Path,
) -> WorkflowRunner:
"""Initialized Workflow runner from a local or remote file."""
if isinstance(workflow_file, str):
workflow_file = pathlib.Path(workflow_file)
Expand All @@ -65,15 +68,37 @@ def from_file(cls, workflow_file: str | pathlib.Path) -> WorkflowRunner:
)

def run(
self, enable_cache: bool = False, cache_ttl_seconds: int = 3600
self,
enable_cache: bool = False,
cache_ttl_seconds: int = 3600,
selected_aliases: list[str] | None = None,
skipped_aliases: list[str] | None = None,
) -> list[str]:
skipped_aliases = skipped_aliases or []
selected_aliases = selected_aliases or []
reader_client = reader.create_reader('file')
execution_results = []
logger.info('Starting Garf Workflow...')
for i, step in enumerate(self.workflow.steps, 1):
step_name = f'{i}-{step.fetcher}'
if step.alias:
step_name = f'{step_name}-{step.alias}'
if step.alias in skipped_aliases:
logger.warning(
'Skipping step %d, fetcher: %s, alias: %s',
i,
step.fetcher,
step.alias,
)
continue
if selected_aliases and step.alias not in selected_aliases:
logger.warning(
'Skipping step %d, fetcher: %s, alias: %s',
i,
step.fetcher,
step.alias,
)
continue
with tracer.start_as_current_span(step_name):
logger.info(
'Running step %d, fetcher: %s, alias: %s', i, step.fetcher, step.alias
Expand Down