diff --git a/libs/executors/garf/executors/__init__.py b/libs/executors/garf/executors/__init__.py index 445de15..fb2c708 100644 --- a/libs/executors/garf/executors/__init__.py +++ b/libs/executors/garf/executors/__init__.py @@ -15,42 +15,7 @@ from __future__ import annotations -import importlib - -from garf.executors import executor, fetchers from garf.executors.api_executor import ApiExecutionContext, ApiQueryExecutor -from garf.executors.telemetry import tracer - - -@tracer.start_as_current_span('setup_executor') -def setup_executor( - source: str, - fetcher_parameters: dict[str, str | int | bool], - enable_cache: bool = False, - cache_ttl_seconds: int = 3600, -) -> type[executor.Executor]: - """Initializes executors based on a source and parameters.""" - if source == 'bq': - bq_executor = importlib.import_module('garf.executors.bq_executor') - query_executor = bq_executor.BigQueryExecutor(**fetcher_parameters) - elif source == 'sqldb': - sql_executor = importlib.import_module('garf.executors.sql_executor') - query_executor = ( - sql_executor.SqlAlchemyQueryExecutor.from_connection_string( - fetcher_parameters.get('connection_string') - ) - ) - else: - concrete_api_fetcher = fetchers.get_report_fetcher(source) - query_executor = ApiQueryExecutor( - fetcher=concrete_api_fetcher( - **fetcher_parameters, - enable_cache=enable_cache, - cache_ttl_seconds=cache_ttl_seconds, - ) - ) - return query_executor - __all__ = [ 'ApiQueryExecutor', diff --git a/libs/executors/garf/executors/entrypoints/cli.py b/libs/executors/garf/executors/entrypoints/cli.py index 7d30462..656d6de 100644 --- a/libs/executors/garf/executors/entrypoints/cli.py +++ b/libs/executors/garf/executors/entrypoints/cli.py @@ -22,11 +22,10 @@ import argparse import logging import pathlib -import re import sys import garf.executors -from garf.executors import config, exceptions, workflow +from garf.executors import config, exceptions, setup, workflow, workflow_runner from garf.executors.entrypoints import utils from garf.executors.entrypoints.tracer import initialize_tracer from garf.executors.telemetry import tracer @@ -86,52 +85,11 @@ def main(): if workflow_file := args.workflow: wf_parent = pathlib.Path.cwd() / pathlib.Path(workflow_file).parent execution_workflow = workflow.Workflow.from_file(workflow_file) - for i, step in enumerate(execution_workflow.steps, 1): - step_span_name = f'{i}-{step.fetcher}' - if step.alias: - step_span_name = f'{step_span_name}-{step.alias}' - with tracer.start_as_current_span(step_span_name): - query_executor = garf.executors.setup_executor( - source=step.fetcher, - fetcher_parameters=step.fetcher_parameters, - enable_cache=args.enable_cache, - cache_ttl_seconds=args.cache_ttl_seconds, - ) - batch = {} - if not (queries := step.queries): - logger.error('Please provide one or more queries to run') - raise exceptions.GarfExecutorError( - 'Please provide one or more queries to run' - ) - for query in queries: - if isinstance(query, garf.executors.workflow.QueryPath): - query_path = query.full_path - if re.match( - '^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)', query_path - ): - batch[query.path] = reader_client.read(query_path) - else: - if not query.prefix: - query_path = wf_parent / pathlib.Path(query.path) - if not query_path.exists(): - raise workflow.GarfWorkflowError( - f'Query: {query_path} not found' - ) - batch[query.path] = reader_client.read(query_path) - elif isinstance(query, garf.executors.workflow.QueryFolder): - query_path = wf_parent / pathlib.Path(query.folder) - if not query_path.exists(): - raise workflow.GarfWorkflowError( - f'Folder: {query_path} not found' - ) - for p in query_path.rglob('*'): - if p.suffix == '.sql': - batch[p.stem] = reader_client.read(p) - else: - batch[query.query.title] = query.query.text - query_executor.execute_batch( - batch, step.context, args.parallel_threshold - ) + workflow_runner.WorkflowRunner( + execution_workflow=execution_workflow, wf_parent=wf_parent + ).run( + enable_cache=args.enable_cache, cache_ttl_seconds=args.cache_ttl_seconds + ) sys.exit() if not args.query: @@ -165,7 +123,7 @@ def main(): writer_parameters=writer_parameters, fetcher_parameters=source_parameters, ) - query_executor = garf.executors.setup_executor( + query_executor = setup.setup_executor( source=args.source, fetcher_parameters=context.fetcher_parameters, enable_cache=args.enable_cache, diff --git a/libs/executors/garf/executors/entrypoints/grpc_server.py b/libs/executors/garf/executors/entrypoints/grpc_server.py index f4aad2d..20e1b9a 100644 --- a/libs/executors/garf/executors/entrypoints/grpc_server.py +++ b/libs/executors/garf/executors/entrypoints/grpc_server.py @@ -18,9 +18,8 @@ import logging from concurrent import futures -import garf.executors import grpc -from garf.executors import garf_pb2, garf_pb2_grpc +from garf.executors import execution_context, garf_pb2, garf_pb2_grpc, setup from garf.executors.entrypoints.tracer import initialize_tracer from google.protobuf.json_format import MessageToDict from grpc_reflection.v1alpha import reflection @@ -28,30 +27,29 @@ class GarfService(garf_pb2_grpc.GarfService): def Execute(self, request, context): - query_executor = garf.executors.setup_executor( + query_executor = setup.setup_executor( request.source, request.context.fetcher_parameters ) - execution_context = garf.executors.execution_context.ExecutionContext( - **MessageToDict(request.context, preserving_proto_field_name=True) - ) result = query_executor.execute( query=request.query, title=request.title, - context=execution_context, + context=execution_context.ExecutionContext( + **MessageToDict(request.context, preserving_proto_field_name=True) + ), ) return garf_pb2.ExecuteResponse(results=[result]) def Fetch(self, request, context): - query_executor = garf.executors.setup_executor( + query_executor = setup.setup_executor( request.source, request.context.fetcher_parameters ) - execution_context = garf.executors.execution_context.ExecutionContext( + query_args = execution_context.ExecutionContext( **MessageToDict(request.context, preserving_proto_field_name=True) - ) + ).query_parameters result = query_executor.fetcher.fetch( query_specification=request.query, title=request.title, - args=execution_context.query_parameters, + args=query_args, ) return garf_pb2.FetchResponse( columns=result.column_names, rows=result.to_list(row_type='dict') diff --git a/libs/executors/garf/executors/entrypoints/server.py b/libs/executors/garf/executors/entrypoints/server.py index 05cb470..f7b4f34 100644 --- a/libs/executors/garf/executors/entrypoints/server.py +++ b/libs/executors/garf/executors/entrypoints/server.py @@ -21,7 +21,7 @@ import pydantic import typer import uvicorn -from garf.executors import exceptions +from garf.executors import exceptions, setup, workflow_runner from garf.executors.entrypoints import utils from garf.executors.entrypoints.tracer import initialize_tracer from garf.io import reader @@ -125,7 +125,7 @@ def execute( request: ApiExecutorRequest, dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)], ) -> ApiExecutorResponse: - query_executor = garf.executors.setup_executor( + query_executor = setup.setup_executor( request.source, request.context.fetcher_parameters ) result = query_executor.execute(request.query, request.title, request.context) @@ -137,7 +137,7 @@ def execute_batch( request: ApiExecutorRequest, dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)], ) -> ApiExecutorResponse: - query_executor = garf.executors.setup_executor( + query_executor = setup.setup_executor( request.source, request.context.fetcher_parameters ) reader_client = reader.FileReader() @@ -146,6 +146,18 @@ def execute_batch( return ApiExecutorResponse(results=results) +@app.post('/api/execute:workflow') +def execute_workflow( + workflow_file: str, + dependencies: Annotated[GarfDependencies, fastapi.Depends(GarfDependencies)], + enable_cache: bool = False, + cache_ttl_seconds: int = 3600, +) -> list[str]: + return workflow_runner.WorkflowRunner.from_file(workflow_file).run( + enable_cache=enable_cache, cache_ttl_seconds=cache_ttl_seconds + ) + + @typer_app.command() def main( port: Annotated[int, typer.Option(help='Port to start the server')] = 8000, diff --git a/libs/executors/garf/executors/setup.py b/libs/executors/garf/executors/setup.py new file mode 100644 index 0000000..0d62dc4 --- /dev/null +++ b/libs/executors/garf/executors/setup.py @@ -0,0 +1,52 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Bootstraps executor based on provided parameters.""" + +from __future__ import annotations + +import importlib + +from garf.executors import executor, fetchers +from garf.executors.api_executor import ApiQueryExecutor +from garf.executors.telemetry import tracer + + +@tracer.start_as_current_span('setup_executor') +def setup_executor( + source: str, + fetcher_parameters: dict[str, str | int | bool], + enable_cache: bool = False, + cache_ttl_seconds: int = 3600, +) -> type[executor.Executor]: + """Initializes executors based on a source and parameters.""" + if source == 'bq': + bq_executor = importlib.import_module('garf.executors.bq_executor') + query_executor = bq_executor.BigQueryExecutor(**fetcher_parameters) + elif source == 'sqldb': + sql_executor = importlib.import_module('garf.executors.sql_executor') + query_executor = ( + sql_executor.SqlAlchemyQueryExecutor.from_connection_string( + fetcher_parameters.get('connection_string') + ) + ) + else: + concrete_api_fetcher = fetchers.get_report_fetcher(source) + query_executor = ApiQueryExecutor( + fetcher=concrete_api_fetcher( + **fetcher_parameters, + enable_cache=enable_cache, + cache_ttl_seconds=cache_ttl_seconds, + ) + ) + return query_executor diff --git a/libs/executors/garf/executors/workflow_runner.py b/libs/executors/garf/executors/workflow_runner.py new file mode 100644 index 0000000..9e67dd1 --- /dev/null +++ b/libs/executors/garf/executors/workflow_runner.py @@ -0,0 +1,114 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Runs garf workflow.""" + +from __future__ import annotations + +import logging +import pathlib +import re +from typing import Final + +from garf.executors import exceptions, setup, workflow +from garf.executors.telemetry import tracer +from garf.io import reader + +logger = logging.getLogger(__name__) + +_REMOTE_FILES_PATTERN: Final[str] = ( + '^(http|gs|s3|aruze|hdfs|webhdfs|ssh|scp|sftp)' +) + + +class WorkflowRunner: + """Runs garf workflow. + + Attributes: + workflow: Workflow to execute. + wf_parent: Optional location of a workflow file. + parallel_threshold: Max allowed parallelism for the queries in the workflow. + """ + + def __init__( + self, + execution_workflow: workflow.Workflow, + wf_parent: pathlib.Path | str, + parallel_threshold: int = 10, + ) -> None: + """Initializes WorkflowRunner.""" + self.workflow = execution_workflow + self.wf_parent = wf_parent + self.parallel_threshold = parallel_threshold + + @classmethod + def from_file(cls, workflow_file: str | pathlib.Path) -> WorkflowRunner: + """Initialized Workflow runner from a local or remote file.""" + if isinstance(workflow_file, str): + workflow_file = pathlib.Path(workflow_file) + execution_workflow = workflow.Workflow.from_file(workflow_file) + return cls( + execution_workflow=execution_workflow, wf_parent=workflow_file.parent + ) + + def run( + self, enable_cache: bool = False, cache_ttl_seconds: int = 3600 + ) -> list[str]: + reader_client = reader.create_reader('file') + execution_results = [] + for i, step in enumerate(self.workflow.steps, 1): + step_name = f'{i}-{step.fetcher}' + if step.alias: + step_name = f'{step_name}-{step.alias}' + with tracer.start_as_current_span(step_name): + query_executor = setup.setup_executor( + source=step.fetcher, + fetcher_parameters=step.fetcher_parameters, + enable_cache=enable_cache, + cache_ttl_seconds=cache_ttl_seconds, + ) + batch = {} + if not (queries := step.queries): + logger.error('Please provide one or more queries to run') + raise exceptions.GarfExecutorError( + 'Please provide one or more queries to run' + ) + for query in queries: + if isinstance(query, workflow.QueryPath): + query_path = query.full_path + if re.match(_REMOTE_FILES_PATTERN, query_path): + batch[query.path] = reader_client.read(query_path) + else: + if not query.prefix: + query_path = self.wf_parent / pathlib.Path(query.path) + if not query_path.exists(): + raise workflow.GarfWorkflowError( + f'Query: {query_path} not found' + ) + batch[query.path] = reader_client.read(query_path) + elif isinstance(query, workflow.QueryFolder): + query_path = self.wf_parent / pathlib.Path(query.folder) + if not query_path.exists(): + raise workflow.GarfWorkflowError( + f'Folder: {query_path} not found' + ) + for p in query_path.rglob('*'): + if p.suffix == '.sql': + batch[p.stem] = reader_client.read(p) + else: + batch[query.query.title] = query.query.text + query_executor.execute_batch( + batch, step.context, self.parallel_threshold + ) + execution_results.append(step_name) + return execution_results diff --git a/libs/executors/tests/end-to-end/test_grpc_server.py b/libs/executors/tests/end-to-end/test_grpc_server.py index cb1b656..6a1a8bb 100644 --- a/libs/executors/tests/end-to-end/test_grpc_server.py +++ b/libs/executors/tests/end-to-end/test_grpc_server.py @@ -13,7 +13,6 @@ # limitations under the License. import pathlib -import grpc import pytest from garf.executors import garf_pb2 as pb from garf.executors import garf_pb2_grpc diff --git a/libs/executors/tests/unit/test_workflow_runner.py b/libs/executors/tests/unit/test_workflow_runner.py new file mode 100644 index 0000000..5ece665 --- /dev/null +++ b/libs/executors/tests/unit/test_workflow_runner.py @@ -0,0 +1,27 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pathlib + +from garf.executors import workflow_runner + +_SCRIPT_PATH = pathlib.Path(__file__).parent + + +class TestWorkflowRunner: + def test_run_returns_executed_step_names(self): + runner = workflow_runner.WorkflowRunner.from_file( + _SCRIPT_PATH / '../end-to-end/test_workflow.yaml' + ) + results = runner.run() + assert results == ['1-fake-test']