Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 0 additions & 35 deletions libs/executors/garf/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,7 @@

from __future__ import annotations

import importlib

from garf.executors import executor, fetchers
from garf.executors.api_executor import ApiExecutionContext, ApiQueryExecutor
from garf.executors.telemetry import tracer


@tracer.start_as_current_span('setup_executor')
def setup_executor(
source: str,
fetcher_parameters: dict[str, str | int | bool],
enable_cache: bool = False,
cache_ttl_seconds: int = 3600,
) -> type[executor.Executor]:
"""Initializes executors based on a source and parameters."""
if source == 'bq':
bq_executor = importlib.import_module('garf.executors.bq_executor')
query_executor = bq_executor.BigQueryExecutor(**fetcher_parameters)
elif source == 'sqldb':
sql_executor = importlib.import_module('garf.executors.sql_executor')
query_executor = (
sql_executor.SqlAlchemyQueryExecutor.from_connection_string(
fetcher_parameters.get('connection_string')
)
)
else:
concrete_api_fetcher = fetchers.get_report_fetcher(source)
query_executor = ApiQueryExecutor(
fetcher=concrete_api_fetcher(
**fetcher_parameters,
enable_cache=enable_cache,
cache_ttl_seconds=cache_ttl_seconds,
)
)
return query_executor


