Skip to content

Commit f112109

Browse files
committed
remove unused code after removing usage of ReconIntermediatePersist
1 parent 91b1e98 commit f112109

File tree

6 files changed

+9
-61
lines changed

6 files changed

+9
-61
lines changed

src/databricks/labs/lakebridge/reconcile/compare.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
import logging
22
from functools import reduce
3-
from pyspark.sql import DataFrame, SparkSession
3+
from pyspark.sql import DataFrame
44
from pyspark.sql.functions import col, expr, lit
55

66
from databricks.labs.lakebridge.reconcile.connectors.dialect_utils import DialectUtils
77
from databricks.labs.lakebridge.reconcile.exception import ColumnMismatchException
8-
from databricks.labs.lakebridge.reconcile.recon_capture import (
9-
ReconIntermediatePersist,
10-
)
118
from databricks.labs.lakebridge.reconcile.recon_output_config import (
129
DataReconcileOutput,
1310
MismatchOutput,
@@ -58,8 +55,6 @@ def reconcile_data(
5855
target: DataFrame,
5956
key_columns: list[str],
6057
report_type: str,
61-
spark: SparkSession,
62-
path: str,
6358
) -> DataReconcileOutput:
6459
source_alias = "src"
6560
target_alias = "tgt"
@@ -414,13 +409,7 @@ def reconcile_agg_data_per_rule(
414409
return rule_reconcile_output
415410

416411

417-
def join_aggregate_data(
418-
source: DataFrame,
419-
target: DataFrame,
420-
key_columns: list[str] | None,
421-
spark: SparkSession,
422-
path: str,
423-
) -> DataFrame:
412+
def join_aggregate_data(source: DataFrame, target: DataFrame, key_columns: list[str] | None) -> DataFrame:
424413
# TODO: Integrate with reconcile_data function
425414

426415
source_alias = "src"

src/databricks/labs/lakebridge/reconcile/reconciliation.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
DatabaseConfig,
88
ReconcileMetadataConfig,
99
)
10-
from databricks.labs.lakebridge.reconcile import utils
1110
from databricks.labs.lakebridge.reconcile.compare import (
1211
capture_mismatch_data_and_columns,
1312
reconcile_data,
@@ -143,14 +142,11 @@ def _get_reconcile_output(
143142
options=table_conf.jdbc_reader_options,
144143
)
145144

146-
volume_path = utils.generate_volume_path(table_conf, self._metadata_config)
147145
return reconcile_data(
148146
source=src_data,
149147
target=tgt_data,
150148
key_columns=table_conf.join_columns,
151149
report_type=self._report_type,
152-
spark=self._spark,
153-
path=volume_path,
154150
)
155151

156152
def _get_reconcile_aggregate_output(
@@ -230,8 +226,6 @@ def _get_reconcile_aggregate_output(
230226
self._target,
231227
).build_queries()
232228

233-
volume_path = utils.generate_volume_path(table_conf, self._metadata_config)
234-
235229
table_agg_output: list[AggregateQueryOutput] = []
236230

237231
# Iterate over the grouped aggregates and reconcile the data
@@ -266,8 +260,6 @@ def _get_reconcile_aggregate_output(
266260
source=src_data,
267261
target=tgt_data,
268262
key_columns=src_query_with_rules.group_by_columns,
269-
spark=self._spark,
270-
path=f"{volume_path}{src_query_with_rules.group_by_columns_as_str}",
271263
)
272264
except DataSourceRuntimeException as e:
273265
data_source_exception = e

src/databricks/labs/lakebridge/reconcile/trigger_recon_aggregate_service.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@
44
from databricks.sdk import WorkspaceClient
55

66
from databricks.labs.lakebridge.config import ReconcileConfig, TableRecon
7-
from databricks.labs.lakebridge.reconcile import utils
87
from databricks.labs.lakebridge.reconcile.exception import DataSourceRuntimeException, ReconciliationException
98
from databricks.labs.lakebridge.reconcile.normalize_recon_config_service import NormalizeReconConfigService
109
from databricks.labs.lakebridge.reconcile.recon_capture import (
11-
ReconIntermediatePersist,
1210
generate_final_reconcile_aggregate_output,
1311
)
1412
from databricks.labs.lakebridge.reconcile.recon_config import AGG_RECONCILE_OPERATION_NAME

src/databricks/labs/lakebridge/reconcile/trigger_recon_service.py

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
from databricks.labs.lakebridge.reconcile.exception import DataSourceRuntimeException, ReconciliationException
1414
from databricks.labs.lakebridge.reconcile.recon_capture import (
1515
ReconCapture,
16-
ReconIntermediatePersist,
1716
generate_final_reconcile_output,
1817
)
1918
from databricks.labs.lakebridge.reconcile.recon_config import Table, Schema
@@ -48,7 +47,7 @@ def trigger_recon(
4847
)
4948

5049
for table_conf in table_recon.tables:
51-
TriggerReconService.recon_one(spark, reconciler, recon_capture, reconcile_config, table_conf)
50+
TriggerReconService.recon_one(reconciler, recon_capture, reconcile_config, table_conf)
5251

5352
return TriggerReconService.verify_successful_reconciliation(
5453
generate_final_reconcile_output(
@@ -105,7 +104,6 @@ def create_recon_dependencies(
105104

106105
@staticmethod
107106
def recon_one(
108-
spark: SparkSession,
109107
reconciler: Reconciliation,
110108
recon_capture: ReconCapture,
111109
reconcile_config: ReconcileConfig,
@@ -119,15 +117,12 @@ def recon_one(
119117
reconciler, reconcile_config, normalized_table_conf
120118
)
121119

122-
TriggerReconService.persist_delta_table(
123-
spark,
124-
reconciler,
125-
recon_capture,
126-
schema_reconcile_output,
127-
data_reconcile_output,
128-
reconcile_config,
129-
normalized_table_conf,
130-
recon_process_duration,
120+
recon_capture.start(
121+
data_reconcile_output=data_reconcile_output,
122+
schema_reconcile_output=schema_reconcile_output,
123+
table_conf=table_conf,
124+
recon_process_duration=recon_process_duration,
125+
record_count=reconciler.get_record_count(table_conf, reconciler.report_type),
131126
)
132127

133128
return schema_reconcile_output, data_reconcile_output
@@ -214,25 +209,6 @@ def _run_reconcile_data(
214209
except DataSourceRuntimeException as e:
215210
return DataReconcileOutput(exception=str(e))
216211

217-
@staticmethod
218-
def persist_delta_table(
219-
spark: SparkSession,
220-
reconciler: Reconciliation,
221-
recon_capture: ReconCapture,
222-
schema_reconcile_output: SchemaReconcileOutput,
223-
data_reconcile_output: DataReconcileOutput,
224-
reconcile_config: ReconcileConfig,
225-
table_conf: Table,
226-
recon_process_duration: ReconcileProcessDuration,
227-
):
228-
recon_capture.start(
229-
data_reconcile_output=data_reconcile_output,
230-
schema_reconcile_output=schema_reconcile_output,
231-
table_conf=table_conf,
232-
recon_process_duration=recon_process_duration,
233-
record_count=reconciler.get_record_count(table_conf, reconciler.report_type),
234-
)
235-
236212
@staticmethod
237213
def verify_successful_reconciliation(
238214
reconcile_output: ReconcileOutput, operation_name: str = "reconcile"

tests/integration/reconcile/test_compare.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ def test_compare_data_for_report_all(
4444
target=target,
4545
key_columns=["s_suppkey", "s_nationkey"],
4646
report_type="all",
47-
spark=mock_spark,
48-
path=str(tmp_path),
4947
)
5048
expected = DataReconcileOutput(
5149
mismatch_count=1,
@@ -97,8 +95,6 @@ def test_compare_data_for_report_hash(mock_spark, tmp_path: Path):
9795
target=target,
9896
key_columns=["s_suppkey", "s_nationkey"],
9997
report_type="hash",
100-
spark=mock_spark,
101-
path=str(tmp_path),
10298
)
10399
expected = DataReconcileOutput(
104100
missing_in_src=missing_in_src,
@@ -281,8 +277,6 @@ def test_compare_data_special_column_names(mock_spark, tmp_path: Path):
281277
target=target,
282278
key_columns=["`s``supp#`", "`s_nation#`"],
283279
report_type="all",
284-
spark=mock_spark,
285-
path=str(tmp_path),
286280
)
287281
expected = DataReconcileOutput(
288282
mismatch_count=1,

tests/integration/reconcile/test_oracle_reconcile.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ def test_oracle_db_reconcile(mock_spark, mock_workspace_client, tmp_path):
7676
)
7777
with patch("databricks.labs.lakebridge.reconcile.utils.generate_volume_path", return_value=str(tmp_path)):
7878
_, data_reconcile_output = TriggerReconService.recon_one(
79-
spark=mock_spark,
8079
reconciler=recon,
8180
recon_capture=recon_capture,
8281
reconcile_config=reconcile_config,

0 commit comments

Comments
 (0)