Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/dqx/docs/reference/benchmarks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,15 @@ sidebar_position: 13
| test_benchmark_foreach_sql_expression[n_rows_100000000_n_columns_5] | 0.895489 | 0.888982 | 0.853895 | 0.950998 | 0.041479 | 0.071722 | 0.858589 | 0.930311 | 5 | 0 | 2 | 1.12 |
| test_benchmark_foreach_sql_query[n_rows_100000000_n_columns_5] | 4.578799 | 4.602143 | 4.442396 | 4.644892 | 0.083901 | 0.113694 | 4.530776 | 4.644470 | 5 | 0 | 1 | 0.22 |
| test_benchmark_foreign_key | 31.784272 | 31.787610 | 31.414708 | 32.123221 | 0.269713 | 0.386951 | 31.597198 | 31.984149 | 5 | 0 | 2 | 0.03 |
| test_benchmark_has_area_equal_to | 0.209381 | 0.207647 | 0.205255 | 0.216179 | 0.004471 | 0.006593 | 0.206066 | 0.212659 | 5 | 0 | 1 | 4.78 |
| test_benchmark_has_area_greater_than | 0.171868 | 0.166867 | 0.161877 | 0.204155 | 0.015957 | 0.003194 | 0.164123 | 0.167316 | 6 | 1 | 1 | 5.82 |
| test_benchmark_has_area_less_than | 0.177230 | 0.179352 | 0.161536 | 0.190875 | 0.010356 | 0.013261 | 0.169503 | 0.182763 | 6 | 0 | 2 | 5.64 |
| test_benchmark_has_area_not_equal_to | 0.208875 | 0.207436 | 0.203626 | 0.217694 | 0.005257 | 0.004513 | 0.206265 | 0.210778 | 5 | 1 | 1 | 4.79 |
| test_benchmark_has_dimension | 0.215338 | 0.213285 | 0.210530 | 0.223131 | 0.005056 | 0.007086 | 0.211819 | 0.218905 | 5 | 0 | 1 | 4.64 |
| test_benchmark_has_num_points_equal_to | 0.213472 | 0.208326 | 0.200840 | 0.228556 | 0.011595 | 0.018574 | 0.205502 | 0.224076 | 5 | 0 | 2 | 4.68 |
| test_benchmark_has_num_points_greater_than | 0.159204 | 0.157405 | 0.151457 | 0.175503 | 0.008775 | 0.008935 | 0.152260 | 0.161195 | 6 | 1 | 1 | 6.28 |
| test_benchmark_has_num_points_less_than | 0.162069 | 0.161908 | 0.149400 | 0.178192 | 0.010833 | 0.014197 | 0.154168 | 0.168365 | 5 | 0 | 2 | 6.17 |
| test_benchmark_has_num_points_not_equal_to | 0.211439 | 0.212084 | 0.200625 | 0.223375 | 0.008900 | 0.013585 | 0.204124 | 0.217709 | 5 | 0 | 2 | 4.73 |
| test_benchmark_has_valid_schema | 0.172078 | 0.172141 | 0.163793 | 0.181081 | 0.006715 | 0.009295 | 0.167010 | 0.176305 | 6 | 0 | 2 | 5.81 |
| test_benchmark_has_x_coordinate_between | 0.217192 | 0.213656 | 0.209310 | 0.236233 | 0.011150 | 0.012638 | 0.209410 | 0.222048 | 5 | 0 | 1 | 4.60 |
| test_benchmark_has_y_coordinate_between | 0.218497 | 0.219630 | 0.209352 | 0.234111 | 0.010103 | 0.013743 | 0.209584 | 0.223327 | 5 | 0 | 1 | 4.58 |
Expand Down
156 changes: 155 additions & 1 deletion docs/dqx/docs/reference/quality_checks.mdx

Large diffs are not rendered by default.

339 changes: 338 additions & 1 deletion src/databricks/labs/dqx/geo/check_funcs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from collections.abc import Callable
import operator as py_operator

from pyspark.sql import Column
import pyspark.sql.functions as F