__all__ = [
'ApiQueryExecutor',
Expand Down
56 changes: 7 additions & 49 deletions libs/executors/garf/executors/entrypoints/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
import argparse
import logging
import pathlib
import re
import sys

import garf.executors
from garf.executors import config, exceptions, workflow
from garf.executors import config, exceptions, setup, workflow, workflow_runner
from garf.executors.entrypoints import utils
from garf.executors.entrypoints.tracer import initialize_tracer
from garf.executors.telemetry import tracer
Expand Down Expand Up @@ -86,52 +85,11 @@ def main():
if workflow_file := args.workflow:
wf_parent = pathlib.Path.cwd() / pathlib.Path(workflow_file).parent
execution_workflow = workflow.Workflow.from_file(workflow_file)
for i, step in enumerate(execution_workflow.steps, 1):
step_span_name = f'{i}-{step.fetcher}'
if step.alias:
step_span_name = f'{step_span_name}-{step.alias}'
with tracer.start_as_current_span(step_span_name):
query_executor = garf.executors.setup_executor(
source=step.fetcher,
fetcher_parameters=step.fetcher_parameters,
enable_cache=args.enable_cache,
cache_ttl_seconds=args.cache_ttl_seconds,
)
batch = {}
if not (queries := step.queries):
logger.error('Please provide one or more queries to run')
raise exceptions.GarfExecutorError(
'Please provide one or more queries to run'
)
for query in queries:
if isinstance(query, garf.executors.workflow.QueryPath):
query_path = query.full_path
if re.match(
'^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)', query_path
):
batch[query.path] = reader_client.read(query_path)
else:
if not query.prefix:
query_path = wf_parent / pathlib.Path(query.path)
if not query_path.exists():
raise workflow.GarfWorkflowError(
f'Query: {query_path} not found'
)
batch[query.path] = reader_client.read(query_path)
elif isinstance(query, garf.executors.workflow.QueryFolder):
query_path = wf_parent / pathlib.Path(query.folder)
if not query_path.exists():
raise workflow.GarfWorkflowError(
f'Folder: {query_path} not found'
)
for p in query_path.rglob('*'):
if p.suffix == '.sql':
batch[p.stem] = reader_client.read(p)
else:
batch[query.query.title] = query.query.text
query_executor.execute_batch(
batch, step.context, args.parallel_threshold
)
workflow_runner.WorkflowRunner(
execution_workflow=execution_workflow, wf_parent=wf_parent
).run(
enable_cache=args.enable_cache, cache_ttl_seconds=args.cache_ttl_seconds
)
sys.exit()

if not args.query:
Expand Down Expand Up @@ -165,7 +123,7 @@ def main():
writer_parameters=writer_parameters,
fetcher_parameters=source_parameters,
)
query_executor = garf.executors.setup_executor(
query_executor = setup.setup_executor(
source=args.source,
fetcher_parameters=context.fetcher_parameters,
enable_cache=args.enable_cache,
Expand Down
20 changes: 9 additions & 11 deletions libs/executors/garf/executors/entrypoints/grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,38 @@
import logging
from concurrent import futures

import garf.executors
import grpc
from garf.executors import garf_pb2, garf_pb2_grpc
from garf.executors import execution_context, garf_pb2, garf_pb2_grpc, setup
from garf.executors.entrypoints.tracer import initialize_tracer
from google.protobuf.json_format import MessageToDict
from grpc_reflection.v1alpha import reflection


class GarfService(garf_pb2_grpc.GarfService):
def Execute(self, request, context):
query_executor = garf.executors.setup_executor(
query_executor = setup.setup_executor(
request.source, request.context.fetcher_parameters
)
execution_context = garf.executors.execution_context.ExecutionContext(
**MessageToDict(request.context, preserving_proto_field_name=True)
)
result = query_executor.execute(
query=request.query,
title=request.title,
context=execution_context,
context=execution_context.ExecutionContext(
**MessageToDict(request.context, preserving_proto_field_name=True)
),
)
return garf_pb2.ExecuteResponse(results=[result])

def Fetch(self, request, context):
query_executor = garf.executors.setup_executor(
query_executor = setup.setup_executor(
request.source, request.context.fetcher_parameters
)
execution_context = garf.executors.execution_context.ExecutionContext(
query_args = execution_context.ExecutionContext(
**MessageToDict(request.context, preserving_proto_field_name=True)
)
).query_parameters
result = query_executor.fetcher.fetch(
query_specification=request.query,
title=request.title,
args=execution_context.query_parameters,
args=query_args,
)
return garf_pb2.FetchResponse(
columns=result.column_names, rows=result.to_list(row_type='dict')
Expand Down
18 changes: 15 additions & 3 deletions libs/executors/garf/executors/entrypoints/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import pydantic
import typer
import uvicorn
from garf.executors import exceptions
from garf.executors import exceptions, setup, workflow_runner
from garf.executors.entrypoints import utils
from garf.executors.entrypoints.tracer import initialize_tracer
from garf.io import reader
Expand Down Expand Up @@ -125,7 +125,7 @@ def execute(
request: ApiExecutorRequest,
dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)],
) -> ApiExecutorResponse:
query_executor = garf.executors.setup_executor(
query_executor = setup.setup_executor(
request.source, request.context.fetcher_parameters
)
result = query_executor.execute(request.query, request.title, request.context)
Expand All @@ -137,7 +137,7 @@ def execute_batch(
request: ApiExecutorRequest,
dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)],
) -> ApiExecutorResponse:
query_executor = garf.executors.setup_executor(
query_executor = setup.setup_executor(
request.source, request.context.fetcher_parameters
)
reader_client = reader.FileReader()
Expand All @@ -146,6 +146,18 @@ def execute_batch(
return ApiExecutorResponse(results=results)


@app.post('/api/execute:workflow')
def execute_workflow(
workflow_file: str,
dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)],
enable_cache: bool = False,
cache_ttl_seconds: int = 3600,
) -> list[str]:
return workflow_runner.WorkflowRunner.from_file(workflow_file).run(
enable_cache=enable_cache, cache_ttl_seconds=cache_ttl_seconds
)


@typer_app.command()
def main(
port: Annotated[int, typer.Option(help='Port to start the server')] = 8000,
Expand Down
52 changes: 52 additions & 0 deletions libs/executors/garf/executors/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Bootstraps executor based on provided parameters."""

from __future__ import annotations

import importlib

from garf.executors import executor, fetchers
from garf.executors.api_executor import ApiQueryExecutor
from garf.executors.telemetry import tracer


@tracer.start_as_current_span('setup_executor')
def setup_executor(
source: str,
fetcher_parameters: dict[str, str | int | bool],
enable_cache: bool = False,
cache_ttl_seconds: int = 3600,
) -> type[executor.Executor]:
"""Initializes executors based on a source and parameters."""
if source == 'bq':
bq_executor = importlib.import_module('garf.executors.bq_executor')
query_executor = bq_executor.BigQueryExecutor(**fetcher_parameters)
elif source == 'sqldb':
sql_executor = importlib.import_module('garf.executors.sql_executor')
query_executor = (
sql_executor.SqlAlchemyQueryExecutor.from_connection_string(
fetcher_parameters.get('connection_string')
)
)
else:
concrete_api_fetcher = fetchers.get_report_fetcher(source)
query_executor = ApiQueryExecutor(
fetcher=concrete_api_fetcher(
**fetcher_parameters,
enable_cache=enable_cache,
cache_ttl_seconds=cache_ttl_seconds,
)
)
return query_executor
114 changes: 114 additions & 0 deletions libs/executors/garf/executors/workflow_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs garf workflow."""

from __future__ import annotations

import logging
import pathlib
import re
from typing import Final

from garf.executors import exceptions, setup, workflow
from garf.executors.telemetry import tracer
from garf.io import reader

logger = logging.getLogger(__name__)

_REMOTE_FILES_PATTERN: Final[str] = (
'^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)'
)


class WorkflowRunner:
"""Runs garf workflow.

Attributes:
workflow: Workflow to execute.
wf_parent: Optional location of a workflow file.
parallel_threshold: Max allowed parallelism for the queries in the workflow.
"""

def __init__(
self,
execution_workflow: workflow.Workflow,
wf_parent: pathlib.Path | str,
parallel_threshold: int = 10,
) -> None:
"""Initializes WorkflowRunner."""
self.workflow = execution_workflow
self.wf_parent = wf_parent
self.parallel_threshold = parallel_threshold

@classmethod
def from_file(cls, workflow_file: str | pathlib.Path) -> WorkflowRunner:
"""Initialized Workflow runner from a local or remote file."""
if isinstance(workflow_file, str):
workflow_file = pathlib.Path(workflow_file)
execution_workflow = workflow.Workflow.from_file(workflow_file)
return cls(
execution_workflow=execution_workflow, wf_parent=workflow_file.parent
)

def run(
self, enable_cache: bool = False, cache_ttl_seconds: int = 3600
) -> list[str]:
reader_client = reader.create_reader('file')
execution_results = []
for i, step in enumerate(self.workflow.steps, 1):
step_name = f'{i}-{step.fetcher}'
if step.alias:
step_name = f'{step_name}-{step.alias}'
with tracer.start_as_current_span(step_name):
query_executor = setup.setup_executor(
source=step.fetcher,
fetcher_parameters=step.fetcher_parameters,
enable_cache=enable_cache,
cache_ttl_seconds=cache_ttl_seconds,
)
batch = {}
if not (queries := step.queries):
logger.error('Please provide one or more queries to run')
raise exceptions.GarfExecutorError(
'Please provide one or more queries to run'
)
for query in queries:
if isinstance(query, workflow.QueryPath):
query_path = query.full_path
if re.match(_REMOTE_FILES_PATTERN, query_path):
batch[query.path] = reader_client.read(query_path)
else:
if not query.prefix:
query_path = self.wf_parent / pathlib.Path(query.path)
if not query_path.exists():
raise workflow.GarfWorkflowError(
f'Query: {query_path} not found'
)
batch[query.path] = reader_client.read(query_path)
elif isinstance(query, workflow.QueryFolder):
query_path = self.wf_parent / pathlib.Path(query.folder)
if not query_path.exists():
raise workflow.GarfWorkflowError(
f'Folder: {query_path} not found'
)
for p in query_path.rglob('*'):
if p.suffix == '.sql':
batch[p.stem] = reader_client.read(p)
else:
batch[query.query.title] = query.query.text
query_executor.execute_batch(
batch, step.context, self.parallel_threshold
)
execution_results.append(step_name)
return execution_results
1 change: 0 additions & 1 deletion libs/executors/tests/end-to-end/test_grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
import pathlib

import grpc
import pytest
from garf.executors import garf_pb2 as pb
from garf.executors import garf_pb2_grpc
Expand Down
Loading