Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
fe944f2
feat: fix serverless compute compatibility
BesikiML Jan 7, 2026
06d7f61
Fixed detection of running on serverless compute
BesikiML Jan 7, 2026
60c46d2
Merge branch 'main' into 1438-feature-remorph-reconcile-fails-to-run-…
BesikiML Jan 7, 2026
2ab2352
fix: remove pylint disable and use specific exceptions
BesikiML Jan 7, 2026
a049243
refactor: use getAll() instead of conf.get() for serverless detection
BesikiML Jan 7, 2026
78f2253
fixed dict issue
BesikiML Jan 7, 2026
0d416c1
fixed getAll call
BesikiML Jan 7, 2026
670940b
Added AnalysisException in the exept block
BesikiML Jan 7, 2026
8fc57b5
Optimised _is_serverless function
BesikiML Jan 7, 2026
d55fc20
Replace clusterType check with clusterNodeType which reliably
BesikiML Jan 9, 2026
bb37933
Merge branch 'main' into 1438-feature-remorph-reconcile-fails-to-run-…
BesikiML Jan 9, 2026
7c5b49c
Clead the code
BesikiML Jan 12, 2026
b82cb7d
Merge branch 'main' into 1438-feature-remorph-reconcile-fails-to-run-…
BesikiML Jan 12, 2026
909a6d8
Merge branch '1438-feature-remorph-reconcile-fails-to-run-on-serverle…
BesikiML Jan 12, 2026
8525599
Changed the loger info to debug
BesikiML Jan 12, 2026
50ff3f3
is_serverless changed as @cached_property
BesikiML Jan 13, 2026
e8f93bb
Added classify_spark_runtime method which classify spark, removed is_…
BesikiML Jan 16, 2026
8b0fac0
Formated the code
BesikiML Jan 16, 2026
ba2f944
Fixed df cache forserverless
BesikiML Jan 16, 2026
c831b3e
Merge branch 'main' into 1438-feature-remorph-reconcile-fails-to-run-…
BesikiML Jan 16, 2026
977b9a8
Added SparkRuntimeType
BesikiML Jan 20, 2026
b5531a4
Merge branch '1438-feature-remorph-reconcile-fails-to-run-on-serverle…
BesikiML Jan 20, 2026
e6d4d6a
Merge branch 'main' into 1438-feature-remorph-reconcile-fails-to-run-…
BesikiML Jan 20, 2026
7e3488a
Added SparkSession for spark param
BesikiML Jan 21, 2026
5960bdc
Merge branch '1438-feature-remorph-reconcile-fails-to-run-on-serverle…
BesikiML Jan 21, 2026
48af7d8
Merge branch 'main' into 1438-feature-remorph-reconcile-fails-to-run-…
BesikiML Jan 21, 2026
de49b61
Changed classify_spark_runtime to private _classify_spark_runtime and…
BesikiML Jan 22, 2026
c2f025e
Added # pylint: disable=import-private-name
BesikiML Jan 22, 2026
d322579
removed the test case
BesikiML Jan 22, 2026
df8ce7b
Merge branch 'main' into 1438-feature-remorph-reconcile-fails-to-run-…
BesikiML Jan 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/databricks/labs/lakebridge/reconcile/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from databricks.labs.lakebridge.reconcile.exception import ColumnMismatchException
from databricks.labs.lakebridge.reconcile.recon_capture import (
ReconIntermediatePersist,
cache_df_or_not,
)
from databricks.labs.lakebridge.reconcile.recon_output_config import (
DataReconcileOutput,
Expand Down Expand Up @@ -452,7 +453,8 @@ def join_aggregate_data(
joined_df = df.select(*normalized_joined_cols)

# Write the joined df to volume path
joined_volume_df = ReconIntermediatePersist(spark, path).write_and_read_unmatched_df_with_volumes(joined_df).cache()
joined_volume_df = ReconIntermediatePersist(spark, path).write_and_read_unmatched_df_with_volumes(joined_df)
joined_volume_df = cache_df_or_not(spark, joined_volume_df)
logger.warning(f"Unmatched data is written to {path} successfully")

return joined_volume_df
32 changes: 30 additions & 2 deletions src/databricks/labs/lakebridge/reconcile/recon_capture.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
from datetime import datetime
from functools import reduce
from functools import reduce, lru_cache
from typing import Literal

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, collect_list, create_map, lit
from pyspark.sql.types import StringType, StructField, StructType
from pyspark.errors import PySparkException
from pyspark.errors import PySparkException, PySparkAttributeError
from sqlglot import Dialect

from databricks.labs.lakebridge.config import DatabaseConfig, Table, ReconcileMetadataConfig
Expand Down Expand Up @@ -79,6 +80,33 @@ def write_and_read_unmatched_df_with_volumes(
raise ReadAndWriteWithVolumeException(message) from e


SparkRuntimeType = Literal["DATABRICKS_SERVERLESS", "CLASSIC", "SPARK_CONNECT", "NO_JVM_UNKNOWN"]


def _classify_spark_runtime(spark: SparkSession) -> SparkRuntimeType:
try:
_ = spark.sparkContext
return "CLASSIC"
except PySparkAttributeError as e:
msg = str(e).lower()

if "serverless" in msg:
return "DATABRICKS_SERVERLESS"

if "spark connect" in msg:
return "SPARK_CONNECT"

return "NO_JVM_UNKNOWN"


@lru_cache(maxsize=1)
def cache_df_or_not(spark: SparkSession, df: DataFrame) -> DataFrame:
cluster_type = _classify_spark_runtime(spark)
if cluster_type != "DATABRICKS_SERVERLESS":
df = df.cache()
return df


def _write_df_to_delta(df: DataFrame, table_name: str, mode="append"):
try:
df.write.mode(mode).saveAsTable(table_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from databricks.labs.lakebridge.reconcile.query_builder.threshold_query import (
ThresholdQueryBuilder,
)
from databricks.labs.lakebridge.reconcile.recon_capture import cache_df_or_not
from databricks.labs.lakebridge.reconcile.recon_config import (
Schema,
Table,
Expand Down Expand Up @@ -303,6 +304,7 @@ def _get_sample_data(
or reconcile_output.missing_in_src_count > 0
or reconcile_output.missing_in_tgt_count > 0
):

src_sampler = SamplingQueryBuilder(table_conf, src_schema, "source", self._source_engine, self._source)
tgt_sampler = SamplingQueryBuilder(table_conf, tgt_schema, "target", self._target_engine, self._target)
if reconcile_output.mismatch_count > 0:
Expand Down Expand Up @@ -370,7 +372,8 @@ def _get_mismatch_data(

# Uses pre-calculated `mismatch_count` from `reconcile_output.mismatch_count` to avoid from recomputing `mismatch` for RandomSampler.
mismatch_sampler = SamplerFactory.get_sampler(sampling_options)
df = mismatch_sampler.sample(mismatch, mismatch_count, key_columns, sampling_model_target).cache()
df = mismatch_sampler.sample(mismatch, mismatch_count, key_columns, sampling_model_target)
df = cache_df_or_not(self._spark, df)

src_mismatch_sample_query = src_sampler.build_query(df)
tgt_mismatch_sample_query = tgt_sampler.build_query(df)
Expand Down
Loading