Skip to content

Conversation

@colin-ho
Copy link
Collaborator

@colin-ho colin-ho commented Jan 9, 2026

Changes Made

There is currently a bug with IntermediateOps where the worker that produces a HasMoreOutput is not being polled from again, in the maintain_order = True case.

Recap

Quick recap, what is actually going in with intermediate ops, and why does this only affect HasMoreOutput.

image

We have intermediate op workers, that are responsible for actually executing the op on input morsels, a dispatcher that dispatches work to them, and a receiver that receives results. in the maintain order = True case, we use a round robin dispatcher that dispatches work in round robin fashion, and an OrderingAwareReceiver that receives results in a round robin fashion.

Problem

If a worker produces a result with HasMoreOutput, the round robin receiver SHOULD PULL FROM THAT WORKER AGAIN, BUT IT DOES NOT and instead it moves on to the next worker.

This can create a deadlock where the worker that produces has more output wants to send again, but the round robin reciever is not awaiting it, and is instead awaiting from some other worker that could be, say, awaiting it's own input.

Solution

Just give the OrderingAwareReceiver info that it should pull from a certain receiver again. Bleh, this is now so messy.

So, lets just get rid of the channels altogether. No more channels.

We can remodel the intermediate op to a single concurrent state machine.

while has_input or has_active_tasks {
    tokio::select {
        new_input = input_rx.recv() => active_tasks.spawn(execute(new_input)),
        result = active_tasks.join_next() => process_result(result) # if has more output just spawn again.
    }
}

This allows us to get rid channels within the intermediate op altogether, and only have a single channel connecting intermdiate ops.

Results

This script simulates a long pipeline of connecting intermediate ops. Theoretically less channels means less intermediate data.

import daft
import os


@daft.func
def generate_10_mb_data(x: int) -> bytes:
    return os.urandom(10 * 1024 * 1024)


@daft.func
def noop(x: bytes) -> bytes:
    return x


df = daft.from_pydict({"id": [i for i in range(100)]}).into_batches(1)

# Generate 10 MB of data for each row
df = df.with_column("data_0", generate_10_mb_data(daft.col("id")))

# Add some noop udfs to add a lot of intermdiate ops
for i in range(1, 10):
    df = df.with_column(f"data_{i}", noop(daft.col(f"data_{i-1}")))

# Send them to the void
for p in df.iter_partitions():
    pass

Before:
Screenshot 2026-01-09 at 1 37 21 PM

After:
Screenshot 2026-01-09 at 1 36 47 PM

We reduced peak memory by half.

Future

This HasMoreOutput bug affects streaming sinks as well actually. So if this fix works we should implement it on streaming sink as well, and then might as well do so for blocking sinks and we can get rid of the intra-op channels altogether.

Related Issues

@github-actions github-actions bot added the feat label Jan 9, 2026
fn op_type(&self) -> NodeType;
fn multiline_display(&self) -> Vec<String>;
fn make_state(&self) -> DaftResult<Self::State>;
fn make_state(&self) -> Self::State;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a drive by, make_state should not need to be fallible

@colin-ho colin-ho marked this pull request as ready for review January 10, 2026 00:01
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Jan 10, 2026

Greptile Overview

Greptile Summary

This PR fundamentally refactors the IntermediateOp execution model to fix a critical bug where workers producing HasMoreOutput were not being polled again in the maintain_order = True case, which could cause deadlocks.

Key Changes

Architecture Overhaul:

  • Removed internal dispatcher and worker channels - The old architecture had a dispatcher sending work to multiple workers via channels, with another channel collecting results. This has been completely eliminated for intermediate ops.
  • New single state machine - Replaced with a tokio::select! based state machine in process_input() that directly manages task execution and input reception without intermediate channels.
  • Proper HasMoreOutput handling - When a task returns HasMoreOutput, it now immediately respawns with the same input and state, fixing the deadlock bug.

