Skip to content

Commit 2447fd9

Browse files
[executors] feat: add server endpoint
* Refactor ApiQueryExecutor: * Add ApiExecutionContext class to contain inputs for query execution * Use context parameter in execute and aexecute methods * Update CLI entrypoint to work with new context class and new signature for execute method * Add new FastAPI server entrypoint to run server request to ApiQueryExecutor
1 parent 9e4cd1b commit 2447fd9

File tree

6 files changed

+170
-95
lines changed

6 files changed

+170
-95
lines changed

libs/garf_executors/garf_executors/api_executor.py

Lines changed: 76 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,47 @@
2222

2323
import logging
2424

25-
from garf_core import report_fetcher
26-
from garf_io.writers import abs_writer, console_writer
25+
import pydantic
26+
27+
from garf_core import query_editor, report_fetcher
28+
from garf_executors import exceptions
29+
from garf_io import writer
30+
from garf_io.writers import abs_writer
2731

2832
logger = logging.getLogger(__name__)
2933

3034

35+
class ApiExecutionContext(pydantic.BaseModel):
36+
"""Common context for executing one or more queries.
37+
38+
Attributes:
39+
query_parameters: Parameters to dynamically change query text.
40+
fetcher_parameters: Parameters to specify fetching setup.
41+
writer: Type of writer to use.
42+
writer_parameters: Optional parameters to setup writer.
43+
"""
44+
45+
query_parameters: query_editor.GarfQueryParameters | None = None
46+
fetcher_parameters: dict[str, str] | None = None
47+
writer: str = 'console'
48+
writer_parameters: dict[str, str] | None = None
49+
50+
def model_post_init(self, __context__) -> None:
51+
if self.fetcher_parameters is None:
52+
self.fetcher_parameters = {}
53+
if self.writer_parameters is None:
54+
self.writer_parameters = {}
55+
56+
@property
57+
def writer_client(self) -> abs_writer.AbsWriter:
58+
writer_client = writer.create_writer(self.writer, **self.writer_parameters)
59+
if self.writer == 'bq':
60+
_ = writer_client.create_or_get_dataset()
61+
if self.writer == 'sheet':
62+
writer_client.init_client()
63+
return writer_client
64+
65+
3166
class ApiQueryExecutor:
3267
"""Gets data from API and writes them to local/remote storage.
3368
@@ -36,63 +71,62 @@ class ApiQueryExecutor:
3671
"""
3772

