Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 181 additions & 2 deletions app/flow/planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class PlanningFlow(BaseFlow):
executor_keys: List[str] = Field(default_factory=list)
active_plan_id: str = Field(default_factory=lambda: f"plan_{int(time.time())}")
current_step_index: Optional[int] = None
max_retry_per_step: int = Field(default=2) # Max retries before replanning
max_replan_count: int = Field(default=3) # Max replanning attempts to prevent infinite loop
replan_counts: Dict[int, int] = Field(default_factory=dict) # Track replans per step
step_retry_counts: Dict[int, int] = Field(default_factory=dict) # Track retries per step

def __init__(
self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **data
Expand Down Expand Up @@ -124,6 +128,21 @@ async def execute(self, input_text: str) -> str:
step_result = await self._execute_step(executor, step_info)
result += step_result + "\n"

# Check if step is blocked (failed after max retries), trigger replanning
# Keep completed steps, replan the blocked step and subsequent steps
# Also check max replan count to prevent infinite loop
retry_count = self.step_retry_counts.get(self.current_step_index, 0)
replan_count = self.replan_counts.get(self.current_step_index, 0)
if retry_count >= self.max_retry_per_step:
if replan_count >= self.max_replan_count:
logger.warning(f"Step {self.current_step_index} exceeded max replan attempts, marking as failed...")
result += f"\n--- Task Failed ---\nStep {self.current_step_index} failed after {replan_count} replanning attempts.\nOriginal error: {step_result}\n--- End ---\n"
break
logger.info(f"Step {self.current_step_index} is blocked, triggering replanning...")
self.replan_counts[self.current_step_index] = replan_count + 1
replan_result = await self._replan(step_result)
result += f"\n--- Replanning ---\n{replan_result}\n--- End Replanning ---\n"

# Check if agent wants to terminate
if hasattr(executor, "state") and executor.state == AgentState.FINISHED:
break
Expand Down Expand Up @@ -295,19 +314,63 @@ async def _execute_step(self, executor: BaseAgent, step_info: dict) -> str:
try:
step_result = await executor.run(step_prompt)

# Mark the step as completed after successful execution
await self._mark_step_completed()
# Check if the step execution indicates a failure (Error: prefix)
result_lower = step_result.strip().lower()
has_failure = result_lower.startswith("error:")

if has_failure:
# Increment retry count
retry_count = self.step_retry_counts.get(self.current_step_index, 0) + 1
self.step_retry_counts[self.current_step_index] = retry_count

# Only mark as blocked after max retries reached
if retry_count >= self.max_retry_per_step:
await self._mark_step_blocked(step_result, f"Failed after {retry_count} retries")
else:
# Still in progress, will retry
logger.info(f"Step {self.current_step_index} failed, retry {retry_count}/{self.max_retry_per_step}")
else:
# Mark the step as completed after successful execution
await self._mark_step_completed()

return step_result
except Exception as e:
logger.error(f"Error executing step {self.current_step_index}: {e}")
# Increment retry count
retry_count = self.step_retry_counts.get(self.current_step_index, 0) + 1
self.step_retry_counts[self.current_step_index] = retry_count

# Only mark as blocked after max retries reached
if retry_count >= self.max_retry_per_step:
await self._mark_step_blocked(str(e), f"Exception after {retry_count} retries")

return f"Error executing step {self.current_step_index}: {str(e)}"

async def _mark_step_blocked(self, reason: str = "", notes: str = "") -> None:
"""Mark the current step as blocked with optional reason."""
if self.current_step_index is None:
return

try:
await self.planning_tool.execute(
command="mark_step",
plan_id=self.active_plan_id,
step_index=self.current_step_index,
step_status=PlanStepStatus.BLOCKED.value,
step_notes=notes if notes else reason
)
except Exception as e:
logger.warning(f"Failed to mark step as blocked: {e}")

async def _mark_step_completed(self) -> None:
"""Mark the current step as completed."""
if self.current_step_index is None:
return

# Clean up retry counts for completed step
self.step_retry_counts.pop(self.current_step_index, None)
self.replan_counts.pop(self.current_step_index, None)

try:
# Mark the step as completed
await self.planning_tool.execute(
Expand All @@ -334,6 +397,122 @@ async def _mark_step_completed(self) -> None:
step_statuses[self.current_step_index] = PlanStepStatus.COMPLETED.value
plan_data["step_statuses"] = step_statuses

async def _replan(self, step_result: str) -> str:
"""
Replan by analyzing failure reason with LLM, then directly update the plan.
Keeps completed steps, replans the blocked step and subsequent steps.
Flow calls LLM to get analysis, then updates plan using PlanningTool (not via LLM tool call).
"""
# Get current plan info
plan_text = await self._get_plan_text()
plan_data = self.planning_tool.plans.get(self.active_plan_id)

# Find completed steps (keep them in the new plan)
completed_steps = []
current_step_text = ""
subsequent_steps = []

if plan_data and self.current_step_index is not None:
steps = plan_data.get("steps", [])
step_statuses = plan_data.get("step_statuses", [])

for i, step in enumerate(steps):
if i < self.current_step_index:
# Steps before current one - keep if completed
if i < len(step_statuses) and step_statuses[i] == PlanStepStatus.COMPLETED.value:
completed_steps.append(step)
elif i == self.current_step_index:
# Current step (blocked) - get text for analysis
current_step_text = step
else:
# Subsequent steps
subsequent_steps.append(step)

# Call LLM to analyze failure and get new plan (only for blocked and subsequent steps)
system_msg = Message.system_message(
"You are a planning assistant. Analyze the failure and provide a new plan. "
"Respond with JSON only: {\"title\": \"new title\", \"steps\": [\"step1\", \"step2\", ...]}"
)

# Build user message with context about completed steps
completed_context = ""
if completed_steps:
completed_context = f"\n\nCOMPLETED STEPS (keep these, do not repeat):\n" + "\n".join(f"- {s}" for s in completed_steps)

user_msg = Message.user_message(
f"Current plan:\n{plan_text}\n"
f"{completed_context}\n\n"
f"BLOCKED STEP (needs to be replanned): {current_step_text}\n\n"
f"Failure result:\n{step_result}\n\n"
f"Please analyze why this step failed and provide an updated plan. "
f"IMPORTANT: Do NOT include already completed steps in your response. "
f"Only provide new steps to replace the blocked step and handle any remaining work."
)

response = await self.llm.ask(messages=[user_msg], system_msgs=[system_msg])

# Parse LLM response to get new plan
try:
# Try to extract and parse JSON from response with improved robustness
import re
# Try to find JSON block (supports nested objects)
json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
json_match = re.search(json_pattern, response, re.DOTALL)
if json_match:
new_plan = json.loads(json_match.group())
else:
new_plan = json.loads(response)

# Validate required fields
if not isinstance(new_plan.get("steps"), list):
raise ValueError("LLM response missing 'steps' field")

# Combine completed steps with new steps from LLM
new_steps_from_llm = new_plan.get("steps", [])
final_steps = completed_steps + new_steps_from_llm

# Update step statuses: keep COMPLETED for old steps, reset others to NOT_STARTED
new_step_statuses = []
for i, step in enumerate(final_steps):
if i < len(completed_steps):
new_step_statuses.append(PlanStepStatus.COMPLETED.value)
else:
new_step_statuses.append(PlanStepStatus.NOT_STARTED.value)

# Directly update the plan using PlanningTool
await self.planning_tool.execute(
command="update",
plan_id=self.active_plan_id,
title=new_plan.get("title"),
steps=final_steps
)

# Update step statuses
if self.active_plan_id in self.planning_tool.plans:
self.planning_tool.plans[self.active_plan_id]["step_statuses"] = new_step_statuses

# Reset retry and replan counts after successful replanning
self.step_retry_counts[self.current_step_index] = 0
self.replan_counts[self.current_step_index] = 0

return f"Plan updated successfully. Completed: {len(completed_steps)}, New steps: {new_steps_from_llm}"
except Exception as e:
logger.error(f"Failed to parse LLM response or update plan: {e}")
# Fallback: keep completed steps, mark current as blocked, add retry step
# Include original error info for debugging
original_error = step_result.strip() if step_result else "Unknown error"
if plan_data:
new_steps = completed_steps + [
f"[BLOCKED] {current_step_text}",
f"[RETRY] Alternative approach for: {current_step_text}"
]
await self.planning_tool.execute(
command="update",
plan_id=self.active_plan_id,
steps=new_steps
)
return f"Plan updated with fallback strategy. Original error: {original_error}. Parse error: {str(e)}"

async def _get_plan_text(self) -> str:
"""Get the current plan as formatted text."""
try:
Expand Down