Skip to content

Commit 112bc85

Browse files
committed
[BUGFIX] Narrow single-pass column_values.unique on SQLAlchemy
Project only the target column through the windowed subquery instead of every source column. The previous shape carried all source columns (including JSON/SUPER fields on Redshift) through the window operator, which intermittently tripped WLM "low_timeout" on wide tables. "unexpected_rows" and "unexpected_index_list" are overridden to use a narrow GROUP BY/HAVING dup-keys subquery joined back to source, so they no longer require the windowed selectable to carry every source column.
1 parent 6c1e971 commit 112bc85

3 files changed

Lines changed: 470 additions & 67 deletions

File tree

Lines changed: 286 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,103 +1,322 @@
11
from __future__ import annotations
22

3-
from great_expectations.compatibility import pyspark
3+
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
4+
5+
import great_expectations.exceptions as gx_exceptions
6+
from great_expectations.compatibility import pyspark, sqlalchemy
47
from great_expectations.compatibility.pyspark import functions as F
58
from great_expectations.compatibility.sqlalchemy import (
69
sqlalchemy as sa,
710
)
8-
from great_expectations.core.metric_function_types import MetricPartialFunctionTypes
11+
from great_expectations.compatibility.typing_extensions import override
12+
from great_expectations.constants import MAX_RESULT_RECORDS
13+
from great_expectations.core.metric_function_types import (
14+
MetricFunctionTypes,
15+
MetricPartialFunctionTypes,
16+
MetricPartialFunctionTypeSuffixes,
17+
SummarizationMetricNameSuffixes,
18+
)
919
from great_expectations.execution_engine import (
20+
ExecutionEngine,
1021
PandasExecutionEngine,
1122
SparkDFExecutionEngine,
1223
SqlAlchemyExecutionEngine,
1324
)
14-
from great_expectations.execution_engine.sqlalchemy_dialect import (
15-
GXSqlDialect,
16-
quote_str,
17-
)
1825
from great_expectations.expectations.metrics.map_metric_provider import (
1926
ColumnMapMetricProvider,
2027
column_condition_partial,
28+
column_function_partial,
2129
)
22-
from great_expectations.util import generate_temporary_table_name
30+
from great_expectations.expectations.registry import register_metric
31+
from great_expectations.util import get_sqlalchemy_selectable
32+
from great_expectations.validator.validation_graph import MetricConfiguration
33+
34+
if TYPE_CHECKING:
35+
from great_expectations.expectations.expectation_configuration import (
36+
ExpectationConfiguration,
37+
)
38+
39+
40+
_DUP_KEY_COUNT_LABEL = "_num_rows"
41+
_DUP_KEY_SUBQUERY_ALIAS = "column_values_count_per_value_subquery"
42+
43+
44+
def _named_source_subquery(selectable, table_columns: List[str]):
45+
"""Return a named subquery that explicitly projects "table_columns" from
46+
the source selectable.
47+
48+
"SqlAlchemyBatchData" exposes the source table as a metadata-less
49+
"sa.Table" shell (no reflected columns), so its ".c" accessor is empty.
50+
Wrapping it in an explicit projection gives us a subquery whose ".c"
51+
collection is populated and can be used to unambiguously reference
52+
source-side columns inside a join with the dup-keys subquery.
53+
"""
54+
base = selectable if isinstance(selectable, sa.Select) else sa.select(
55+
*[sa.column(c) for c in table_columns]
56+
).select_from(selectable)
57+
return base.subquery("column_values_unique_source")
58+
59+
60+
def _build_dup_keys_subquery(
61+
execution_engine: SqlAlchemyExecutionEngine,
62+
metric_domain_kwargs: Dict[str, Any],
63+
column_name: str,
64+
):
65+
"""Narrow GROUP BY/HAVING subquery: one row per duplicated value.
66+
67+
Reads only the target column from the source table; partial-aggregation
68+
friendly on distributed engines and avoids wide-row window sort.
69+
"""
70+
selectable = execution_engine.get_domain_records(domain_kwargs=metric_domain_kwargs)
71+
selectable = get_sqlalchemy_selectable(selectable)
72+
return (
73+
sa.select(sa.column(column_name))
74+
.select_from(selectable)
75+
.where(sa.column(column_name).is_not(None))
76+
.group_by(sa.column(column_name))
77+
.having(sa.func.count() >= 2) # noqa: PLR2004
78+
.subquery("column_values_unique_dup_keys")
79+
)
80+
81+
82+
def _sqlalchemy_unique_unexpected_rows(
83+
cls,
84+
execution_engine: SqlAlchemyExecutionEngine,
85+
metric_domain_kwargs: Dict[str, Any],
86+
metric_value_kwargs: Dict[str, Any],
87+
metrics: Dict[str, Any],
88+
**kwargs,
89+
) -> Sequence[Any]:
90+
"""Return full source rows for values that appear more than once.
91+
92+
Source is scanned twice (cheap narrow hash-aggregate + hash join back),
93+
but only when the caller requests "unexpected_rows" (typically COMPLETE
94+
result_format). The dominant "unexpected_count" path stays single-scan.
95+
"""
96+
column_name: str = metric_domain_kwargs["column"]
97+
table_columns: List[str] = metrics["table.columns"]
98+
source_selectable = _named_source_subquery(
99+
execution_engine.get_domain_records(domain_kwargs=metric_domain_kwargs),
100+
table_columns,
101+
)
102+
dup_keys = _build_dup_keys_subquery(
103+
execution_engine=execution_engine,
104+
metric_domain_kwargs=metric_domain_kwargs,
105+
column_name=column_name,
106+
)
107+
column_selector = [source_selectable.c[c] for c in table_columns]
108+
query = sa.select(*column_selector).select_from(
109+
source_selectable.join(
110+
dup_keys,
111+
source_selectable.c[column_name] == dup_keys.c[column_name],
112+
)
113+
)
114+
result_format = metric_value_kwargs["result_format"]
115+
if result_format["result_format"] != "COMPLETE":
116+
limit = min(result_format["partial_unexpected_count"], MAX_RESULT_RECORDS)
117+
query = query.limit(limit)
118+
try:
119+
return [
120+
row._asdict()
121+
for row in execution_engine.execute_query(query).fetchmany(MAX_RESULT_RECORDS)
122+
]
123+
except sqlalchemy.OperationalError as oe:
124+
raise gx_exceptions.InvalidMetricAccessorDomainKwargsKeyError(
125+
message=f"An SQL execution Exception occurred: {oe!s}."
126+
)
127+
128+
129+
def _sqlalchemy_unique_unexpected_index_list(
130+
cls,
131+
execution_engine: SqlAlchemyExecutionEngine,
132+
metric_domain_kwargs: Dict[str, Any],
133+
metric_value_kwargs: Dict[str, Any],
134+
metrics: Dict[str, Any],
135+
**kwargs,
136+
) -> Union[List[Dict[str, Any]], None]:
137+
"""Return specified index columns + target column for duplicate rows."""
138+
result_format = metric_value_kwargs["result_format"]
139+
unexpected_index_column_names = result_format.get("unexpected_index_column_names")
140+
if not unexpected_index_column_names:
141+
return None
142+
143+
column_name: str = metric_domain_kwargs["column"]
144+
all_table_columns: List[str] = metrics.get("table.columns", [])
145+
for idx_col in unexpected_index_column_names:
146+
if idx_col not in all_table_columns:
147+
raise gx_exceptions.InvalidMetricAccessorDomainKwargsKeyError(
148+
message=(
149+
f'Error: The unexpected_index_column: "{idx_col}" does not exist in '
150+
"SQL Table. Please check your configuration and try again."
151+
)
152+
)
153+
154+
source_selectable = _named_source_subquery(
155+
execution_engine.get_domain_records(domain_kwargs=metric_domain_kwargs),
156+
all_table_columns,
157+
)
158+
dup_keys = _build_dup_keys_subquery(
159+
execution_engine=execution_engine,
160+
metric_domain_kwargs=metric_domain_kwargs,
161+
column_name=column_name,
162+
)
163+
column_selector = [source_selectable.c[c] for c in unexpected_index_column_names]
164+
column_selector.append(source_selectable.c[column_name])
165+
query = (
166+
sa.select(*column_selector)
167+
.select_from(
168+
source_selectable.join(
169+
dup_keys,
170+
source_selectable.c[column_name] == dup_keys.c[column_name],
171+
)
172+
)
173+
.limit(result_format["partial_unexpected_count"])
174+
)
175+
exclude_unexpected_values: bool = result_format.get("exclude_unexpected_values", False)
176+
try:
177+
query_result = execution_engine.execute_query(query).fetchall()
178+
except sqlalchemy.OperationalError as oe:
179+
raise gx_exceptions.InvalidMetricAccessorDomainKwargsKeyError(
180+
message=f"An SQL execution Exception occurred: {oe!s}."
181+
)
182+
183+
if exclude_unexpected_values:
184+
return [
185+
{col: row[i] for i, col in enumerate(unexpected_index_column_names)}
186+
for row in query_result
187+
]
188+
return [
189+
{
190+
**{col: row[i] for i, col in enumerate(unexpected_index_column_names)},
191+
column_name: row[-1],
192+
}
193+
for row in query_result
194+
]
23195

