Skip to content

Commit 7d48f1a

Browse files
committed
add tests for ReconIntermediatePersist
1 parent e67e646 commit 7d48f1a

File tree

2 files changed

+63
-6
lines changed

2 files changed

+63
-6
lines changed

src/databricks/labs/lakebridge/reconcile/recon_capture.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ def write_and_read_df_with_volumes(
5454
class ReconIntermediatePersist(AbstractReconIntermediatePersist):
5555
def __init__(self, spark: SparkSession, metadata_config: ReconcileMetadataConfig):
5656
self._spark = spark
57+
self._metadata_config = metadata_config
5758
self._format = "delta" if self._is_databricks else "parquet"
58-
self._base_dir = self._get_uc_volume_path(metadata_config) if self._is_databricks else tempfile.gettempdir()
59+
self._base_dir = self._get_uc_volume_path if self._is_databricks else tempfile.gettempdir()
5960

6061
@cached_property
6162
def _is_databricks(self) -> bool:
@@ -65,11 +66,14 @@ def _is_databricks(self) -> bool:
6566
def base_dir(self) -> Path:
6667
return Path(self._base_dir)
6768

68-
@classmethod
69-
def _get_uc_volume_path(cls, metadata_config: ReconcileMetadataConfig):
70-
catalog = metadata_config.catalog
71-
schema = metadata_config.schema
72-
return f"/Volumes/{catalog}/{schema}/{metadata_config.volume}"
69+
@property
70+
def _get_uc_volume_path(self):
71+
return (
72+
f"/Volumes/"
73+
f"{self._metadata_config.catalog}/"
74+
f"{self._metadata_config.schema}/"
75+
f"{self._metadata_config.volume}"
76+
)
7377

7478
def _write_df_to_volumes(self, df: DataFrame, path: str) -> None:
7579
df.write.format(self._format).save(path)

tests/integration/reconcile/test_recon_capture.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from databricks.labs.lakebridge.reconcile.recon_capture import (
1313
ReconCapture,
1414
generate_final_reconcile_output,
15+
ReconIntermediatePersist,
1516
)
1617
from databricks.labs.lakebridge.reconcile.recon_output_config import (
1718
DataReconcileOutput,
@@ -990,3 +991,55 @@ def test_apply_threshold_for_only_threshold_mismatch_with_true_absolute(
990991
remorph_recon_metrics_df = spark.sql(f"select * from {recon_metadata.schema}.metrics")
991992
row = remorph_recon_metrics_df.collect()[0]
992993
assert row.run_metrics.status is True
994+
995+
996+
class ReconIntermediatePersistUnderTest(ReconIntermediatePersist):
997+
@property
998+
def is_databricks(self) -> bool:
999+
return self._is_databricks
1000+
1001+
@property
1002+
def format(self):
1003+
return self._format
1004+
1005+
1006+
def test_is_databricks_false(mock_spark):
1007+
conf = ReconcileMetadataConfig()
1008+
persist = ReconIntermediatePersistUnderTest(mock_spark, conf)
1009+
1010+
assert persist.is_databricks is False
1011+
1012+
1013+
def test_is_databricks_true(spark):
1014+
conf = ReconcileMetadataConfig()
1015+
persist = ReconIntermediatePersistUnderTest(spark, conf)
1016+
1017+
assert persist.is_databricks is True
1018+
1019+
1020+
def test_dir_uses_tempfile(mock_spark):
1021+
conf = ReconcileMetadataConfig()
1022+
persist = ReconIntermediatePersistUnderTest(mock_spark, conf)
1023+
1024+
assert str(persist.base_dir).startswith("/tmp/")
1025+
1026+
1027+
def test_format_uses_uc(spark):
1028+
conf = ReconcileMetadataConfig()
1029+
persist = ReconIntermediatePersistUnderTest(spark, conf)
1030+
1031+
assert str(persist.base_dir).startswith("/Volumes/")
1032+
1033+
1034+
def test_format_uses_parquet(mock_spark):
1035+
conf = ReconcileMetadataConfig()
1036+
persist = ReconIntermediatePersistUnderTest(mock_spark, conf)
1037+
1038+
assert persist.format == "parquet"
1039+
1040+
1041+
def test_format_uses_delta(spark):
1042+
conf = ReconcileMetadataConfig()
1043+
persist = ReconIntermediatePersistUnderTest(spark, conf)
1044+
1045+
assert persist.format == "delta"

0 commit comments

Comments
 (0)