-
Notifications
You must be signed in to change notification settings - Fork 83
Fix serverless compatibility by replacing cache() with conditional persistence #2218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix serverless compatibility by replacing cache() with conditional persistence #2218
Conversation
Use Unity Catalog volumes instead of .cache() for serverless. Auto-detects compute type. Fixes: [NOT_SUPPORTED_WITH_SERVERLESS]
|
✅ 130/130 passed, 8 flaky, 5 skipped, 9m40s total Flaky tests:
Running from acceptance #3493 |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2218 +/- ##
==========================================
- Coverage 63.95% 63.78% -0.17%
==========================================
Files 99 99
Lines 8644 8666 +22
Branches 890 893 +3
==========================================
Hits 5528 5528
- Misses 2944 2966 +22
Partials 172 172 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…on-serverless-compute
Use specific exception types instead of broad Exception catch to satisfy CI linter rules. Add cluster ID check for improved detection.
Avoids CONFIG_NOT_AVAILABLE exceptions by fetching all configs at once. Passes all linter checks.
distinguishes serverless (CONFIG_NOT_AVAILABLE) from classic clusters.
…on-serverless-compute
…on-serverless-compute
m-abulazm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job. we should always add type annotations to any new code especially to a public method. once this comment is addressed we can merge this down
| raise ReadAndWriteWithVolumeException(message) from e | ||
|
|
||
|
|
||
| def classify_spark_runtime(spark): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add type annotation
| def classify_spark_runtime(spark): | |
| SparkRuntimeType = Literal["DATABRICKS_SERVERLESS", "CLASSIC" , .....] | |
| def classify_spark_runtime(spark: SparkSession) -> SparkRuntimeType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
|
|
||
| def test_classify_spark_runtime(spark): | ||
| assert classify_spark_runtime(spark) != "DATABRICKS_SERVERLESS" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a comment that our sandbox environment uses spark connect so in case this changes in the future, the comment explains why it was done in this way originally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…ss-compute' of github.com:databrickslabs/lakebridge into 1438-feature-remorph-reconcile-fails-to-run-on-serverless-compute
…on-serverless-compute
| # Write the joined df to volume path | ||
| joined_volume_df = ReconIntermediatePersist(spark, path).write_and_read_unmatched_df_with_volumes(joined_df).cache() | ||
| joined_volume_df = ReconIntermediatePersist(spark, path).write_and_read_unmatched_df_with_volumes(joined_df) | ||
| cluster_type = classify_spark_runtime(spark) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is used inside a loop and we should not calculate this every time. I would either cache this value or calculate at the caller before any loops and pass it here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can create ReconIntermediatePersist(spark, path) before the loop and pass it in, but since joined_df changes with the loop parameters, how can we cache it ahead of time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think a ReconIntermediatePersist#cache_if_supported() method is a good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another question though; why are we caching here after writing to storage? what is the improvement? otherwise we can just remove it from here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This (cache) approach was already there; I didn’t add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| SparkRuntimeType = Literal["DATABRICKS_SERVERLESS", "CLASSIC", "SPARK_CONNECT", "NO_JVM_UNKNOWN"] | ||
|
|
||
|
|
||
| def classify_spark_runtime(spark) -> SparkRuntimeType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def classify_spark_runtime(spark) -> SparkRuntimeType: | |
| def classify_spark_runtime(spark: SparkSession) -> SparkRuntimeType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…ss-compute' of github.com:databrickslabs/lakebridge into 1438-feature-remorph-reconcile-fails-to-run-on-serverless-compute
…on-serverless-compute
… added cache_df_or_not method
…on-serverless-compute
🐛 Problem
Reconciliation fails on Databricks serverless compute with:
[NOT_SUPPORTED_WITH_SERVERLESS] PERSIST TABLE is not supported on serverless compute
🔍 Root Cause
The reconciliation process uses .cache() for performance optimization, but serverless compute does not support DataFrame caching operations.
✅ Solution
Implemented serverless detection and conditional caching strategy:
Changes Made
New _is_serverless() method checks for clusterNodeType config
Classic clusters: config exists → returns False
Serverless: config throws CONFIG_NOT_AVAILABLE → returns True
Classic clusters: Uses .cache() for performance (existing behavior)
Serverless: Skips caching to avoid runtime errors
Technical Details
Detection Method:
node_type = self._spark.conf.get("spark.databricks.clusterUsageTags.clusterNodeType")
✅ Classic: Returns node type (e.g., i3.2xlarge)
❌ Serverless: Throws AnalysisException with CONFIG_NOT_AVAILABLE
Fixed issue: #1438
Tests