Skip to content

Commit cc43495

Browse files
[executors] feat: unify bq and sqldb executors
* Add support for bq and sqldb executors in CLI and server * Add `create_executor` package level function. * BQ and SqlAlchemy executors returns GarfReport instead of DataFrame; If writer is provided the results are written to specified writer destination, otherwise results of execution are returned back to caller. * Add ExecutionContext class for all executors that contains writer, query_, fetcher_ and writer_parameters. ApiExecutionContext contains predefined 'console' writer.
1 parent 93d97fc commit cc43495

File tree

8 files changed

+194
-102
lines changed

8 files changed

+194
-102
lines changed

libs/garf_executors/garf_executors/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,32 @@
1515

1616
from __future__ import annotations
1717

18+
from garf_executors import bq_executor, exceptions, sql_executor
1819
from garf_executors.api_executor import ApiExecutionContext, ApiQueryExecutor
1920
from garf_executors.fetchers import FETCHERS
2021

22+
23+
def setup_executor(source: str, fetcher_parameters: dict[str, str]):
24+
"""Initializes executors based on a source and parameters."""
25+
if source not in ('bq', 'sqldb') and not (
26+
concrete_api_fetcher := FETCHERS.get(source)
27+
):
28+
raise exceptions.GarfExecutorError(f'Source {source} is not available.')
29+
if source == 'bq':
30+
query_executor = bq_executor.BigQueryExecutor(**fetcher_parameters)
31+
elif source == 'sqldb':
32+
query_executor = (
33+
sql_executor.SqlAlchemyQueryExecutor.from_connection_string(
34+
fetcher_parameters.get('connection_string')
35+
)
36+
)
37+
else:
38+
query_executor = ApiQueryExecutor(
39+
concrete_api_fetcher(**fetcher_parameters)
40+
)
41+
return query_executor
42+
43+
2144
__all__ = [
2245
'FETCHERS',
2346
'ApiQueryExecutor',

libs/garf_executors/garf_executors/api_executor.py

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,45 +22,16 @@
2222

2323
import logging
2424

25-
import pydantic
26-
27-
from garf_core import query_editor, report_fetcher
28-
from garf_executors import exceptions
29-
from garf_io import writer
30-
from garf_io.writers import abs_writer
25+
from garf_core import report_fetcher
26+
from garf_executors import exceptions, execution_context
3127

3228
logger = logging.getLogger(__name__)
3329

3430

35-
class ApiExecutionContext(pydantic.BaseModel):
36-
"""Common context for executing one or more queries.
37-
38-
Attributes:
39-
query_parameters: Parameters to dynamically change query text.
40-
fetcher_parameters: Parameters to specify fetching setup.
41-
writer: Type of writer to use.
42-
writer_parameters: Optional parameters to setup writer.
43-
"""
31+
class ApiExecutionContext(execution_context.ExecutionContext):
32+
"""Common context for executing one or more queries."""
4433

45-
query_parameters: query_editor.GarfQueryParameters | None = None
46-
fetcher_parameters: dict[str, str] | None = None
4734
writer: str = 'console'
48-
writer_parameters: dict[str, str] | None = None
49-
50-
def model_post_init(self, __context__) -> None:
51-
if self.fetcher_parameters is None:
52-
self.fetcher_parameters = {}
53-
if self.writer_parameters is None:
54-
self.writer_parameters = {}
55-
56-
@property
57-
def writer_client(self) -> abs_writer.AbsWriter:
58-
writer_client = writer.create_writer(self.writer, **self.writer_parameters)
59-
if self.writer == 'bq':
60-
_ = writer_client.create_or_get_dataset()
61-
if self.writer == 'sheet':
62-
writer_client.init_client()
63-
return writer_client
6435

6536

6637
class ApiQueryExecutor:
@@ -79,15 +50,22 @@ def __init__(self, fetcher: report_fetcher.ApiReportFetcher) -> None:
7950
self.fetcher = fetcher
8051

8152
async def aexecute(
82-
self, query: str, context: ApiExecutionContext, **kwargs: str
53+
self,
54+
query: str,
55+
title: str,
56+
context: ApiExecutionContext,
8357
) -> str:
84-
"""Reads query, extract results and stores them in a specified location.
58+
"""Performs query execution asynchronously.
8559
8660
Args:
8761
query: Location of the query.
62+
title: Name of the query.
8863
context: Query execution context.
64+
65+
Returns:
66+
Result of writing the report.
8967
"""
90-
await self.execute(query, context, **kwargs)
68+
return await self.execute(query, context, title, context)
9169

9270
def execute(
9371
self,

libs/garf_executors/garf_executors/bq_executor.py

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@
2525

2626
import logging
2727

28-
import pandas as pd
2928
from google.cloud import exceptions as google_cloud_exceptions
3029

31-
from garf_core import query_editor
30+
from garf_core import query_editor, report
31+
from garf_executors import exceptions, execution_context
3232

3333
logger = logging.getLogger(__name__)
3434

3535

36-
class BigQueryExecutorError(Exception):
37-
"""Error when executor fails to run query."""
36+
class BigQueryExecutorError(exceptions.GarfExecutorError):
37+
"""Error when BigQueryExecutor fails to run query."""
3838

3939

4040
class BigQueryExecutor(query_editor.TemplateProcessorMixin):
@@ -62,26 +62,48 @@ def client(self) -> bigquery.Client:
6262
return bigquery.Client(self.project_id)
6363

6464
def execute(
65-
self, script_name: str, query_text: str, params: dict | None = None
66-
) -> pd.DataFrame:
65+
self,
66+
query: str,
67+
title: str,
68+
context: execution_context.ExecutionContext = (
69+
execution_context.ExecutionContext()
70+
),
71+
) -> report.GarfReport:
6772
"""Executes query in BigQuery.
6873
6974
Args:
70-
script_name: Script identifier.
71-
query_text: Query to be executed.
72-
params: Optional parameters to be replaced in query text.
75+
query: Location of the query.
76+
title: Name of the query.
77+
context: Query execution context.
7378
7479
Returns:
75-
DataFrame if query returns some data otherwise empty DataFrame.
80+
Report with data if query returns some data otherwise empty Report.
7681
"""
77-
query_text = self.replace_params_template(query_text, params)
82+
query_text = self.replace_params_template(query, context.query_parameters)
7883
job = self.client.query(query_text)
7984
try:
8085
result = job.result()
81-
logger.debug('%s launched successfully', script_name)
86+
logger.debug('%s launched successfully', title)
8287
if result.total_rows:
83-
return result.to_dataframe()
84-
return pd.DataFrame()
88+
results = report.GarfReport.from_pandas(result.to_dataframe())
89+
else:
90+
results = report.GarfReport()
91+
if context.writer and results:
92+
writer_client = context.writer_client
93+
logger.debug(
94+
'Start writing data for query %s via %s writer',
95+
title,
96+
type(writer_client),
97+
)
98+
writing_result = writer_client.write(results, title)
99+
logger.debug(
100+
'Finish writing data for query %s via %s writer',
101+
title,
102+
type(writer_client),
103+
)
104+
logger.info('%s executed successfully', title)
105+
return writing_result
106+
return results
85107
except google_cloud_exceptions.GoogleCloudError as e:
86108
raise BigQueryExecutorError(e) from e
87109

@@ -92,7 +114,7 @@ def create_datasets(self, macros: dict | None) -> None:
92114
are treated as dataset names.
93115
94116
Args:
95-
macros: Mapping containing data for query execution.
117+
macros: Mapping containing data for query execution.
96118
"""
97119
if macros and (datasets := extract_datasets(macros)):
98120
for dataset in datasets:

libs/garf_executors/garf_executors/entrypoints/cli.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from concurrent import futures
2525

2626
import garf_executors
27-
from garf_executors import exceptions
27+
from garf_executors import bq_executor, exceptions, sql_executor
2828
from garf_executors.entrypoints import utils
2929
from garf_io import reader
3030

@@ -56,13 +56,6 @@ def main():
5656
if args.version:
5757
print(garf_executors.__version__)
5858
sys.exit()
59-
if not (source := args.source):
60-
raise exceptions.GarfExecutorError(
61-
f'Select one of available sources: {list(garf_executors.FETCHERS.keys())}'
62-
)
63-
if not (concrete_api_fetcher := garf_executors.FETCHERS.get(source)):
64-
raise exceptions.GarfExecutorError(f'Source {source} is not available.')
65-
6659
logger = utils.init_logging(
6760
loglevel=args.loglevel.upper(), logger_type=args.logger
6861
)
@@ -88,8 +81,8 @@ def main():
8881
writer_parameters=config.writer_params,
8982
fetcher_parameters=source_parameters,
9083
)
91-
query_executor = garf_executors.api_executor.ApiQueryExecutor(
92-
concrete_api_fetcher(**source_parameters)
84+
query_executor = garf_executors.setup_executor(
85+
args.source, context.fetcher_parameters
9386
)
9487
if args.parallel_queries:
9588
logger.info('Running queries in parallel')

libs/garf_executors/garf_executors/entrypoints/server.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import garf_executors
2424
from garf_executors import exceptions
25+
from garf_executors.entrypoints import utils
2526
from garf_io import reader
2627

2728

@@ -72,28 +73,17 @@ class ApiExecutorResponse(pydantic.BaseModel):
7273

7374
@router.post('/execute')
7475
async def execute(request: ApiExecutorRequest) -> ApiExecutorResponse:
75-
if not (concrete_api_fetcher := garf_executors.FETCHERS.get(request.source)):
76-
raise exceptions.GarfExecutorError(
77-
f'Source {request.source} is not available.'
78-
)
79-
80-
query_executor = garf_executors.ApiQueryExecutor(
81-
concrete_api_fetcher(**request.context.fetcher_parameters)
76+
query_executor = garf_executors.setup_executor(
77+
request.source, request.context.fetcher_parameters
8278
)
83-
8479
result = query_executor.execute(request.query, request.title, request.context)
8580
return ApiExecutorResponse(results=[result])
8681

8782

8883
@router.post('/execute:batch')
8984
async def execute_batch(request: ApiExecutorRequest) -> ApiExecutorResponse:
90-
if not (concrete_api_fetcher := garf_executors.FETCHERS.get(request.source)):
91-
raise exceptions.GarfExecutorError(
92-
f'Source {request.source} is not available.'
93-
)
94-
95-
query_executor = garf_executors.ApiQueryExecutor(
96-
concrete_api_fetcher(**request.context.fetcher_parameters)
85+
query_executor = garf_executors.setup_executor(
86+
request.source, request.context.fetcher_parameters
9787
)
9888
file_reader = reader.FileReader()
9989
results = []
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# pylint: disable=C0330, g-bad-import-order, g-multiple-import
16+
17+
from __future__ import annotations
18+
19+
import pydantic
20+
21+
from garf_core import query_editor
22+
from garf_io import writer
23+
from garf_io.writers import abs_writer
24+
25+
26+
class ExecutionContext(pydantic.BaseModel):
27+
"""Common context for executing one or more queries.
28+
29+
Attributes:
30+
query_parameters: Parameters to dynamically change query text.
31+
fetcher_parameters: Parameters to specify fetching setup.
32+
writer: Type of writer to use.
33+
writer_parameters: Optional parameters to setup writer.
34+
"""
35+
36+
query_parameters: query_editor.GarfQueryParameters | None = pydantic.Field(
37+
default_factory=dict
38+
)
39+
fetcher_parameters: dict[str, str] | None = pydantic.Field(
40+
default_factory=dict
41+
)
42+
writer: str | None = None
43+
writer_parameters: dict[str, str] | None = pydantic.Field(
44+
default_factory=dict
45+
)
46+
47+
def model_post_init(self, __context__) -> None:
48+
if self.fetcher_parameters is None:
49+
self.fetcher_parameters = {}
50+
if self.writer_parameters is None:
51+
self.writer_parameters = {}
52+
53+
@property
54+
def writer_client(self) -> abs_writer.AbsWriter:
55+
writer_client = writer.create_writer(self.writer, **self.writer_parameters)
56+
if self.writer == 'bq':
57+
_ = writer_client.create_or_get_dataset()
58+
if self.writer == 'sheet':
59+
writer_client.init_client()
60+
return writer_client

0 commit comments

Comments
 (0)