from databricks.labs.dqx.rule import register_rule
from databricks.labs.dqx.check_funcs import make_condition, _get_normalized_column_and_expr
from databricks.labs.dqx.check_funcs import make_condition, _get_normalized_column_and_expr, _get_limit_expr

POINT_TYPE = "ST_Point"
LINESTRING_TYPE = "ST_LineString"
Expand All @@ -10,6 +14,7 @@
MULTILINESTRING_TYPE = "ST_MultiLineString"
MULTIPOLYGON_TYPE = "ST_MultiPolygon"
GEOMETRYCOLLECTION_TYPE = "ST_GeometryCollection"
DEFAULT_SRID = 4326


@register_rule("row")
Expand Down Expand Up @@ -448,3 +453,335 @@ def has_y_coordinate_between(column: str | Column, min_value: float, max_value:
F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)),
f"{col_str_norm}_has_y_coordinates_outside_range",
)


@register_rule("row")
def has_area_equal_to(
column: str | Column, value: int | float | str | Column, srid: int | None = 3857, geodesic: bool = False
) -> Column:
"""
Checks if the areas of values in a geometry or geography column are equal to a specified value. By default, the 2D
Cartesian area in WGS84 (Pseudo-Mercator) with units of meters squared is used. An SRID can be specified to
transform the input values and compute areas with specific units of measure.

Args:
column: Column to check; can be a string column name or a column expression
value: Value to use in the condition as number, column name or sql expression
srid: Optional integer SRID to use for computing the area of the geometry or geography value (default `None`).
If an SRID is provided, the input value is translated and area is calculated using the units of measure of
the specified coordinate reference system (e.g. meters squared for `srid=3857`).
geodesic: Whether to use the 2D geodesic area (default `False`).

Returns:
Column object indicating whether the area the geometries in the input column are equal to the provided value

Note:
This function requires Databricks serverless compute or runtime 17.1 or above.
"""
return _compare_sql_function_result(
column,
value,
spatial_function="st_area",
spatial_quantity_label="area",
spatial_quantity_name="area",
compare_op=py_operator.ne,
compare_op_label="not equal to",
compare_op_name="not_equal_to",
srid=srid,
geodesic=geodesic,
)


@register_rule("row")
def has_area_not_equal_to(
column: str | Column, value: int | float | str | Column, srid: int | None = 3857, geodesic: bool = False
) -> Column:
"""
Checks if the areas of values in a geometry column are not equal to a specified value. By default, the 2D
Cartesian area in WGS84 (Pseudo-Mercator) with units of meters squared is used. An SRID can be specified to
transform the input values and compute areas with specific units of measure.

Args:
column: Column to check; can be a string column name or a column expression
value: Value to use in the condition as number, column name or sql expression
srid: Optional integer SRID to use for computing the area of the geometry or geography value (default `None`).
If an SRID is provided, the input value is translated and area is calculated using the units of measure of
the specified coordinate reference system (e.g. meters squared for `srid=3857`).
geodesic: Whether to use the 2D geodesic area (default `False`).

Returns:
Column object indicating whether the area the geometries in the input column are not equal to the provided value

Note:
This function requires Databricks serverless compute or runtime 17.1 or above.
"""
return _compare_sql_function_result(
column,
value,
spatial_function="st_area",
spatial_quantity_label="area",
spatial_quantity_name="area",
compare_op=py_operator.eq,
compare_op_label="equal to",
compare_op_name="equal_to",
srid=srid,
geodesic=geodesic,
)


