v0.7.0
- Added end-to-end quality checking methods (#364). The library now includes end-to-end quality checking methods, allowing users to read data from a table or view, apply checks, and write the results to a table. The
DQEngineclass has been updated to utilizeInputConfigandOutputConfigobjects to handle input and output configurations, providing more flexibility in the quality checking flow. Theapply_checks_and_write_to_tableandapply_checks_by_metadata_and_write_to_tablemethods have been introduced to support this functionality, applying checks using DQX classes and configuration, respectively. Additionally, the profiler configuration options have been reorganized intoinput_configandprofiler_configsections, making it easier to understand and customize the profiling process. The changes aim to provide a more streamlined and efficient way to perform end-to-end quality checking and data validation, with improved configuration flexibility and readability. - Added equality checks for aggregate values and negate option for Foreign Key (#387). The library now includes two new checks,
is_aggr_equalandis_aggr_not_equal, which enable users to perform equality checks on aggregate values, such as count, sum, average, minimum, and maximum, allowing verification that an aggregation on a column or group of columns is equal to or not equal to a specified limit. These checks can be configured with a criticality level of eithererrororwarnand can be applied to specific columns or groups of columns. Additionally, theforeign_keycheck has been updated with anegateoption, allowing the condition to be negated so that the check fails when the foreign key values exist in the reference dataframe or table, rather than when they do not exist. This expanded functionality enhances the library's data quality checking capabilities, providing more flexibility and power in validating data integrity. - Extend options for profiling multiple tables (#420). The profiler now supports wildcard patterns for profiling multiple tables, replacing the previous regex pattern support, and options can be passed as a list of dictionaries to apply different options to each table based on pattern matching. The profiler job is setup now with IO cache enabled cluste.
- Improved quick demo to showcase defining checks using DQX classes and renamed DLT into Lakeflow Pipeline in docs (#399).
DLThas been renamed toLakeflow Pipelinein documentation and docstrings, to maintain consistency in terminology. The quick demo has been enhanced to showcase defining checks using DQX classes, providing a more comprehensive approach to data quality validation. Additionally, performance information related to dataset-level checks has been added to the documentation, and instructions on how to use the Environment to install DQX in Lakeflow Pipelines have been provided. - Populate columns in the results from kwargs of the check if provided (#416). Additionally, the
sql_expressionnow supports optionalcolumnsargument that is propagated to the results.
BREAKING CHANGES!
- Only users using
config.ymlare affected. The breaking change is for the required field:output_locationwhich is now stored insideinput_config.location. - The structure of the config.yml has been improved to include configs for input, output and quarantine data instead of specifying the fields directly. New fields related to profiler cluster and streaming has also been included as well. The installer has been updated accordingly.
Full example:
log_level: INFO
version: 1
profiler_override_clusters: # <- optional dictionary mapping job cluster names to existing cluster IDs
main: your-existing-cluster-id # <- existing cluster Id to use
profiler_spark_conf: # <- optional spark configuration to use for the profiler job
spark.sql.ansi.enabled: true
run_configs:
- name: default # <- unique name of the run config (default used during installation)
input_config: # <- optional input data configuration
location: s3://iot-ingest/raw # <- input location of the data (table or cloud path)
format: delta # <- format, required if cloud path provided
is_streaming: false # <- whether the input data should be read using streaming (default is false)
schema: col1 int, col2 string # <- schema of the input data (optional), applicable if reading csv and json files
options: # <- additional options for reading from the input location (optional)
versionAsOf: '0'
output_config: # <- output data configuration
location: main.iot.silver # <- output location (table), used as input for quality dashboard ir quarantine locaiton is not provided
format: delta # <- format of the output table
mode: append # <- write mode for the output table (append or overwrite)
options: # <- additional options for writing to the output table (optional)
mergeSchema: 'true'
#checkpointLocation: /Volumes/catalog1/schema1/checkpoint # <- only applicable if input_config.is_streaming is enabled
trigger: # <- streaming trigger, only applicable if input_config.is_streaming is enabled
availableNow: true
quarantine_config: # <- quarantine data configuration, if specified, bad data is written to quarantine table
location: main.iot.silver_quarantine # <- quarantine location (table), used as input for quality dashboard
format: delta # <- format of the quarantine table
mode: append # <- write mode for the quarantine table (append or overwrite)
options: # <- additional options for writing to the quarantine table (optional)
mergeSchema: 'true'
#checkpointLocation: /Volumes/catalog1/schema1/checkpoint # <- only applicable if input_config.is_streaming is enabled
trigger: # <- streaming trigger, only applicable if input_config.is_streaming is enabled
availableNow: true
checks_file: iot_checks.yml # <- relative location of the quality rules (checks) defined in json or yaml file
checks_table: main.iot.checks # <- table storing the quality rules (checks)
profiler_config: # <- profiler configuration
summary_stats_file: iot_summary_stats.yml # <- relative location of profiling summary stats
sample_fraction: 0.3 # <- fraction of data to sample in the profiler (30%)
sample_seed: 30 # <- optional seed for reproducible sampling
limit: 1000 # <- limit the number of records to profile
warehouse_id: your-warehouse-id # <- warehouse id for refreshing dashboard
- name: another_run_config # <- unique name of the run config
...Contributors: @mwojtyczka, @ghanse