Skip to content

Commit 7895426

Browse files
[executors] feat: add execute_batch method for executing queries in parallel
* Add common Executor class with execute_batch method * Inherit Api, Bq and SqlAlchemy executors from it * Replace parallel execution in CLI and server with execute_batch call
1 parent cc43495 commit 7895426

File tree

7 files changed

+81
-43
lines changed

7 files changed

+81
-43
lines changed

libs/garf_executors/garf_executors/api_executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import logging
2424

2525
from garf_core import report_fetcher
26-
from garf_executors import exceptions, execution_context
26+
from garf_executors import exceptions, execution_context, executor
2727

2828
logger = logging.getLogger(__name__)
2929

@@ -34,7 +34,7 @@ class ApiExecutionContext(execution_context.ExecutionContext):
3434
writer: str = 'console'
3535

3636

37-
class ApiQueryExecutor:
37+
class ApiQueryExecutor(executor.Executor):
3838
"""Gets data from API and writes them to local/remote storage.
3939
4040
Attributes:

libs/garf_executors/garf_executors/bq_executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from google.cloud import exceptions as google_cloud_exceptions
2929

3030
from garf_core import query_editor, report
31-
from garf_executors import exceptions, execution_context
31+
from garf_executors import exceptions, execution_context, executor
3232

3333
logger = logging.getLogger(__name__)
3434

@@ -37,7 +37,7 @@ class BigQueryExecutorError(exceptions.GarfExecutorError):
3737
"""Error when BigQueryExecutor fails to run query."""
3838

3939

40-
class BigQueryExecutor(query_editor.TemplateProcessorMixin):
40+
class BigQueryExecutor(executor.Executor, query_editor.TemplateProcessorMixin):
4141
"""Handles query execution in BigQuery.
4242
4343
Attributes:

libs/garf_executors/garf_executors/entrypoints/cli.py

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@
2121

2222
import argparse
2323
import sys
24-
from concurrent import futures
2524

2625
import garf_executors
27-
from garf_executors import bq_executor, exceptions, sql_executor
26+
from garf_executors import exceptions
2827
from garf_executors.entrypoints import utils
2928
from garf_io import reader
3029

@@ -86,18 +85,8 @@ def main():
8685
)
8786
if args.parallel_queries:
8887
logger.info('Running queries in parallel')
89-
with futures.ThreadPoolExecutor(args.parallel_threshold) as executor:
90-
future_to_query = {
91-
executor.submit(
92-
query_executor.execute,
93-
reader_client.read(query),
94-
query,
95-
context,
96-
): query
97-
for query in args.query
98-
}
99-
for future in futures.as_completed(future_to_query):
100-
future.result()
88+
batch = {query: reader_client.read(query) for query in args.query}
89+
query_executor.execute_batch(batch, context, args.parallel_queries)
10190
else:
10291
logger.info('Running queries sequentially')
10392
for query in args.query:

libs/garf_executors/garf_executors/entrypoints/server.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,12 @@
1414

1515
"""FastAPI endpoint for executing queries."""
1616

17-
from concurrent import futures
18-
1917
import fastapi
2018
import pydantic
2119
import uvicorn
2220

2321
import garf_executors
2422
from garf_executors import exceptions
25-
from garf_executors.entrypoints import utils
2623
from garf_io import reader
2724

2825

@@ -85,20 +82,9 @@ async def execute_batch(request: ApiExecutorRequest) -> ApiExecutorResponse:
8582
query_executor = garf_executors.setup_executor(
8683
request.source, request.context.fetcher_parameters
8784
)
88-
file_reader = reader.FileReader()
89-
results = []
90-
with futures.ThreadPoolExecutor() as executor:
91-
future_to_query = {
92-
executor.submit(
93-
query_executor.execute,
94-
file_reader.read(query),
95-
query,
96-
request.context,
97-
): query
98-
for query in request.query_path
99-
}
100-
for future in futures.as_completed(future_to_query):
101-
results.append(future.result())
85+
reader_client = reader.FileReader()
86+
batch = {query: reader_client.read(query) for query in request.query_path}
87+
results = query_executor.execute_batch(batch, request.context)
10288
return ApiExecutorResponse(results=results)
10389

