Skip to content

Commit 4b18669

Browse files
feat(executors): add workflow runner; support for workflow in server
1 parent 4b8647e commit 4b18669

File tree

5 files changed

+168
-87
lines changed

5 files changed

+168
-87
lines changed

libs/executors/garf/executors/__init__.py

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,42 +15,7 @@
1515

1616
from __future__ import annotations
1717

18-
import importlib
19-
20-
from garf.executors import executor, fetchers
2118
from garf.executors.api_executor import ApiExecutionContext, ApiQueryExecutor
22-
from garf.executors.telemetry import tracer
23-
24-
25-
@tracer.start_as_current_span('setup_executor')
26-
def setup_executor(
27-
source: str,
28-
fetcher_parameters: dict[str, str | int | bool],
29-
enable_cache: bool = False,
30-
cache_ttl_seconds: int = 3600,
31-
) -> type[executor.Executor]:
32-
"""Initializes executors based on a source and parameters."""
33-
if source == 'bq':
34-
bq_executor = importlib.import_module('garf.executors.bq_executor')
35-
query_executor = bq_executor.BigQueryExecutor(**fetcher_parameters)
36-
elif source == 'sqldb':
37-
sql_executor = importlib.import_module('garf.executors.sql_executor')
38-
query_executor = (
39-
sql_executor.SqlAlchemyQueryExecutor.from_connection_string(
40-
fetcher_parameters.get('connection_string')
41-
)
42-
)
43-
else:
44-
concrete_api_fetcher = fetchers.get_report_fetcher(source)
45-
query_executor = ApiQueryExecutor(
46-
fetcher=concrete_api_fetcher(
47-
**fetcher_parameters,
48-
enable_cache=enable_cache,
49-
cache_ttl_seconds=cache_ttl_seconds,
50-
)
51-
)
52-
return query_executor
53-
5419

