-
Notifications
You must be signed in to change notification settings - Fork 31
correctly handle corner case barrier rdd errors #993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… are passed in Signed-off-by: Erik Ordentlich <[email protected]>
Signed-off-by: Erik Ordentlich <[email protected]>
|
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Fixes barrier RDD errors when input DataFrames have ancestors with different partition counts before shuffle boundaries.
- Two-pronged approach: Retry with repartitioning for algorithms that collect results (KMeans, etc.), and mandatory upfront repartitioning for algorithms returning lazy DataFrames (KNN, DBSCAN)
- core.py: Added try-except in
_fit_internalto catchBarrierJobUnsupportedRDDChainExceptionand retry with explicit hash repartitioning - knn.py & clustering.py: Force repartition before barrier RDD creation since lazy evaluation prevents try-catch from working
- test_kmeans.py: New test verifies the retry logic by creating a DataFrame with mismatched ancestor partitions
Confidence Score: 4/5
- This PR is safe to merge with one minor style improvement recommended
- The implementation correctly addresses the barrier RDD corner case with appropriate strategies for different execution patterns. The only issue is using
raise einstead ofraisewhich loses traceback information. - python/src/spark_rapids_ml/core.py has a minor exception handling style issue
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| python/src/spark_rapids_ml/core.py | 4/5 | Added try-except wrapper in _fit_internal to catch barrier RDD errors and retry with explicit repartitioning |
| python/src/spark_rapids_ml/knn.py | 5/5 | Added mandatory repartition before barrier RDD creation to prevent unsupported RDD chain errors |
| python/src/spark_rapids_ml/clustering.py | 5/5 | Added mandatory repartition before barrier RDD creation to prevent unsupported RDD chain errors |
Sequence Diagram
sequenceDiagram
participant User
participant Estimator
participant Core as _CumlEstimator
participant RDD as Barrier RDD
Note over User,RDD: Case 1: Algorithms that collect results (e.g., KMeans)
User->>Estimator: fit(dataset)
Estimator->>Core: _fit_internal(dataset, paramMaps)
Core->>Core: _call_cuml_fit_func(dataset)
alt Dataset partitions == num_workers
Core->>Core: Skip repartition (optimization)
else Dataset partitions != num_workers
Core->>Core: Repartition dataset
end
Core->>RDD: barrier().collect()
alt BarrierJobUnsupportedRDDChainException
RDD-->>Core: Exception (ancestor partitions differ)
Core->>Core: Log warning
Core->>Core: _call_cuml_fit_func(dataset.repartition(num_workers))
Core->>RDD: barrier().collect() (with shuffle boundary)
RDD-->>Core: Success
else Success
RDD-->>Core: Results
end
Core-->>User: Trained model
Note over User,RDD: Case 2: Algorithms returning barrier RDD (KNN, DBSCAN)
User->>Estimator: kneighbors(query) / fit_predict(dataset)
Estimator->>Estimator: union_df / dataset.repartition(num_workers)
Note over Estimator: Always repartition to create shuffle boundary
Estimator->>RDD: _call_cuml_fit_func(repartitioned_df)
RDD->>Estimator: Lazy RDD (no execution yet)
Estimator-->>User: DataFrame (lazy)
User->>RDD: .collect() / .show()
RDD-->>User: Results (no barrier errors)
4 files reviewed, 1 comment
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
|
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Adds error handling for barrier RDD incompatibility issues when input DataFrames have ancestors with different partition counts before shuffle boundaries.
Key Changes:
core.py (_fit_internal): Wraps barrier RDD collection in try-catch to detectBarrierJobUnsupportedRDDChainExceptionand retries with explicitrepartition()to create shuffle boundary- Strategy: For algorithms that collect barrier RDD results (KMeans, etc.), the retry mechanism handles the corner case gracefully; for KNN/DBSCAN which return the barrier RDD directly to user code, forced repartitioning is applied upfront to prevent user-facing errors
Test Coverage:
- New test case
test_kmeans_basic_repartitionvalidates the retry logic by creating a DataFrame withcoalesce()that triggers the barrier error
Minor Issue:
- Exception re-raising uses
raise(good practice) to preserve traceback
Confidence Score: 4/5
- This PR is safe to merge with minimal risk - it fixes a corner case bug without breaking existing functionality
- The changes are well-thought-out with appropriate error handling for different algorithm types. The retry logic is sound and preserves existing behavior while handling the edge case. Test coverage validates the fix. Only minor style improvement suggested (already noted in previous comments).
- No files require special attention - changes are straightforward and well-tested
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| python/src/spark_rapids_ml/core.py | 4/5 | Added try-catch retry logic for barrier RDD errors in _fit_internal. Catches BarrierJobUnsupportedRDDChainException and retries with explicit repartitioning. Minor style issue with exception re-raising pattern. |
Sequence Diagram
sequenceDiagram
participant User
participant Estimator as _CumlEstimator
participant CallFunc as _call_cuml_fit_func
participant RDD as Barrier RDD
User->>Estimator: fit(dataset)
Estimator->>Estimator: _fit_internal(dataset, paramMaps)
Note over Estimator: First attempt (optimized path)
Estimator->>CallFunc: _call_cuml_fit_func(dataset, partially_collect=True)
alt dataset.rdd.getNumPartitions() == num_workers
Note over CallFunc: Skip repartitioning (optimization)
CallFunc->>RDD: Create barrier RDD without shuffle
else dataset.rdd.getNumPartitions() != num_workers
Note over CallFunc: Repartition to num_workers
CallFunc->>RDD: Create barrier RDD with shuffle boundary
end
CallFunc-->>Estimator: Return pipelined_rdd
Estimator->>RDD: collect()
alt Barrier RDD compatible
RDD-->>Estimator: Success - return rows
Estimator-->>User: Return trained models
else BarrierJobUnsupportedRDDChainException
RDD-->>Estimator: Throw exception (incompatible ancestor partitions)
Note over Estimator: Catch BarrierJobUnsupportedRDDChainException
Estimator->>Estimator: Log warning
Note over Estimator: Retry with forced repartitioning
Estimator->>CallFunc: _call_cuml_fit_func(dataset.repartition(num_workers))
CallFunc->>RDD: Create barrier RDD with explicit shuffle boundary
CallFunc-->>Estimator: Return pipelined_rdd
Estimator->>RDD: collect()
RDD-->>Estimator: Success - return rows
Estimator-->>User: Return trained models
end
1 file reviewed, no comments
We currently have an optimization that if input dfs have the same number of partitions as num_workers, we skip the repartition. This can be problematic in cases when the df has ancestors with a different number of partitions before a shuffle boundary, which is incompatible with barrier rdd logic. There are two fixes: