Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/executors/garf/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ def setup_executor(
'ApiExecutionContext',
]

__version__ = '1.0.5'
__version__ = '1.0.6'
48 changes: 32 additions & 16 deletions libs/executors/garf/executors/query_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""qQuery can be used as a parameter in garf queries."""

import contextlib

from garf.core import query_editor
from garf.executors import exceptions, execution_context
from garf.core import query_editor, query_parser
from garf.executors import execution_context


def process_gquery(
context: execution_context.ExecutionContext,
) -> execution_context.ExecutionContext:
for k, v in context.fetcher_parameters.items():
class GqueryError(query_parser.GarfQueryError):
"""Errors on incorrect qQuery syntax."""


def _handle_sub_context(context, sub_context):
for k, v in sub_context.items():
if isinstance(v, str) and v.startswith('gquery'):
no_writer_context = context.model_copy(update={'writer': None})
try:
_, alias, query = v.split(':', maxsplit=3)
_, alias, *query = v.split(':', maxsplit=3)
except ValueError:
raise exceptions.GarfExecutorError(
raise GqueryError(
f'Incorrect gquery format, should be gquery:alias:query, got {v}'
)
if not alias:
raise GqueryError(f'Missing alias in gquery: {v}')
if not query:
raise GqueryError(f'Missing query text in gquery: {v}')
if alias == 'sqldb':
from garf.executors import sql_executor

Expand All @@ -43,19 +51,27 @@ def process_gquery(
**context.fetcher_parameters
)
else:
raise exceptions.GarfExecutorError(
f'Unsupported alias for gquery: {alias}'
)
with contextlib.suppress(query_editor.GarfResourceError):
raise GqueryError(f'Unsupported alias {alias} for gquery: {v}')
with contextlib.suppress(
query_editor.GarfResourceError, query_parser.GarfVirtualColumnError
):
query = ':'.join(query)
query_spec = query_editor.QuerySpecification(
text=query, args=context.query_parameters
).generate()
if len(columns := [c for c in query_spec.column_names if c != '_']) > 1:
raise exceptions.GarfExecutorError(
f'Multiple columns in gquery: {columns}'
)
raise GqueryError(f'Multiple columns in gquery definition: {columns}')
res = gquery_executor.execute(
query=query, title='gquery', context=no_writer_context
)
context.fetcher_parameters[k] = res.to_list(row_type='scalar')
if len(columns := [c for c in res.column_names if c != '_']) > 1:
raise GqueryError(f'Multiple columns in gquery result: {columns}')
sub_context[k] = res.to_list(row_type='scalar')


def process_gquery(
context: execution_context.ExecutionContext,
) -> execution_context.ExecutionContext:
_handle_sub_context(context, context.fetcher_parameters)
_handle_sub_context(context, context.query_parameters.macro)
return context
19 changes: 13 additions & 6 deletions libs/executors/tests/unit/test_query_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,24 @@ def test_process_gquery_returns_unchanged_context():
('gquery', 'error_message'),
[
(
'gquery:SELECT 1',
'Incorrect gquery format, should be gquery:alias:query, '
'got gquery:SELECT 1',
'gquery',
'Incorrect gquery format, should be gquery:alias:query, got gquery',
),
(
'gquery::SELECT 1',
'Missing alias in gquery: gquery::SELECT 1',
),
(
'gquery:sqldb',
'Missing query text in gquery: gquery:sqldb',
),
(
'gquery:unknown_alias:SELECT 1',
'Unsupported alias for gquery: unknown_alias',
'Unsupported alias unknown_alias for gquery: gquery:unknown_alias:SELECT 1',
),
(
'gquery:sqldb:SELECT 1 AS id, 2 AS second_id',
"Multiple columns in gquery: ['id', 'second_id']",
"Multiple columns in gquery definition: ['id', 'second_id']",
),
],
)
Expand All @@ -47,7 +54,7 @@ def test_process_gquery_raises_error(gquery, error_message):
fetcher_parameters={'id': gquery}
)
with pytest.raises(
exceptions.GarfExecutorError,
query_processor.GqueryError,
match=re.escape(error_message),
):
query_processor.process_gquery(context)
Expand Down