diff --git a/demos/dqx_demo_library.py b/demos/dqx_demo_library.py index e9922749..b069c649 100644 --- a/demos/dqx_demo_library.py +++ b/demos/dqx_demo_library.py @@ -62,12 +62,15 @@ user_name = spark.sql("select current_user() as user").collect()[0]["user"] checks_file = f"/Workspace/Users/{user_name}/dqx_demo_checks.yml" dq_engine = DQEngine(ws) -dq_engine.save_checks_in_workspace_file(checks, workspace_path=checks_file) +dq_engine.save_checks_in_workspace_file(checks=checks, workspace_path=checks_file) + +# save generated checks in a Delta table +dq_engine.save_checks_in_table(checks=checks, table_name="main.default.dqx_checks_table", mode="overwrite") # COMMAND ---------- # MAGIC %md -# MAGIC ## Loading and applying quality checks +# MAGIC ## Loading and applying quality checks from a file # COMMAND ---------- @@ -76,7 +79,7 @@ input_df = spark.createDataFrame([[1, 3, 3, 2], [3, 3, None, 1]], schema) -# load checks +# load checks from a file dq_engine = DQEngine(WorkspaceClient()) checks = dq_engine.load_checks_from_workspace_file(workspace_path=checks_file) @@ -91,6 +94,31 @@ # COMMAND ---------- +# MAGIC %md +# MAGIC ## Loading and applying quality checks from a Delta table + +# COMMAND ---------- + +from databricks.labs.dqx.engine import DQEngine +from databricks.sdk import WorkspaceClient + +input_df = spark.createDataFrame([[1, 3, 3, 2], [3, 3, None, 1]], schema) + +# load checks from a Delta table +dq_engine = DQEngine(WorkspaceClient()) +checks = dq_engine.load_checks_from_table(table_name="main.default.dqx_checks_table") + +# Option 1: apply quality rules and quarantine invalid records +valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks) +display(valid_df) +display(quarantined_df) + +# Option 2: apply quality rules and flag invalid records as additional columns (`_warning` and `_error`) +valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks) +display(valid_and_quarantined_df) + +# COMMAND ---------- + # MAGIC %md # MAGIC ## Validating syntax of quality checks defined in yaml diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index d06bea29..947e14cd 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -160,17 +160,41 @@ Fields: - `criticality`: either "error" (data going only into "bad/quarantine" dataframe) or "warn" (data going into both "good" and "bad" dataframes). If not provided, the default is "error". - `check`: column expression containing "function" (check function to apply), "arguments" (check function arguments), and "col_name" (column name as `str` or sql expression the check will be applied for) or "col_names" (column names as `array` the check will be applied for). - (optional) `name` for the check: autogenerated if not provided. +- (optional) `filter` to filter the rows for which the check is applied (e.g. `"business_unit = 'Finance'"`) + +### Quality rules configured in a Delta table + +Quality rules can also be stored in a Delta table in Unity Catalog. Each row represents a check with column values for the `name`, `check`, `criticality`, `filter`, and `run_config_name`. + +```python +# The checks table will contain the following columns: +# +------------------+-----------------------+---------------------------------------+----------------------+--------------------+ +# | name | criticality | check | filter | run_config_name | +# +------------------+-----------------------+---------------------------------------+----------------------+--------------------+ +# | "city_is_null" | "warn" | {function: 'is_not_null', | "country = 'Poland'" | "default" | +# | | | arguments: {'col_name': 'city'}} | | | +# | ... | ... | ... | ... | ... | +# +------------------+-----------------------+---------------------------------------+----------------------+--------------------+ +``` +Fields: +- `criticality`: either "error" (data going only into "bad/quarantine" dataframe) or "warn" (data going into both "good" and "bad" dataframes). If not provided, the default is "error". +- `check`: a `StructType` value with the following fields: + - `function`: Name of the DQX check function to apply + - `arguments`: A `MapType` value with the function's keyword arguments as key-value pairs +- (optional) `name`: Name to use for the check +- (optional) `filter`: Spark expression to filter the rows for which the check is applied (e.g. `"business_unit = 'Finance'"`) +- `run_config_name`: A run or workflow name. Can be used to load and apply a subset of checks to specific workflows. Default value is `"default"`. ### Loading and execution methods -Checks can be loaded from a file in the installation folder, workspace, or local file system. The engine will raise an error if the checks file contains invalid JSON or YAML definition. +Checks can be loaded from a Delta table in Unity Catalog or from a file in the installation folder, workspace, or local file system. The engine will raise an error if the stored checks (either from a file or table) contain an invalid definition. -Checks loaded from a file can be applied using one of the following methods: +Checks loaded from a file or table can be applied using one of the following methods: * `apply_checks_by_metadata_and_split`: splits the input data into valid and invalid (quarantined) dataframes. * `apply_checks_by_metadata`: report issues as additional columns. Syntax of the loaded checks is validated automatically as part of these methods. -In addition, you can also perform a standalone syntax validation of the checks as described [here](#validating-syntax-of-quality-checks-defined-in-yamljson). +In addition, you can also perform a standalone syntax validation of the checks as described [here](#validating-syntax-of-quality-checks-defined-in-configuration). #### Method 1: Loading checks from a workspace file in the installation folder @@ -239,6 +263,34 @@ In addition, you can also perform a standalone syntax validation of the checks a +#### Method 4: Loading checks from a Delta table + + + + ```python + from databricks.labs.dqx.engine import DQEngine + from databricks.sdk import WorkspaceClient + + dq_engine = DQEngine(WorkspaceClient()) + + # Load all checks with the "default" `run_config_name`: + default_checks = dq_engine.load_checks_from_table("dq.config.checks_table") + + # Load checks with the "workflow_001" `run_config_name`: + workflow_checks = dq_engine.load_checks_from_table("dq.config.checks_table", "workflow_001") + + checks = default_checks + workflow_checks + input_df = spark.read.table("catalog1.schema1.table1") + + # Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes + valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks) + + # Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`) + valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks) + ``` + + + ### Quality rules defined in code #### Method 1: Using DQX classes @@ -427,9 +479,9 @@ The DQX integration with DLT does not use DLT Expectations but DQX's own methods -## Validating syntax of quality checks defined in yaml/json +## Validating syntax of quality checks defined in configuration -You can validate the syntax of checks defined as metadata in `yaml` or `json` format before applying them. This validation ensures that the checks are correctly defined and can be interpreted by the DQX engine. +You can validate the syntax of checks defined as metadata in a Delta table or file (either `yaml` or `json`) before applying them. This validation ensures that the checks are correctly defined and can be interpreted by the DQX engine. The validation cannot be used for checks defined using [DQX classes](#method-1-using-dqx-classes). When checks are defined with DQX classes, syntax validation is unnecessary because the application will fail to interpret them if the DQX objects are constructed incorrectly. diff --git a/pyproject.toml b/pyproject.toml index 2cfcfc61..d5c32d20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -413,7 +413,7 @@ max-args = 10 max-positional-arguments=10 # Maximum number of attributes for a class (see R0902). -max-attributes = 15 +max-attributes = 16 # Maximum number of boolean expressions in an if statement (see R0916). max-bool-expr = 5 diff --git a/src/databricks/labs/dqx/config.py b/src/databricks/labs/dqx/config.py index c59476f6..be3bfc79 100644 --- a/src/databricks/labs/dqx/config.py +++ b/src/databricks/labs/dqx/config.py @@ -17,6 +17,7 @@ class RunConfig: output_table: str | None = None # output data table quarantine_table: str | None = None # quarantined data table checks_file: str | None = "checks.yml" # file containing quality rules / checks + checks_table: str | None = None # table containing quality rules / checks profile_summary_stats_file: str | None = "profile_summary_stats.yml" # file containing profile summary statistics override_clusters: dict[str, str] | None = None # cluster configuration for jobs spark_conf: dict[str, str] | None = None # extra spark configs diff --git a/src/databricks/labs/dqx/engine.py b/src/databricks/labs/dqx/engine.py index b9d3c222..f7ae4072 100644 --- a/src/databricks/labs/dqx/engine.py +++ b/src/databricks/labs/dqx/engine.py @@ -3,12 +3,13 @@ import functools as ft import inspect import itertools +import warnings from pathlib import Path from collections.abc import Callable from typing import Any import yaml import pyspark.sql.functions as F -from pyspark.sql import DataFrame +from pyspark.sql import DataFrame, SparkSession from databricks.labs.blueprint.installation import Installation from databricks.labs.dqx import row_checks @@ -30,6 +31,7 @@ from databricks.sdk import WorkspaceClient logger = logging.getLogger(__name__) +COLLECT_LIMIT_WARNING = 500 class DQEngineCore(DQEngineCoreBase): @@ -142,6 +144,72 @@ def save_checks_in_local_file(checks: list[dict], filepath: str): msg = f"Checks file {filepath} missing" raise FileNotFoundError(msg) from None + @staticmethod + def build_quality_rules_from_dataframe(df: DataFrame, run_config_name: str = "default") -> list[dict]: + """Build checks from a Spark DataFrame based on check specifications, i.e. function name plus arguments. + + :param df: Spark DataFrame with data quality check rules. Each row should define a check. Rows should + have the following columns: + * `name` - Name that will be given to a resulting column. Autogenerated if not provided + * `criticality` (optional) - Possible values are `error` (data going only into "bad" dataframe) and `warn` (data is going into both dataframes) + * `check` - DQX check function used in the check; A `StructType` column defining the data quality check + * `filter` - Expression for filtering data quality checks + * `run_config_name` (optional) - Run configuration name for storing checks across runs + :param run_config_name: Run configuration name for filtering quality rules + :return: List of data quality check specifications as a Python dictionary + """ + check_rows = df.where(f"run_config_name = '{run_config_name}'").collect() + if len(check_rows) > COLLECT_LIMIT_WARNING: + warnings.warn( + f"Collecting large number of rows from Spark DataFrame: {len(check_rows)}", + category=UserWarning, + stacklevel=2, + ) + checks = [] + for row in check_rows: + check = {"name": row.name, "criticality": row.criticality, "check": row.check.asDict()} + if row.filter is not None: + check["filter"] = row.filter + checks.append(check) + return checks + + @staticmethod + def build_dataframe_from_quality_rules( + checks: list[dict], run_config_name: str = "default", spark: SparkSession | None = None + ) -> DataFrame: + """Build a Spark DataFrame from a set of check specifications, i.e. function name plus arguments. + + :param checks: list of check specifications as Python dictionaries. Each check consists of the following fields: + * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true (it will be used as an error/warning message) or `null` if it's evaluated to `false` + * `name` - Name that will be given to a resulting column. Autogenerated if not provided + * `criticality` (optional) - Possible values are `error` (data going only into "bad" dataframe) and `warn` (data is going into both dataframes) + * `filter` (optional) - Expression for filtering data quality checks + :param run_config_name: Run configuration name for storing quality checks across runs + :param spark: SparkSession to use for DataFrame operations + :return: Spark DataFrame with data quality check rules + """ + if spark is None: + spark = SparkSession.builder.getOrCreate() + schema = "name STRING, criticality STRING, check STRUCT>, filter STRING, run_config_name STRING" + dq_rule_checks = DQEngineCore.build_checks_by_metadata(checks) + dq_rule_rows = [] + for dq_rule_check in dq_rule_checks: + arguments = dq_rule_check.check_func_kwargs + if isinstance(dq_rule_check, DQColSetRule): + arguments["col_names"] = dq_rule_check.columns + if isinstance(dq_rule_check, DQColRule): + arguments["col_name"] = dq_rule_check.col_name + dq_rule_rows.append( + [ + dq_rule_check.name, + dq_rule_check.criticality, + {"function": dq_rule_check.check_func.__name__, "arguments": arguments}, + dq_rule_check.filter, + run_config_name, + ] + ) + return spark.createDataFrame(dq_rule_rows, schema) + @staticmethod def build_checks_by_metadata(checks: list[dict], custom_checks: dict[str, Any] | None = None) -> list[DQColRule]: """Build checks based on check specification, i.e. function name plus arguments. @@ -590,6 +658,23 @@ def load_checks_from_installation( raise ValueError(f"Invalid or no checks in workspace file: {installation.install_folder()}/{filename}") return parsed_checks + def load_checks_from_table( + self, table_name: str, run_config_name: str = "default", spark: SparkSession | None = None + ) -> list[dict]: + """ + Load checks (dq rules) from a Delta table in the workspace. + :param table_name: Unity catalog or Hive metastore table name + :param run_config_name: Run configuration name for filtering checks + :param spark: Optional SparkSession + :return: List of dq rules or raise an error if checks file is missing or is invalid. + """ + logger.info(f"Loading quality rules (checks) from table {table_name}") + if not self.ws.tables.exists(table_name).table_exists: + raise NotFound(f"Table {table_name} does not exist in the workspace") + if spark is None: + spark = SparkSession.builder.getOrCreate() + return DQEngine._load_checks_from_table(table_name, run_config_name, spark) + @staticmethod def save_checks_in_local_file(checks: list[dict], path: str): return DQEngineCore.save_checks_in_local_file(checks, path) @@ -633,6 +718,20 @@ def save_checks_in_workspace_file(self, checks: list[dict], workspace_path: str) workspace_path, yaml.safe_dump(checks).encode('utf-8'), format=ImportFormat.AUTO, overwrite=True ) + @staticmethod + def save_checks_in_table( + checks: list[dict], table_name: str, run_config_name: str = "default", mode: str = "append" + ): + """ + Save checks to a Delta table in the workspace. + :param checks: list of dq rules to save + :param table_name: Unity catalog or Hive metastore table name + :param run_config_name: Run configuration name for identifying groups of checks + :param mode: Output mode for writing checks to Delta (e.g. `append` or `overwrite`) + """ + logger.info(f"Saving quality rules (checks) to table {table_name}") + DQEngine._save_checks_in_table(checks, table_name, run_config_name, mode) + def load_run_config( self, run_config_name: str | None = "default", assume_user: bool = True, product_name: str = "dqx" ) -> RunConfig: @@ -670,3 +769,15 @@ def _load_checks_from_file(installation: Installation, filename: str) -> list[di except NotFound: msg = f"Checks file {filename} missing" raise NotFound(msg) from None + + @staticmethod + def _load_checks_from_table(table_name: str, run_config_name: str, spark: SparkSession | None = None) -> list[dict]: + if spark is None: + spark = SparkSession.builder.getOrCreate() + rules_df = spark.read.table(table_name) + return DQEngineCore.build_quality_rules_from_dataframe(rules_df, run_config_name=run_config_name) + + @staticmethod + def _save_checks_in_table(checks: list[dict], table_name: str, run_config_name: str, mode: str): + rules_df = DQEngineCore.build_dataframe_from_quality_rules(checks, run_config_name=run_config_name) + rules_df.write.saveAsTable(table_name, mode=mode) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index a21ddbff..a995bc84 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -80,12 +80,7 @@ def config(self) -> WorkspaceConfig: class MockInstallationContext(MockRuntimeContext): __test__ = False - def __init__( - self, - env_or_skip_fixture, - ws, - check_file, - ): + def __init__(self, env_or_skip_fixture, ws, check_file): super().__init__(env_or_skip_fixture, ws) self.check_file = check_file @@ -170,16 +165,8 @@ def workspace_installation(self) -> WorkspaceInstallation: @pytest.fixture -def installation_ctx( - ws, - env_or_skip, - check_file="checks.yml", -) -> Generator[MockInstallationContext, None, None]: - ctx = MockInstallationContext( - env_or_skip, - ws, - check_file, - ) +def installation_ctx(ws, env_or_skip, check_file="checks.yml") -> Generator[MockInstallationContext, None, None]: + ctx = MockInstallationContext(env_or_skip, ws, check_file) yield ctx.replace(workspace_client=ws) ctx.workspace_installation.uninstall() diff --git a/tests/integration/test_load_checks_from_table.py b/tests/integration/test_load_checks_from_table.py new file mode 100644 index 00000000..b834e732 --- /dev/null +++ b/tests/integration/test_load_checks_from_table.py @@ -0,0 +1,73 @@ +import pytest +from databricks.labs.dqx.engine import DQEngine +from databricks.sdk.errors import NotFound + + +TEST_CHECKS = [ + { + "name": "column_is_not_null", + "criticality": "error", + "check": {"function": "is_not_null", "arguments": {"col_name": "col_1"}}, + }, + { + "name": "column_not_less_than", + "criticality": "warn", + "check": {"function": "is_not_less_than", "arguments": {"col_name": "col_1", "limit": "0"}}, + }, +] + + +def test_load_checks_when_checks_table_does_not_exist(installation_ctx, make_schema, make_random, spark): + client = installation_ctx.workspace_client + catalog_name = "main" + schema_name = make_schema(catalog_name=catalog_name).name + table_name = f"{catalog_name}.{schema_name}.{make_random(6).lower()}" + + with pytest.raises(NotFound, match=f"Table {table_name} does not exist in the workspace"): + engine = DQEngine(client) + engine.load_checks_from_table(table_name, spark) + + +def test_load_checks_from_table(installation_ctx, make_schema, make_random, spark): + client = installation_ctx.workspace_client + catalog_name = "main" + schema_name = make_schema(catalog_name=catalog_name).name + table_name = f"{catalog_name}.{schema_name}.{make_random(6).lower()}" + + engine = DQEngine(client) + DQEngine.save_checks_in_table(TEST_CHECKS, table_name) + checks = engine.load_checks_from_table(table_name, spark=spark) + assert checks == TEST_CHECKS, "Checks were not loaded correctly." + + +def test_load_checks_from_table_with_run_config(installation_ctx, make_schema, make_random, spark): + client = installation_ctx.workspace_client + catalog_name = "main" + schema_name = make_schema(catalog_name=catalog_name).name + table_name = f"{catalog_name}.{schema_name}.{make_random(6).lower()}" + + engine = DQEngine(client) + run_config_name = "workflow_001" + DQEngine.save_checks_in_table(TEST_CHECKS[:1], table_name, run_config_name=run_config_name) + checks = engine.load_checks_from_table(table_name, run_config_name=run_config_name, spark=spark) + assert checks == TEST_CHECKS[:1], "Checks were not loaded correctly for workflow run config." + + DQEngine.save_checks_in_table(TEST_CHECKS[1:], table_name) + checks = engine.load_checks_from_table(table_name, spark=spark) + assert checks == TEST_CHECKS[1:], "Checks were not loaded correctly for default run config." + + +def test_save_checks_to_table_output_modes(installation_ctx, make_schema, make_random, spark): + client = installation_ctx.workspace_client + catalog_name = "main" + schema_name = make_schema(catalog_name=catalog_name).name + table_name = f"{catalog_name}.{schema_name}.{make_random(6).lower()}" + + engine = DQEngine(client) + engine.save_checks_in_table(TEST_CHECKS[:1], table_name, mode="append") + checks = engine.load_checks_from_table(table_name, spark=spark) + assert checks == TEST_CHECKS[:1], "Checks were not loaded correctly after appending." + + engine.save_checks_in_table(TEST_CHECKS[1:], table_name, mode="overwrite") + checks = engine.load_checks_from_table(table_name, spark=spark) + assert checks == TEST_CHECKS[1:], "Checks were not loaded correctly after overwriting." diff --git a/tests/unit/test_build_rules.py b/tests/unit/test_build_rules.py index 0d1d63f7..05a5bdb6 100644 --- a/tests/unit/test_build_rules.py +++ b/tests/unit/test_build_rules.py @@ -441,3 +441,58 @@ def test_deprecated_warning_dqrule_class(): def test_deprecated_warning_dqrulecolset_class(): with pytest.warns(DeprecationWarning, match="DQRuleColSet is deprecated and will be removed in a future version"): DQRuleColSet(criticality="error", check_func=is_not_null, columns=["col1"]) + + +def test_build_quality_rules_from_dataframe(spark_local): + test_checks = [ + { + "name": "column_is_not_null", + "criticality": "error", + "check": {"function": "is_not_null", "arguments": {"col_name": "test_col"}}, + }, + { + "name": "column_is_not_null_or_empty", + "criticality": "warn", + "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "test_col"}}, + }, + { + "name": "column_not_less_than", + "criticality": "warn", + "check": {"function": "is_not_less_than", "arguments": {"col_name": "test_col", "limit": "5"}}, + }, + ] + df = DQEngineCore.build_dataframe_from_quality_rules(test_checks, spark=spark_local) + checks = DQEngineCore.build_quality_rules_from_dataframe(df) + assert checks == test_checks, "The loaded checks do not match the expected checks." + + +def test_build_quality_rules_from_dataframe_with_run_config(spark_local): + + default_checks = [ + { + "name": "column_is_not_null", + "criticality": "error", + "check": {"function": "is_not_null", "arguments": {"col_name": "test_col"}}, + }, + { + "name": "column_is_not_null_or_empty", + "criticality": "warn", + "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "test_col"}}, + }, + ] + workflow_checks = [ + { + "name": "column_not_less_than", + "criticality": "warn", + "check": {"function": "is_not_less_than", "arguments": {"col_name": "test_col", "limit": "5"}}, + }, + ] + default_checks_df = DQEngineCore.build_dataframe_from_quality_rules(default_checks, spark=spark_local) + workflow_checks_df = DQEngineCore.build_dataframe_from_quality_rules( + workflow_checks, run_config_name="workflow_001", spark=spark_local + ) + df = default_checks_df.union(workflow_checks_df) + checks = DQEngineCore.build_quality_rules_from_dataframe(df, run_config_name="workflow_001") + assert checks == workflow_checks, "The loaded checks do not match the expected workflow checks." + checks = DQEngineCore.build_quality_rules_from_dataframe(df) + assert checks == default_checks, "The loaded checks do not match the expected default checks."