Skip to content

Commit db15f96

Browse files
[executors] feat: add workflow entity
1 parent 6692a05 commit db15f96

File tree

6 files changed

+235
-3
lines changed

6 files changed

+235
-3
lines changed

libs/executors/garf_executors/config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,7 @@ def from_file(cls, path: str | pathlib.Path | os.PathLike[str]) -> Config:
4747
def save(self, path: str | pathlib.Path | os.PathLike[str]) -> str:
4848
"""Saves config to local or remote yaml file."""
4949
with smart_open.open(path, 'w', encoding='utf-8') as f:
50-
yaml.dump(self.model_dump().get('sources'), f, encoding='utf-8')
50+
yaml.dump(
51+
self.model_dump(exclude_none=True).get('sources'), f, encoding='utf-8'
52+
)
5153
return f'Config is saved to {str(path)}'

libs/executors/garf_executors/entrypoints/cli.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
import sys
2525

2626
from garf_io import reader
27+
from opentelemetry import trace
2728

2829
import garf_executors
29-
from garf_executors import config, exceptions
30+
from garf_executors import config, exceptions, workflow
3031
from garf_executors.entrypoints import utils
3132
from garf_executors.entrypoints.tracer import initialize_tracer
3233
from garf_executors.telemetry import tracer
@@ -39,6 +40,7 @@ def main():
3940
parser = argparse.ArgumentParser()
4041
parser.add_argument('query', nargs='*')
4142
parser.add_argument('-c', '--config', dest='config', default=None)
43+
parser.add_argument('-w', '--workflow', dest='workflow', default=None)
4244
parser.add_argument('--source', dest='source', default=None)
4345
parser.add_argument('--output', dest='output', default='console')
4446
parser.add_argument('--input', dest='input', default='file')
@@ -70,18 +72,47 @@ def main():
7072
parser.set_defaults(dry_run=False)
7173
args, kwargs = parser.parse_known_args()
7274

75+
span = trace.get_current_span()
76+
command_args = ' '.join(sys.argv[1:])
77+
span.set_attribute('cli.command', f'garf {command_args}')
7378
if args.version:
7479
print(garf_executors.__version__)
7580
sys.exit()
7681
logger = utils.init_logging(
7782
loglevel=args.loglevel.upper(), logger_type=args.logger, name=args.log_name
7883
)
84+
reader_client = reader.create_reader(args.input)
85+
if workflow_file := args.workflow:
86+
execution_workflow = workflow.Workflow.from_file(workflow_file)
87+
for i, step in enumerate(execution_workflow.steps, 1):
88+
with tracer.start_as_current_span(f'{i}-{step.fetcher}'):
89+
query_executor = garf_executors.setup_executor(
90+
source=step.fetcher,
91+
fetcher_parameters=step.fetcher_parameters,
92+
enable_cache=args.enable_cache,
93+
cache_ttl_seconds=args.cache_ttl_seconds,
94+
)
95+
batch = {}
96+
if not (queries := step.queries):
97+
logger.error('Please provide one or more queries to run')
98+
raise exceptions.GarfExecutorError(
99+
'Please provide one or more queries to run'
100+
)
101+
for query in queries:
102+
if isinstance(query, garf_executors.workflow.QueryPath):
103+
batch[query.path] = reader_client.read(query.path)
104+
else:
105+
batch[query.query.title] = query.query.text
106+
query_executor.execute_batch(
107+
batch, step.context, args.parallel_threshold
108+
)
109+
sys.exit()
110+
79111
if not args.query:
80112
logger.error('Please provide one or more queries to run')
81113
raise exceptions.GarfExecutorError(
82114
'Please provide one or more queries to run'
83115
)
84-
reader_client = reader.create_reader(args.input)
85116
if config_file := args.config:
86117
execution_config = config.Config.from_file(config_file)
87118
if not (context := execution_config.sources.get(args.source)):
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
import os
17+
import pathlib
18+
19+
import pydantic
20+
import smart_open
21+
import yaml
22+
23+
from garf_executors.execution_context import ExecutionContext
24+
25+
26+
class QueryPath(pydantic.BaseModel):
27+
"""Path file with query."""
28+
29+
path: str
30+
31+
32+
class QueryDefinition(pydantic.BaseModel):
33+
"""Definition of a query."""
34+
35+
query: Query
36+
37+
38+
class Query(pydantic.BaseModel):
39+
"""Query elements.
40+
41+
Attributes:
42+
text: Query text.
43+
title: Name of the query.
44+
"""
45+
46+
text: str
47+
title: str
48+
49+
50+
class ExecutionStep(ExecutionContext):
51+
"""Common context for executing one or more queries.
52+
53+
Attributes:
54+
fetcher: Name of a fetcher to get data from API.
55+
alias: Optional alias to identify execution step.
56+
queries: Queries to run for a particular fetcher.
57+
context: Execution context for queries and fetcher.
58+
"""
59+
60+
fetcher: str | None = None
61+
alias: str | None = None
62+
queries: list[QueryPath | QueryDefinition] | None = None
63+
64+
@property
65+
def context(self) -> ExecutionContext:
66+
return ExecutionContext(
67+
writer=self.writer,
68+
writer_parameters=self.writer_parameters,
69+
query_parameters=self.query_parameters,
70+
fetcher_parameters=self.fetcher_parameters,
71+
)
72+
73+
74+
class Workflow(pydantic.BaseModel):
75+
"""Orchestrates execution of queries for multiple fetchers.
76+
77+
Attributes:
78+
steps: Contains one or several fetcher executions.
79+
"""
80+
81+
steps: list[ExecutionStep]
82+
83+
@classmethod
84+
def from_file(cls, path: str | pathlib.Path | os.PathLike[str]) -> Workflow:
85+
"""Builds workflow from local or remote yaml file."""
86+
with smart_open.open(path, 'r', encoding='utf-8') as f:
87+
data = yaml.safe_load(f)
88+
return Workflow(steps=data.get('steps'))
89+
90+
def save(self, path: str | pathlib.Path | os.PathLike[str]) -> str:
91+
"""Saves workflow to local or remote yaml file."""
92+
with smart_open.open(path, 'w', encoding='utf-8') as f:
93+
yaml.dump(
94+
self.model_dump(exclude_none=True).get('steps'), f, encoding='utf-8'
95+
)
96+
return f'Workflow is saved to {str(path)}'