10490

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Defines common functionality between executors."""
16+
17+
from concurrent import futures
18+
19+
from garf_executors import execution_context
20+
21+
22+
class Executor:
23+
"""Defines common functionality between executors."""
24+
25+
def execute_batch(
26+
self,
27+
batch: dict[str, str],
28+
context: execution_context.ExecutionContext,
29+
parallel_threshold: int = 10,
30+
) -> list[str]:
31+
"""Executes batch of queries for a common context.
32+
33+
Args:
34+
batch: Mapping between query_title and its text.
35+
context: Execution context.
36+
parallel_threshold: Number of queries to execute in parallel.
37+
38+
Returns:
39+
Results of execution.
40+
"""
41+
results = []
42+
with futures.ThreadPoolExecutor(max_workers=parallel_threshold) as executor:
43+
future_to_query = {
44+
executor.submit(
45+
self.execute,
46+
query,
47+
title,
48+
context,
49+
): query
50+
for title, query in batch.items()
51+
}
52+
for future in futures.as_completed(future_to_query):
53+
results.append(future.result())
54+
return results

libs/garf_executors/garf_executors/sql_executor.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import pandas as pd
3030

3131
from garf_core import query_editor, report
32-
from garf_executors import exceptions, execution_context
32+
from garf_executors import exceptions, execution_context, executor
3333

3434
logger = logging.getLogger(__name__)
3535

@@ -38,7 +38,9 @@ class SqlAlchemyQueryExecutorError(exceptions.GarfExecutorError):
3838
"""Error when SqlAlchemyQueryExecutor fails to run query."""
3939

4040

41-
class SqlAlchemyQueryExecutor(query_editor.TemplateProcessorMixin):
41+
class SqlAlchemyQueryExecutor(
42+
executor.Executor, query_editor.TemplateProcessorMixin
43+
):
4244
"""Handles query execution via SqlAlchemy.
4345
4446
Attributes:
@@ -57,6 +59,10 @@ def __init__(self, engine: sqlalchemy.engine.base.Engine) -> None:
5759
def from_connection_string(
5860
cls, connection_string: str
5961
) -> SqlAlchemyQueryExecutor:
62+
"""Creates executor from SqlAlchemy connection string.
63+
64+
https://docs.sqlalchemy.org/en/20/core/engines.html
65+
"""
6066
engine = sqlalchemy.create_engine(connection_string)
6167
return cls(engine)
6268

libs/garf_executors/tests/end-to-end/test_server.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,18 @@ def test_fake_source_from_query_path(self, tmp_path):
7575
assert response.json() == expected_output
7676

7777
def test_batch_fake_source_from_query_path(self, tmp_path):
78-
query_path = tmp_path / 'query.sql'
79-
with pathlib.Path.open(query_path, 'w', encoding='utf-8') as f:
78+
query_path1 = tmp_path / 'query1.sql'
79+
with pathlib.Path.open(query_path1, 'w', encoding='utf-8') as f:
80+
f.write(self.query)
81+
query_path2 = tmp_path / 'query2.sql'
82+
with pathlib.Path.open(query_path2, 'w', encoding='utf-8') as f:
8083
f.write(self.query)
8184
fake_data = _SCRIPT_PATH / 'test.json'
8285
request = {
8386
'source': 'fake',
8487
'query_path': [
85-
str(query_path),
86-
str(query_path),
88+
str(query_path1),
89+
str(query_path2),
8790
],
8891
'context': {
8992
'fetcher_parameters': {
@@ -97,8 +100,8 @@ def test_batch_fake_source_from_query_path(self, tmp_path):
97100
assert response.status_code == fastapi.status.HTTP_200_OK
98101
expected_output = {
99102
'results': [
100-
f'[CSV] - at {tmp_path}/query.csv',
101-
f'[CSV] - at {tmp_path}/query.csv',
103+
f'[CSV] - at {tmp_path}/query1.csv',
104+
f'[CSV] - at {tmp_path}/query2.csv',
102105
]
103106
}
104107
assert response.json() == expected_output

0 commit comments

Comments
 (0)