Skip to content

Commit 65654b3

Browse files
feat(executors): add workflow runner; support for workflow in server
1 parent 4b8647e commit 65654b3

File tree

8 files changed

+224
-99
lines changed

8 files changed

+224
-99
lines changed

libs/executors/garf/executors/__init__.py

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,42 +15,7 @@
1515

1616
from __future__ import annotations
1717

18-
import importlib
19-
20-
from garf.executors import executor, fetchers
2118
from garf.executors.api_executor import ApiExecutionContext, ApiQueryExecutor
22-
from garf.executors.telemetry import tracer
23-
24-
25-
@tracer.start_as_current_span('setup_executor')
26-
def setup_executor(
27-
source: str,
28-
fetcher_parameters: dict[str, str | int | bool],
29-
enable_cache: bool = False,
30-
cache_ttl_seconds: int = 3600,
31-
) -> type[executor.Executor]:
32-
"""Initializes executors based on a source and parameters."""
33-
if source == 'bq':
34-
bq_executor = importlib.import_module('garf.executors.bq_executor')
35-
query_executor = bq_executor.BigQueryExecutor(**fetcher_parameters)
36-
elif source == 'sqldb':
37-
sql_executor = importlib.import_module('garf.executors.sql_executor')
38-
query_executor = (
39-
sql_executor.SqlAlchemyQueryExecutor.from_connection_string(
40-
fetcher_parameters.get('connection_string')
41-
)
42-
)
43-
else:
44-
concrete_api_fetcher = fetchers.get_report_fetcher(source)
45-
query_executor = ApiQueryExecutor(
46-
fetcher=concrete_api_fetcher(
47-
**fetcher_parameters,
48-
enable_cache=enable_cache,
49-
cache_ttl_seconds=cache_ttl_seconds,
50-
)
51-
)
52-
return query_executor
53-
5419

