Skip to content

Commit 9c28284

Browse files
Merge pull request #1428 from TimothyZhang7/feature/parallel-fanout
feat: parallel execution framework
2 parents e1bea18 + 075e917 commit 9c28284

File tree

6 files changed

+1230
-165
lines changed

6 files changed

+1230
-165
lines changed

core/framework/graph/edge.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,11 @@ class GraphSpec(BaseModel):
412412
default_model: str = "claude-haiku-4-5-20251001"
413413
max_tokens: int = 1024
414414

415+
# Cleanup LLM for JSON extraction fallback (fast/cheap model preferred)
416+
# If not set, uses CEREBRAS_API_KEY -> cerebras/llama-3.3-70b or
417+
# ANTHROPIC_API_KEY -> claude-3-5-haiku as fallback
418+
cleanup_llm_model: str | None = None
419+
415420
# Execution limits
416421
max_steps: int = Field(default=100, description="Maximum node executions before timeout")
417422
max_retries_per_node: int = 3
@@ -449,6 +454,44 @@ def get_incoming_edges(self, node_id: str) -> list[EdgeSpec]:
449454
"""Get all edges entering a node."""
450455
return [e for e in self.edges if e.target == node_id]
451456

457+
def detect_fan_out_nodes(self) -> dict[str, list[str]]:
458+
"""
459+
Detect nodes that fan-out to multiple targets.
460+
461+
A fan-out occurs when a node has multiple outgoing edges with the same
462+
condition (typically ON_SUCCESS) that should execute in parallel.
463+
464+
Returns:
465+
Dict mapping source_node_id -> list of parallel target_node_ids
466+
"""
467+
fan_outs: dict[str, list[str]] = {}
468+
for node in self.nodes:
469+
outgoing = self.get_outgoing_edges(node.id)
470+
# Fan-out: multiple edges with ON_SUCCESS condition
471+
success_edges = [
472+
e for e in outgoing if e.condition == EdgeCondition.ON_SUCCESS
473+
]
474+
if len(success_edges) > 1:
475+
fan_outs[node.id] = [e.target for e in success_edges]
476+
return fan_outs
477+
478+
def detect_fan_in_nodes(self) -> dict[str, list[str]]:
479+
"""
480+
Detect nodes that receive from multiple sources (fan-in / convergence).
481+
482+
A fan-in occurs when a node has multiple incoming edges, meaning
483+
it should wait for all predecessor branches to complete.
484+
485+
Returns:
486+
Dict mapping target_node_id -> list of source_node_ids
487+
"""
488+
fan_ins: dict[str, list[str]] = {}
489+
for node in self.nodes:
490+
incoming = self.get_incoming_edges(node.id)
491+
if len(incoming) > 1:
492+
fan_ins[node.id] = [e.source for e in incoming]
493+
return fan_ins
494+
452495
def get_entry_point(self, session_state: dict | None = None) -> str:
453496
"""
454497
Get the appropriate entry point based on session state.

0 commit comments

Comments
 (0)