Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/databricks/labs/remorph/reconcile/recon_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ def _insert_into_metrics_table(
f"""
select {recon_table_id} as recon_table_id,
named_struct(
'source_record_count', cast({record_count.source} as bigint),
'target_record_count', cast({record_count.target} as bigint),
'row_comparison', case when '{self.report_type.lower()}' in ('all', 'row', 'data')
and '{exception_msg}' = '' then
named_struct(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ SELECT main.recon_id,
CONCAT(main.target_table.catalog, '.', main.target_table.schema, '.', main.target_table.table_name) AS target_table,
metrics.run_metrics.status AS status,
metrics.run_metrics.exception_message AS exception,
metrics.recon_metrics.source_record_count AS source_record_count,
metrics.recon_metrics.target_record_count AS target_record_count,
metrics.recon_metrics.row_comparison.missing_in_source AS missing_in_source,
metrics.recon_metrics.row_comparison.missing_in_target AS missing_in_target,
metrics.recon_metrics.column_comparison.absolute_mismatch AS absolute_mismatch,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
CREATE TABLE IF NOT EXISTS metrics (
recon_table_id BIGINT NOT NULL,
recon_metrics STRUCT<
source_record_count: BIGINT,
target_record_count: BIGINT,
row_comparison: STRUCT<
missing_in_source: BIGINT,
missing_in_target: BIGINT
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# pylint: disable=invalid-name

import logging

from databricks.labs.blueprint.installation import Installation
from databricks.sdk import WorkspaceClient

from databricks.labs.remorph.contexts.application import ApplicationContext
from databricks.labs.remorph.helpers import db_sql

logger = logging.getLogger(__name__)


def _upgrade_reconcile_metadata_metrics_table(
installation: Installation, ws: WorkspaceClient, app_context: ApplicationContext
):
reconcile_config = app_context.recon_config
assert reconcile_config, "Reconcile config must be present to upgrade the reconcile metadata metrics table"
table_name = "metrics"
table_identifier = (
f"{reconcile_config.metadata_config.catalog}.{reconcile_config.metadata_config.schema}.{table_name}"
)

sqls: list = [
f"ALTER TABLE {table_identifier} ADD COLUMN recon_metrics.source_record_count BIGINT",
f"ALTER TABLE {table_identifier} ADD COLUMN recon_metrics.target_record_count BIGINT",
]

for sql in sqls:
logger.debug(f"Executing SQL to upgrade metrics table fields: \n{sql}")
db_sql.get_sql_backend(ws).execute(sql)
installation.save(reconcile_config)
logger.debug("Upgraded Reconcile metrics table")


def upgrade(installation: Installation, ws: WorkspaceClient):
app_context = ApplicationContext(ws)
if app_context.recon_config is not None:
_upgrade_reconcile_metadata_metrics_table(installation, ws, app_context)
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ def report_tables_schema():
"recon_metrics",
StructType(
[
StructField("source_record_count", IntegerType()),
StructField("target_record_count", IntegerType()),
StructField(
"row_comparison",
StructType(
Expand Down
16 changes: 8 additions & 8 deletions tests/integration/reconcile/query_builder/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ def test_recon_for_report_type_is_data(
data=[
(
11111,
((1, 1), (1, 0, "s_address,s_phone"), None),
(3, 3, (1, 1), (1, 0, "s_address,s_phone"), None),
(False, "remorph", ""),
datetime(2024, 5, 23, 9, 21, 25, 122185),
)
Expand Down Expand Up @@ -975,7 +975,7 @@ def test_recon_for_report_type_schema(
schema=recon_schema,
)
expected_remorph_recon_metrics = mock_spark.createDataFrame(
data=[(22222, (None, None, True), (True, "remorph", ""), datetime(2024, 5, 23, 9, 21, 25, 122185))],
data=[(22222, (0, 0, None, None, True), (True, "remorph", ""), datetime(2024, 5, 23, 9, 21, 25, 122185))],
schema=metrics_schema,
)
expected_remorph_recon_details = mock_spark.createDataFrame(
Expand Down Expand Up @@ -1187,7 +1187,7 @@ def test_recon_for_report_type_all(
data=[
(
33333,
((1, 1), (1, 0, "s_address,s_phone"), False),
(3, 3, (1, 1), (1, 0, "s_address,s_phone"), False),
(False, "remorph", ""),
datetime(2024, 5, 23, 9, 21, 25, 122185),
)
Expand Down Expand Up @@ -1458,7 +1458,7 @@ def test_recon_for_report_type_is_row(
data=[
(
33333,
((2, 2), None, None),
(3, 3, (2, 2), None, None),
(False, "remorph", ""),
datetime(2024, 5, 23, 9, 21, 25, 122185),
)
Expand Down Expand Up @@ -1599,7 +1599,7 @@ def test_schema_recon_with_data_source_exception(
data=[
(
33333,
(None, None, None),
(0, 0, None, None, None),
(
False,
"remorph",
Expand Down Expand Up @@ -1669,7 +1669,7 @@ def test_schema_recon_with_general_exception(
data=[
(
33333,
(None, None, None),
(0, 0, None, None, None),
(
False,
"remorph",
Expand Down Expand Up @@ -1740,7 +1740,7 @@ def test_data_recon_with_general_exception(
data=[
(
33333,
(None, None, None),
(3, 3, None, None, None),
(
False,
"remorph",
Expand Down Expand Up @@ -1811,7 +1811,7 @@ def test_data_recon_with_source_exception(
data=[
(
33333,
(None, None, None),
(3, 3, None, None, None),
(
False,
"remorph",
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/reconcile/test_recon_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def test_recon_capture_start_snowflake_all(mock_workspace_client, mock_spark):
remorph_recon_metrics_df = spark.sql("select * from DEFAULT.metrics")
row = remorph_recon_metrics_df.collect()[0]
assert remorph_recon_metrics_df.count() == 1
assert row.recon_metrics.source_record_count == 5
assert row.recon_metrics.target_record_count == 5
assert row.recon_metrics.row_comparison.missing_in_source == 3
assert row.recon_metrics.row_comparison.missing_in_target == 4
assert row.recon_metrics.column_comparison.absolute_mismatch == 2
Expand Down
Loading