fix: lazily initialize BatchCoalescer in CoalescedShuffleReaderStream to avoid schema type mismatch#24
Merged
phillipleblanc merged 1 commit intoMar 11, 2026
Conversation
cc432f2 to
542a755
Compare
There was a problem hiding this comment.
Pull request overview
This PR fixes a panic in Ballista’s shuffle reader path by avoiding eager LimitedBatchCoalescer initialization with the execution plan’s declared schema when the actual Arrow IPC shuffle batches contain differing concrete Arrow types.
Changes:
- Make
CoalescedShuffleReaderStreamlazily initialize itsLimitedBatchCoalescerfrom the first non-empty incomingRecordBatchschema. - Store
batch_size/limiton the stream to support lazy coalescer creation and adjust polling logic to handleOption<LimitedBatchCoalescer>.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… to avoid schema type mismatch
The BatchCoalescer inside CoalescedShuffleReaderStream was eagerly initialized
with the declared schema from the execution plan. However, the actual IPC shuffle
data may have different Arrow types (e.g., string columns declared as LargeUtf8
in the plan but written as Utf8 by the CSV reader). When
InProgressPrimitiveArray<T>::copy_rows() tries to downcast the source array, the
type mismatch causes a panic: Internal("primitive array").
This applies the same lazy initialization pattern used for RepartitionExec
(spiceai/datafusion#135): defer BatchCoalescer creation until the first batch
arrives and use the batch's actual schema.
542a755 to
fc5d46f
Compare
sgrebnov
approved these changes
Mar 11, 2026
phillipleblanc
added a commit
to spiceai/spiceai
that referenced
this pull request
Mar 11, 2026
…anic in distributed queries Updates datafusion-ballista from e1153d7b to ad88031f which includes spiceai/datafusion-ballista#24: lazily initialize BatchCoalescer in CoalescedShuffleReaderStream using the first batch's actual schema instead of the declared plan schema. Fixes distributed queries with GROUP BY on string columns panicking with Internal("primitive array") when the plan declares LargeUtf8 but the IPC shuffle data contains Utf8.
github-merge-queue Bot
pushed a commit
to spiceai/spiceai
that referenced
this pull request
Mar 12, 2026
…anic in distributed queries (#9716) Updates datafusion-ballista from e1153d7b to ad88031f which includes spiceai/datafusion-ballista#24: lazily initialize BatchCoalescer in CoalescedShuffleReaderStream using the first batch's actual schema instead of the declared plan schema. Fixes distributed queries with GROUP BY on string columns panicking with Internal("primitive array") when the plan declares LargeUtf8 but the IPC shuffle data contains Utf8.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
The
BatchCoalescerinsideCoalescedShuffleReaderStreamwas eagerly initialized with the declared schema from the execution plan. However, the actual IPC shuffle data may have different Arrow types — for example, string columns declared asLargeUtf8in the plan schema but written asUtf8by the CSV reader. WhenInProgressPrimitiveArray<T>::copy_rows()tries to downcast the source array, the type mismatch causes a panic:Root Cause
In distributed (Ballista) query execution, shuffle data is written as Arrow IPC by Stage N and read by Stage N+1 via
ShuffleReaderExec. TheCoalescedShuffleReaderStreamwraps the IPC reader output and coalesces small batches. It eagerly creates aLimitedBatchCoalescerusinginput.schema()— the declared schema from the plan. But the actual Arrow arrays in the IPC files may use different concrete types than what the plan declares, causing theBatchCoalescerto createInProgressPrimitiveArray<T>with the wrongT.Fix
Lazily initialize the
BatchCoalescerfrom the first actual batch's schema instead of the declared schema. This is the same pattern already applied toRepartitionExecin spiceai/datafusion#135.