Skip to content

Commit fd76a12

Browse files
antiguruclaude
andcommitted
Extract fusion into GraphPass plugin API
Move ~600 lines of operator fusion code out of subgraph.rs into two new modules: graph_pass.rs defines a GraphPass trait that receives children and edges for in-place transformation, and fusion.rs implements it with the existing detect_groups/fuse_group/GroupScheduler logic. SubgraphBuilder gains a graph_passes Vec and add_graph_pass() method. The build() method runs registered passes sequentially before building the reachability tracker. worker.rs and child.rs register FusionPass when fuse_chain_length >= 2. PerOperatorState fields are now pub(crate) so the fusion module can access them. The forward_to field and tombstone mechanism remain as a general consequence of graph pass output, not fusion-specific. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 583c570 commit fd76a12

File tree

6 files changed

+745
-619
lines changed

6 files changed

+745
-619
lines changed

timely/src/dataflow/scopes/child.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,17 @@ where
148148
};
149149
func(&mut builder)
150150
};
151-
let subscope = subscope.into_inner().build(self);
151+
let mut subscope = subscope.into_inner();
152+
153+
// Register the fusion pass if enabled.
154+
let fuse_chain_length = self.parent.config().fuse_chain_length;
155+
if fuse_chain_length >= 2 {
156+
subscope.add_graph_pass(Box::new(
157+
crate::progress::fusion::FusionPass::new(fuse_chain_length)
158+
));
159+
}
160+
161+
let subscope = subscope.build(self);
152162

153163
self.add_operator_with_indices(Box::new(subscope), index, identifier);
154164

0 commit comments

Comments
 (0)