Skip to content

Conversation

@colin-ho
Copy link
Collaborator

@colin-ho colin-ho commented Jan 8, 2026

Changes Made

Stacked on #5976

This PR enables sources to be streaming sources in swordfish. Currently, all our sources (in memory / scan task / glob) require a fixed set of inputs, and this PR refactors them such that they can continuously accept inputs. This works by

  1. Separate inputs from the plan. LocalPhysicalPlans as well as Swordfish Pipeline nodes no longer store the inputs inline. Translation from logical -> physical now produces a separate Inputs struct, and the swordfish pipeline nodes now store a receiver, e.g. Receiver<(InputID, ScanTask)>, that the sources pull from when they are started.
  2. The NativeExecutor holds the senders into the new streaming sources, and is responsible for enqueuing the inputs.

FOR NOW, the pipelines themselves are still short lived, NativeExecutor.run accepts both plan and inputs, and immediately starts the plan and enqueues the inputs and thats it. In the next PR, I will implement the ability to have the pipelines be long lived with continuous input enqueuing. (this will require the input_id in the morsels as well as flushing mechanism).

Related Issues

@github-actions github-actions bot added the feat label Jan 8, 2026
@colin-ho colin-ho changed the base branch from main to colin/swordfish-task-builder January 8, 2026 01:02
@colin-ho colin-ho changed the title feat(swordfish): Stremaing sources feat(swordfish): Streaming sources Jan 8, 2026
@colin-ho colin-ho force-pushed the colin/stremaing-sources branch from 49bd008 to 23abefb Compare January 8, 2026 04:16
@colin-ho colin-ho changed the base branch from colin/swordfish-task-builder to main January 8, 2026 04:17
@colin-ho colin-ho force-pushed the colin/stremaing-sources branch from b09e4e3 to eb2d3e9 Compare January 8, 2026 05:29
@codecov
Copy link

codecov bot commented Jan 12, 2026

Codecov Report

❌ Patch coverage is 47.30813% with 920 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.94%. Comparing base (ae14540) to head (6895fc4).

Files with missing lines Patch % Lines
src/daft-distributed/src/scheduling/task.rs 0.00% 111 Missing ⚠️
src/daft-distributed/src/pipeline_node/sort.rs 0.00% 88 Missing ⚠️
...t-distributed/src/pipeline_node/into_partitions.rs 0.00% 76 Missing ⚠️
src/daft-distributed/src/pipeline_node/limit.rs 0.00% 65 Missing ⚠️
...t-distributed/src/pipeline_node/join/cross_join.rs 0.00% 47 Missing ⚠️
src/daft-distributed/src/pipeline_node/mod.rs 0.00% 46 Missing ⚠️
...stributed/src/pipeline_node/join/broadcast_join.rs 0.00% 39 Missing ⚠️
...daft-distributed/src/pipeline_node/into_batches.rs 0.00% 38 Missing ⚠️
...tributed/src/pipeline_node/join/sort_merge_join.rs 0.00% 38 Missing ⚠️
src/daft-local-plan/src/translate.rs 90.35% 30 Missing ⚠️
... and 39 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #5978      +/-   ##
==========================================
+ Coverage   72.89%   72.94%   +0.04%     
==========================================
  Files         971      972       +1     
  Lines      125987   126179     +192     
==========================================
+ Hits        91842    92041     +199     
+ Misses      34145    34138       -7     
Files with missing lines Coverage Δ
daft/execution/native_executor.py 94.11% <100.00%> (+2.94%) ⬆️
daft/runners/native_runner.py 83.75% <100.00%> (ø)
src/common/scan-info/src/lib.rs 80.00% <100.00%> (+7.77%) ⬆️
.../daft-distributed/src/pipeline_node/materialize.rs 97.67% <100.00%> (ø)
src/daft-distributed/src/scheduling/dispatcher.rs 96.37% <100.00%> (ø)
src/daft-local-execution/src/channel.rs 100.00% <ø> (+1.85%) ⬆️
src/daft-local-execution/src/lib.rs 87.05% <ø> (ø)
...rc/daft-local-execution/src/sinks/blocking_sink.rs 83.33% <100.00%> (ø)
src/daft-local-execution/src/sinks/sort.rs 73.49% <100.00%> (+0.99%) ⬆️
src/daft-local-plan/src/plan.rs 45.96% <100.00%> (-0.52%) ⬇️
... and 49 more

... and 9 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants