Support kudo GPU shuffle reads in the plugin#13489
Conversation
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
|
build |
Signed-off-by: Zach Puller <zpuller@nvidia.com>
|
Hi @zpuller , can you point workload has 10% improvement, and can you attach a detailed perf report? |
|
I left my comment @zpuller . Also FYI, recently we accidentally enabled kudo gpu write in our customer environment, and found many queries failed at shuffle. We're still investigating why. But that won't be a show stopper of this PR (since all GPU kudo are by default disabled) |
|
Ok, thanks. I tested the GPU kudo writes against NDS and didn't have any failure or incorrect result, but if there are other failures we can investigate |
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR implements GPU-side Kudo deserialization for shuffle reads, splitting the previous SHUFFLE_KUDO_MODE config into separate read and write modes. The implementation includes safety checks to prevent incompatible configurations (async reads + GPU deserialization), comprehensive refactoring of the shuffle coalesce iterator hierarchy, and proper handling of edge cases like zero-column batches.
Key changes:
- Added
SHUFFLE_KUDO_READ_MODEconfig (defaulting to CPU for backward compatibility) - Implemented
KudoGpuTableOperatorandKudoGpuShuffleCoalesceIteratorfor GPU deserialization - Refactored iterator hierarchy with new base classes
CoalesceIteratorBaseandGpuCoalesceIteratorBase - Added
resolveKudoMode()logic to prevent conflicts between async reads and GPU deserialization (downgrades to CPU with warning) - Updated all references from old
SHUFFLE_KUDO_MODEtoSHUFFLE_KUDO_WRITE_MODE - Added comprehensive unit test validating the full GPU shuffle round-trip
Previous review issues addressed:
All issues from previous review comments have been fixed in subsequent commits:
- ✅ Doc string corrected to say "deserialize shuffle inputs"
- ✅ Test now sets both write and read mode to GPU
- ✅ Zero-column batch case properly handled in
KudoGpuTableOperator - ✅ Async read incompatibility resolved via
resolveKudoMode()logic
Confidence Score: 5/5
- This PR is safe to merge with high confidence - all previous issues have been addressed and the implementation includes proper safety checks
- The code demonstrates thorough engineering: all previous review comments have been addressed, async read conflicts are handled gracefully with downgrade logic, edge cases like zero-column batches are properly handled, comprehensive testing validates the GPU deserialization path, and the refactoring maintains clear separation between host and GPU paths with appropriate type safety through proper iterator casting
- No files require special attention - all previous issues have been resolved and the implementation is complete
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 5/5 | Adds new SHUFFLE_KUDO_READ_MODE config to control GPU/CPU deserialization, splitting from previous combined SHUFFLE_KUDO_MODE |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala | 5/5 | Updates shuffle coalesce logic to handle GPU-deserialized batches, returning tuple to track whether batches are host-serialized or GPU batches |
| tests/src/test/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceSuite.scala | 5/5 | Adds comprehensive test for GPU kudo deserialization path, properly configuring both read and write modes |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala | 5/5 | Major refactoring adding GPU deserialization support: new KudoGpuTableOperator, KudoGpuShuffleCoalesceIterator, and GpuCoalesceIteratorBase, plus safety logic in CoalesceReadOption.resolveKudoMode() |
Sequence Diagram
sequenceDiagram
participant User as User Query
participant Exec as GpuShuffleCoalesceExec
participant Config as CoalesceReadOption
participant KudoIter as KudoGpuShuffleCoalesceIterator
participant Operator as KudoGpuTableOperator
participant GPU as GPU/cuDF
User->>Exec: Execute shuffle read
Exec->>Config: Create CoalesceReadOption(conf)
Config->>Config: resolveKudoMode(kudoMode, useAsync)
alt useAsync=true && kudoMode=GPU
Config-->>Config: Downgrade to CPU mode (log warning)
end
Config-->>Exec: readOption with resolved mode
alt kudoEnabled && kudoMode=GPU
Exec->>KudoIter: Create GPU coalesce iterator
KudoIter->>KudoIter: Buffer KudoSerializedTableColumn batches
KudoIter->>Operator: concat(columns)
alt numCols = 0
Operator-->>KudoIter: Empty ColumnarBatch
else
Operator->>GPU: Copy to device memory
Operator->>GPU: KudoGpuSerializer.assembleFromDeviceRaw()
GPU-->>Operator: cuDF Table
Operator-->>KudoIter: ColumnarBatch (GPU)
end
KudoIter-->>Exec: GPU ColumnarBatch
else kudoEnabled && kudoMode=CPU
Exec->>Exec: Use host deserialization path
end
Exec-->>User: Return deserialized batches
3 files reviewed, no comments
|
build |
the know issue is #13663, which is merged. so I think this PR is good to go |
Signed-off-by: Zach Puller <zpuller@nvidia.com>
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR adds GPU-based Kudo deserialization to complement the existing CPU-based approach. The feature is disabled by default via the new spark.rapids.shuffle.kudo.serializer.read.mode configuration (defaults to CPU).
Key Changes
- Configuration split: Renamed
SHUFFLE_KUDO_MODEtoSHUFFLE_KUDO_WRITE_MODEand added separateSHUFFLE_KUDO_READ_MODEconfig for independent control of serialization vs deserialization - New iterator classes: Added
KudoGpuShuffleCoalesceIteratorandGpuGpuShuffleCoalesceIteratorto handle GPU deserialization path, producingColumnarBatchdirectly on GPU rather thanCoalescedHostResult - Type safety: Refactored iterator hierarchy with proper type parameters to distinguish between host (
CoalescedHostResult) and GPU (ColumnarBatch) results - Conflict resolution: Added
resolveKudoMode()to automatically override GPU mode to CPU when async read is enabled, preventing incompatible configuration combinations - Hash join integration: Updated
GpuShuffledHashJoinExecto handle both GPU batches and host results with proper type discrimination viaisHostSerializedflag
Test Coverage
New GpuShuffleCoalesceSuite validates the GPU deserialization path by performing a full shuffle round-trip with GPU mode enabled and comparing results against expected output.
Confidence Score: 4/5
- This PR is safe to merge with minor style improvements recommended
- The implementation is well-designed with proper type safety, conflict resolution, and test coverage. Previous critical issues (test configuration, zero-column handling, async/GPU conflicts) have all been addressed. Only one minor style issue found (missing space in doc string). The feature is disabled by default, reducing risk.
- No files require special attention - all previously identified issues have been resolved
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 4/5 | Adds new SHUFFLE_KUDO_READ_MODE config to control GPU/CPU deserialization separately from write mode. Renames SHUFFLE_KUDO_MODE to SHUFFLE_KUDO_WRITE_MODE and adds helper method shuffleKudoGpuSerializerReadEnabled. Minor doc formatting issue found. |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala | 4/5 | Major refactoring to support GPU-based Kudo deserialization. Adds new iterator classes (KudoGpuShuffleCoalesceIterator, GpuGpuShuffleCoalesceIterator), new table operator (KudoGpuTableOperator), and refactored base classes. Includes conflict resolution logic between async mode and GPU mode. |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala | 4/5 | Updates shuffle coalesce iterator logic to handle both GPU batches and host results. Renames getHostShuffleCoalesceIterator to getShuffleCoalesceIterator and returns tuple with boolean flag indicating result type. Properly integrates GPU Kudo mode into hash join execution. |
| tests/src/test/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceSuite.scala | 5/5 | New test suite validating GPU Kudo deserialization. Test properly configures both write and read modes to GPU, performs full shuffle round-trip, and validates data correctness. Well-structured test with proper resource management. |
Sequence Diagram
sequenceDiagram
participant Exec as GpuShuffleCoalesceExec
participant Utils as GpuShuffleCoalesceUtils
participant ReadOpt as CoalesceReadOption
participant KudoGpuIter as KudoGpuShuffleCoalesceIterator
participant TableOp as KudoGpuTableOperator
participant GpuIter as GpuGpuShuffleCoalesceIterator
participant GPU as GPU Memory
Exec->>ReadOpt: apply(conf)
ReadOpt->>ReadOpt: resolveKudoMode()
Note over ReadOpt: Overrides GPU to CPU if async enabled
ReadOpt-->>Exec: CoalesceReadOption
Exec->>Utils: getGpuShuffleCoalesceIterator()
alt kudoMode == GPU
Utils->>KudoGpuIter: new KudoGpuShuffleCoalesceIterator()
Note over KudoGpuIter: Produces ColumnarBatch (GPU)
else kudoMode == CPU
Utils->>Utils: new KudoHostShuffleCoalesceIterator()
Note over Utils: Produces CoalescedHostResult
end
alt prefetchFirstBatch
Utils->>Utils: Buffer first batch
end
alt useAsync == true
Utils->>Utils: new GpuShuffleAsyncCoalesceIterator()
Note over Utils: Always uses CoalescedHostResult
else kudoMode == GPU
Utils->>GpuIter: new GpuGpuShuffleCoalesceIterator()
else
Utils->>Utils: new GpuShuffleCoalesceIterator()
end
loop For each shuffle partition
GpuIter->>KudoGpuIter: next()
KudoGpuIter->>TableOp: concat(KudoSerializedTableColumn[])
TableOp->>GPU: Copy to device memory
TableOp->>GPU: assembleFromDeviceRaw()
GPU-->>TableOp: GPU Table
TableOp-->>KudoGpuIter: ColumnarBatch
KudoGpuIter-->>GpuIter: ColumnarBatch (GPU)
GpuIter-->>Exec: ColumnarBatch
end
3 files reviewed, 1 comment
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
Adds GPU-based Kudo deserialization as an optional disabled-by-default feature for shuffle reads, complementing existing CPU-based deserialization.
Key changes:
- Splits
SHUFFLE_KUDO_MODEconfig into separateSHUFFLE_KUDO_WRITE_MODEandSHUFFLE_KUDO_READ_MODEconfigs to allow independent control - Implements
KudoGpuTableOperatorandKudoGpuShuffleCoalesceIteratorfor GPU-side deserialization - Adds conflict resolution to disable async reads when GPU kudo mode is enabled (with warning)
- Refactors iterator hierarchy with
CoalesceIteratorBaseandGpuMetricIteratorBaseto reduce duplication - Updates test suite to verify GPU deserialization path
All issues from previous comments have been addressed:
- Doc string corrected to say "deserialize shuffle inputs"
- Test now sets both write and read modes to GPU
- Zero-column batch handling implemented in
KudoGpuTableOperator.concat() - Async/GPU mode conflict resolved with
resolveUseAsync()function
Confidence Score: 5/5
- Safe to merge - all previously identified issues have been resolved
- All critical issues from previous review comments have been fixed: doc strings corrected, test properly configured for GPU read mode, zero-column batches handled, and async/GPU mode conflict resolved with proper warning. The implementation follows existing patterns with good abstractions.
- No files require special attention
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala | 5/5 | Adds GPU kudo deserialization support with proper async conflict resolution. Previous comments addressed the async/GPU mode conflict and test coverage. |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 5/5 | Splits kudo serializer mode into separate read/write configs. Doc string already corrected in previous comments. |
Sequence Diagram
sequenceDiagram
participant Exec as GpuShuffleCoalesceExec
participant ReadOpt as CoalesceReadOption
participant Utils as GpuShuffleCoalesceUtils
participant KudoGpuIter as KudoGpuShuffleCoalesceIterator
participant KudoGpuOp as KudoGpuTableOperator
participant MetricIter as GpuColumnarBatchMetricIterator
Exec->>ReadOpt: apply(conf)
ReadOpt->>ReadOpt: resolveUseAsync(kudoMode, useAsync)
Note over ReadOpt: If kudoMode=GPU & useAsync=true<br/>logs warning, sets useAsync=false
ReadOpt-->>Exec: CoalesceReadOption(kudoEnabled, kudoMode, useAsync)
Exec->>Utils: getGpuShuffleCoalesceIterator(iter, targetSize, dataTypes, readOption, metricsMap)
alt kudoEnabled && kudoMode == GPU
Utils->>KudoGpuIter: new KudoGpuShuffleCoalesceIterator(...)
KudoGpuIter->>KudoGpuOp: new KudoGpuTableOperator(dataTypes)
alt useAsync == false (enforced by resolveUseAsync)
Utils->>MetricIter: new GpuColumnarBatchMetricIterator(iter)
Note over MetricIter: Wraps GPU coalesced batches<br/>with metrics tracking
end
else kudoEnabled && kudoMode == CPU
Utils->>Utils: new KudoHostShuffleCoalesceIterator(...)
Note over Utils: Host-side kudo deserialization
else !kudoEnabled
Utils->>Utils: new HostShuffleCoalesceIterator(...)
Note over Utils: Traditional JCudf deserialization
end
Note over KudoGpuIter,KudoGpuOp: GPU Deserialization Path
KudoGpuIter->>KudoGpuIter: bufferNextBatch()
KudoGpuIter->>KudoGpuIter: concatenateTablesInGpu()
KudoGpuIter->>KudoGpuOp: concat(kudoTables[])
KudoGpuOp->>KudoGpuOp: Copy headers & data to host buffers
KudoGpuOp->>KudoGpuOp: Copy to device buffers
KudoGpuOp->>KudoGpuOp: KudoGpuSerializer.assembleFromDeviceRaw()
KudoGpuOp-->>KudoGpuIter: ColumnarBatch (GPU)
KudoGpuIter-->>MetricIter: ColumnarBatch
MetricIter->>MetricIter: Track output metrics
MetricIter-->>Exec: ColumnarBatch (GPU)
2 files reviewed, no comments
Signed-off-by: Zach Puller <zpuller@nvidia.com>
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR adds GPU-based Kudo deserialization for shuffle reads, allowing the plugin to deserialize shuffle data directly on the GPU rather than on the CPU. The implementation introduces a new KudoGpuShuffleCoalesceIterator and KudoGpuTableOperator that handle GPU-side deserialization, mirroring the existing CPU-based Kudo implementation.
Key changes:
- Added
KudoBufferscase class to manage paired data/offsets buffers for Kudo format - Introduced
KudoGpuTableOperatorthat performs deserialization on GPU usingKudoGpuSerializer.assembleFromDeviceRaw() - Refactored iterator hierarchy with new base classes
CoalesceIteratorBaseandGpuMetricIteratorBaseto share common logic - Added
resolveUseAsync()to prevent incompatible async+GPU-kudo configuration (logs warning and disables async) - Created
GpuColumnarBatchMetricIteratorto track metrics for GPU-deserialized batches - Properly handles edge cases: zero-column batches, resource cleanup, type safety with explicit casts
The implementation follows the existing patterns in the codebase and includes comprehensive test coverage that validates the full serialization/deserialization round-trip.
Confidence Score: 5/5
- This PR is safe to merge with minimal risk
- The implementation demonstrates excellent engineering practices: proper edge case handling (zero-column batches, async mode conflicts), clean refactoring with well-defined abstractions (base classes for code reuse), appropriate resource management (AutoCloseable, withResource), and comprehensive test coverage validating the full shuffle round-trip. The code correctly prevents the async+GPU-kudo combination through
resolveUseAsync(), ensuring type safety throughout the pipeline. - No files require special attention
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala | 5/5 | Adds GPU-based Kudo deserialization with proper handling of edge cases (zero-column batches, async mode conflicts), well-structured class hierarchy, and appropriate resource management |
Sequence Diagram
sequenceDiagram
participant User as Shuffle Read
participant Utils as GpuShuffleCoalesceUtils
participant GPU_Iter as KudoGpuShuffleCoalesceIterator
participant GPU_Op as KudoGpuTableOperator
participant Metric_Iter as GpuColumnarBatchMetricIterator
participant Host_Iter as KudoHostShuffleCoalesceIterator
participant Host_Op as KudoTableOperator
User->>Utils: getGpuShuffleCoalesceIterator(kudoMode=GPU)
Utils->>Utils: Check kudoEnabled && kudoMode==GPU
alt GPU Kudo Mode
Utils->>GPU_Iter: Create KudoGpuShuffleCoalesceIterator
GPU_Iter->>GPU_Iter: Collect batches up to targetSize
GPU_Iter->>GPU_Op: concat(serialized columns)
GPU_Op->>GPU_Op: Allocate host buffers
GPU_Op->>GPU_Op: Copy to device buffers
GPU_Op->>GPU_Op: KudoGpuSerializer.assembleFromDeviceRaw()
GPU_Op-->>GPU_Iter: ColumnarBatch
GPU_Iter-->>Utils: Iterator[ColumnarBatch]
Utils->>Metric_Iter: Wrap in GpuColumnarBatchMetricIterator
Metric_Iter-->>User: Iterator[ColumnarBatch] with metrics
else CPU Kudo Mode
Utils->>Host_Iter: Create KudoHostShuffleCoalesceIterator
Host_Iter->>Host_Iter: Collect batches up to targetSize
Host_Iter->>Host_Op: concat(serialized columns)
Host_Op->>Host_Op: KudoSerializer.mergeOnHost()
Host_Op-->>Host_Iter: CoalescedHostResult
Host_Iter-->>Utils: Iterator[CoalescedHostResult]
Utils-->>User: Transfer to GPU in next stage
end
1 file reviewed, no comments
Signed-off-by: Zach Puller <zpuller@nvidia.com>
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR implements GPU-side Kudo deserialization for shuffle reads, providing a new execution path that deserializes shuffle data directly on the GPU rather than on the host.
Key Changes
- New GPU deserialization path: Introduces
KudoGpuTableOperatorandKudoGpuShuffleCoalesceIteratorthat perform Kudo deserialization entirely on the GPU by copying serialized data to device memory and usingKudoGpuSerializer.assembleFromDeviceRaw() - Configuration: Adds
SHUFFLE_KUDO_READ_MODEconfig (CPU/GPU) separate from write mode, with proper validation that disables async reads when GPU mode is enabled (they're incompatible) - Refactored iterator hierarchy: Extracts common coalescing logic into
CoalesceIteratorBase, with specialized subclasses for host (HostCoalesceIteratorBase) and GPU (GpuCoalesceIteratorBase) processing - Join integration: Updates
GpuShuffledHashJoinExecto handle both host and GPU deserialization paths by tracking whether batches are host-serialized vs already on GPU - Testing: Comprehensive new test suite validates end-to-end GPU deserialization with proper configuration
Issues Found
All critical issues from previous review comments have been addressed:
- ✅ Doc string corrected (
SHUFFLE_KUDO_READ_MODEnow says "deserialize shuffle inputs") - ✅ Test now sets both write and read modes to GPU
- ✅ Async + GPU mode conflict resolved with
resolveUseAsync()that disables async and logs warning - ✅ Zero-column batches properly handled in
KudoGpuTableOperator.concat()
The implementation correctly separates host and GPU paths throughout the codebase with appropriate type handling.
Confidence Score: 4/5
- This PR is safe to merge with minor risk - all previously identified critical issues have been resolved
- Score of 4 reflects that this is a complex feature addition touching shuffle execution paths, but the implementation is well-structured with proper error handling. All critical bugs from previous reviews (async/GPU incompatibility, test configuration, zero-column handling) have been fixed. The feature is disabled by default which reduces risk. Minor deduction because this introduces a new execution path that should be thoroughly performance-tested before enabling by default.
- Pay close attention to
GpuShuffleCoalesceExec.scaladue to the complex iterator hierarchy refactoring and new GPU deserialization logic
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala | 4/5 | Adds GPU-side Kudo deserialization with new KudoGpuTableOperator and KudoGpuShuffleCoalesceIterator classes; refactors iterator hierarchy; properly handles async/GPU mode conflicts. Previous critical issues (zero-column handling, test configuration) have been fixed. |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala | 5/5 | Updates join execution to support both host and GPU kudo deserialization paths by returning tuple (Iterator[AutoCloseable], Boolean) to distinguish between host and GPU batches; correctly handles type casting based on isHostSerialized flag. |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 5/5 | Adds new SHUFFLE_KUDO_READ_MODE configuration to control Kudo deserialization location (CPU vs GPU); splits previous shuffleKudoMode into separate read/write modes; doc strings corrected in recent commits. |
| tests/src/test/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceSuite.scala | 5/5 | New test suite with comprehensive test for GPU kudo deserialization end-to-end; correctly configures both SHUFFLE_KUDO_WRITE_MODE and SHUFFLE_KUDO_READ_MODE to GPU; validates round-trip serialization/deserialization. |
Sequence Diagram
sequenceDiagram
participant Exec as GpuShuffleCoalesceExec
participant CoalOpt as CoalesceReadOption
participant KudoGpuIter as KudoGpuShuffleCoalesceIterator
participant KudoGpuOp as KudoGpuTableOperator
participant Host as HostMemoryBuffer
participant Device as DeviceMemoryBuffer
participant Kudo as KudoGpuSerializer
participant Output as ColumnarBatch
Note over Exec,CoalOpt: Configuration Phase
Exec->>CoalOpt: apply(conf)
CoalOpt->>CoalOpt: resolveUseAsync()<br/>(disables async if GPU mode)
CoalOpt-->>Exec: CoalesceReadOption(kudoMode=GPU, useAsync=false)
Note over Exec,KudoGpuIter: Iterator Creation
Exec->>KudoGpuIter: new KudoGpuShuffleCoalesceIterator(iter, targetSize, dataTypes)
KudoGpuIter->>KudoGpuOp: new KudoGpuTableOperator(dataTypes)
Note over KudoGpuIter,Output: Batch Processing Loop
loop For each batch
KudoGpuIter->>KudoGpuIter: bufferNextBatch()<br/>(collect tables up to target size)
KudoGpuIter->>KudoGpuIter: extractAndUpdateBatch()
KudoGpuIter->>KudoGpuOp: concat(kudoTables[])
alt numCols == 0
KudoGpuOp-->>Output: new ColumnarBatch(empty, rowCount)
else numCols > 0
KudoGpuOp->>Host: allocate data and offsets buffers
KudoGpuOp->>Host: copy kudo headers and table data
KudoGpuOp->>Device: allocate device buffers
KudoGpuOp->>Device: copyFromHostBuffer()
KudoGpuOp->>Kudo: assembleFromDeviceRaw(schema, dataDev, offsetsDev)
Kudo-->>KudoGpuOp: cudf Table
KudoGpuOp->>Output: GpuColumnVector.from(table, dataTypes)
KudoGpuOp-->>KudoGpuIter: ColumnarBatch (on GPU)
end
KudoGpuIter-->>Exec: ColumnarBatch (GPU-side)
end
1 file reviewed, no comments
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR implements GPU-side Kudo deserialization for shuffle reads, adding a disabled-by-default feature controlled by spark.rapids.shuffle.kudo.serializer.read.mode.
Key Changes:
- Introduces
KudoGpuTableOperatorthat performs deserialization directly on the GPU - Adds
KudoGpuShuffleCoalesceIteratorand supporting iterator base classes to handle GPU-path coalescing - Creates
KudoBufferscase class to manage paired host/device buffers for Kudo format - Implements conflict resolution between async read and GPU kudo mode (GPU mode takes precedence with warning)
- Properly handles zero-column batches
- Adds comprehensive unit test that validates end-to-end GPU deserialization
Previous Review Feedback:
Previous comments flagged potential issues with async read compatibility and test configuration. These have been properly addressed:
- Async/GPU kudo conflict is handled via
resolveUseAsync()which disables async when GPU mode is enabled (GpuShuffleCoalesceExec.scala:117-126) - Test correctly configures both write and read modes to GPU (GpuShuffleCoalesceSuite.scala:108-109)
- Zero-column handling is implemented (GpuShuffleCoalesceExec.scala:358-360)
Confidence Score: 4/5
- This PR is safe to merge with minor risk - implementation is sound and previous concerns have been addressed
- The implementation properly handles GPU deserialization with good separation of concerns. The async/GPU kudo conflict resolution prevents runtime issues. Zero-column edge cases are handled. The comprehensive test validates the full serialization round-trip. Previous reviewer concerns have been properly addressed. Score is 4 instead of 5 because this is a complex feature touching critical shuffle code paths, and while well-implemented, would benefit from additional performance validation in production.
- No files require special attention - the implementation is well-structured and previous issues have been resolved
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala | 4/5 | Adds GPU-side Kudo deserialization with new KudoGpuTableOperator and iterator classes. Includes proper async conflict resolution and zero-column handling. Previous reviewer concerns about async compatibility have been addressed. |
Sequence Diagram
sequenceDiagram
participant Client as Shuffle Reader
participant Utils as GpuShuffleCoalesceUtils
participant Coalesce as KudoGpuShuffleCoalesceIterator
participant Op as KudoGpuTableOperator
participant GPU as GPU Memory/Serializer
Client->>Utils: getGpuShuffleCoalesceIterator(kudoMode=GPU)
Utils->>Utils: Check readOption.kudoMode
Utils->>Coalesce: Create KudoGpuShuffleCoalesceIterator
loop For each batch request
Client->>Coalesce: next()
Coalesce->>Coalesce: bufferNextBatch() - collect tables up to target size
Coalesce->>Coalesce: acquireGpuSemaphore()
Coalesce->>Op: concat(columns)
Op->>Op: Check if numCols == 0
alt Zero columns
Op->>Client: Return empty ColumnarBatch
else Has columns
Op->>Op: Calculate buffer sizes
Op->>Op: Allocate host buffers (data + offsets)
Op->>Op: Serialize headers and data to host
Op->>GPU: Copy to device buffers
GPU->>GPU: KudoGpuSerializer.assembleFromDeviceRaw()
GPU->>Op: Return Table
Op->>Coalesce: Return ColumnarBatch
Coalesce->>Client: Return deserialized batch
end
end
1 file reviewed, no comments
|
build |
abellina
left a comment
There was a problem hiding this comment.
Looking good, one last one.
…alesceExec.scala Co-authored-by: Alessandro Bellina <abellina@gmail.com>
|
build |
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR implements GPU-based Kudo deserialization for shuffle reads, providing a performance optimization that shows a 10% improvement on test queries.
Key Changes:
- Added new
KudoGpuTableOperatorandKudoGpuShuffleCoalesceIteratorto perform deserialization directly on GPU - Introduced
SHUFFLE_KUDO_READ_MODEconfiguration (separate from write mode) with CPU/GPU options, defaulting to CPU - Refactored iterator hierarchy: created
CoalesceIteratorBasebase class and split intoHostCoalesceIteratorBaseandGpuCoalesceIteratorBase - Added
KudoBufferscase class to manage host/device memory buffers for serialization - Implemented conflict resolution between async read and GPU kudo mode (GPU mode takes precedence with warning)
- Added
GpuMetricIteratorBaseandGpuColumnarBatchMetricIteratorfor proper metrics tracking when GPU deserialization is used - Updated delta-lake modules to use the new separate
SHUFFLE_KUDO_WRITE_MODEconfig
The implementation properly handles the interaction between async reads and GPU kudo mode by disabling async when GPU mode is active (via resolveUseAsync), preventing type safety issues.
Previous Review Comments:
All previously identified issues have been addressed in the current code.
Confidence Score: 4/5
- This PR is safe to merge with minor risk from complexity
- The implementation is well-structured with proper resource management, comprehensive refactoring, and conflict resolution between async and GPU modes. The 10% performance improvement validates the approach. Score is 4/5 rather than 5/5 due to the significant complexity added with new iterator hierarchies and the interaction between multiple configuration modes that could have subtle edge cases in production.
- No files require special attention - the refactoring is comprehensive and the previously identified issues have been resolved
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala | 4/5 | Adds GPU-based Kudo deserialization with proper async handling and comprehensive refactoring of coalesce iterators |
Sequence Diagram
sequenceDiagram
participant App as Spark Application
participant Exec as GpuShuffleCoalesceExec
participant ReadOpt as CoalesceReadOption
participant HostIter as KudoHostShuffleCoalesceIterator
participant GpuIter as KudoGpuShuffleCoalesceIterator
participant MetricIter as GpuColumnarBatchMetricIterator
participant DevMem as GPU Memory
App->>Exec: executeColumnar
Exec->>ReadOpt: Create from conf
ReadOpt->>ReadOpt: resolveUseAsync
alt GPU Kudo Mode and Async Enabled
ReadOpt->>ReadOpt: Log warning disable async
end
ReadOpt-->>Exec: ReadOption with resolved settings
alt Kudo Enabled and GPU Mode
Exec->>GpuIter: Create iterator
GpuIter->>GpuIter: bufferNextBatch collect tables
GpuIter->>GpuIter: concatenateTablesInGpu
GpuIter->>DevMem: Copy to device memory
GpuIter->>DevMem: assembleFromDeviceRaw
GpuIter-->>Exec: ColumnarBatch on GPU
alt Sync Mode no async
Exec->>MetricIter: Wrap for metrics
MetricIter->>MetricIter: Track metrics only
MetricIter-->>App: ColumnarBatch
end
else Kudo Enabled and CPU Mode
Exec->>HostIter: Create iterator
HostIter->>HostIter: bufferNextBatch collect tables
HostIter->>HostIter: concatenateTablesInHost
HostIter-->>Exec: CoalescedHostResult
Exec->>Exec: Transfer to GPU
Exec-->>App: ColumnarBatch
end
1 file reviewed, no comments
|
build |
Fixes #12968.
Description
This PR adds a disabled-by-default optional feature to perform Kudo deserialization on the GPU. It also adds a unit test to validate this new behavior.
I'm currently planning to merge this and then have a follow up PR to either enable the behavior by default or conditionally on internal metrics, depending on remaining performance testing results.
Checklists
The test query of interest shows a 10% performance gain with this feature enabled.
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)