5520
__all__ = [
5621
'ApiQueryExecutor',

libs/executors/garf/executors/entrypoints/cli.py

Lines changed: 7 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@
2222
import argparse
2323
import logging
2424
import pathlib
25-
import re
2625
import sys
2726

2827
import garf.executors
29-
from garf.executors import config, exceptions, workflow
28+
from garf.executors import config, exceptions, setup, workflow, workflow_runner
3029
from garf.executors.entrypoints import utils
3130
from garf.executors.entrypoints.tracer import initialize_tracer
3231
from garf.executors.telemetry import tracer
@@ -86,52 +85,11 @@ def main():
8685
if workflow_file := args.workflow:
8786
wf_parent = pathlib.Path.cwd() / pathlib.Path(workflow_file).parent
8887
execution_workflow = workflow.Workflow.from_file(workflow_file)
89-
for i, step in enumerate(execution_workflow.steps, 1):
90-
step_span_name = f'{i}-{step.fetcher}'
91-
if step.alias:
92-
step_span_name = f'{step_span_name}-{step.alias}'
93-
with tracer.start_as_current_span(step_span_name):
94-
query_executor = garf.executors.setup_executor(
95-
source=step.fetcher,
96-
fetcher_parameters=step.fetcher_parameters,
97-
enable_cache=args.enable_cache,
98-
cache_ttl_seconds=args.cache_ttl_seconds,
99-
)
100-
batch = {}
101-
if not (queries := step.queries):
102-
logger.error('Please provide one or more queries to run')
103-
raise exceptions.GarfExecutorError(
104-
'Please provide one or more queries to run'
105-
)
106-
for query in queries:
107-
if isinstance(query, garf.executors.workflow.QueryPath):
108-
query_path = query.full_path
109-
if re.match(
110-
'^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)', query_path
111-
):
112-
batch[query.path] = reader_client.read(query_path)
113-
else:
114-
if not query.prefix:
115-
query_path = wf_parent / pathlib.Path(query.path)
116-
if not query_path.exists():
117-
raise workflow.GarfWorkflowError(
118-
f'Query: {query_path} not found'
119-
)
120-
batch[query.path] = reader_client.read(query_path)
121-
elif isinstance(query, garf.executors.workflow.QueryFolder):
122-
query_path = wf_parent / pathlib.Path(query.folder)
123-
if not query_path.exists():
124-
raise workflow.GarfWorkflowError(
125-
f'Folder: {query_path} not found'
126-
)
127-
for p in query_path.rglob('*'):
128-
if p.suffix == '.sql':
129-
batch[p.stem] = reader_client.read(p)
130-
else:
131-
batch[query.query.title] = query.query.text
132-
query_executor.execute_batch(
133-
batch, step.context, args.parallel_threshold
134-
)
88+
workflow_runner.WorkflowRunner(
89+
execution_workflow=execution_workflow, wf_parent=wf_parent
90+
).run(
91+
enable_cache=args.enable_cache, cache_ttl_seconds=args.cache_ttl_seconds
92+
)
13593
sys.exit()
13694

13795
if not args.query:
@@ -165,7 +123,7 @@ def main():
165123
writer_parameters=writer_parameters,
166124
fetcher_parameters=source_parameters,
167125
)
168-
query_executor = garf.executors.setup_executor(
126+
query_executor = setup.setup_executor(
169127
source=args.source,
170128
fetcher_parameters=context.fetcher_parameters,
171129
enable_cache=args.enable_cache,

libs/executors/garf/executors/entrypoints/grpc_server.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,40 +18,38 @@
1818
import logging
1919
from concurrent import futures
2020

21-
import garf.executors
2221
import grpc
23-
from garf.executors import garf_pb2, garf_pb2_grpc
22+
from garf.executors import execution_context, garf_pb2, garf_pb2_grpc, setup
2423
from garf.executors.entrypoints.tracer import initialize_tracer
2524
from google.protobuf.json_format import MessageToDict
2625
from grpc_reflection.v1alpha import reflection
2726

2827

2928
class GarfService(garf_pb2_grpc.GarfService):
3029
def Execute(self, request, context):
31-
query_executor = garf.executors.setup_executor(
30+
query_executor = setup.setup_executor(
3231
request.source, request.context.fetcher_parameters
3332
)
34-
execution_context = garf.executors.execution_context.ExecutionContext(
35-
**MessageToDict(request.context, preserving_proto_field_name=True)
36-
)
3733
result = query_executor.execute(
3834
query=request.query,
3935
title=request.title,
40-
context=execution_context,
36+
context=execution_context.ExecutionContext(
37+
**MessageToDict(request.context, preserving_proto_field_name=True)
38+
),
4139
)
4240
return garf_pb2.ExecuteResponse(results=[result])
4341

4442
def Fetch(self, request, context):
45-
query_executor = garf.executors.setup_executor(
43+
query_executor = setup.setup_executor(
4644
request.source, request.context.fetcher_parameters
4745
)
48-
execution_context = garf.executors.execution_context.ExecutionContext(
46+
query_args = execution_context.ExecutionContext(
4947
**MessageToDict(request.context, preserving_proto_field_name=True)
50-
)
48+
).query_parameters
5149
result = query_executor.fetcher.fetch(
5250
query_specification=request.query,
5351
title=request.title,
54-
args=execution_context.query_parameters,
52+
args=query_args,
5553
)
5654
return garf_pb2.FetchResponse(
5755
columns=result.column_names, rows=result.to_list(row_type='dict')

libs/executors/garf/executors/entrypoints/server.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import pydantic
2222
import typer
2323
import uvicorn
24-
from garf.executors import exceptions
24+
from garf.executors import exceptions, setup, workflow_runner
2525
from garf.executors.entrypoints import utils
2626
from garf.executors.entrypoints.tracer import initialize_tracer
2727
from garf.io import reader
@@ -125,7 +125,7 @@ def execute(
125125
request: ApiExecutorRequest,
126126
dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)],
127127
) -> ApiExecutorResponse:
128-
query_executor = garf.executors.setup_executor(
128+
query_executor = setup.setup_executor(
129129
request.source, request.context.fetcher_parameters
130130
)
131131
result = query_executor.execute(request.query, request.title, request.context)
@@ -137,7 +137,7 @@ def execute_batch(
137137
request: ApiExecutorRequest,
138138
dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)],
139139
) -> ApiExecutorResponse:
140-
query_executor = garf.executors.setup_executor(
140+
query_executor = setup.setup_executor(
141141
request.source, request.context.fetcher_parameters
142142
)
143143
reader_client = reader.FileReader()
@@ -146,6 +146,18 @@ def execute_batch(
146146
return ApiExecutorResponse(results=results)
147147

148148

149+
@app.post('/api/execute:workflow')
150+
def execute_workflow(
151+
workflow_file: str,
152+
dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)],
153+
enable_cache: bool = False,
154+
cache_ttl_seconds: int = 3600,
155+
) -> list[str]:
156+
return workflow_runner.WorkflowRunner.from_file(workflow_file).run(
157+
enable_cache=enable_cache, cache_ttl_seconds=cache_ttl_seconds
158+
)
159+
160+
149161
@typer_app.command()
150162
def main(
151163
port: Annotated[int, typer.Option(help='Port to start the server')] = 8000,
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Bootstraps executor based on provided parameters."""
15+
16+
from __future__ import annotations
17+
18+
import importlib
19+
20+
from garf.executors import executor, fetchers
21+
from garf.executors.api_executor import ApiQueryExecutor
22+
from garf.executors.telemetry import tracer
23+
24+
25+
@tracer.start_as_current_span('setup_executor')
26+
def setup_executor(
27+
source: str,
28+
fetcher_parameters: dict[str, str | int | bool],
29+
enable_cache: bool = False,
30+
cache_ttl_seconds: int = 3600,
31+
) -> type[executor.Executor]:
32+
"""Initializes executors based on a source and parameters."""
33+
if source == 'bq':
34+
bq_executor = importlib.import_module('garf.executors.bq_executor')
35+
query_executor = bq_executor.BigQueryExecutor(**fetcher_parameters)
36+
elif source == 'sqldb':
37+
sql_executor = importlib.import_module('garf.executors.sql_executor')
38+
query_executor = (
39+
sql_executor.SqlAlchemyQueryExecutor.from_connection_string(
40+
fetcher_parameters.get('connection_string')
41+
)
42+
)
43+
else:
44+
concrete_api_fetcher = fetchers.get_report_fetcher(source)
45+
query_executor = ApiQueryExecutor(
46+
fetcher=concrete_api_fetcher(
47+
**fetcher_parameters,
48+
enable_cache=enable_cache,
49+
cache_ttl_seconds=cache_ttl_seconds,
50+
)
51+
)
52+
return query_executor
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Runs garf workflow."""
15+
16+
from __future__ import annotations
17+
18+
import logging
19+
import pathlib
20+
import re
21+
from typing import Final
22+
23+
from garf.executors import exceptions, setup, workflow
24+
from garf.executors.telemetry import tracer
25+
from garf.io import reader
26+
27+
logger = logging.getLogger(__name__)
28+
29+
_REMOTE_FILES_PATTERN: Final[str] = (
30+
'^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)'
31+
)
32+
33+
34+
class WorkflowRunner:
35+
"""Runs garf workflow.
36+
37+
Attributes:
38+
workflow: Workflow to execute.
39+
wf_parent: Optional location of a workflow file.
40+
parallel_threshold: Max allowed parallelism for the queries in the workflow.
41+
"""
42+
43+
def __init__(
44+
self,
45+
execution_workflow: workflow.Workflow,
46+
wf_parent: pathlib.Path | str,
47+
parallel_threshold: int = 10,
48+
) -> None:
49+
"""Initializes WorkflowRunner."""
50+
self.workflow = execution_workflow
51+
self.wf_parent = wf_parent
52+
self.parallel_threshold = parallel_threshold
53+
54+
@classmethod
55+
def from_file(cls, workflow_file: str | pathlib.Path) -> WorkflowRunner:
56+
"""Initialized Workflow runner from a local or remote file."""
57+
if isinstance(workflow_file, str):
58+
workflow_file = pathlib.Path(workflow_file)
59+
execution_workflow = workflow.Workflow.from_file(workflow_file)
60+
return cls(
61+
execution_workflow=execution_workflow, wf_parent=workflow_file.parent
62+
)
63+
64+
def run(
65+
self, enable_cache: bool = False, cache_ttl_seconds: int = 3600
66+
) -> list[str]:
67+
reader_client = reader.create_reader('file')
68+
execution_results = []
69+
for i, step in enumerate(self.workflow.steps, 1):
70+
step_name = f'{i}-{step.fetcher}'
71+
if step.alias:
72+
step_name = f'{step_name}-{step.alias}'
73+
with tracer.start_as_current_span(step_name):
74+
query_executor = setup.setup_executor(
75+
source=step.fetcher,
76+
fetcher_parameters=step.fetcher_parameters,
77+
enable_cache=enable_cache,
78+
cache_ttl_seconds=cache_ttl_seconds,
79+
)
80+
batch = {}
81+
if not (queries := step.queries):
82+
logger.error('Please provide one or more queries to run')
83+
raise exceptions.GarfExecutorError(
84+
'Please provide one or more queries to run'
85+
)
86+
for query in queries:
87+
if isinstance(query, workflow.QueryPath):
88+
query_path = query.full_path
89+
if re.match(_REMOTE_FILES_PATTERN, query_path):
90+
batch[query.path] = reader_client.read(query_path)
91+
else:
92+
if not query.prefix:
93+
query_path = self.wf_parent / pathlib.Path(query.path)
94+
if not query_path.exists():
95+
raise workflow.GarfWorkflowError(
96+
f'Query: {query_path} not found'
97+
)
98+
batch[query.path] = reader_client.read(query_path)
99+
elif isinstance(query, workflow.QueryFolder):
100+
query_path = self.wf_parent / pathlib.Path(query.folder)
101+
if not query_path.exists():
102+
raise workflow.GarfWorkflowError(
103+
f'Folder: {query_path} not found'
104+
)
105+
for p in query_path.rglob('*'):
106+
if p.suffix == '.sql':
107+
batch[p.stem] = reader_client.read(p)
108+
else:
109+
batch[query.query.title] = query.query.text
110+
query_executor.execute_batch(
111+
batch, step.context, self.parallel_threshold
112+
)
113+
execution_results.append(step_name)
114+
return execution_results

libs/executors/tests/end-to-end/test_grpc_server.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# limitations under the License.
1414
import pathlib
1515

16-
import grpc
1716
import pytest
1817
from garf.executors import garf_pb2 as pb
1918
from garf.executors import garf_pb2_grpc

0 commit comments

Comments
 (0)