HasMoreOutput API Change:

  • Changed from HasMoreOutput(Arc<MicroPartition>) to HasMoreOutput { input: Arc<MicroPartition>, output: Arc<MicroPartition> } to support the new execution model.

State Management:

  • States are now pre-allocated in a pool and reused across inputs, which is more efficient for operators with expensive state initialization (like UDF process handles).
  • make_state() signature simplified from DaftResult<Self::State> to Self::State since state creation is now infallible for all operators.

Code Sharing:

  • Moved JoinSet and OrderedJoinSet from daft-distributed to common_runtime for reuse.
  • Added new OrderingAwareJoinSet enum that wraps both ordered and unordered variants.

Impact

Performance: The PR demonstrates a 50% reduction in peak memory usage for long pipelines of intermediate ops, as shown in the benchmark with chained UDFs.

Correctness: Fixes the HasMoreOutput deadlock that could occur with operations like CrossJoin when maintain_order=True.

Future Work: PR description notes that streaming sinks and blocking sinks still use the old dispatcher/channel architecture and will be refactored in future work to get similar benefits.

Review Findings

The implementation is sound and correct. The state machine logic properly handles:

  • Task completion with NeedMoreInput (returns state to pool)
  • Task completion with HasMoreOutput (respawns task, keeps state)
  • Input buffering and batching
  • Remaining buffered data after input closes
  • Ordered vs unordered execution modes

All intermediate operators have been correctly updated to the new API. The change is internal to the crate and does not break external APIs.

Confidence Score: 5/5

  • This PR is safe to merge with minimal risk - the refactoring is well-architected and thoroughly tested
  • Score reflects comprehensive analysis of the state machine logic, verification of all operator updates, and validation that the HasMoreOutput bug fix is correctly implemented. The removal of internal channels is a significant simplification that reduces complexity and memory usage. All edge cases in the tokio::select branches have been verified correct. No logic bugs or race conditions were found.
  • No files require special attention - all changes are consistent and correct

Important Files Changed

File Analysis

Filename Score Overview
src/daft-local-execution/src/intermediate_ops/intermediate_op.rs 5/5 Complete rewrite of intermediate op execution model. Removed dispatcher and worker channels in favor of single state machine using tokio::select. HasMoreOutput now correctly handled by respawning tasks with same input. Logic appears sound.
src/common/runtime/src/joinset.rs 5/5 Moved from daft-distributed, added OrderingAwareJoinSet wrapper. OrderedJoinSet correctly maintains order by buffering out-of-order completions. Comprehensive tests included.
src/daft-local-execution/src/intermediate_ops/cross_join.rs 5/5 Updated HasMoreOutput to include both input and output fields. Logic correctly returns HasMoreOutput for intermediate combinations and NeedMoreInput when all combinations complete.
src/daft-local-execution/src/intermediate_ops/udf.rs 5/5 Refactored to move use_process logic from make_state to try_new. UdfHandle lifecycle managed correctly through Drop trait. State changes from DaftResult to direct return are safe.
src/daft-local-execution/src/dispatcher.rs 5/5 Minor import cleanup. Dispatcher still used by sinks, not removed entirely as claimed for future work.
src/daft-local-execution/src/lib.rs 5/5 Removed channel-related re-exports that are no longer needed for intermediate ops.

Sequence Diagram

