Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,13 @@ async fn fetch_partition_object_store_inner(
struct CoalescedShuffleReaderStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
coalescer: LimitedBatchCoalescer,
/// The coalescer is lazily initialized from the first batch's actual schema
/// rather than the declared schema to avoid type mismatches (e.g., the plan
/// declares LargeUtf8 but the IPC shuffle data contains Utf8).
coalescer: Option<LimitedBatchCoalescer>,
/// Batch size and limit for lazy coalescer initialization.
batch_size: usize,
limit: Option<usize>,
completed: bool,
baseline_metrics: BaselineMetrics,
}
Expand All @@ -1313,9 +1319,11 @@ impl CoalescedShuffleReaderStream {
) -> Self {
let schema = input.schema();
Self {
schema: schema.clone(),
schema,
input,
coalescer: LimitedBatchCoalescer::new(schema, batch_size, limit),
coalescer: None,
batch_size,
limit,
completed: false,
baseline_metrics: BaselineMetrics::new(metrics, partition),
}
Expand All @@ -1334,7 +1342,9 @@ impl Stream for CoalescedShuffleReaderStream {

loop {
// If there is already a completed batch ready, return it directly
if let Some(batch) = self.coalescer.next_completed_batch() {
if let Some(ref mut coalescer) = self.coalescer
&& let Some(batch) = coalescer.next_completed_batch()
{
self.baseline_metrics.record_output(batch.num_rows());
return Poll::Ready(Some(Ok(batch)));
}
Expand All @@ -1346,26 +1356,44 @@ impl Stream for CoalescedShuffleReaderStream {

// Pull from upstream
match ready!(self.input.poll_next_unpin(cx)) {
// If upstream is completed, then flush remaning buffered batches
// If upstream is completed, then flush remaining buffered batches
None => {
self.completed = true;
if let Err(e) = self.coalescer.finish() {
if let Some(ref mut coalescer) = self.coalescer
&& let Err(e) = coalescer.finish()
{
return Poll::Ready(Some(Err(e)));
}
}
// If upstream is not completed, then push to coalescer
Some(Ok(batch)) => {
if batch.num_rows() > 0 {
// Lazily initialize the coalescer from the first
// batch's actual schema to avoid type mismatches
// (e.g., plan declares LargeUtf8 but IPC data has Utf8).
if self.coalescer.is_none() {
self.coalescer = Some(LimitedBatchCoalescer::new(
batch.schema(),
Comment thread
phillipleblanc marked this conversation as resolved.
self.batch_size,
self.limit,
));
Comment thread
phillipleblanc marked this conversation as resolved.
}

let coalescer =
self.coalescer.as_mut().expect("just initialized");

// Try to push to coalescer
match self.coalescer.push_batch(batch) {
match coalescer.push_batch(batch) {
// If push is successful, then continue
Ok(PushBatchStatus::Continue) => {
continue;
}
// If limit is reached, then finish coalescer and set completed to true
Ok(PushBatchStatus::LimitReached) => {
self.completed = true;
if let Err(e) = self.coalescer.finish() {
let coalescer =
self.coalescer.as_mut().expect("just initialized");
if let Err(e) = coalescer.finish() {
return Poll::Ready(Some(Err(e)));
}
}
Expand Down
Loading