@register_rule("row")
def has_area_less_than(
column: str | Column, value: int | float | str | Column, srid: int | None = 3857, geodesic: bool = False
) -> Column:
"""
Checks if the areas of values in a geometry column are not greater than a specified limit. By default, the 2D
Cartesian area in WGS84 (Pseudo-Mercator) with units of meters squared is used. An SRID can be specified to
transform the input values and compute areas with specific units of measure.

Args:
column: Column to check; can be a string column name or a column expression
value: Value to use in the condition as number, column name or sql expression
srid: Optional integer SRID to use for computing the area of the geometry or geography value (default `None`).
If an SRID is provided, the input value is translated and area is calculated using the units of measure of
the specified coordinate reference system (e.g. meters squared for `srid=3857`).
geodesic: Whether to use the 2D geodesic area (default `False`).

Returns:
Column object indicating whether the area the geometries in the input column is greater than the provided value

Note:
This function requires Databricks serverless compute or runtime 17.1 or above.
"""
return _compare_sql_function_result(
column,
value,
spatial_function="st_area",
spatial_quantity_label="area",
spatial_quantity_name="area",
compare_op=py_operator.gt,
compare_op_label="greater than",
compare_op_name="greater_than",
srid=srid,
geodesic=geodesic,
)


@register_rule("row")
def has_area_greater_than(
column: str | Column, value: int | float | str | Column, srid: int | None = 3857, geodesic: bool = False
) -> Column:
"""
Checks if the areas of values in a geometry column are not less than a specified limit. By default, the 2D
Cartesian area in WGS84 (Pseudo-Mercator) with units of meters squared is used. An SRID can be specified to
transform the input values and compute areas with specific units of measure.

Args:
column: Column to check; can be a string column name or a column expression
value: Value to use in the condition as number, column name or sql expression
srid: Optional integer SRID to use for computing the area of the geometry or geography value (default `None`).
If an SRID is provided, the input value is translated and area is calculated using the units of measure of
the specified coordinate reference system (e.g. meters squared for `srid=3857`).
geodesic: Whether to use the 2D geodesic area (default `False`).

Returns:
Column object indicating whether the area the geometries in the input column is less than the provided value

Note:
This function requires Databricks serverless compute or runtime 17.1 or above.
"""
return _compare_sql_function_result(
column,
value,
spatial_function="st_area",
spatial_quantity_label="area",
spatial_quantity_name="area",
compare_op=py_operator.lt,
compare_op_label="less than",
compare_op_name="less_than",
srid=srid,
geodesic=geodesic,
)


@register_rule("row")
def has_num_points_equal_to(column: str | Column, value: int | float | str | Column) -> Column:
"""
Checks if the number of coordinate pairs in values of a geometry column is equal to a specified value.

Args:
column: Column to check; can be a string column name or a column expression
value: Value to use in the condition as number, column name or sql expression

Returns:
Column object indicating whether the number of coordinate pairs in the geometries of the input column is
equal to the provided value

Note:
This function requires Databricks serverless compute or runtime 17.1 or above.
"""
return _compare_sql_function_result(
column,
value,
spatial_function="st_npoints",
spatial_quantity_label="number of coordinates",
spatial_quantity_name="num_points",
compare_op=py_operator.ne,
compare_op_label="not equal to",
compare_op_name="not_equal_to",
)


@register_rule("row")
def has_num_points_not_equal_to(column: str | Column, value: int | float | str | Column) -> Column:
"""
Checks if the number of coordinate pairs in values of a geometry column is not equal to a specified value.

Args:
column: Column to check; can be a string column name or a column expression
value: Value to use in the condition as number, column name or sql expression

Returns:
Column object indicating whether the number of coordinate pairs in the geometries of the input column is not
equal to the provided value

Note:
This function requires Databricks serverless compute or runtime 17.1 or above.
"""
return _compare_sql_function_result(
column,
value,
spatial_function="st_npoints",
spatial_quantity_label="number of coordinates",
spatial_quantity_name="num_points",
compare_op=py_operator.eq,
compare_op_label="equal to",
compare_op_name="equal_to",
)


@register_rule("row")
def has_num_points_less_than(column: str | Column, value: int | float | str | Column) -> Column:
"""
Checks if the number of coordinate pairs in the values of a geometry column is not greater than a specified limit.

Args:
column: Column to check; can be a string column name or a column expression
value: Value to use in the condition as number, column name or sql expression

Returns:
Column object indicating whether the number of coordinate pairs in the geometries of the input column is
greater than the provided value

Note:
This function requires Databricks serverless compute or runtime 17.1 or above.
"""
return _compare_sql_function_result(
column,
value,
spatial_function="st_npoints",
spatial_quantity_label="number of coordinates",
spatial_quantity_name="num_points",
compare_op=py_operator.gt,
compare_op_label="greater than",
compare_op_name="greater_than",
)


