-
Notifications
You must be signed in to change notification settings - Fork 35
Load and save checks from a Delta table #339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✅ 149/149 passed, 1 flaky, 1 skipped, 1h13m36s total Flaky tests:
Running from acceptance #724 |
src/databricks/labs/dqx/engine.py
Outdated
|
||
@staticmethod | ||
def _load_checks_from_table( | ||
table_name: str, query: str | None = None, spark: SparkSession | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need the query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case users want to filter checks on-load (e.g. where criticality <> "warning"
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it would be more useful to be able to load a set of checks for particular use case / run config. We already have the concept of run config. I would use run_config_name
instead of query
when saving and loading. By default use default
run config.
I would not make it too flexible because it increases usage complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed query
. We can add run_config_name
if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces the ability to persist data quality checks to and from Delta tables, enhances the core engine with DataFrame-based serialization/deserialization, and updates configuration and tests to support a checks_table
.
- Added
save_checks_in_dataframe
,load_checks_from_dataframe
,save_checks_in_table
, andload_checks_from_table
inDQEngineCore
/DQEngine
. - Extended
RunConfig
with achecks_table
setting and updated integration fixtures. - Bumped linter
max-attributes
to accommodate the new config field.
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
tests/unit/test_load_checks_from_dataframe.py | Unit tests for load_checks_from_dataframe , including warning. |
tests/integration/test_load_checks_from_table.py | Integration tests for loading checks from a Delta table. |
tests/integration/conftest.py | Updated MockInstallationContext to inject checks_table . |
src/databricks/labs/dqx/engine.py | Implemented DataFrame/table save/load methods and warning logic. |
src/databricks/labs/dqx/config.py | Added checks_table field to RunConfig . |
pyproject.toml | Increased max-attributes limit to 16. |
src/databricks/labs/dqx/engine.py
Outdated
|
||
@staticmethod | ||
def _load_checks_from_table( | ||
table_name: str, query: str | None = None, spark: SparkSession | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it would be more useful to be able to load a set of checks for particular use case / run config. We already have the concept of run config. I would use run_config_name
instead of query
when saving and loading. By default use default
run config.
I would not make it too flexible because it increases usage complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds Delta table persistence for data quality checks by introducing DataFrame conversion utilities and new DQEngine methods for saving/loading to tables.
- Implemented
build_dataframe_from_quality_rules
andbuild_quality_rules_from_dataframe
inDQEngineCore
- Added
save_checks_in_table
andload_checks_from_table
in bothDQEngineCore
andDQEngine
- Updated configuration and tests to support table-based check persistence
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
src/databricks/labs/dqx/engine.py | Added build/save/load methods for DataFrame and Delta table persistence of checks |
src/databricks/labs/dqx/config.py | Introduced checks_table field in RunConfig |
tests/unit/test_build_rules.py | Added unit test for DataFrame-based quality rules build |
tests/integration/test_load_checks_from_table.py | Added integration tests for loading checks from a Delta table |
tests/integration/conftest.py | Adjusted installation_ctx fixture signature formatting |
pyproject.toml | Increased max-attributes limit to accommodate new methods |
Comments suppressed due to low confidence (1)
tests/unit/test_build_rules.py:446
- Test covers basic DataFrame conversion but does not verify behavior when a
filter
column is present or when a large-DataFrame warning is emitted. Consider adding tests for those scenarios.
def test_build_quality_rules_from_dataframe(spark_local):
* `filter` - Expression for filtering data quality checks | ||
:return: list of data quality check specifications as a Python dictionary | ||
""" | ||
num_check_rows = df.count() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling df.count() before df.collect() triggers two separate Spark jobs. Consider collecting rows once (e.g., rows = df.collect()) and using len(rows) to avoid the extra scan.
num_check_rows = df.count() | |
rows = df.collect() | |
num_check_rows = len(rows) |
Copilot uses AI. Check for mistakes.
* `name` - name that will be given to a resulting column. Autogenerated if not provided | ||
* `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe), | ||
and `warn` (data is going into both dataframes) | ||
* `check_function` - DQX check function used in the check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring refers to a check_function
column, but the actual DataFrame schema uses a check
column. Update the description to match the implementation.
* `check_function` - DQX check function used in the check | |
* `check` - DQX check function used in the check |
Copilot uses AI. Check for mistakes.
if spark is None: | ||
spark = SparkSession.builder.getOrCreate() | ||
rules_df = spark.read.table(table_name) | ||
return DQEngineCore.build_quality_rules_from_dataframe(rules_df) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] This static method duplicates functionality in DQEngineCore
. Consider consolidating with the core implementation to reduce code duplication.
if spark is None: | |
spark = SparkSession.builder.getOrCreate() | |
rules_df = spark.read.table(table_name) | |
return DQEngineCore.build_quality_rules_from_dataframe(rules_df) | |
""" | |
Load checks from a Delta table in the workspace. | |
:param table_name: Unity catalog or Hive metastore table name | |
:param spark: Optional SparkSession instance | |
:return: List of quality rules (checks) | |
""" | |
return DQEngineCore.load_checks_from_table(table_name, spark) |
Copilot uses AI. Check for mistakes.
Changes
Added the following methods to the
DQEngine
class to save and load checks to a Delta table in a Databricks workspace:save_checks_in_dataframe
load_checks_from_dataframe
save_checks_in_table
load_checks_from_table
Linked issues
Resolves #299
Tests
Added unit and integration tests.