libs/executors/tests/end-to-end/test_cli.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,28 @@ def test_fake_source_from_config(self, tmp_path):
123123

124124
assert result.returncode == 0
125125
assert json.loads(result.stdout) == self.expected_output
126+
127+
def test_fake_source_from_workflow(self, tmp_path):
128+
workflow_path = _SCRIPT_PATH / 'test_workflow.yaml'
129+
with open(workflow_path, 'r', encoding='utf-8') as f:
130+
workflow_data = yaml.safe_load(f)
131+
original_data_location = workflow_data['steps'][0]['fetcher_parameters'][
132+
'data_location'
133+
]
134+
workflow_data['steps'][0]['fetcher_parameters']['data_location'] = str(
135+
_SCRIPT_PATH / original_data_location
136+
)
137+
tmp_workflow = tmp_path / 'workflow.yaml'
138+
with open(tmp_workflow, 'w', encoding='utf-8') as f:
139+
yaml.dump(workflow_data, f, encoding='utf-8')
140+
command = f'garf -w {str(tmp_workflow)} --loglevel ERROR'
141+
result = subprocess.run(
142+
command,
143+
shell=True,
144+
check=False,
145+
capture_output=True,
146+
text=True,
147+
)
148+
149+
assert result.returncode == 0
150+
assert json.loads(result.stdout) == self.expected_output
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
steps:
2+
- alias: test
3+
fetcher: fake
4+
queries:
5+
- query:
6+
title: test_query
7+
text: |
8+
SELECT
9+
resource,
10+
dimensions.name AS name,
11+
metrics.clicks AS clicks
12+
FROM resource
13+
writer: console
14+
writer_parameters:
15+
format: json
16+
fetcher_parameters:
17+
data_location: test.json
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import yaml
16+
from garf_executors.workflow import Workflow
17+
18+
19+
class TestWorkflow:
20+
data = {
21+
'steps': [
22+
{
23+
'fetcher': 'api',
24+
'queries': [
25+
{'path': 'example.sql'},
26+
{'query': {'text': 'SELECT 1', 'title': 'example2'}},
27+
],
28+
'query_parameters': {
29+
'macro': {
30+
'start_date': '2025-01-01',
31+
},
32+
'template': {
33+
'cohorts': 1,
34+
},
35+
},
36+
'fetcher_parameters': {
37+
'id': [1, 2, 3],
38+
},
39+
'writer': 'csv',
40+
'writer_parameters': {
41+
'destination_folder': '/tmp',
42+
},
43+
}
44+
]
45+
}
46+
47+
def test_from_file_returns_correct_context_from_data(self, tmp_path):
48+
tmp_workflow = tmp_path / 'workflow.yaml'
49+
with open(tmp_workflow, 'w', encoding='utf-8') as f:
50+
yaml.dump(self.data, f, encoding='utf-8')
51+
workflow = Workflow.from_file(tmp_workflow)
52+
expected_workflow = Workflow(steps=self.data.get('steps'))
53+
assert workflow == expected_workflow
54+
55+
def test_save_returns_correct_data(self, tmp_path):
56+
tmp_workflow = tmp_path / 'workflow.yaml'
57+
workflow = Workflow(steps=self.data.get('steps'))
58+
workflow.save(tmp_workflow)
59+
with open(tmp_workflow, 'r', encoding='utf-8') as f:
60+
workflow_data = yaml.safe_load(f)
61+
assert workflow_data == self.data.get('steps')

0 commit comments

Comments
 (0)