Skip to content

Add save_results_in_table function #319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/databricks/labs/dqx/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,38 @@ def save_checks_in_installation(
)
installation.upload(run_config.checks_file, yaml.safe_dump(checks).encode('utf-8'))

def save_results_in_table(
self,
quarantine_df: DataFrame | None = None,
output_df: DataFrame | None = None,
run_config_name: str | None = "default",
product_name: str = "dqx",
assume_user: bool = True,
):
"""
Save quarantine and output data to the `quarantine_table` and `output_table` mentioned in the installation config file.

:param quarantine_df: Dataframe containing the quarantine data
:param output_df: Dataframe containing the output data
:param run_config_name: name of the run (config) to use
:param product_name: name of the product/installation directory
:param assume_user: if True, assume user installation
"""
installation = self._get_installation(assume_user, product_name)
run_config = self._load_run_config(installation, run_config_name)

if quarantine_df is not None and run_config.quarantine_table:
logger.info(
f"Saving quarantine data to {run_config.quarantine_table} table"
)
quarantine_df.write.format("delta").mode("overwrite").saveAsTable(run_config.quarantine_table)

if output_df is not None and run_config.output_table:
logger.info(
f"Saving output data to {run_config.output_table} table"
)
output_df.write.format("delta").mode("append").saveAsTable(run_config.output_table)

def save_checks_in_workspace_file(self, checks: list[dict], workspace_path: str):
"""Save checks (dq rules) to yaml file in the workspace.
This does not require installation of DQX in the workspace.
Expand Down
76 changes: 76 additions & 0 deletions tests/integration/test_save_results_in_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import pytest
from unittest.mock import MagicMock, patch
from pyspark.sql import DataFrame
from databricks.labs.dqx.engine import DQEngine

@pytest.fixture
def mock_engine():
engine = DQEngine(MagicMock())
engine._get_installation = MagicMock()
engine._load_run_config = MagicMock()
return engine

@pytest.fixture
def mock_dataframes():
quarantine_df = MagicMock(spec=DataFrame)
output_df = MagicMock(spec=DataFrame)
return quarantine_df, output_df

def test_save_results_in_table_quarantine_only(mock_engine, mock_dataframes):
quarantine_df, _ = mock_dataframes
mock_run_config = MagicMock()
mock_run_config.quarantine_table = "quarantine_table"
mock_run_config.output_table = None
mock_engine._load_run_config.return_value = mock_run_config

mock_engine.save_results_in_table(quarantine_df=quarantine_df, output_df=None)

mock_engine._get_installation.assert_called_once_with(True, "dqx")
mock_engine._load_run_config.assert_called_once()
quarantine_df.write.format.assert_called_once_with("delta")
quarantine_df.write.format().mode.assert_called_once_with("overwrite")
quarantine_df.write.format().mode().saveAsTable.assert_called_once_with("quarantine_table")

def test_save_results_in_table_output_only(mock_engine, mock_dataframes):
_, output_df = mock_dataframes
mock_run_config = MagicMock()
mock_run_config.quarantine_table = None
mock_run_config.output_table = "output_table"
mock_engine._load_run_config.return_value = mock_run_config

mock_engine.save_results_in_table(quarantine_df=None, output_df=output_df)

mock_engine._get_installation.assert_called_once_with(True, "dqx")
mock_engine._load_run_config.assert_called_once()
output_df.write.format.assert_called_once_with("delta")
output_df.write.format().mode.assert_called_once_with("append")
output_df.write.format().mode().saveAsTable.assert_called_once_with("output_table")

def test_save_results_in_table_both(mock_engine, mock_dataframes):
quarantine_df, output_df = mock_dataframes
mock_run_config = MagicMock()
mock_run_config.quarantine_table = "quarantine_table"
mock_run_config.output_table = "output_table"
mock_engine._load_run_config.return_value = mock_run_config

mock_engine.save_results_in_table(quarantine_df=quarantine_df, output_df=output_df)

mock_engine._get_installation.assert_called_once_with(True, "dqx")
mock_engine._load_run_config.assert_called_once()
quarantine_df.write.format.assert_called_once_with("delta")
quarantine_df.write.format().mode.assert_called_once_with("overwrite")
quarantine_df.write.format().mode().saveAsTable.assert_called_once_with("quarantine_table")
output_df.write.format.assert_called_once_with("delta")
output_df.write.format().mode.assert_called_once_with("append")
output_df.write.format().mode().saveAsTable.assert_called_once_with("output_table")

def test_save_results_in_table_no_data(mock_engine):
mock_run_config = MagicMock()
mock_run_config.quarantine_table = None
mock_run_config.output_table = None
mock_engine._load_run_config.return_value = mock_run_config

mock_engine.save_results_in_table(quarantine_df=None, output_df=None)

mock_engine._get_installation.assert_called_once_with(True, "dqx")
mock_engine._load_run_config.assert_called_once()
Loading