Skip to content

Commit 60a41b1

Browse files
feat(executors): add support for garf query expansion for fetcher parameters
* add query_processor module that changes values in context.fetcher_parameters is gquery is included * executor and api_executor to modify context before execution query / batch * make resource optional in `garf.core.query_editor` and propagate check to community libraries (i.e. `libs/community/google/ads`) * SQL & BQ executor can accept kwargs to handle modified context * SQL executor by default works with in-memory db is no connection_string is provided
1 parent 3d70018 commit 60a41b1

File tree

12 files changed

+166
-17
lines changed

12 files changed

+166
-17
lines changed

libs/community/google/ads/garf/community/google/ads/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@
2222
'GoogleAdsApiReportFetcher',
2323
]
2424

25-
__version__ = '1.0.0'
25+
__version__ = '1.0.1'

libs/community/google/ads/garf/community/google/ads/query_editor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ class GoogleAdsApiQuery(query_editor.QuerySpecification):
2323

2424
def generate(self):
2525
base_query = super().generate()
26+
if not base_query.resource_name:
27+
raise query_editor.GarfResourceError(
28+
f'No resource found in query: {base_query.text}'
29+
)
2630
for field in base_query.fields:
2731
field = _format_type_field_name(field)
2832
for customizer in base_query.customizers.values():

libs/community/google/ads/tests/unit/test_query_editor.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
15+
import pytest
1616
from garf.community.google.ads.query_editor import GoogleAdsApiQuery
17+
from garf.core import query_editor
1718

1819

1920
class TestGoogleAdsApiQuery:
@@ -38,3 +39,11 @@ def test_generate(self):
3839
spec.customizers.get('asset_policy_type').value
3940
== 'policy_topic_entries.type_'
4041
)
42+
43+
def test_generate_raises_error_on_missing_resource(self):
44+
query = 'SELECT campaign.type'
45+
with pytest.raises(
46+
query_editor.GarfResourceError,
47+
match='No resource found in query: SELECT campaign.type',
48+
):
49+
GoogleAdsApiQuery(text=query).generate()

libs/core/garf/core/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@
2626
'ApiReportFetcher',
2727
]
2828

29-
__version__ = '1.0.0'
29+
__version__ = '1.0.1'

libs/core/garf/core/query_editor.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,9 @@ def generate(self) -> BaseQueryElements:
237237
.extract_virtual_columns()
238238
.extract_customizers()
239239
)
240-
if self.query.resource_name.startswith('builtin'):
240+
if self.query.resource_name and self.query.resource_name.startswith(
241+
'builtin'
242+
):
241243
self.query.title = self.query.resource_name.replace('builtin.', '')
242244
self.query.is_builtin_query = True
243245
return self.query
@@ -283,16 +285,14 @@ def extract_resource_name(self) -> Self:
283285
284286
Returns:
285287
Found resource.
286-
287-
Raises:
288-
GarfResourceException: If resource_name isn't found.
289288
"""
290289
if resource_name := re.findall(
291290
r'FROM\s+([\w.]+)', self.query.text, flags=re.IGNORECASE
292291
):
293292
self.query.resource_name = str(resource_name[0]).strip()
294-
return self
295-
raise GarfResourceError(f'No resource found in query: {self.query.text}')
293+
else:
294+
self.query.resource_name = None
295+
return self
296296

297297
def extract_fields(self) -> Self:
298298
for line in self._extract_query_lines():
@@ -379,7 +379,7 @@ def convert_date(date_string: str) -> str:
379379
GarfMacroError:
380380
If dynamic lookback value (:YYYYMMDD-N) is incorrect.
381381
"""
382-
if isinstance(date_string, list) or date_string.find(':Y') == -1:
382+
if isinstance(date_string, list) or str(date_string).find(':Y') == -1:
383383
return date_string
384384
current_date = datetime.date.today()
385385
base_date, *date_customizer = re.split('\\+|-', date_string)

