Skip to content

Commit 992997b

Browse files
[executors] feat: add support for batch execution of queries
1 parent 41bf70c commit 992997b

File tree

2 files changed

+63
-2
lines changed

2 files changed

+63
-2
lines changed

libs/garf_executors/garf_executors/entrypoints/server.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
"""FastAPI endpoint for executing queries."""
1616

17+
from concurrent import futures
18+
1719
import fastapi
1820
import pydantic
1921
import uvicorn
@@ -37,7 +39,7 @@ class ApiExecutorRequest(pydantic.BaseModel):
3739
source: str
3840
title: str | None = None
3941
query: str | None = None
40-
query_path: str | None = None
42+
query_path: str | list[str] | None = None
4143
context: garf_executors.api_executor.ApiExecutionContext
4244

4345
@pydantic.model_validator(mode='after')
@@ -49,7 +51,7 @@ def check_query_specified(self):
4951
return self
5052

5153
def model_post_init(self, __context__) -> None:
52-
if self.query_path:
54+
if self.query_path and isinstance(self.query_path, str):
5355
self.query = reader.FileReader().read(self.query_path)
5456
if not self.title:
5557
self.title = str(self.query_path)
@@ -76,6 +78,36 @@ async def execute(request: ApiExecutorRequest) -> dict[str, str]:
7678
)
7779

7880

81+
@router.post('/execute:batch')
82+
async def execute_batch(request: ApiExecutorRequest) -> dict[str, str]:
83+
if not (concrete_api_fetcher := garf_executors.FETCHERS.get(request.source)):
84+
raise exceptions.GarfExecutorError(
85+
f'Source {request.source} is not available.'
86+
)
87+
88+
query_executor = garf_executors.api_executor.ApiQueryExecutor(
89+
concrete_api_fetcher(**request.context.fetcher_parameters)
90+
)
91+
file_reader = reader.FileReader()
92+
results = []
93+
with futures.ThreadPoolExecutor() as executor:
94+
future_to_query = {
95+
executor.submit(
96+
query_executor.execute,
97+
file_reader.read(query),
98+
query,
99+
request.context,
100+
): query
101+
for query in request.query_path
102+
}
103+
for future in futures.as_completed(future_to_query):
104+
results.append(future.result())
105+
106+
return fastapi.responses.JSONResponse(
107+
content=fastapi.encoders.jsonable_encoder({'result': results})
108+
)
109+
110+
79111
if __name__ == '__main__':
80112
app = fastapi.FastAPI()
81113
app.include_router(router)

libs/garf_executors/tests/end-to-end/test_server.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,32 @@ def test_fake_source_from_query_path(self, tmp_path):
7373
assert response.status_code == fastapi.status.HTTP_200_OK
7474
expected_output = {'result': f'[CSV] - at {tmp_path}/query.csv'}
7575
assert response.json() == expected_output
76+
77+
def test_batch_fake_source_from_query_path(self, tmp_path):
78+
query_path = tmp_path / 'query.sql'
79+
with pathlib.Path.open(query_path, 'w', encoding='utf-8') as f:
80+
f.write(self.query)
81+
fake_data = _SCRIPT_PATH / 'test.json'
82+
request = {
83+
'source': 'fake',
84+
'query_path': [
85+
str(query_path),
86+
str(query_path),
87+
],
88+
'context': {
89+
'fetcher_parameters': {
90+
'data_location': str(fake_data),
91+
},
92+
'writer': 'csv',
93+
'writer_parameters': {'destination_folder': str(tmp_path)},
94+
},
95+
}
96+
response = client.post('/api/execute:batch', json=request)
97+
assert response.status_code == fastapi.status.HTTP_200_OK
98+
expected_output = {
99+
'result': [
100+
f'[CSV] - at {tmp_path}/query.csv',
101+
f'[CSV] - at {tmp_path}/query.csv',
102+
]
103+
}
104+
assert response.json() == expected_output

0 commit comments

Comments
 (0)