24196

25197
class ColumnValuesUnique(ColumnMapMetricProvider):
198+
"""Detects duplicate values in a column.
199+
200+
The "SqlAlchemyExecutionEngine" implementation materializes a *narrow* windowed
201+
subquery that exposes only the target column and a "_num_rows" count per value.
202+
Because the source table is scanned exactly once and the window operator carries
203+
only one column through the sort/partition phase, this avoids both:
204+
205+
* the "col NOT IN (dup_subquery)" double-scan pattern (original failure mode),
206+
* the "SELECT *table_columns, count() OVER ... FROM source" wide-row window that
207+
forced Redshift to materialize every column (including JSON/SUPER fields)
208+
through the sort, occasionally tripping the WLM "low_timeout" rule on
209+
column-store backends even after the double-scan was removed.
210+
211+
Auxiliary metrics that need the full source row ("unexpected_rows") or specific
212+
"unexpected_index_column_names" are served by a separate join-back path that
213+
re-reads only the necessary columns from the source table, keeping the common
214+
"BASIC" result_format (only "unexpected_count" requested) on the single-scan
215+
fast path.
216+
"""
217+
218+
function_metric_name = "column_values.count_per_value"
26219
condition_metric_name = "column_values.unique"
27220

28221
@column_condition_partial(engine=PandasExecutionEngine)
29222
def _pandas(cls, column, **kwargs):
30223
return ~column.duplicated(keep=False)
31224

32-
# NOTE: 20201119 - JPC - We cannot split per-dialect into window and non-window functions
33-
# @column_condition_partial(
34-
# engine=SqlAlchemyExecutionEngine,
35-
# )
36-
# def _sqlalchemy(cls, column, _table, **kwargs):
37-
# dup_query = (
38-
# sa.select(column)
39-
# .select_from(_table)
40-
# .group_by(column)
41-
# .having(sa.func.count(column) > 1)
42-
# )
43-
#
44-
# return column.notin_(dup_query)
225+
@column_function_partial(engine=SqlAlchemyExecutionEngine)
226+
def _sqlalchemy_function(cls, column, _table, **kwargs):
227+
# Narrow projection: only the target column and the window count per value.
228+
# Auxiliary methods that consume this selectable (unexpected_count,
229+
# unexpected_values, unexpected_value_counts) only ever read these two
230+
# columns. Paths that need additional source columns ("unexpected_rows",
231+
# "unexpected_index_list") are overridden in _register_metric_functions
232+
# to join back to source.
233+
from_clause = _table.subquery() if isinstance(_table, sa.Select) else _table
234+
return (
235+
sa.select(
236+
sa.column(column.name),
237+
sa.func.count()
238+
.over(partition_by=sa.column(column.name))
239+
.label(_DUP_KEY_COUNT_LABEL),
240+
)
241+
.select_from(from_clause)
242+
.alias(_DUP_KEY_SUBQUERY_ALIAS)
243+
)
45244

46245
@column_condition_partial(
47246
engine=SqlAlchemyExecutionEngine,
48247
partial_fn_type=MetricPartialFunctionTypes.WINDOW_CONDITION_FN,
49248
)
50-
def _sqlalchemy_window(cls, column, _table, **kwargs):
51-
# MySQL and SingleStore cannot reference a temp table more than once in the
52-
# same query, and SingleStore disallows correlated subselects with GROUP BY.
53-
# Create a temp table copy of the column to avoid both issues.
54-
dialect = kwargs.get("_dialect")
55-
sql_engine = kwargs.get("_sqlalchemy_engine")
56-
execution_engine = kwargs.get("_execution_engine")
57-
try:
58-
dialect_name = dialect.dialect.name
59-
except AttributeError:
60-
try:
61-
dialect_name = dialect.name
62-
except AttributeError:
63-
dialect_name = ""
64-
if sql_engine and dialect and dialect_name in ("mysql", "singlestoredb"):
65-
gx_dialect = GXSqlDialect(dialect_name)
66-
quoted_col = quote_str(column.name, gx_dialect)
67-
temp_table_name = generate_temporary_table_name()
68-
if isinstance(_table, sa.Select):
69-
from_clause = _table.subquery().alias("tmp")
70-
else:
71-
from_clause = _table
72-
source_query = sa.select(sa.column(column.name)).select_from(from_clause)
73-
compiled = source_query.compile(
74-
dialect=sql_engine.dialect, compile_kwargs={"literal_binds": True}
75-
)
76-
temp_table_stmt = f"CREATE TEMPORARY TABLE {temp_table_name} AS {compiled}"
77-
execution_engine.execute_query_in_transaction(sa.text(temp_table_stmt))
78-
# SingleStore cannot handle subselects with GROUP BY/HAVING inside
79-
# expressions, so materialize duplicate values into a second temp table.
80-
dup_table_name = generate_temporary_table_name()
81-
dup_stmt = (
82-
f"CREATE TEMPORARY TABLE {dup_table_name} AS "
83-
f"SELECT {quoted_col} FROM {temp_table_name} "
84-
f"GROUP BY {quoted_col} HAVING count({quoted_col}) > 1"
85-
)
86-
execution_engine.execute_query_in_transaction(sa.text(dup_stmt))
87-
dup_query = sa.select(column).select_from(sa.text(dup_table_name))
88-
else:
89-
from_clause = _table.subquery() if isinstance(_table, sa.Select) else _table
90-
dup_query = (
91-
sa.select(column)
92-
.select_from(from_clause)
93-
.group_by(column)
94-
.having(sa.func.count(column) > 1)
95-
)
96-
return column.notin_(dup_query)
249+
def _sqlalchemy_condition(cls, column, **kwargs):
250+
metrics = kwargs.get("_metrics")
251+
count_per_value_query, _, _ = metrics[
252+
f"column_values.count_per_value.{MetricPartialFunctionTypeSuffixes.MAP.value}"
253+
]
254+
return count_per_value_query.c[_DUP_KEY_COUNT_LABEL] < 2 # noqa: PLR2004
97255