@register_rule("row")
def has_num_points_greater_than(column: str | Column, value: int | float | str | Column) -> Column:
"""
Checks if the number of coordinate pairs in values of a geometry column is not less than a specified limit.

Args:
column: Column to check; can be a string column name or a column expression
value: Value to use in the condition as number, column name or sql expression

Returns:
Column object indicating whether the number of coordinate pairs in the geometries of the input column is
less than the provided value

Note:
This function requires Databricks serverless compute or runtime 17.1 or above.
"""
return _compare_sql_function_result(
column,
value,
spatial_function="st_npoints",
spatial_quantity_label="number of coordinates",
spatial_quantity_name="num_points",
compare_op=py_operator.lt,
compare_op_label="less than",
compare_op_name="less_than",
)


def _compare_sql_function_result(
column: str | Column,
value: int | float | str | Column,
spatial_function: str,
spatial_quantity_label: str,
spatial_quantity_name: str,
compare_op: Callable[[Column, Column], Column],
compare_op_label: str,
compare_op_name: str,
srid: int | None = None,
geodesic: bool = False,
) -> Column:
"""
Compares the results from applying a spatial SQL function (e.g. `st_area`) on a geometry column against a limit
using the specified comparison operator.

Args:
column: Column to check; can be a string column name or a column expression
value: Value to use in the condition as number, column name or sql expression
spatial_function: Spatial SQL function as a string (e.g. `st_npoints`)
spatial_quantity_label: Spatial quantity label (e.g. `number of coordinates` )
spatial_quantity_name: Spatial quantity identifier (e.g. `num_points`)
compare_op: Comparison operator (e.g., `operator.gt`, `operator.lt`).
compare_op_label: Human-readable label for the comparison (e.g., 'greater than').
compare_op_name: Name identifier for the comparison (e.g., 'greater_than').
srid: Optional integer SRID for computing measurements on the converted geometry or geography value (default `None`).
geodesic: Whether to convert the input column to a geography type for computing geodesic distances.

Returns:
Column object indicating whether the area the geometries in the input column is less than the provided limit

Note:
This function requires Databricks serverless compute or runtime 17.1 or above.
"""
col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column)
value_expr = _get_limit_expr(value)
# NOTE: This function is currently only available in Databricks runtime 17.1 or above or in
# Databricks SQL, due to the use of the `try_to_geometry` and `st_area` functions.
if geodesic:
spatial_conversion_expr = f"try_to_geography({col_str_norm})"
spatial_data_type = "geography"
elif srid:
spatial_conversion_expr = f"st_transform(st_setsrid(try_to_geometry({col_str_norm}), {DEFAULT_SRID}), {srid})"
spatial_data_type = "geometry"
else:
spatial_conversion_expr = f"try_to_geometry({col_str_norm})"
spatial_data_type = "geometry"

is_valid_cond = F.expr(f"{spatial_conversion_expr} IS NULL")
is_valid_message = F.concat_ws(
"",
F.lit("value `"),
col_expr.cast("string"),
F.lit(f"` in column `{col_expr_str}` is not a valid {spatial_data_type}"),
)
compare_cond = compare_op(F.expr(f"{spatial_function}({spatial_conversion_expr})"), value_expr)
compare_message = F.concat_ws(
"",
F.lit("value `"),
col_expr.cast("string"),
F.lit(f"` in column `{col_expr_str}` has {spatial_quantity_label} {compare_op_label} value: "),
value_expr.cast("string"),
)
condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(is_valid_cond | compare_cond)

return make_condition(
condition,
F.when(is_valid_cond, is_valid_message).otherwise(compare_message),
f"{col_str_norm}_{spatial_quantity_name}_{compare_op_name}_limit",
)
Loading