Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/executors/garf_executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ def setup_executor(
'ApiExecutionContext',
]

__version__ = '0.2.1'
__version__ = '0.2.2'
24 changes: 18 additions & 6 deletions libs/executors/garf_executors/api_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,22 @@ def __init__(self, fetcher: report_fetcher.ApiReportFetcher) -> None:

@classmethod
def from_fetcher_alias(
cls, source: str, fetcher_parameters: dict[str, str] | None = None
cls,
source: str,
fetcher_parameters: dict[str, str] | None = None,
enable_cache: bool = False,
cache_ttl_seconds: int = 3600,
) -> ApiQueryExecutor:
if not fetcher_parameters:
fetcher_parameters = {}
concrete_api_fetcher = fetchers.get_report_fetcher(source)
return ApiQueryExecutor(fetcher=concrete_api_fetcher(**fetcher_parameters))
return ApiQueryExecutor(
fetcher=concrete_api_fetcher(
**fetcher_parameters,
enable_cache=enable_cache,
cache_ttl_seconds=cache_ttl_seconds,
)
)

@tracer.start_as_current_span('api.execute')
def execute(
Expand All @@ -86,11 +96,13 @@ def execute(
GarfExecutorError: When failed to execute query.
"""
span = trace.get_current_span()
span.set_attribute('fetcher', self.fetcher.__class__.__name__)
span.set_attribute('api_client', self.fetcher.api_client.__class__.__name__)
span.set_attribute('fetcher.class', self.fetcher.__class__.__name__)
span.set_attribute(
'api.client.class', self.fetcher.api_client.__class__.__name__
)
try:
span.set_attribute('query_title', title)
span.set_attribute('query_text', query)
span.set_attribute('query.title', title)
span.set_attribute('query.text', query)
logger.debug('starting query %s', query)
results = self.fetcher.fetch(
query_specification=query,
Expand Down
12 changes: 11 additions & 1 deletion libs/executors/garf_executors/entrypoints/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import fastapi
import pydantic
import typer
import uvicorn
from garf_io import reader
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from typing_extensions import Annotated

import garf_executors
from garf_executors import exceptions
Expand All @@ -29,6 +31,7 @@
initialize_tracer()
app = fastapi.FastAPI()
FastAPIInstrumentor.instrument_app(app)
typer_app = typer.Typer()


class ApiExecutorRequest(pydantic.BaseModel):
Expand Down Expand Up @@ -104,5 +107,12 @@ def execute_batch(request: ApiExecutorRequest) -> ApiExecutorResponse:
return ApiExecutorResponse(results=results)


@typer_app.command()
def main(
port: Annotated[int, typer.Option(help='Port to start the server')] = 8000,
):
uvicorn.run(app, port=port)


if __name__ == '__main__':
uvicorn.run(app)
typer_app()
3 changes: 2 additions & 1 deletion libs/executors/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"pydantic",
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry-exporter-otlp",
]
authors = [
{name = "Google Inc. (gTech gPS CSE team)", email = "no-reply@google.com"},
Expand Down Expand Up @@ -51,7 +52,7 @@ sql=[
server=[
"fastapi[standard]",
"opentelemetry-instrumentation-fastapi",
"opentelemetry-exporter-otlp",
"typer",
]
all = [
"garf-executors[bq,sql,server]"
Expand Down