3873
def __init__(self, fetcher: report_fetcher.ApiReportFetcher) -> None:
39-
"""Initializes QueryExecutor.
74+
"""Initializes ApiQueryExecutor.
4075
4176
Args:
4277
fetcher: Instantiated report fetcher.
4378
"""
4479
self.fetcher = fetcher
4580

4681
async def aexecute(
47-
self,
48-
query_text: str,
49-
query_name: str,
50-
writer_client: abs_writer.AbsWriter = console_writer.ConsoleWriter(),
51-
args: dict[str, str] | None = None,
52-
**kwargs: str,
82+
self, query: str, context: ApiExecutionContext, **kwargs: str
5383
) -> None:
5484
"""Reads query, extract results and stores them in a specified location.
5585
5686
Args:
57-
query_text: Text for the query.
58-
query_name: Identifier of a query.
59-
customer_ids: All accounts for which query will be executed.
60-
writer_client: Client responsible for writing data to local/remote
61-
location.
62-
args: Arguments that need to be passed to the query.
63-
optimize_performance: strategy for speeding up query execution
64-
("NONE", "PROTOBUF", "BATCH", "BATCH_PROTOBUF").
87+
query: Location of the query.
88+
context: Query execution context.
6589
"""
66-
self.execute(query_text, query_name, writer_client, args, **kwargs)
90+
self.execute(query, context, **kwargs)
6791

6892
def execute(
6993
self,
70-
query_text: str,
71-
query_name: str,
72-
writer_client: abs_writer.AbsWriter = console_writer.ConsoleWriter(),
73-
args: dict[str, str] | None = None,
74-
**kwargs: str,
94+
query: str,
95+
title: str,
96+
context: ApiExecutionContext,
7597
) -> None:
7698
"""Reads query, extract results and stores them in a specified location.
7799
78100
Args:
79-
query_text: Text for the query.
80-
query_name: Identifier of a query.
81-
writer_client: Client responsible for writing data to local/remote
82-
location.
83-
args: Arguments that need to be passed to the query.
101+
query: Location of the query.
102+
title: Name of the query.
103+
context: Query execution context.
104+
105+
Raises:
106+
GarfExecutorError: When failed to execute query.
84107
"""
85-
results = self.fetcher.fetch(
86-
query_specification=query_text, args=args, **kwargs
87-
)
88-
logger.debug(
89-
'Start writing data for query %s via %s writer',
90-
query_name,
91-
type(writer_client),
92-
)
93-
writer_client.write(results, query_name)
94-
logger.debug(
95-
'Finish writing data for query %s via %s writer',
96-
query_name,
97-
type(writer_client),
98-
)
108+
try:
109+
logger.debug('starting query %s', query)
110+
results = self.fetcher.fetch(
111+
query_specification=query,
112+
args=context.query_parameters,
113+
**context.fetcher_parameters,
114+
)
115+
writer_client = context.writer_client
116+
logger.debug(
117+
'Start writing data for query %s via %s writer',
118+
title,
119+
type(writer_client),
120+
)
121+
writer_client.write(results, title)
122+
logger.debug(
123+
'Finish writing data for query %s via %s writer',
124+
title,
125+
type(writer_client),
126+
)
127+
logger.info('%s executed successfully', title)
128+
except Exception as e:
129+
logger.error('%s generated an exception: %s', title, str(e))
130+
raise exceptions.GarfExecutorError(
131+
'%s generated an exception: %s', title, str(e)
132+
) from e

libs/garf_executors/garf_executors/entrypoints/cli.py

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,21 @@
2020
from __future__ import annotations
2121

2222
import argparse
23-
import functools
2423
import sys
2524
from concurrent import futures
2625

2726
import garf_executors
2827
from garf_executors import exceptions
2928
from garf_executors.entrypoints import utils
30-
from garf_io import reader, writer
29+
from garf_io import reader
3130

3231

3332
def main():
3433
parser = argparse.ArgumentParser()
3534
parser.add_argument('query', nargs='*')
3635
parser.add_argument('-c', '--config', dest='garf_config', default=None)
3736
parser.add_argument('--source', dest='source', default=None)
38-
parser.add_argument('--output', dest='output', default=None)
37+
parser.add_argument('--output', dest='output', default='console')
3938
parser.add_argument('--input', dest='input', default='file')
4039
parser.add_argument('--log', '--loglevel', dest='loglevel', default='info')
4140
parser.add_argument('--logger', dest='logger', default='local')
@@ -72,7 +71,6 @@ def main():
7271
raise exceptions.GarfExecutorError(
7372
'Please provide one or more queries to run'
7473
)
75-
7674
config = utils.ConfigBuilder('garf').build(vars(args), kwargs)
7775
logger.debug('config: %s', config)
7876

@@ -82,17 +80,17 @@ def main():
8280

8381
extra_parameters = utils.ParamsParser(['source']).parse(kwargs)
8482
source_parameters = extra_parameters.get('source', {})
83+
reader_client = reader.create_reader(args.input)
84+
85+
context = garf_executors.api_executor.ApiExecutionContext(
86+
query_parameters=config.params,
87+
writer=args.output,
88+
writer_parameters=config.writer_params,
89+
fetcher_parameters=source_parameters,
90+
)
8591
query_executor = garf_executors.api_executor.ApiQueryExecutor(
8692
concrete_api_fetcher(**source_parameters)
8793
)
88-
reader_client = reader.create_reader(args.input)
89-
90-
writer_client = writer.create_writer(config.output, **config.writer_params)
91-
if config.output == 'bq':
92-
_ = writer_client.create_or_get_dataset()
93-
if config.output == 'sheet':
94-
writer_client.init_client()
95-
9694
if args.parallel_queries:
9795
logger.info('Running queries in parallel')
9896
with futures.ThreadPoolExecutor(args.parallel_threshold) as executor:
@@ -101,27 +99,16 @@ def main():
10199
query_executor.execute,
102100
reader_client.read(query),
103101
query,
104-
writer_client,
105-
config.params,
106-
**source_parameters,
102+
context,
107103
): query
108104
for query in args.query
109105
}
110106
for future in futures.as_completed(future_to_query):
111-
query = future_to_query[future]
112-
utils.garf_runner(query, future.result, logger)
107+
future.result()
113108
else:
114109
logger.info('Running queries sequentially')
115110
for query in args.query:
116-
callback = functools.partial(
117-
query_executor.execute,
118-
reader_client.read(query),
119-
query,
120-
writer_client,
121-
config.params,
122-
**source_parameters,
123-
)
124-
utils.garf_runner(query, callback, logger)
111+
query_executor.execute(reader_client.read(query), query, context)
125112

126113

127114
if __name__ == '__main__':
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""FastAPI endpoint for executing queries."""
16+
17+
import fastapi
18+
import pydantic
19+
import uvicorn
20+
21+
import garf_executors
22+
from garf_executors import exceptions
23+
24+
25+
class ApiExecutorRequest(pydantic.BaseModel):
26+
"""Request for executing a query.
27+
28+
Attributes:
29+
source: Type of API to interact with.
30+
query: Query to execute.
31+
title: Name of the query used as an output for writing.
32+
context: Execution context.
33+
"""
34+
35+
source: str
36+
query: str
37+
title: str
38+
context: garf_executors.api_executor.ApiExecutionContext
39+
40+
41+
router = fastapi.APIRouter(prefix='/api')
42+
43+
44+
@router.post('/execute')
45+
async def execute(request: ApiExecutorRequest) -> dict[str, str]:
46+
if not (concrete_api_fetcher := garf_executors.FETCHERS.get(request.source)):
47+
raise exceptions.GarfExecutorError(
48+
f'Source {request.source} is not available.'
49+
)
50+
51+
query_executor = garf_executors.api_executor.ApiQueryExecutor(
52+
concrete_api_fetcher(**request.context.fetcher_parameters)
53+
)
54+
55+
query_executor.execute(request.query, request.title, request.context)
56+
57+
return fastapi.responses.JSONResponse(
58+
content=fastapi.encoders.jsonable_encoder({'result': 'success'})
59+
)
60+
61+
62+
if __name__ == '__main__':
63+
app = fastapi.FastAPI()
64+
app.include_router(router)
65+
uvicorn.run(app)

