Skip to content

Commit 517620b

Browse files
committed
fix: handle non-zero null buffer offset in raw shuffle format
The null bitmap in Arrow arrays can have a non-zero bit offset even when ArrayData.offset() is 0 (e.g. after RecordBatch::slice). The raw shuffle writer was copying the bitmap bytes verbatim, but the reader assumes bits start at offset 0. This caused shifted null bitmaps, corrupting data during shuffle and producing wrong query results (e.g. TPC-DS q6 counts off by 1). Fix by detecting non-zero bitmap offsets and emitting a re-aligned copy. Add a roundtrip test with sliced batches to cover this case.
1 parent 38fe95c commit 517620b

1 file changed

Lines changed: 64 additions & 5 deletions

File tree

  • native/core/src/execution/shuffle

native/core/src/execution/shuffle/codec.rs

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use simd_adler32::Adler32;
3131
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
3232
use std::sync::Arc;
3333

34+
3435
#[derive(Debug, Clone)]
3536
pub enum CompressionCodec {
3637
None,
@@ -46,20 +47,39 @@ pub struct ShuffleBlockWriter {
4647
}
4748

4849
/// Recursively writes raw Arrow ArrayData buffers to the given writer.
50+
///
51+
/// The null buffer may have a non-zero bit offset (e.g. from `RecordBatch::slice`),
52+
/// even when `data.offset() == 0`. We emit a zero-offset copy of the bitmap so
53+
/// the reader can consume it without tracking offsets.
4954
fn write_array_data<W: Write>(data: &arrow::array::ArrayData, writer: &mut W) -> Result<()> {
5055
debug_assert_eq!(data.offset(), 0, "shuffle arrays must have offset 0");
5156

5257
// Write null_count
5358
let null_count = data.null_count() as u32;
5459
writer.write_all(&null_count.to_le_bytes())?;
5560

56-
// Write validity bitmap
61+
// Write validity bitmap (always emitted at bit-offset 0)
5762
if null_count > 0 {
5863
if let Some(bitmap) = data.nulls() {
59-
let bitmap_bytes = bitmap.buffer().as_slice();
60-
let len = bitmap_bytes.len() as u32;
61-
writer.write_all(&len.to_le_bytes())?;
62-
writer.write_all(bitmap_bytes)?;
64+
if bitmap.offset() == 0 {
65+
// Fast path: bitmap is already aligned, write raw bytes
66+
let bitmap_bytes = bitmap.buffer().as_slice();
67+
let len = bitmap_bytes.len() as u32;
68+
writer.write_all(&len.to_le_bytes())?;
69+
writer.write_all(bitmap_bytes)?;
70+
} else {
71+
// Bitmap has a non-zero bit offset — produce a zero-offset copy
72+
let num_bits = bitmap.len();
73+
let num_bytes = num_bits.div_ceil(8);
74+
writer.write_all(&(num_bytes as u32).to_le_bytes())?;
75+
let mut buf = vec![0u8; num_bytes];
76+
for i in 0..num_bits {
77+
if bitmap.is_valid(i) {
78+
buf[i / 8] |= 1 << (i % 8);
79+
}
80+
}
81+
writer.write_all(&buf)?;
82+
}
6383
} else {
6484
writer.write_all(&0u32.to_le_bytes())?;
6585
}
@@ -762,6 +782,45 @@ mod tests {
762782
}
763783
}
764784

785+
#[test]
786+
#[cfg_attr(miri, ignore)]
787+
fn test_roundtrip_sliced_batch() {
788+
// Test that arrays with non-zero offsets (from slicing) roundtrip correctly.
789+
// This is important because the shuffle writer uses debug_assert for offset==0,
790+
// but in release builds sliced arrays could silently produce wrong results.
791+
let schema = Arc::new(Schema::new(vec![
792+
Field::new("i", DataType::Int32, true),
793+
Field::new("s", DataType::Utf8, true),
794+
]));
795+
let full_batch = RecordBatch::try_new(
796+
Arc::clone(&schema),
797+
vec![
798+
Arc::new(Int32Array::from(vec![
799+
Some(1),
800+
None,
801+
Some(3),
802+
Some(4),
803+
None,
804+
Some(6),
805+
])),
806+
Arc::new(StringArray::from(vec![
807+
Some("a"),
808+
Some("bb"),
809+
None,
810+
Some("dddd"),
811+
Some("eeeee"),
812+
None,
813+
])),
814+
],
815+
)
816+
.unwrap();
817+
818+
// Slice the batch to get arrays with non-zero offset
819+
let sliced = full_batch.slice(2, 3); // rows: [Some(3), Some(4), None] and [None, Some("dddd"), Some("eeeee")]
820+
assert_eq!(sliced.num_rows(), 3);
821+
roundtrip_test(schema, &sliced);
822+
}
823+
765824
#[test]
766825
fn test_empty_batch_returns_zero() {
767826
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

0 commit comments

Comments
 (0)