Skip to content

Commit 090f1d1

Browse files
Merge pull request MervinPraison#493 from MervinPraison/claude/issue-411-20250524_055043
Fix Mini Agents sequential task data passing issue
2 parents e299a5a + 15bda4d commit 090f1d1

3 files changed

Lines changed: 252 additions & 66 deletions

File tree

src/praisonai-agents/praisonaiagents/agents/agents.py

Lines changed: 77 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@
1313
from ..process.process import Process, LoopItems
1414
import asyncio
1515
import uuid
16+
from enum import Enum
17+
18+
# Task status constants
19+
class TaskStatus(Enum):
20+
"""Enumeration for task status values to ensure consistency"""
21+
COMPLETED = "completed"
22+
IN_PROGRESS = "in progress"
23+
NOT_STARTED = "not started"
24+
FAILED = "failed"
25+
UNKNOWN = "unknown"
1626

1727
# Set up logger
1828
logger = logging.getLogger(__name__)
@@ -49,6 +59,55 @@ def process_video(video_path: str, seconds_per_frame=2):
4959
video.release()
5060
return base64_frames
5161

62+
def process_task_context(context_item, verbose=0, user_id=None):
63+
"""
64+
Process a single context item for task execution.
65+
This helper function avoids code duplication between async and sync execution methods.
66+
67+
Args:
68+
context_item: The context item to process (can be string, list, task object, or dict)
69+
verbose: Verbosity level for logging
70+
user_id: User ID for database queries
71+
72+
Returns:
73+
str: Formatted context string for this item
74+
"""
75+
if isinstance(context_item, str):
76+
return f"Input Content:\n{context_item}"
77+
elif isinstance(context_item, list):
78+
return f"Input Content: {' '.join(str(x) for x in context_item)}"
79+
elif hasattr(context_item, 'result'): # Task object
80+
# Ensure the previous task is completed before including its result
81+
task_status = getattr(context_item, 'status', None)
82+
task_name = context_item.name if context_item.name else context_item.description
83+
84+
if context_item.result and task_status == TaskStatus.COMPLETED.value:
85+
return f"Result of previous task {task_name}:\n{context_item.result.raw}"
86+
elif task_status == TaskStatus.COMPLETED.value and not context_item.result:
87+
return f"Previous task {task_name} completed but produced no result."
88+
else:
89+
return f"Previous task {task_name} is not yet completed (status: {task_status or TaskStatus.UNKNOWN.value})."
90+
elif isinstance(context_item, dict) and "vector_store" in context_item:
91+
from ..knowledge.knowledge import Knowledge
92+
try:
93+
# Handle both string and dict configs
94+
cfg = context_item["vector_store"]
95+
if isinstance(cfg, str):
96+
cfg = json.loads(cfg)
97+
98+
knowledge = Knowledge(config={"vector_store": cfg}, verbose=verbose)
99+
100+
# Only use user_id as filter
101+
db_results = knowledge.search(
102+
context_item.get("query", ""), # Use query from context if available
103+
user_id=user_id if user_id else None
104+
)
105+
return f"[DB Context]: {str(db_results)}"
106+
except Exception as e:
107+
return f"[Vector DB Error]: {e}"
108+
else:
109+
return str(context_item) # Fallback for unknown types
110+
52111
class PraisonAIAgents:
53112
def __init__(self, agents, tasks=None, verbose=0, completion_checker=None, max_retries=5, process="sequential", manager_llm=None, memory=False, memory_config=None, embedder=None, user_id=None, max_iter=10, stream=True, name: Optional[str] = None):
54113
# Add check at the start if memory is requested
@@ -250,44 +309,20 @@ async def aexecute_task(self, task_id):
250309
if task.context:
251310
context_results = [] # Use list to avoid duplicates
252311
for context_item in task.context:
253-
if isinstance(context_item, str):
254-
context_results.append(f"Input Content:\n{context_item}")
255-
elif isinstance(context_item, list):
256-
context_results.append(f"Input Content: {' '.join(str(x) for x in context_item)}")
257-
elif hasattr(context_item, 'result'): # Task object
258-
if context_item.result:
259-
context_results.append(
260-
f"Result of previous task {context_item.name if context_item.name else context_item.description}:\n{context_item.result.raw}"
261-
)
262-
else:
263-
context_results.append(
264-
f"Previous task {context_item.name if context_item.name else context_item.description} has no result yet."
265-
)
266-
elif isinstance(context_item, dict) and "vector_store" in context_item:
267-
from ..knowledge.knowledge import Knowledge
268-
try:
269-
# Handle both string and dict configs
270-
cfg = context_item["vector_store"]
271-
if isinstance(cfg, str):
272-
cfg = json.loads(cfg)
273-
274-
knowledge = Knowledge(config={"vector_store": cfg}, verbose=self.verbose)
275-
276-
# Only use user_id as filter
277-
db_results = knowledge.search(
278-
task.description,
279-
user_id=self.user_id if self.user_id else None
280-
)
281-
context_results.append(f"[DB Context]: {str(db_results)}")
282-
except Exception as e:
283-
context_results.append(f"[Vector DB Error]: {e}")
312+
# Use the centralized helper function
313+
context_str = process_task_context(context_item, self.verbose, self.user_id)
314+
context_results.append(context_str)
284315