libs/garf_executors/garf_executors/entrypoints/utils.py

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020
import logging
2121
import os
2222
import sys
23-
import traceback
2423
from collections.abc import MutableSequence, Sequence
25-
from typing import Any, Callable, TypedDict
24+
from typing import Any, TypedDict
2625

2726
import smart_open
2827
import yaml
@@ -429,25 +428,6 @@ def _remove_empty_values(dict_object: dict[str, Any]) -> dict[str, Any]:
429428
return dict_object
430429

431430

432-
def garf_runner(query: str, callback: Callable, logger) -> None:
433-
try:
434-
logger.debug('starting query %s', query)
435-
callback()
436-
logger.info('%s executed successfully', query)
437-
except Exception as e:
438-
traceback.print_tb(e.__traceback__)
439-
logger.error('%s generated an exception: %s', query, str(e))
440-
441-
442-
def postprocessor_runner(query: str, callback: Callable, logger) -> None:
443-
try:
444-
logger.debug('starting query %s', query)
445-
callback()
446-
logger.info('%s executed successfully', query)
447-
except Exception as e:
448-
logger.error('%s generated an exception: %s', query, str(e))
449-
450-
451431
def init_logging(
452432
loglevel: str = 'INFO', logger_type: str = 'local', name: str = __name__
453433
) -> logging.Logger:

libs/garf_executors/pyproject.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ dependencies = [
88
"garf-core",
99
"garf-io",
1010
"pyyaml",
11+
"pydantic",
1112
]
1213
authors = [
1314
{name = "Google Inc. (gTech gPS CSE team)", email = "no-reply@google.com"},
15+
{name = "Andrei Markin", email = "andrey.markin.ppc@gmail.com"},
1416
]
1517
requires-python = ">=3.8"
1618
description = "Executes queries against API and writes data to local/remote storage."
@@ -43,8 +45,11 @@ sql=[
4345
"garf-io[sqlalchemy]",
4446
"pandas",
4547
]
48+
server=[
49+
"fastapi[standard]",
50+
]
4651
all = [
47-
"garf-executors[bq,sql]"
52+
"garf-executors[bq,sql,server]"
4853
]
4954
[project.scripts]
5055
garf="garf_executors.entrypoints.cli:main"

0 commit comments

Comments
 (0)