Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5c15e63
[MAINTENANCE] Extract type comparison logic into dedicated module
wookasz Apr 9, 2026
4cf6d34
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 9, 2026
ccd4f71
[MAINTENANCE] Address review feedback on type comparison module
wookasz Apr 10, 2026
522c2c7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 10, 2026
eb4c5d6
Merge branch 'develop' into m/extract-type-comparison-module
wookasz Apr 10, 2026
231812c
[MAINTENANCE] Add dialect-specific type coverage tests
wookasz Apr 10, 2026
ee91779
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 10, 2026
063dab5
[MAINTENANCE] Comprehensive dialect-specific type tests for both scal…
wookasz Apr 10, 2026
2cc59e9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 10, 2026
b07e77c
[MAINTENANCE] Move type_comparison module to expectations/ package level
wookasz Apr 10, 2026
5a39b29
[MAINTENANCE] Move test_type_comparison.py to match module location
wookasz Apr 10, 2026
602d379
[MAINTENANCE] Fix xdist collection error and mypy issues
wookasz Apr 10, 2026
52edbcf
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 10, 2026
053d1bc
Merge branch 'develop' into m/extract-type-comparison-module
wookasz Apr 10, 2026
a2a528f
[MAINTENANCE] Address review: tighten return type, use casefold consi…
wookasz Apr 10, 2026
a36b3bb
[MAINTENANCE] Address review: document cross-type equality, add strin…
wookasz Apr 10, 2026
745f6dd
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 10, 2026
45b796e
Trigger CI
wookasz Apr 10, 2026
cd7112f
[MAINTENANCE] Fix mypy errors for optional dialect imports
wookasz Apr 10, 2026
5029fa6
[MAINTENANCE] Address review: teradatatypes fallback, stale docstring
wookasz Apr 10, 2026
92d6025
[MAINTENANCE] Suppress mypy arg-type for issubclass on dialect_module
wookasz Apr 10, 2026
a3361e5
[MAINTENANCE] Add expectation wiring tests for type comparison delega…
wookasz Apr 10, 2026
d260d8c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 10, 2026
4c6fb76
Trigger CI
wookasz Apr 10, 2026
9174a1d
[MAINTENANCE] Replace integration-style wiring tests with monkeypatch…
wookasz Apr 10, 2026
af838d5
[MAINTENANCE] Fix compare_column_type docstring to accurately describ…
wookasz Apr 10, 2026
b4d621a
[MAINTENANCE] Use mocker.patch with assert_called_once in wiring tests
wookasz Apr 10, 2026
a74796e
[MAINTENANCE] Use assert_called_once_with in wiring tests
wookasz Apr 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@
from great_expectations.core.suite_parameters import (
SuiteParameterDict, # noqa: TC001, RUF100 # FIXME CoP
)
from great_expectations.execution_engine.sqlalchemy_dialect import (
GXSqlDialect,
)
from great_expectations.expectations.core.expect_column_values_to_be_of_type import (
_get_potential_sqlalchemy_types,
_native_type_type_map,
)
from great_expectations.expectations.expectation import (
ColumnMapExpectation,
_style_row_condition,
Expand All @@ -30,6 +23,10 @@
FAILURE_SEVERITY_DESCRIPTION,
)
from great_expectations.expectations.registry import get_metric_kwargs
from great_expectations.expectations.type_comparison import (
compare_column_type_list,
native_type_type_map,
)
from great_expectations.render import LegacyRendererType, RenderedStringTemplateContent
from great_expectations.render.renderer.renderer import renderer
from great_expectations.render.renderer_configuration import (
Expand Down Expand Up @@ -432,7 +429,7 @@ def _validate_pandas( # noqa: C901, PLR0912 # FIXME CoP
except AttributeError:
pass

native_type = _native_type_type_map(type_)
native_type = native_type_type_map(type_)
if native_type is not None:
comp_types.extend(native_type)

Expand Down Expand Up @@ -467,45 +464,12 @@ def _validate_pandas( # noqa: C901, PLR0912 # FIXME CoP

def _validate_sqlalchemy(self, actual_column_type, expected_types_list, execution_engine):
if expected_types_list is None:
success = True
elif execution_engine.dialect_name in [
GXSqlDialect.DATABRICKS,
GXSqlDialect.POSTGRESQL,
GXSqlDialect.SNOWFLAKE,
GXSqlDialect.SQL_SERVER,
GXSqlDialect.TRINO,
]:
if isinstance(actual_column_type, str):
success = any(
actual_column_type.lower() == expected_type.lower()
for expected_type in expected_types_list
)
ret_type = actual_column_type
else:
ret_type = type(actual_column_type).__name__
success = any(
ret_type.lower() == expected_type.lower()
for expected_type in expected_types_list
)

return {
"success": success,
"result": {"observed_value": ret_type},
}
else:
types = []
for type_ in expected_types_list:
types.extend(
_get_potential_sqlalchemy_types(
execution_engine=execution_engine, expected_type=type_
)
)
success = isinstance(actual_column_type, tuple(types))

return {
"success": success,
"result": {"observed_value": type(actual_column_type).__name__},
}
observed = type(actual_column_type).__name__
return {"success": True, "result": {"observed_value": observed}}
success, observed_value = compare_column_type_list(
execution_engine, actual_column_type, expected_types_list
)
return {"success": success, "result": {"observed_value": observed_value}}

def _validate_spark(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,16 @@
from __future__ import annotations

import inspect
import logging
from types import ModuleType
from typing import TYPE_CHECKING, Any, ClassVar, Dict, Optional, Tuple, Type, Union

import numpy as np
import pandas as pd

from great_expectations.compatibility import aws, pydantic, pyspark, trino
from great_expectations.compatibility.bigquery import (
BIGQUERY_GEO_SUPPORT,
bigquery_types_tuple,
)
from great_expectations.compatibility.bigquery import (
sqlalchemy_bigquery as BigQueryDialect,
)
from great_expectations.compatibility.sqlalchemy import sqlalchemy as sa
from great_expectations.compatibility import pydantic, pyspark
from great_expectations.compatibility.typing_extensions import override
from great_expectations.core.suite_parameters import (
SuiteParameterDict, # noqa: TC001 # FIXME CoP
)
from great_expectations.execution_engine.sqlalchemy_dialect import (
GXSqlDialect, # noqa: TC001, RUF100 # FIXME CoP
)
from great_expectations.expectations.expectation import (
ColumnMapExpectation,
_style_row_condition,
Expand All @@ -35,6 +22,10 @@
FAILURE_SEVERITY_DESCRIPTION,
)
from great_expectations.expectations.registry import get_metric_kwargs
from great_expectations.expectations.type_comparison import (
compare_column_type,
native_type_type_map,
)
from great_expectations.render import LegacyRendererType, RenderedStringTemplateContent
from great_expectations.render.renderer.renderer import renderer
from great_expectations.render.renderer_configuration import (
Expand All @@ -46,10 +37,6 @@
parse_row_condition_string,
substitute_none_for_missing,
)
from great_expectations.util import (
get_clickhouse_sqlalchemy_potential_type,
get_pyathena_potential_type,
)
from great_expectations.validator.metric_configuration import MetricConfiguration

if TYPE_CHECKING:
Expand All @@ -63,19 +50,6 @@

logger = logging.getLogger(__name__)

try:
import teradatasqlalchemy.dialect
import teradatasqlalchemy.types as teradatatypes
except ImportError:
teradatasqlalchemy = None

try:
import clickhouse_sqlalchemy
import clickhouse_sqlalchemy.types as ch_types
except (ImportError, KeyError):
clickhouse_sqlalchemy = None
ch_types = None

EXPECTATION_SHORT_DESCRIPTION = "Expect a column to contain values of a specified data type."
TYPE__DESCRIPTION = """
A string representing the data type that each column should have as entries. \
Expand Down Expand Up @@ -411,7 +385,7 @@ def _validate_pandas(
except AttributeError:
pass

native_type = _native_type_type_map(expected_type)
native_type = native_type_type_map(expected_type)
if native_type is not None:
comp_types.extend(native_type)

Expand All @@ -423,49 +397,13 @@ def _validate_pandas(
}

def _validate_sqlalchemy(self, actual_column_type, expected_type, execution_engine):
# Our goal is to be as explicit as possible. We will match the dialect
# if that is possible. If there is no dialect available, we *will*
# match against a top-level SqlAlchemy type.
#
# This is intended to be a conservative approach.
#
# In particular, we *exclude* types that would be valid under an ORM
# such as "float" for postgresql with this approach

if expected_type is None:
success = True
elif execution_engine.dialect_name in [
GXSqlDialect.DATABRICKS,
GXSqlDialect.POSTGRESQL,
GXSqlDialect.SNOWFLAKE,
GXSqlDialect.SQL_SERVER,
GXSqlDialect.TRINO,
]:
# For these dialects, actual_column_type should be a string or CaseInsensitiveString
if isinstance(actual_column_type, str):
# CaseInsensitiveString objects will automatically do case-insensitive comparison
success = actual_column_type == expected_type
else:
# Handle the case where it's not a string type
# This should never happen, but we'll handle it just in case
# the column type should be converted to a CaseInsensitiveString
# for these three dialects in metrics/util.py:get_sqlalchemy_column_metadata
success = str(actual_column_type).lower() == expected_type.lower()

return {
"success": success,
"result": {"observed_value": actual_column_type},
}
else:
types = _get_potential_sqlalchemy_types(
execution_engine=execution_engine, expected_type=expected_type
)
success = isinstance(actual_column_type, tuple(types))

return {
"success": success,
"result": {"observed_value": type(actual_column_type).__name__},
}
observed = type(actual_column_type).__name__
return {"success": True, "result": {"observed_value": observed}}
success, observed_value = compare_column_type(
execution_engine, actual_column_type, expected_type
)
return {"success": success, "result": {"observed_value": observed_value}}

def _validate_spark(
self,
Expand Down Expand Up @@ -624,154 +562,3 @@ def _validate(
return self._validate_spark(
actual_column_type=actual_column_type, expected_type=expected_type
)


def _get_potential_sqlalchemy_types(execution_engine, expected_type):
types = []
type_module = _get_dialect_type_module(execution_engine=execution_engine)
try:
# bigquery geography requires installing an extra package
if (
expected_type.lower() == "geography"
and execution_engine.engine.dialect.name.lower() == GXSqlDialect.BIGQUERY
and not BIGQUERY_GEO_SUPPORT
):
logger.warning(
"BigQuery GEOGRAPHY type is not supported by default. "
+ "To install support, please run:"
+ " $ pip install 'sqlalchemy-bigquery[geography]'"
)
elif type_module.__name__ == "pyathena.sqlalchemy_athena":
potential_type = get_pyathena_potential_type(type_module, expected_type)
# In the case of the PyAthena dialect we need to verify that
# the type returned is indeed a type and not an instance.
if not inspect.isclass(potential_type):
real_type = type(potential_type)
else:
real_type = potential_type
types.append(real_type)
elif type_module.__name__ == "clickhouse_sqlalchemy.drivers.base":
potential_type = get_clickhouse_sqlalchemy_potential_type(type_module, expected_type)
types.append(potential_type)
elif type_module.__name__ == "sqlalchemy_redshift.dialect":
types.extend(_get_redshift_sqlalchemy_types(type_module, expected_type))
else:
potential_type = getattr(type_module, expected_type)
types.append(potential_type)
except AttributeError:
logger.debug(f"Unrecognized type: {expected_type}")
if len(types) == 0:
logger.debug("No recognized sqlalchemy types in type_list for current dialect.")

return types


def _get_redshift_sqlalchemy_types(
type_module: ModuleType, expected_type: Any
) -> list[sa.sql.type_api.TypeEngine]:
types: list[sa.sql.type_api.TypeEngine] = []
potential_type = getattr(type_module, expected_type)
types.append(potential_type)
if expected_type.lower() == "decimal":
# There is no redshift numeric type NUMERIC. It is suppose to be a synonym for
# the official type DECIMAL, according to the docs:
# https://docs.aws.amazon.com/redshift/latest/dg/c_Supported_data_types.html
# However we have observed the raw sqltypes.[NUMERIC|Numeric] instead so we
# add this as an allowed matching type.
types.append(sa.sql.sqltypes.NUMERIC)
return types


def _get_dialect_type_module( # noqa: C901, PLR0911 # FIXME CoP
execution_engine,
):
if execution_engine.dialect_module is None:
logger.warning("No sqlalchemy dialect found; relying in top-level sqlalchemy types.")
return sa

# Redshift does not (yet) export types to top level; only recognize base SA types
if aws.redshiftdialect and isinstance(
execution_engine.dialect_module,
aws.redshiftdialect.RedshiftDialect,
):
return execution_engine.dialect_module.sa
else:
pass

# Bigquery works with newer versions, but use a patch if we had to define bigquery_types_tuple
try:
if BigQueryDialect and (
isinstance(
execution_engine.dialect_module,
BigQueryDialect,
)
and bigquery_types_tuple is not None
):
return bigquery_types_tuple
except (TypeError, AttributeError):
pass

# Teradata types module
try:
if (
issubclass(
execution_engine.dialect_module,
teradatasqlalchemy.dialect.TeradataDialect,
)
and teradatatypes is not None
):
return teradatatypes
except (TypeError, AttributeError):
pass

try:
if (
issubclass(
execution_engine.dialect_module,
clickhouse_sqlalchemy.drivers.base.ClickHouseDialect,
)
and ch_types is not None
):
return ch_types
except (TypeError, AttributeError):
pass

# Trino types module
try:
if (
trino.trinodialect
and trino.trinotypes
and isinstance(
execution_engine.dialect,
trino.trinodialect.TrinoDialect,
)
):
return trino.trinotypes
except (TypeError, AttributeError):
pass

return execution_engine.dialect_module


def _native_type_type_map(type_): # noqa: C901, PLR0911 # FIXME CoP
# We allow native python types in cases where the underlying type is "object":
if type_.lower() == "none":
return (type(None),)
elif type_.lower() == "bool":
return (bool,)
elif type_.lower() in ["int", "long"]:
return (int,)
elif type_.lower() == "float":
return (float,)
elif type_.lower() == "bytes":
return (bytes,)
elif type_.lower() == "complex":
return (complex,)
elif type_.lower() in ["str", "string_types"]:
return (str,)
elif type_.lower() == "list":
return (list,)
elif type_.lower() == "dict":
return (dict,)
elif type_.lower() == "unicode":
return None
Loading
Loading