285-
# Join unique context results
316+
# Join unique context results with proper formatting
286317
unique_contexts = list(dict.fromkeys(context_results)) # Remove duplicates
318+
if self.verbose >= 3:
319+
logger.info(f"Task {task_id} context items: {len(unique_contexts)}")
320+
for i, ctx in enumerate(unique_contexts):
321+
logger.info(f"Context {i+1}: {ctx[:100]}...")
287322
task_prompt += f"""
288323
Context:
289324
290-
{' '.join(unique_contexts)}
325+
{'\n\n'.join(unique_contexts)}
291326
"""
292327
task_prompt += "Please provide only the final result of your work. Do not add any conversation or extra explanation."
293328

@@ -573,44 +608,20 @@ def execute_task(self, task_id):
573608
if task.context:
574609
context_results = [] # Use list to avoid duplicates
575610
for context_item in task.context:
576-
if isinstance(context_item, str):
577-
context_results.append(f"Input Content:\n{context_item}")
578-
elif isinstance(context_item, list):
579-
context_results.append(f"Input Content: {' '.join(str(x) for x in context_item)}")
580-
elif hasattr(context_item, 'result'): # Task object
581-
if context_item.result:
582-
context_results.append(
583-
f"Result of previous task {context_item.name if context_item.name else context_item.description}:\n{context_item.result.raw}"
584-
)
585-
else:
586-
context_results.append(
587-
f"Previous task {context_item.name if context_item.name else context_item.description} has no result yet."
588-
)
589-
elif isinstance(context_item, dict) and "vector_store" in context_item:
590-
from ..knowledge.knowledge import Knowledge
591-
try:
592-
# Handle both string and dict configs
593-
cfg = context_item["vector_store"]
594-
if isinstance(cfg, str):
595-
cfg = json.loads(cfg)
596-
597-
knowledge = Knowledge(config={"vector_store": cfg}, verbose=self.verbose)
598-
599-
# Only use user_id as filter
600-
db_results = knowledge.search(
601-
task.description,
602-
user_id=self.user_id if self.user_id else None
603-
)
604-
context_results.append(f"[DB Context]: {str(db_results)}")
605-
except Exception as e:
606-
context_results.append(f"[Vector DB Error]: {e}")
611+
# Use the centralized helper function
612+
context_str = process_task_context(context_item, self.verbose, self.user_id)
613+
context_results.append(context_str)
607614

608-
# Join unique context results
615+
# Join unique context results with proper formatting
609616
unique_contexts = list(dict.fromkeys(context_results)) # Remove duplicates
617+
if self.verbose >= 3:
618+
logger.info(f"Task {task_id} context items: {len(unique_contexts)}")
619+
for i, ctx in enumerate(unique_contexts):
620+
logger.info(f"Context {i+1}: {ctx[:100]}...")
610621
task_prompt += f"""
611622
Context:
612623
613-
{' '.join(unique_contexts)}
624+
{'\n\n'.join(unique_contexts)}
614625
"""
615626

