Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/usage/executors.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ query_executor.execute(
context=context
)
```
!!!note
You can use `aexecute` method to run execute the query asynchronously
```python
await query_executor.aexecute(
query=query_text,
title="query",
context=context
)
```

///

/// tab | server
Expand Down
6 changes: 6 additions & 0 deletions docs/usage/fetcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ query = 'SELECT metric FROM resource'
report = report_fetcher.fetch(query)
```

!!!note
You can use `afetch` method to run execute the query asynchronously
```python
report = await report_fetcher.afetch(query)
```

`fetch` method returns `GarfReport` which can be [processed](reports.md) in Python
or [written](writers.md) to local / remote storage.

Expand Down
2 changes: 1 addition & 1 deletion libs/core/garf_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
'ApiReportFetcher',
]

__version__ = '0.4.0'
__version__ = '0.4.1'
5 changes: 4 additions & 1 deletion libs/core/garf_core/report_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from __future__ import annotations

import asyncio
import logging
from typing import Callable

Expand Down Expand Up @@ -113,7 +114,9 @@ async def afetch(
Returns:
GarfReport with results of query execution.
"""
return self.fetch(query_specification, args, **kwargs)
return await asyncio.to_thread(
self.fetch, query_specification, args, **kwargs
)

@tracer.start_as_current_span('fetch')
def fetch(
Expand Down
2 changes: 1 addition & 1 deletion libs/executors/garf_executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ def setup_executor(
'ApiExecutionContext',
]

__version__ = '0.1.1'
__version__ = '0.1.2'
3 changes: 2 additions & 1 deletion libs/executors/garf_executors/api_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from __future__ import annotations

import asyncio
import logging

from garf_core import report_fetcher
Expand Down Expand Up @@ -77,7 +78,7 @@ async def aexecute(
Returns:
Result of writing the report.
"""
return await self.execute(query, context, title, context)
return await asyncio.to_thread(self.execute, query, title, context)

@tracer.start_as_current_span('api.execute')
def execute(
Expand Down
6 changes: 3 additions & 3 deletions libs/executors/garf_executors/entrypoints/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def main():
parser.add_argument('--dry-run', dest='dry_run', action='store_true')
parser.add_argument('-v', '--version', dest='version', action='store_true')
parser.add_argument(
'--parallel-threshold', dest='parallel_threshold', default=None, type=int
'--parallel-threshold', dest='parallel_threshold', default=10, type=int
)
parser.set_defaults(parallel_queries=True)
parser.set_defaults(dry_run=False)
Expand Down Expand Up @@ -82,7 +82,7 @@ def main():
args.source, context.fetcher_parameters
)
batch = {query: reader_client.read(query) for query in args.query}
query_executor.execute_batch(batch, context, args.parallel_queries)
query_executor.execute_batch(batch, context, args.parallel_threshold)
else:
extra_parameters = utils.ParamsParser(
['source', args.output, 'macro', 'template']
Expand All @@ -104,7 +104,7 @@ def main():
if args.parallel_queries:
logger.info('Running queries in parallel')
batch = {query: reader_client.read(query) for query in args.query}
query_executor.execute_batch(batch, context, args.parallel_queries)
query_executor.execute_batch(batch, context, args.parallel_threshold)
else:
logger.info('Running queries sequentially')
for query in args.query:
Expand Down
2 changes: 1 addition & 1 deletion libs/executors/garf_executors/entrypoints/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def execute(request: ApiExecutorRequest) -> ApiExecutorResponse:


@app.post('/api/execute:batch')
async def execute_batch(request: ApiExecutorRequest) -> ApiExecutorResponse:
def execute_batch(request: ApiExecutorRequest) -> ApiExecutorResponse:
query_executor = garf_executors.setup_executor(
request.source, request.context.fetcher_parameters
)
Expand Down
43 changes: 28 additions & 15 deletions libs/executors/garf_executors/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

"""Defines common functionality between executors."""

from concurrent import futures
import asyncio

from opentelemetry import trace

from garf_executors import execution_context
from garf_executors.telemetry import tracer
Expand All @@ -40,17 +42,28 @@ def execute_batch(
Returns:
Results of execution.
"""
results = []
with futures.ThreadPoolExecutor(max_workers=parallel_threshold) as executor:
future_to_query = {
executor.submit(
self.execute,
query,
title,
context,
): query
for title, query in batch.items()
}
for future in futures.as_completed(future_to_query):
results.append(future.result())
return results
span = trace.get_current_span()
span.set_attribute('api.parallel_threshold', parallel_threshold)
return asyncio.run(
self._run(
batch=batch, context=context, parallel_threshold=parallel_threshold
)
)

async def _run(
self,
batch: dict[str, str],
context: execution_context.ExecutionContext,
parallel_threshold: int,
):
semaphore = asyncio.Semaphore(value=parallel_threshold)

async def run_with_semaphore(fn):
async with semaphore:
return await fn

tasks = [
self.aexecute(query=query, title=title, context=context)
for title, query in batch.items()
]
return await asyncio.gather(*(run_with_semaphore(task) for task in tasks))