5520
__all__ = [
5621
'ApiQueryExecutor',

libs/executors/garf/executors/entrypoints/cli.py

Lines changed: 7 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@
2222
import argparse
2323
import logging
2424
import pathlib
25-
import re
2625
import sys
2726

2827
import garf.executors
29-
from garf.executors import config, exceptions, workflow
28+
from garf.executors import config, exceptions, setup, workflow, workflow_runner
3029
from garf.executors.entrypoints import utils
3130
from garf.executors.entrypoints.tracer import initialize_tracer
3231
from garf.executors.telemetry import tracer
@@ -86,52 +85,11 @@ def main():
8685
if workflow_file := args.workflow:
8786
wf_parent = pathlib.Path.cwd() / pathlib.Path(workflow_file).parent
8887
execution_workflow = workflow.Workflow.from_file(workflow_file)
89-
for i, step in enumerate(execution_workflow.steps, 1):
90-
step_span_name = f'{i}-{step.fetcher}'
91-
if step.alias:
92-
step_span_name = f'{step_span_name}-{step.alias}'
93-
with tracer.start_as_current_span(step_span_name):
94-
query_executor = garf.executors.setup_executor(
95-
source=step.fetcher,
96-
fetcher_parameters=step.fetcher_parameters,
97-
enable_cache=args.enable_cache,
98-
cache_ttl_seconds=args.cache_ttl_seconds,
99-
)
100-
batch = {}
101-
if not (queries := step.queries):
102-
logger.error('Please provide one or more queries to run')
103-
raise exceptions.GarfExecutorError(
104-
'Please provide one or more queries to run'
105-
)
106-
for query in queries:
107-
if isinstance(query, garf.executors.workflow.QueryPath):
108-
query_path = query.full_path
109-
if re.match(
110-
'^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)', query_path
111-
):
112-
batch[query.path] = reader_client.read(query_path)
113-
else:
114-
if not query.prefix:
115-
query_path = wf_parent / pathlib.Path(query.path)
116-
if not query_path.exists():
117-
raise workflow.GarfWorkflowError(
118-
f'Query: {query_path} not found'
119-
)
120-
batch[query.path] = reader_client.read(query_path)
121-
elif isinstance(query, garf.executors.workflow.QueryFolder):
122-
query_path = wf_parent / pathlib.Path(query.folder)
123-
if not query_path.exists():
124-
raise workflow.GarfWorkflowError(
125-
f'Folder: {query_path} not found'
126-
)
127-
for p in query_path.rglob('*'):
128-
if p.suffix == '.sql':
129-
batch[p.stem] = reader_client.read(p)
130-
else:
131-
batch[query.query.title] = query.query.text
132-
query_executor.execute_batch(
133-
batch, step.context, args.parallel_threshold
134-
)
88+
workflow_runner.WorkflowRunner(
89+
execution_workflow=execution_workflow, wf_parent=wf_parent
90+
).run(
91+
enable_cache=args.enable_cache, cache_ttl_seconds=args.cache_ttl_seconds
92+
)
13593
sys.exit()
13694

13795
if not args.query:
@@ -165,7 +123,7 @@ def main():
165123
writer_parameters=writer_parameters,
166124
fetcher_parameters=source_parameters,
167125
)
168-
query_executor = garf.executors.setup_executor(
126+
query_executor = setup.setup_executor(
169127
source=args.source,
170128
fetcher_parameters=context.fetcher_parameters,
171129
enable_cache=args.enable_cache,

libs/executors/garf/executors/entrypoints/server.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import pydantic
2222
import typer
2323
import uvicorn
24-
from garf.executors import exceptions
24+
from garf.executors import exceptions, setup, workflow_runner
2525
from garf.executors.entrypoints import utils
2626
from garf.executors.entrypoints.tracer import initialize_tracer
2727
from garf.io import reader
@@ -125,7 +125,7 @@ def execute(
125125
request: ApiExecutorRequest,
126126
dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)],
127127
) -> ApiExecutorResponse:
128-
query_executor = garf.executors.setup_executor(
128+
query_executor = setup.setup_executor(
129129
request.source, request.context.fetcher_parameters
130130
)
131131
result = query_executor.execute(request.query, request.title, request.context)
@@ -137,7 +137,7 @@ def execute_batch(
137137
request: ApiExecutorRequest,
138138
dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)],
139139
) -> ApiExecutorResponse:
140-
query_executor = garf.executors.setup_executor(
140+
query_executor = setup.setup_executor(
141141
request.source, request.context.fetcher_parameters
142142
)
143143
reader_client = reader.FileReader()
@@ -146,6 +146,19 @@ def execute_batch(
146146
return ApiExecutorResponse(results=results)
147147

148148

149+
@app.post('/api/execute:workflow')
150+
def execute_workflow(
151+
workflow_file: str,
152+
dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)],
153+
enable_cache: bool = False,
154+
cache_ttl_seconds: int = 3600,
155+
) -> str:
156+
workflow_runner.WorkflowRunner.from_file(workflow_file).run(
157+
enable_cache=enable_cache, cache_ttl_seconds=cache_ttl_seconds
158+
)
159+
return 'success'
160+
161+
149162
@typer_app.command()
150163
def main(
151164
port: Annotated[int, typer.Option(help='Port to start the server')] = 8000,
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
import importlib
17+
18+
from garf.executors import executor, fetchers
19+
from garf.executors.api_executor import ApiQueryExecutor
20+
from garf.executors.telemetry import tracer
21+
22+
23+
@tracer.start_as_current_span('setup_executor')
24+
def setup_executor(
25+
source: str,
26+
fetcher_parameters: dict[str, str | int | bool],
27+
enable_cache: bool = False,
28+
cache_ttl_seconds: int = 3600,
29+
) -> type[executor.Executor]:
30+
"""Initializes executors based on a source and parameters."""
31+
if source == 'bq':
32+
bq_executor = importlib.import_module('garf.executors.bq_executor')
33+
query_executor = bq_executor.BigQueryExecutor(**fetcher_parameters)
34+
elif source == 'sqldb':
35+
sql_executor = importlib.import_module('garf.executors.sql_executor')
36+
query_executor = (
37+
sql_executor.SqlAlchemyQueryExecutor.from_connection_string(
38+
fetcher_parameters.get('connection_string')
39+
)
40+
)
41+
else:
42+
concrete_api_fetcher = fetchers.get_report_fetcher(source)
43+
query_executor = ApiQueryExecutor(
44+
fetcher=concrete_api_fetcher(
45+
**fetcher_parameters,
46+
enable_cache=enable_cache,
47+
cache_ttl_seconds=cache_ttl_seconds,
48+
)
49+
)
50+
return query_executor
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
import logging
17+
import pathlib
18+
import re
19+
20+
from garf.executors import exceptions, setup, workflow
21+
from garf.executors.telemetry import tracer
22+
from garf.io import reader
23+
24+
logger = logging.getLogger(__name__)
25+
26+
27+
class WorkflowRunner:
28+
def __init__(
29+
self,
30+
execution_workflow: workflow.Workflow,
31+
wf_parent: pathlib.Path | str,
32+
parallel_threshold: int = 10,
33+
) -> None:
34+
"""Initializes WorkflowRunner."""
35+
self.workflow = execution_workflow
36+
self.wf_parent = wf_parent
37+
self.parallel_threshold = parallel_threshold
38+
39+
@classmethod
40+
def from_file(cls, workflow_file: str | pathlib.Path) -> WorkflowRunner:
41+
if isinstance(workflow_file, str):
42+
workflow_file = pathlib.Path(workflow_file)
43+
execution_workflow = workflow.Workflow.from_file(workflow_file)
44+
return cls(
45+
execution_workflow=execution_workflow, wf_parent=workflow_file.parent
46+
)
47+
48+
def run(self, enable_cache, cache_ttl_seconds):
49+
reader_client = reader.create_reader('file')
50+
for i, step in enumerate(self.workflow.steps, 1):
51+
step_span_name = f'{i}-{step.fetcher}'
52+
if step.alias:
53+
step_span_name = f'{step_span_name}-{step.alias}'
54+
with tracer.start_as_current_span(step_span_name):
55+
query_executor = setup.setup_executor(
56+
source=step.fetcher,
57+
fetcher_parameters=step.fetcher_parameters,
58+
enable_cache=enable_cache,
59+
cache_ttl_seconds=cache_ttl_seconds,
60+
)
61+
batch = {}
62+
if not (queries := step.queries):
63+
logger.error('Please provide one or more queries to run')
64+
raise exceptions.GarfExecutorError(
65+
'Please provide one or more queries to run'
66+
)
67+
for query in queries:
68+
if isinstance(query, workflow.QueryPath):
69+
query_path = query.full_path
70+
if re.match(
71+
'^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)', query_path
72+
):
73+
batch[query.path] = reader_client.read(query_path)
74+
else:
75+
if not query.prefix:
76+
query_path = self.wf_parent / pathlib.Path(query.path)
77+
if not query_path.exists():
78+
raise workflow.GarfWorkflowError(
79+
f'Query: {query_path} not found'
80+
)
81+
batch[query.path] = reader_client.read(query_path)
82+
elif isinstance(query, workflow.QueryFolder):
83+
query_path = self.wf_parent / pathlib.Path(query.folder)
84+
if not query_path.exists():
85+
raise workflow.GarfWorkflowError(
86+
f'Folder: {query_path} not found'
87+
)
88+
for p in query_path.rglob('*'):
89+
if p.suffix == '.sql':
90+
batch[p.stem] = reader_client.read(p)
91+
else:
92+
batch[query.query.title] = query.query.text
93+
query_executor.execute_batch(
94+
batch, step.context, self.parallel_threshold
95+
)

0 commit comments

Comments
 (0)