libs/executors/garf/executors/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,4 @@ def setup_executor(
5757
'ApiExecutionContext',
5858
]
5959

60-
__version__ = '1.0.0'
60+
__version__ = '1.0.1'

libs/executors/garf/executors/api_executor.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,13 @@
2323
import logging
2424

2525
from garf.core import report_fetcher
26-
from garf.executors import exceptions, execution_context, executor, fetchers
26+
from garf.executors import (
27+
exceptions,
28+
execution_context,
29+
executor,
30+
fetchers,
31+
query_processor,
32+
)
2733
from garf.executors.telemetry import tracer
2834
from opentelemetry import trace
2935

@@ -94,6 +100,7 @@ def execute(
94100
Raises:
95101
GarfExecutorError: When failed to execute query.
96102
"""
103+
context = query_processor.process_gquery(context)
97104
span = trace.get_current_span()
98105
span.set_attribute('fetcher.class', self.fetcher.__class__.__name__)
99106
span.set_attribute(

libs/executors/garf/executors/bq_executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def __init__(
5454
self,
5555
project_id: str | None = os.getenv('GOOGLE_CLOUD_PROJECT'),
5656
location: str | None = None,
57+
**kwargs: str,
5758
) -> None:
5859
"""Initializes BigQueryExecutor.
5960

libs/executors/garf/executors/executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from typing import Optional
2020

2121
from garf.core import report_fetcher
22-
from garf.executors import execution_context
22+
from garf.executors import execution_context, query_processor
2323
from garf.executors.telemetry import tracer
2424
from opentelemetry import trace
2525

@@ -112,6 +112,7 @@ def _handle_processors(
112112
processors: dict[str, report_fetcher.Processor],
113113
context: execution_context.ExecutionContext,
114114
) -> None:
115+
context = query_processor.process_gquery(context)
115116
for k, processor in processors.items():
116117
processor_signature = list(inspect.signature(processor).parameters.keys())
117118
if k in context.fetcher_parameters:
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import contextlib
16+
17+
from garf.core import query_editor
18+
from garf.executors import exceptions, execution_context
19+
20+
21+
def process_gquery(
22+
context: execution_context.ExecutionContext,
23+
) -> execution_context.ExecutionContext:
24+
for k, v in context.fetcher_parameters.items():
25+
if isinstance(v, str) and v.startswith('gquery'):
26+
no_writer_context = context.model_copy(update={'writer': None})
27+
try:
28+
_, alias, query = v.split(':', maxsplit=3)
29+
except ValueError:
30+
raise exceptions.GarfExecutorError(
31+
f'Incorrect gquery format, should be gquery:alias:query, got {v}'
32+
)
33+
if alias == 'sqldb':
34+
from garf.executors import sql_executor
35+
36+
gquery_executor = sql_executor.SqlAlchemyQueryExecutor(
37+
**context.fetcher_parameters
38+
)
39+
elif alias == 'bq':
40+
from garf.executors import bq_executor
41+
42+
gquery_executor = bq_executor.BigQueryExecutor(
43+
**context.fetcher_parameters
44+
)
45+
else:
46+
raise exceptions.GarfExecutorError(
47+
f'Unsupported alias for gquery: {alias}'
48+
)
49+
with contextlib.suppress(query_editor.GarfResourceError):
50+
query_spec = query_editor.QuerySpecification(
51+
text=query, args=context.query_parameters
52+
).generate()
53+
if len(columns := [c for c in query_spec.column_names if c != '_']) > 1:
54+
raise exceptions.GarfExecutorError(
55+
f'Multiple columns in gquery: {columns}'
56+
)
57+
res = gquery_executor.execute(
58+
query=query, title='gquery', context=no_writer_context
59+
)
60+
context.fetcher_parameters[k] = res.to_list(row_type='scalar')
61+
return context

0 commit comments

Comments
 (0)