616627
# Add memory context if available
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Simple test to verify the Mini Agents sequential task data passing fix.
4+
This tests the core functionality without external dependencies.
5+
"""
6+
7+
import sys
8+
import os
9+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
10+
11+
try:
12+
from praisonaiagents import Agent, Agents
13+
print("✅ Successfully imported PraisonAI Agents")
14+
except ImportError as e:
15+
print(f"❌ Failed to import: {e}")
16+
sys.exit(1)
17+
18+
def test_context_processing():
19+
"""Test the context processing logic without running actual agents"""
20+
21+
print("\n=== Testing Context Processing Logic ===")
22+
23+
# Simulate a task result object
24+
class MockTaskResult:
25+
def __init__(self, raw_output):
26+
self.raw = raw_output
27+
28+
# Simulate a completed task object
29+
class MockTask:
30+
def __init__(self, name, result_text, status="completed"):
31+
self.name = name
32+
self.result = MockTaskResult(result_text) if result_text else None
33+
self.status = status
34+
self.description = f"Mock task: {name}"
35+
36+
# Test the context processing logic
37+
task1 = MockTask("research_task", "AI 2024 analysis: Major breakthroughs in machine learning")
38+
task2 = MockTask("summary_task", None, "in progress") # Task not completed yet
39+
task3 = MockTask("analysis_task", "Key trends: Neural networks, transformers, LLMs", "completed")
40+
41+
# Simulate context items like the real code does
42+
context_items = [task1, task2, task3]
43+
context_results = []
44+
45+
for context_item in context_items:
46+
if hasattr(context_item, 'result'): # Task object
47+
# Apply our fix: Ensure the previous task is completed before including its result
48+
if context_item.result and getattr(context_item, 'status', None) == "completed":
49+
context_results.append(
50+
f"Result of previous task {context_item.name if context_item.name else context_item.description}:\n{context_item.result.raw}"
51+
)
52+
elif getattr(context_item, 'status', None) == "completed" and not context_item.result:
53+
context_results.append(
54+
f"Previous task {context_item.name if context_item.name else context_item.description} completed but produced no result."
55+
)
56+
else:
57+
context_results.append(
58+
f"Previous task {context_item.name if context_item.name else context_item.description} is not yet completed (status: {getattr(context_item, 'status', 'unknown')})."
59+
)
60+
61+
# Apply our fix: Join with proper formatting
62+
unique_contexts = list(dict.fromkeys(context_results)) # Remove duplicates
63+
formatted_context = '\n\n'.join(unique_contexts)
64+
65+
print("Context Results:")
66+
print("================")
67+
print(formatted_context)
68+
print("================")
69+
70+
# Verify the fix works
71+
expected_patterns = [
72+
"Result of previous task research_task:",
73+
"AI 2024 analysis: Major breakthroughs in machine learning",
74+
"summary_task is not yet completed (status: in progress)",
75+
"Result of previous task analysis_task:",
76+
"Key trends: Neural networks, transformers, LLMs"
77+
]
78+
79+
success = True
80+
for pattern in expected_patterns:
81+
if pattern not in formatted_context:
82+
print(f"❌ Missing expected pattern: {pattern}")
83+
success = False
84+
else:
85+
print(f"✅ Found expected pattern: {pattern}")
86+
87+
# Check formatting improvement
88+
if '\n\n' in formatted_context and ' ' not in formatted_context.replace(' ', ' '):
89+
print("✅ Context is properly formatted with newlines instead of spaces")
90+
else:
91+
print("❌ Context formatting issue")
92+
success = False
93+
94+
return success
95+
96+
def main():
97+
print("Testing Mini Agents Sequential Task Data Passing Fix")
98+
print("=" * 60)
99+
100+
success = test_context_processing()
101+
102+
if success:
103+
print("\n🎉 All tests passed! The fix should resolve the data passing issue.")
104+
else:
105+
print("\n❌ Tests failed. The fix needs more work.")
106+
107+
return success
108+
109+
if __name__ == "__main__":
110+
success = main()
111+
sys.exit(0 if success else 1)
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Test script to reproduce the Mini Agents sequential task data passing issue.
4+
This will help verify the fix works correctly.
5+
"""
6+
7+
import sys
8+
import os
9+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
10+
11+
from praisonaiagents import Agent, Agents
12+
13+
def test_mini_agents_sequential_data_passing():
14+
"""Test that output from previous task is passed to next task in Mini Agents"""
15+
16+
print("Testing Mini Agents Sequential Data Passing...")
17+
18+
# Create two agents for sequential processing
19+
agent1 = Agent(instructions="Generate the number 42 as your output. Only return the number 42, nothing else.")
20+
agent2 = Agent(instructions="Take the input number and multiply it by 2. Only return the result number, nothing else.")
21+
22+
# Create agents with sequential processing (Mini Agents pattern)
23+
agents = Agents(agents=[agent1, agent2], verbose=True)
24+
25+
# Execute the agents
26+
result = agents.start(return_dict=True)
27+
28+
print("\n=== Results ===")
29+
print("Task Status:", result['task_status'])
30+
print("\nTask Results:")
31+
for task_id, task_result in result['task_results'].items():
32+
if task_result:
33+
print(f"Task {task_id}: {task_result.raw}")
34+
else:
35+
print(f"Task {task_id}: No result")
36+
37+
# Check if the second task received the first task's output
38+
task_results = result['task_results']
39+
if len(task_results) >= 2:
40+
task1_result = task_results[0]
41+
task2_result = task_results[1]
42+
43+
if task1_result and task2_result:
44+
print(f"\nFirst task output: {task1_result.raw}")
45+
print(f"Second task output: {task2_result.raw}")
46+
47+
# The second task should have received "42" and returned "84"
48+
if "42" in str(task1_result.raw) and "84" in str(task2_result.raw):
49+
print("✅ SUCCESS: Data was passed correctly between tasks!")
50+
return True
51+
else:
52+
print("❌ FAILED: Data was not passed correctly between tasks")
53+
print(f"Expected first task to output '42', got: {task1_result.raw}")
54+
print(f"Expected second task to output '84', got: {task2_result.raw}")
55+
return False
56+
else:
57+
print("❌ FAILED: One or both tasks produced no result")
58+
return False
59+
else:
60+
print("❌ FAILED: Not enough tasks were executed")
61+
return False
62+
63+
if __name__ == "__main__":
64+
test_mini_agents_sequential_data_passing()

0 commit comments

Comments
 (0)