Skip to content

Commit 687c426

Browse files
[executors]: feat: add support for async execution
1 parent a178433 commit 687c426

File tree

9 files changed

+56
-23
lines changed

9 files changed

+56
-23
lines changed

docs/usage/executors.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,16 @@ query_executor.execute(
7474
context=context
7575
)
7676
```
77+
!!!note
78+
You can use `aexecute` method to run execute the query asynchronously
79+
```python
80+
await query_executor.aexecute(
81+
query=query_text,
82+
title="query",
83+
context=context
84+
)
85+
```
86+
7787
///
7888

7989
/// tab | server

docs/usage/fetcher.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ query = 'SELECT metric FROM resource'
7575
report = report_fetcher.fetch(query)
7676
```
7777

78+
!!!note
79+
You can use `afetch` method to run execute the query asynchronously
80+
```python
81+
report = await report_fetcher.afetch(query)
82+
```
83+
7884
`fetch` method returns `GarfReport` which can be [processed](reports.md) in Python
7985
or [written](writers.md) to local / remote storage.
8086

libs/core/garf_core/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@
2626
'ApiReportFetcher',
2727
]
2828

29-
__version__ = '0.4.0'
29+
__version__ = '0.4.1'

libs/core/garf_core/report_fetcher.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from __future__ import annotations
2323

24+
import asyncio
2425
import logging
2526
from typing import Callable
2627

@@ -113,7 +114,9 @@ async def afetch(
113114
Returns:
114115
GarfReport with results of query execution.
115116
"""
116-
return self.fetch(query_specification, args, **kwargs)
117+
return await asyncio.to_thread(
118+
self.fetch, query_specification, args, **kwargs
119+
)
117120

118121
@tracer.start_as_current_span('fetch')
119122
def fetch(

libs/executors/garf_executors/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,4 @@ def setup_executor(
5050
'ApiExecutionContext',
5151
]
5252

53-
__version__ = '0.1.1'
53+
__version__ = '0.1.2'

libs/executors/garf_executors/api_executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from __future__ import annotations
2222

23+
import asyncio
2324
import logging
2425

2526
from garf_core import report_fetcher
@@ -77,7 +78,7 @@ async def aexecute(
7778
Returns:
7879
Result of writing the report.
7980
"""
80-
return await self.execute(query, context, title, context)
81+
return await asyncio.to_thread(self.execute, query, title, context)
8182

8283
@tracer.start_as_current_span('api.execute')
8384
def execute(

libs/executors/garf_executors/entrypoints/cli.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def main():
5454
parser.add_argument('--dry-run', dest='dry_run', action='store_true')
5555
parser.add_argument('-v', '--version', dest='version', action='store_true')
5656
parser.add_argument(
57-
'--parallel-threshold', dest='parallel_threshold', default=None, type=int
57+
'--parallel-threshold', dest='parallel_threshold', default=10, type=int
5858
)
5959
parser.set_defaults(parallel_queries=True)
6060
parser.set_defaults(dry_run=False)
@@ -82,7 +82,7 @@ def main():
8282
args.source, context.fetcher_parameters
8383
)
8484
batch = {query: reader_client.read(query) for query in args.query}
85-
query_executor.execute_batch(batch, context, args.parallel_queries)
85+
query_executor.execute_batch(batch, context, args.parallel_threshold)
8686
else:
8787
extra_parameters = utils.ParamsParser(
8888
['source', args.output, 'macro', 'template']
@@ -104,7 +104,7 @@ def main():
104104
if args.parallel_queries:
105105
logger.info('Running queries in parallel')
106106
batch = {query: reader_client.read(query) for query in args.query}
107-
query_executor.execute_batch(batch, context, args.parallel_queries)
107+
query_executor.execute_batch(batch, context, args.parallel_threshold)
108108
else:
109109
logger.info('Running queries sequentially')
110110
for query in args.query:

libs/executors/garf_executors/entrypoints/server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ async def execute(request: ApiExecutorRequest) -> ApiExecutorResponse:
9494

9595

9696
@app.post('/api/execute:batch')
97-
async def execute_batch(request: ApiExecutorRequest) -> ApiExecutorResponse:
97+
def execute_batch(request: ApiExecutorRequest) -> ApiExecutorResponse:
9898
query_executor = garf_executors.setup_executor(
9999
request.source, request.context.fetcher_parameters
100100
)

libs/executors/garf_executors/executor.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
"""Defines common functionality between executors."""
1616

17-
from concurrent import futures
17+
import asyncio
18+
19+
from opentelemetry import trace
1820

1921
from garf_executors import execution_context
2022
from garf_executors.telemetry import tracer
@@ -40,17 +42,28 @@ def execute_batch(
4042
Returns:
4143
Results of execution.
4244
"""
43-
results = []
44-
with futures.ThreadPoolExecutor(max_workers=parallel_threshold) as executor:
45-
future_to_query = {
46-
executor.submit(
47-
self.execute,
48-
query,
49-
title,
50-
context,
51-
): query
52-
for title, query in batch.items()
53-
}
54-
for future in futures.as_completed(future_to_query):
55-
results.append(future.result())
56-
return results
45+
span = trace.get_current_span()
46+
span.set_attribute('api.parallel_threshold', parallel_threshold)
47+
return asyncio.run(
48+
self._run(
49+
batch=batch, context=context, parallel_threshold=parallel_threshold
50+
)
51+
)
52+
53+
async def _run(
54+
self,
55+
batch: dict[str, str],
56+
context: execution_context.ExecutionContext,
57+
parallel_threshold: int,
58+
):
59+
semaphore = asyncio.Semaphore(value=parallel_threshold)
60+
61+
async def run_with_semaphore(fn):
62+
async with semaphore:
63+
return await fn
64+
65+
tasks = [
66+
self.aexecute(query=query, title=title, context=context)
67+
for title, query in batch.items()
68+
]
69+
return await asyncio.gather(*(run_with_semaphore(task) for task in tasks))

0 commit comments

Comments
 (0)