Is your feature request related to a problem? Please describe.
GPU plans on spark-rapids do not preserve within-partition row order across shuffles and hash-based operators. This is documented as expected behavior in docs/compatibility.md:20-41. However, for queries that end with a Parquet (or ORC) write on high-cardinality columns, the loss of within-partition clustering inflates output file size by 3-4× compared to the equivalent CPU run.
The cause is that Parquet's dictionary encoding emits column-chunk indices as a HYBRID RLE+BIT_PACKED stream: when consecutive rows hold the same dictionary index, the RLE half collapses runs aggressively (single-digit bits per row); when input is shuffled, every row pays the full bit-packed cost (10-20 bits per row depending on cardinality). Hash partitioning, hash joins, and GPU's multi-threaded shuffle I/O all destroy any natural same-key runs that the upstream pipeline produced.
A minimal synthetic reproducer:
import pyspark.sql.functions as F
# Naturally clustered: rows with same k1 appear consecutively in id order
N, CLUSTER, P = 10_000_000, 10_000, 4
df = (spark.range(0, N)
.select(
F.floor(F.col("id") / CLUSTER).cast("long").alias("k1"), # ~1K distinct, clustered
(F.col("id") * 31).cast("long").alias("k2"), # 10M distinct
(F.col("id") % 50000).cast("long").alias("k3"), # ~50K distinct
))
# Pattern A: scatter via repartition then write (typical ETL output)
df.repartition(P, F.rand()).write.mode("overwrite").parquet("/tmp/a")
# Pattern B: same plan + sortWithinPartitions before write
(df.repartition(P, F.rand())
.sortWithinPartitions("k1", "k2", "k3")
.write.mode("overwrite").parquet("/tmp/b"))
On the same data:
- CPU runs of pattern A produce small files (within-partition order preserved by
SortShuffleManager).
- GPU runs of pattern A produce files several times larger.
- GPU runs of pattern B produce files within ~10% of the CPU baseline.
The output is correct — same rows, same row count — but the file size delta is consistently observed and reproducible.
Operators in the GPU plan that destroy within-partition clustering:
GpuShuffleExchangeExec — sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala:439-507 (prepareBatchShuffleDependency): partition assignment and the multi-threaded async shuffle write/read pipeline do not preserve within-partition row order across the network round-trip.
GpuShuffledHashJoinExec / GpuBroadcastHashJoinExec — sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala:140-174: cuDF's hash-join gather maps return rows in hash-table iteration order, not probe-side input order.
GpuHashAggregateExec — same mechanism; row order follows the hash table's internal bucketing.
Describe the solution you'd like
A new opt-in conf spark.rapids.sql.shuffleOptimizeSort.enabled (default false) that, when a GpuDataWritingCommandExec is present, walks the plan tree from the write down through outputOrdering.isEmpty nodes and inserts a GpuSortExec above the deepest reachable GpuShuffleExchangeExec. The sort key would be selected the same way hashOptimizeSort's getOptimizedSortOrder (GpuTransitionOverrides.scala:621-628) does — output schema columns of the shuffle.
This is structurally a small extension to the existing insertHashOptimizeSorts rewrite rule (GpuTransitionOverrides.scala:591-619). The conf is opt-in because the sort cost is workload-dependent and not always a net win — small writes or low-cardinality data may not see a measurable file size reduction relative to the added sort time.
Describe alternatives you've considered
- Recommend users add
.sortWithinPartitions(...) manually. Works (verified to close the gap) but requires changing every query that hits this pattern. Many users won't notice the file-size problem until storage costs surprise them.
- Extend
hashOptimizeSort to walk through GpuShuffleExchangeExec. Doesn't help the common repartition(N, F.rand()).write pattern because the sort gets inserted above the hash op below the shuffle, then the shuffle scrambles the sort's output before reaching the write.
- Force the shuffle to CPU. Disables a major GPU optimization for the whole stage. Much heavier than an opt-in sort.
- Fix it in cuDF's Parquet writer. Investigated: cuDF's HYBRID RLE+BIT_PACKED encoder is already correctly implemented (
cudf/cpp/src/io/parquet/page_enc.cu:1143-1277). On clustered input it produces files slightly smaller than parquet-mr. The writer is not the bottleneck.
Additional context
Closely related existing conf: spark.rapids.sql.hashOptimizeSort.enabled (RapidsConf.scala:1125-1129, GpuTransitionOverrides.scala:591-619). The proposed conf is the shuffle-edge analogue of that rule.
Documentation gap: docs/compatibility.md notes that the plugin does not guarantee output order for hash ops, but does not call out the 3-4× file-size cost on writes. A docs update calling out this caveat (and pointing to either the manual workaround or this new conf if it lands) would help users diagnose unexpectedly large GPU-written files.
No existing tests cover row-ordering preservation across shuffles. HashSortOptimizeSuite.scala validates plan structure for hashOptimizeSort but does not assert anything about ordering or output file size.
Is your feature request related to a problem? Please describe.
GPU plans on spark-rapids do not preserve within-partition row order across shuffles and hash-based operators. This is documented as expected behavior in
docs/compatibility.md:20-41. However, for queries that end with a Parquet (or ORC) write on high-cardinality columns, the loss of within-partition clustering inflates output file size by 3-4× compared to the equivalent CPU run.The cause is that Parquet's dictionary encoding emits column-chunk indices as a HYBRID RLE+BIT_PACKED stream: when consecutive rows hold the same dictionary index, the RLE half collapses runs aggressively (single-digit bits per row); when input is shuffled, every row pays the full bit-packed cost (10-20 bits per row depending on cardinality). Hash partitioning, hash joins, and GPU's multi-threaded shuffle I/O all destroy any natural same-key runs that the upstream pipeline produced.
A minimal synthetic reproducer:
On the same data:
SortShuffleManager).The output is correct — same rows, same row count — but the file size delta is consistently observed and reproducible.
Operators in the GPU plan that destroy within-partition clustering:
GpuShuffleExchangeExec—sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala:439-507(prepareBatchShuffleDependency): partition assignment and the multi-threaded async shuffle write/read pipeline do not preserve within-partition row order across the network round-trip.GpuShuffledHashJoinExec/GpuBroadcastHashJoinExec—sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractGpuJoinIterator.scala:140-174: cuDF's hash-join gather maps return rows in hash-table iteration order, not probe-side input order.GpuHashAggregateExec— same mechanism; row order follows the hash table's internal bucketing.Describe the solution you'd like
A new opt-in conf
spark.rapids.sql.shuffleOptimizeSort.enabled(defaultfalse) that, when aGpuDataWritingCommandExecis present, walks the plan tree from the write down throughoutputOrdering.isEmptynodes and inserts aGpuSortExecabove the deepest reachableGpuShuffleExchangeExec. The sort key would be selected the same wayhashOptimizeSort'sgetOptimizedSortOrder(GpuTransitionOverrides.scala:621-628) does — output schema columns of the shuffle.This is structurally a small extension to the existing
insertHashOptimizeSortsrewrite rule (GpuTransitionOverrides.scala:591-619). The conf is opt-in because the sort cost is workload-dependent and not always a net win — small writes or low-cardinality data may not see a measurable file size reduction relative to the added sort time.Describe alternatives you've considered
.sortWithinPartitions(...)manually. Works (verified to close the gap) but requires changing every query that hits this pattern. Many users won't notice the file-size problem until storage costs surprise them.hashOptimizeSortto walk throughGpuShuffleExchangeExec. Doesn't help the commonrepartition(N, F.rand()).writepattern because the sort gets inserted above the hash op below the shuffle, then the shuffle scrambles the sort's output before reaching the write.cudf/cpp/src/io/parquet/page_enc.cu:1143-1277). On clustered input it produces files slightly smaller than parquet-mr. The writer is not the bottleneck.Additional context
Closely related existing conf:
spark.rapids.sql.hashOptimizeSort.enabled(RapidsConf.scala:1125-1129,GpuTransitionOverrides.scala:591-619). The proposed conf is the shuffle-edge analogue of that rule.Documentation gap:
docs/compatibility.mdnotes that the plugin does not guarantee output order for hash ops, but does not call out the 3-4× file-size cost on writes. A docs update calling out this caveat (and pointing to either the manual workaround or this new conf if it lands) would help users diagnose unexpectedly large GPU-written files.No existing tests cover row-ordering preservation across shuffles.
HashSortOptimizeSuite.scalavalidates plan structure forhashOptimizeSortbut does not assert anything about ordering or output file size.