Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
'GoogleAdsApiReportFetcher',
]

__version__ = '1.0.0'
__version__ = '1.0.1'
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class GoogleAdsApiQuery(query_editor.QuerySpecification):

def generate(self):
base_query = super().generate()
if not base_query.resource_name:
raise query_editor.GarfResourceError(
f'No resource found in query: {base_query.text}'
)
for field in base_query.fields:
field = _format_type_field_name(field)
for customizer in base_query.customizers.values():
Expand Down
11 changes: 10 additions & 1 deletion libs/community/google/ads/tests/unit/test_query_editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import pytest
from garf.community.google.ads.query_editor import GoogleAdsApiQuery
from garf.core import query_editor


class TestGoogleAdsApiQuery:
Expand All @@ -38,3 +39,11 @@ def test_generate(self):
spec.customizers.get('asset_policy_type').value
== 'policy_topic_entries.type_'
)

def test_generate_raises_error_on_missing_resource(self):
query = 'SELECT campaign.type'
with pytest.raises(
query_editor.GarfResourceError,
match='No resource found in query: SELECT campaign.type',
):
GoogleAdsApiQuery(text=query).generate()
2 changes: 1 addition & 1 deletion libs/core/garf/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
'ApiReportFetcher',
]

__version__ = '1.0.0'
__version__ = '1.0.1'
20 changes: 10 additions & 10 deletions libs/core/garf/core/query_editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ def replace_params_template(
else:
query_text = self.expand_jinja(query_text, {})
if macros := params.macro:
query_text = query_text.format(
**macros, **CommonParametersMixin().common_params
)
joined_macros = CommonParametersMixin().common_params
joined_macros.update(macros)
query_text = query_text.format(**joined_macros)
logger.debug('Query text after macro substitution:\n%s', query_text)
else:
query_text = self.expand_jinja(query_text, {})
Expand Down Expand Up @@ -237,7 +237,9 @@ def generate(self) -> BaseQueryElements:
.extract_virtual_columns()
.extract_customizers()
)
if self.query.resource_name.startswith('builtin'):
if self.query.resource_name and self.query.resource_name.startswith(
'builtin'
):
self.query.title = self.query.resource_name.replace('builtin.', '')
self.query.is_builtin_query = True
return self.query
Expand Down Expand Up @@ -283,16 +285,14 @@ def extract_resource_name(self) -> Self:

Returns:
Found resource.

Raises:
GarfResourceException: If resource_name isn't found.
"""
if resource_name := re.findall(
r'FROM\s+([\w.]+)', self.query.text, flags=re.IGNORECASE
):
self.query.resource_name = str(resource_name[0]).strip()
return self
raise GarfResourceError(f'No resource found in query: {self.query.text}')
else:
self.query.resource_name = None
return self

def extract_fields(self) -> Self:
for line in self._extract_query_lines():
Expand Down Expand Up @@ -379,7 +379,7 @@ def convert_date(date_string: str) -> str:
GarfMacroError:
If dynamic lookback value (:YYYYMMDD-N) is incorrect.
"""
if isinstance(date_string, list) or date_string.find(':Y') == -1:
if isinstance(date_string, list) or str(date_string).find(':Y') == -1:
return date_string
current_date = datetime.date.today()
base_date, *date_customizer = re.split('\\+|-', date_string)
Expand Down
2 changes: 1 addition & 1 deletion libs/executors/garf/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ def setup_executor(
'ApiExecutionContext',
]

__version__ = '1.0.0'
__version__ = '1.0.1'
9 changes: 8 additions & 1 deletion libs/executors/garf/executors/api_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@
import logging

from garf.core import report_fetcher
from garf.executors import exceptions, execution_context, executor, fetchers
from garf.executors import (
exceptions,
execution_context,
executor,
fetchers,
query_processor,
)
from garf.executors.telemetry import tracer
from opentelemetry import trace

Expand Down Expand Up @@ -94,6 +100,7 @@ def execute(
Raises:
GarfExecutorError: When failed to execute query.
"""
context = query_processor.process_gquery(context)
span = trace.get_current_span()
span.set_attribute('fetcher.class', self.fetcher.__class__.__name__)
span.set_attribute(
Expand Down
1 change: 1 addition & 0 deletions libs/executors/garf/executors/bq_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(
self,
project_id: str | None = os.getenv('GOOGLE_CLOUD_PROJECT'),
location: str | None = None,
**kwargs: str,
) -> None:
"""Initializes BigQueryExecutor.

Expand Down
3 changes: 2 additions & 1 deletion libs/executors/garf/executors/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Optional

from garf.core import report_fetcher
from garf.executors import execution_context
from garf.executors import execution_context, query_processor
from garf.executors.telemetry import tracer
from opentelemetry import trace

Expand Down Expand Up @@ -112,6 +112,7 @@ def _handle_processors(
processors: dict[str, report_fetcher.Processor],
context: execution_context.ExecutionContext,
) -> None:
context = query_processor.process_gquery(context)
for k, processor in processors.items():
processor_signature = list(inspect.signature(processor).parameters.keys())
if k in context.fetcher_parameters:
Expand Down
61 changes: 61 additions & 0 deletions libs/executors/garf/executors/query_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib

from garf.core import query_editor
from garf.executors import exceptions, execution_context


def process_gquery(
context: execution_context.ExecutionContext,
) -> execution_context.ExecutionContext:
for k, v in context.fetcher_parameters.items():
if isinstance(v, str) and v.startswith('gquery'):
no_writer_context = context.model_copy(update={'writer': None})
try:
_, alias, query = v.split(':', maxsplit=3)
except ValueError:
raise exceptions.GarfExecutorError(
f'Incorrect gquery format, should be gquery:alias:query, got {v}'
)
if alias == 'sqldb':
from garf.executors import sql_executor

gquery_executor = sql_executor.SqlAlchemyQueryExecutor(
**context.fetcher_parameters
)
elif alias == 'bq':
from garf.executors import bq_executor

gquery_executor = bq_executor.BigQueryExecutor(
**context.fetcher_parameters
)
else:
raise exceptions.GarfExecutorError(
f'Unsupported alias for gquery: {alias}'
)
with contextlib.suppress(query_editor.GarfResourceError):
query_spec = query_editor.QuerySpecification(
text=query, args=context.query_parameters
).generate()
if len(columns := [c for c in query_spec.column_names if c != '_']) > 1:
raise exceptions.GarfExecutorError(
f'Multiple columns in gquery: {columns}'
)
res = gquery_executor.execute(
query=query, title='gquery', context=no_writer_context
)
context.fetcher_parameters[k] = res.to_list(row_type='scalar')
return context
10 changes: 6 additions & 4 deletions libs/executors/garf/executors/sql_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,26 @@ class SqlAlchemyQueryExecutor(
engine: Initialized Engine object to operated on a given database.
"""

def __init__(self, engine: sqlalchemy.engine.base.Engine) -> None:
def __init__(
self, engine: sqlalchemy.engine.base.Engine | None = None, **kwargs: str
) -> None:
"""Initializes executor with a given engine.

Args:
engine: Initialized Engine object to operated on a given database.
"""
self.engine = engine
self.engine = engine or sqlalchemy.create_engine('sqlite://')
super().__init__()

@classmethod
def from_connection_string(
cls, connection_string: str
cls, connection_string: str | None
) -> SqlAlchemyQueryExecutor:
"""Creates executor from SqlAlchemy connection string.

https://docs.sqlalchemy.org/en/20/core/engines.html
"""
engine = sqlalchemy.create_engine(connection_string)
engine = sqlalchemy.create_engine(connection_string or 'sqlite://')
return cls(engine)

@tracer.start_as_current_span('sql.execute')
Expand Down
64 changes: 64 additions & 0 deletions libs/executors/tests/unit/test_query_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re

import pytest
from garf.executors import exceptions, execution_context, query_processor


def test_process_gquery_returns_unchanged_context():
context = execution_context.ExecutionContext(fetcher_parameters={'id': 1})
processed_context = query_processor.process_gquery(context)
assert processed_context == context


@pytest.mark.parametrize(
('gquery', 'error_message'),
[
(
'gquery:SELECT 1',
'Incorrect gquery format, should be gquery:alias:query, '
'got gquery:SELECT 1',
),
(
'gquery:unknown_alias:SELECT 1',
'Unsupported alias for gquery: unknown_alias',
),
(
'gquery:sqldb:SELECT 1 AS id, 2 AS second_id',
"Multiple columns in gquery: ['id', 'second_id']",
),
],
)
def test_process_gquery_raises_error(gquery, error_message):
context = execution_context.ExecutionContext(
fetcher_parameters={'id': gquery}
)
with pytest.raises(
exceptions.GarfExecutorError,
match=re.escape(error_message),
):
query_processor.process_gquery(context)


def test_process_gquery_returns_processed_context():
context = execution_context.ExecutionContext(
fetcher_parameters={'id': 'gquery:sqldb:SELECT 1 AS id'}
)
processed_context = query_processor.process_gquery(context)
expected_context = execution_context.ExecutionContext(
fetcher_parameters={'id': [1]}
)
assert processed_context == expected_context