Skip to content

Commit ad88031

Browse files
fix: lazily initialize BatchCoalescer in CoalescedShuffleReaderStream to avoid schema type mismatch (#24)
The BatchCoalescer inside CoalescedShuffleReaderStream was eagerly initialized with the declared schema from the execution plan. However, the actual IPC shuffle data may have different Arrow types (e.g., string columns declared as LargeUtf8 in the plan but written as Utf8 by the CSV reader). When InProgressPrimitiveArray<T>::copy_rows() tries to downcast the source array, the type mismatch causes a panic: Internal("primitive array"). This applies the same lazy initialization pattern used for RepartitionExec (spiceai/datafusion#135): defer BatchCoalescer creation until the first batch arrives and use the batch's actual schema.
1 parent e1153d7 commit ad88031

1 file changed

Lines changed: 36 additions & 8 deletions

File tree

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,7 +1298,13 @@ async fn fetch_partition_object_store_inner(
12981298
struct CoalescedShuffleReaderStream {
12991299
schema: SchemaRef,
13001300
input: SendableRecordBatchStream,
1301-
coalescer: LimitedBatchCoalescer,
1301+
/// The coalescer is lazily initialized from the first batch's actual schema
1302+
/// rather than the declared schema to avoid type mismatches (e.g., the plan
1303+
/// declares LargeUtf8 but the IPC shuffle data contains Utf8).
1304+
coalescer: Option<LimitedBatchCoalescer>,
1305+
/// Batch size and limit for lazy coalescer initialization.
1306+
batch_size: usize,
1307+
limit: Option<usize>,
13021308
completed: bool,
13031309
baseline_metrics: BaselineMetrics,
13041310
}
@@ -1313,9 +1319,11 @@ impl CoalescedShuffleReaderStream {
13131319
) -> Self {
13141320
let schema = input.schema();
13151321
Self {
1316-
schema: schema.clone(),
1322+
schema,
13171323
input,
1318-
coalescer: LimitedBatchCoalescer::new(schema, batch_size, limit),
1324+
coalescer: None,
1325+
batch_size,
1326+
limit,
13191327
completed: false,
13201328
baseline_metrics: BaselineMetrics::new(metrics, partition),
13211329
}
@@ -1334,7 +1342,9 @@ impl Stream for CoalescedShuffleReaderStream {
13341342

13351343
loop {
13361344
// If there is already a completed batch ready, return it directly
1337-
if let Some(batch) = self.coalescer.next_completed_batch() {
1345+
if let Some(ref mut coalescer) = self.coalescer
1346+
&& let Some(batch) = coalescer.next_completed_batch()
1347+
{
13381348
self.baseline_metrics.record_output(batch.num_rows());
13391349
return Poll::Ready(Some(Ok(batch)));
13401350
}
@@ -1346,26 +1356,44 @@ impl Stream for CoalescedShuffleReaderStream {
13461356

13471357
// Pull from upstream
13481358
match ready!(self.input.poll_next_unpin(cx)) {
1349-
// If upstream is completed, then flush remaning buffered batches
1359+
// If upstream is completed, then flush remaining buffered batches
13501360
None => {
13511361
self.completed = true;
1352-
if let Err(e) = self.coalescer.finish() {
1362+
if let Some(ref mut coalescer) = self.coalescer
1363+
&& let Err(e) = coalescer.finish()
1364+
{
13531365
return Poll::Ready(Some(Err(e)));
13541366
}
13551367
}
13561368
// If upstream is not completed, then push to coalescer
13571369
Some(Ok(batch)) => {
13581370
if batch.num_rows() > 0 {
1371+
// Lazily initialize the coalescer from the first
1372+
// batch's actual schema to avoid type mismatches
1373+
// (e.g., plan declares LargeUtf8 but IPC data has Utf8).
1374+
if self.coalescer.is_none() {
1375+
self.coalescer = Some(LimitedBatchCoalescer::new(
1376+
batch.schema(),
1377+
self.batch_size,
1378+
self.limit,
1379+
));
1380+
}
1381+
1382+
let coalescer =
1383+
self.coalescer.as_mut().expect("just initialized");
1384+
13591385
// Try to push to coalescer
1360-
match self.coalescer.push_batch(batch) {
1386+
match coalescer.push_batch(batch) {
13611387
// If push is successful, then continue
13621388
Ok(PushBatchStatus::Continue) => {
13631389
continue;
13641390
}
13651391
// If limit is reached, then finish coalescer and set completed to true
13661392
Ok(PushBatchStatus::LimitReached) => {
13671393
self.completed = true;
1368-
if let Err(e) = self.coalescer.finish() {
1394+
let coalescer =
1395+
self.coalescer.as_mut().expect("just initialized");
1396+
if let Err(e) = coalescer.finish() {
13691397
return Poll::Ready(Some(Err(e)));
13701398
}
13711399
}

0 commit comments

Comments
 (0)