sequenceDiagram
    participant Child as Child Node
    participant IntermediateOp as IntermediateNode
    participant ProcessInput as process_input
    participant TaskSet as OrderingAwareJoinSet
    participant StatePool as State Pool
    participant Buffer as RowBasedBuffer
    participant Output as Output Channel

    Child->>IntermediateOp: start()
    IntermediateOp->>StatePool: Initialize states (max_concurrency)
    IntermediateOp->>ProcessInput: spawn task
    
    loop Until input_closed AND task_set empty
        alt Task completion available
            TaskSet-->>ProcessInput: join_next() returns result
            ProcessInput->>ProcessInput: handle_task_completion()
            alt NeedMoreInput(Some(mp))
                ProcessInput->>Output: send(mp)
                ProcessInput->>StatePool: return state
            end
            alt NeedMoreInput(None)
                ProcessInput->>StatePool: return state
            end
            alt HasMoreOutput{input, output}
                ProcessInput->>Output: send(output)
                ProcessInput->>TaskSet: spawn_execution_task(same input, same state)
                Note over ProcessInput,TaskSet: State NOT returned - will be reused
            end
            ProcessInput->>Buffer: spawn_ready_batches()
            loop While states available AND batches ready
                Buffer-->>ProcessInput: next_batch_if_ready()
                StatePool-->>ProcessInput: get state
                ProcessInput->>TaskSet: spawn_execution_task()
            end
        else Input available
            Child-->>ProcessInput: recv() returns morsel
            ProcessInput->>Buffer: push(morsel)
            ProcessInput->>Buffer: spawn_ready_batches()
            loop While states available AND batches ready
                Buffer-->>ProcessInput: next_batch_if_ready()
                StatePool-->>ProcessInput: get state
                ProcessInput->>TaskSet: spawn_execution_task()
            end
        end
    end
    
    opt Remaining buffered data
        Buffer-->>ProcessInput: pop_all()
        StatePool-->>ProcessInput: get state
        ProcessInput->>TaskSet: spawn_execution_task()
        loop Until all tasks complete
            TaskSet-->>ProcessInput: join_next()
            ProcessInput->>ProcessInput: handle_task_completion()
            opt HasMoreOutput
                ProcessInput->>TaskSet: spawn again
                Note over ProcessInput,TaskSet: Loop continues until NeedMoreInput
            end
        end
    end
    
    ProcessInput->>IntermediateOp: Done
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@colin-ho colin-ho requested a review from srilman January 10, 2026 00:22
@codecov
Copy link

codecov bot commented Jan 10, 2026

Codecov Report

❌ Patch coverage is 95.23810% with 17 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.86%. Comparing base (c4adb76) to head (96e2ab8).
⚠️ Report is 8 commits behind head on main.

Files with missing lines Patch % Lines
...-execution/src/intermediate_ops/intermediate_op.rs 95.87% 8 Missing ⚠️
src/common/runtime/src/joinset.rs 95.60% 4 Missing ⚠️
...intermediate_ops/distributed_actor_pool_project.rs 0.00% 3 Missing ⚠️
src/daft-local-execution/src/dispatcher.rs 75.00% 1 Missing ⚠️
src/daft-local-execution/src/lib.rs 88.88% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #5999      +/-   ##
==========================================
+ Coverage   72.69%   72.86%   +0.16%     
==========================================
  Files         970      970              
  Lines      127333   126113    -1220     
==========================================
- Hits        92570    91889     -681     
+ Misses      34763    34224     -539     
Files with missing lines Coverage Δ
src/common/error/src/error.rs 76.47% <ø> (ø)
src/common/runtime/src/lib.rs 87.57% <100.00%> (ø)
...rc/daft-distributed/src/pipeline_node/actor_udf.rs 25.12% <ø> (ø)
...t-distributed/src/pipeline_node/into_partitions.rs 0.00% <ø> (ø)
.../daft-distributed/src/pipeline_node/materialize.rs 97.67% <ø> (ø)
src/daft-distributed/src/pipeline_node/vllm.rs 0.00% <ø> (ø)
src/daft-distributed/src/plan/mod.rs 75.75% <ø> (ø)
src/daft-distributed/src/plan/runner.rs 7.00% <ø> (ø)
src/daft-distributed/src/scheduling/dispatcher.rs 96.37% <ø> (ø)
...ibuted/src/scheduling/scheduler/scheduler_actor.rs 89.95% <ø> (ø)
... and 22 more

... and 24 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants