Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
)
from great_expectations.compatibility.sqlalchemy import sqlalchemy as sa
from great_expectations.compatibility.typing_extensions import override
from great_expectations.core.metric_function_types import (
SummarizationMetricNameSuffixes,
)
from great_expectations.core.suite_parameters import (
SuiteParameterDict, # noqa: TC001 # FIXME CoP
)
Expand All @@ -26,9 +29,13 @@
)
from great_expectations.expectations.expectation import (
ColumnMapExpectation,
_format_map_output,
_style_row_condition,
render_suite_parameter_string,
)
from great_expectations.expectations.expectation_configuration import (
parse_result_format,
)
from great_expectations.expectations.metadata_types import DataQualityIssues, SupportedDataSources
from great_expectations.expectations.model_field_descriptions import (
COLUMN_DESCRIPTION,
Expand Down Expand Up @@ -575,6 +582,42 @@ def get_validation_dependencies(
),
)

# Issue #11076: ExpectColumnValuesToBeOfType is a ColumnMapExpectation, so its
# result should include the standard map-result fields (element_count,
# unexpected_count, missing_count, etc.) regardless of backend. The aggregate
# paths (Spark, SqlAlchemy, non-object Pandas) need table.row_count and the
# column null count to populate those fields via _format_map_output.
row_count_metric_kwargs = get_metric_kwargs(
metric_name="table.row_count",
configuration=configuration,
runtime_configuration=runtime_configuration,
)
validation_dependencies.set_metric_configuration(
metric_name="table.row_count",
metric_configuration=MetricConfiguration(
metric_name="table.row_count",
metric_domain_kwargs=row_count_metric_kwargs["metric_domain_kwargs"],
Comment on lines +591 to +595
metric_value_kwargs=row_count_metric_kwargs["metric_value_kwargs"],
),
)

nonnull_unexpected_count_metric_name = (
f"column_values.nonnull.{SummarizationMetricNameSuffixes.UNEXPECTED_COUNT.value}"
)
nonnull_metric_kwargs = get_metric_kwargs(
metric_name=nonnull_unexpected_count_metric_name,
configuration=configuration,
runtime_configuration=runtime_configuration,
)
validation_dependencies.set_metric_configuration(
metric_name=nonnull_unexpected_count_metric_name,
metric_configuration=MetricConfiguration(
metric_name=nonnull_unexpected_count_metric_name,
metric_domain_kwargs=nonnull_metric_kwargs["metric_domain_kwargs"],
metric_value_kwargs=nonnull_metric_kwargs["metric_value_kwargs"],
),
)

return validation_dependencies

@override
Expand Down Expand Up @@ -611,19 +654,77 @@ def _validate(
]:
# this calls ColumnMapMetric._validate
return super()._validate(metrics, runtime_configuration, execution_engine)
return self._validate_pandas(
base_result = self._validate_pandas(
actual_column_type=actual_column_type, expected_type=expected_type
)
elif isinstance(execution_engine, SqlAlchemyExecutionEngine):
return self._validate_sqlalchemy(
base_result = self._validate_sqlalchemy(
actual_column_type=actual_column_type,
expected_type=expected_type,
execution_engine=execution_engine,
)
elif isinstance(execution_engine, SparkDFExecutionEngine):
return self._validate_spark(
base_result = self._validate_spark(
actual_column_type=actual_column_type, expected_type=expected_type
)
else:
return None

# Issue #11076: Augment the aggregate-style result with the standard
# ColumnMapExpectation map-result fields (element_count, unexpected_count,
# missing_count, etc.) so callers see a consistent result shape regardless of
# backend. The type check is column-aggregate, so either all non-null rows pass
# or all non-null rows are unexpected.
return self._build_map_result(
base_result=base_result,
metrics=metrics,
runtime_configuration=runtime_configuration,
)

def _build_map_result(
self,
base_result: Dict[str, Any],
metrics: Dict,
runtime_configuration: Optional[dict] = None,
) -> Dict[str, Any]:
success = bool(base_result.get("success", False))
observed_value = base_result.get("result", {}).get("observed_value")

result_format = self._get_result_format(runtime_configuration=runtime_configuration)
parsed_result_format = parse_result_format(result_format)

total_count: Optional[int] = metrics.get("table.row_count")
null_count: Optional[int] = metrics.get(
f"column_values.nonnull.{SummarizationMetricNameSuffixes.UNEXPECTED_COUNT.value}"
)

nonnull_count: Optional[int] = None
if total_count is not None and null_count is not None:
nonnull_count = total_count - null_count

if success:
unexpected_count: Optional[int] = 0
elif nonnull_count is not None:
# On failure every non-null value is the wrong type
unexpected_count = nonnull_count
elif total_count is not None:
unexpected_count = total_count
else:
unexpected_count = None

formatted = _format_map_output(
result_format=parsed_result_format,
success=success,
element_count=total_count,
nonnull_count=nonnull_count,
unexpected_count=unexpected_count,
unexpected_list=[],
)

Comment thread
joshua-stauffer marked this conversation as resolved.
if observed_value is not None:
formatted.setdefault("result", {})["observed_value"] = observed_value
Comment thread
joshua-stauffer marked this conversation as resolved.
Outdated

return formatted


def _get_potential_sqlalchemy_types(execution_engine, expected_type):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,48 @@ def test_success_with_suite_param_type_(
assert result.success == expected_result


@parameterize_batch_for_data_sources(
data_source_configs=[
SparkFilesystemCsvDatasourceTestConfig(
column_types=SPARK_COLUMN_TYPES,
)
Comment thread
joshua-stauffer marked this conversation as resolved.
],
data=DATA,
)
def test_result_format_contains_map_fields_on_spark(batch_for_datasource: Batch) -> None:
Comment thread
joshua-stauffer marked this conversation as resolved.
Outdated
"""Reproduces community issue #11076.

ExpectColumnValuesToBeOfType is a ColumnMapExpectation, and per the public docs
(https://greatexpectations.io/expectations/expect_column_values_to_be_of_type/)
its result should contain the standard map-result fields: element_count,
unexpected_count, unexpected_percent, partial_unexpected_list, missing_count,
missing_percent, unexpected_percent_total, unexpected_percent_nonmissing.

On Spark/Databricks (and other non-Pandas-object backends) the result instead
only contains {"observed_value": ...} because _validate_spark / _validate_sqlalchemy
do not run the map-result formatting path.
"""
expectation = gxe.ExpectColumnValuesToBeOfType(column=INTEGER_COLUMN, type_="IntegerType")
result = batch_for_datasource.validate(expectation, result_format=ResultFormat.SUMMARY)
assert result.success
result_dict = result["result"]
expected_fields = {
"element_count",
"unexpected_count",
"unexpected_percent",
"partial_unexpected_list",
"missing_count",
"missing_percent",
"unexpected_percent_total",
"unexpected_percent_nonmissing",
}
missing = expected_fields - set(result_dict.keys())
assert not missing, (
f"ExpectColumnValuesToBeOfType result missing standard map fields {missing}; "
f"got result={result_dict}"
)
Comment thread
joshua-stauffer marked this conversation as resolved.


@parameterize_batch_for_data_sources(data_source_configs=JUST_PANDAS_DATA_SOURCES, data=DATA)
def test_include_unexpected_rows_pandas(batch_for_datasource: Batch) -> None:
"""Test include_unexpected_rows for ExpectColumnValuesToBeOfType with pandas data sources."""
Expand Down
Loading