98256
@column_condition_partial(
99257
engine=SparkDFExecutionEngine,
100258
partial_fn_type=MetricPartialFunctionTypes.WINDOW_CONDITION_FN,
101259
)
102260
def _spark(cls, column, **kwargs):
103261
return F.count(F.lit(1)).over(pyspark.Window.partitionBy(column)) <= 1
262+
263+
@classmethod
264+
@override
265+
def _get_evaluation_dependencies(
266+
cls,
267+
metric: MetricConfiguration,
268+
configuration: Optional[ExpectationConfiguration] = None,
269+
execution_engine: Optional[ExecutionEngine] = None,
270+
runtime_configuration: Optional[dict] = None,
271+
):
272+
dependencies: dict = super()._get_evaluation_dependencies(
273+
metric=metric,
274+
configuration=configuration,
275+
execution_engine=execution_engine,
276+
runtime_configuration=runtime_configuration,
277+
)
278+
279+
if isinstance(execution_engine, SqlAlchemyExecutionEngine) and (
280+
metric.metric_name
281+
== f"column_values.unique.{MetricPartialFunctionTypeSuffixes.CONDITION.value}"
282+
):
283+
dependencies[
284+
f"column_values.count_per_value.{MetricPartialFunctionTypeSuffixes.MAP.value}"
285+
] = MetricConfiguration(
286+
metric_name=f"column_values.count_per_value.{MetricPartialFunctionTypeSuffixes.MAP.value}",
287+
metric_domain_kwargs=metric.metric_domain_kwargs,
288+
metric_value_kwargs=None,
289+
)
290+
291+
return dependencies
292+
293+
@classmethod
294+
@override
295+
def _register_metric_functions(cls):
296+
super()._register_metric_functions()
297+
# The narrow windowed subquery (above) carries only the target column.
298+
# The default map-condition auxiliary methods for row-retrieval paths
299+
# assume the selectable carries every table column (compound_columns.unique
300+
# pattern), which would re-introduce the wide-row window on Redshift.
301+
# Override those two paths with a single narrow dup-keys subquery joined
302+
# back to source.
303+
register_metric(
304+
metric_name=f"{cls.condition_metric_name}."
305+
f"{SummarizationMetricNameSuffixes.UNEXPECTED_ROWS.value}",
306+
metric_domain_keys=cls.condition_domain_keys,
307+
metric_value_keys=(*cls.condition_value_keys, "result_format"),
308+
execution_engine=SqlAlchemyExecutionEngine,
309+
metric_class=cls,
310+
metric_provider=_sqlalchemy_unique_unexpected_rows,
311+
metric_fn_type=MetricFunctionTypes.VALUE,
312+
)
313+
register_metric(
314+
metric_name=f"{cls.condition_metric_name}."
315+
f"{SummarizationMetricNameSuffixes.UNEXPECTED_INDEX_LIST.value}",
316+
metric_domain_keys=cls.condition_domain_keys,
317+
metric_value_keys=(*cls.condition_value_keys, "result_format"),
318+
execution_engine=SqlAlchemyExecutionEngine,
319+
metric_class=cls,
320+
metric_provider=_sqlalchemy_unique_unexpected_index_list,
321+
metric_fn_type=MetricFunctionTypes.VALUE,
322+
)

great_expectations/expectations/metrics/map_metric_provider/is_sqlalchemy_metric_selectable.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
SQLALCHEMY_SELECTABLE_METRICS: Set[str] = {
1515
"compound_columns.count",
1616
"compound_columns.unique",
17+
"column_values.unique",
1718
}
1819

1920

0 commit comments

Comments
 (0)