Skip to content

Commit 391b2e5

Browse files
committed
feat: Add helper methods for creating message chunks and error handling in AgentOrchestrator
1 parent f6ba3e0 commit 391b2e5

1 file changed

Lines changed: 38 additions & 28 deletions

File tree

python/valuecell/core/coordinate/orchestrator.py

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,33 @@ def __init__(self):
2727

2828
self.planner = ExecutionPlanner(self.agent_connections)
2929

30+
def _create_message_chunk(
31+
self,
32+
content: str,
33+
session_id: str,
34+
user_id: str,
35+
kind: MessageDataKind = MessageDataKind.TEXT,
36+
is_final: bool = False,
37+
) -> MessageChunk:
38+
"""Create a MessageChunk with common metadata"""
39+
return MessageChunk(
40+
content=content,
41+
kind=kind,
42+
meta=MessageChunkMetadata(session_id=session_id, user_id=user_id),
43+
is_final=is_final,
44+
)
45+
46+
def _create_error_message_chunk(
47+
self, error_msg: str, session_id: str, user_id: str
48+
) -> MessageChunk:
49+
"""Create an error MessageChunk with standardized format"""
50+
return self._create_message_chunk(
51+
content=f"(Error): {error_msg}",
52+
session_id=session_id,
53+
user_id=user_id,
54+
is_final=True,
55+
)
56+
3057
async def process_user_input(
3158
self, user_input: UserInput
3259
) -> AsyncGenerator[MessageChunk, None]:
@@ -54,13 +81,8 @@ async def process_user_input(
5481
except Exception as e:
5582
error_msg = f"Error processing request: {str(e)}"
5683
await self.session_manager.add_message(session_id, Role.SYSTEM, error_msg)
57-
yield MessageChunk(
58-
content=f"(Error): {error_msg}",
59-
kind=MessageDataKind.TEXT,
60-
meta=MessageChunkMetadata(
61-
session_id=session_id, user_id=user_input.meta.user_id
62-
),
63-
is_final=True,
84+
yield self._create_error_message_chunk(
85+
error_msg, session_id, user_input.meta.user_id
6486
)
6587

6688
async def _execute_plan(
@@ -70,11 +92,8 @@ async def _execute_plan(
7092

7193
session_id, user_id = metadata["session_id"], metadata["user_id"]
7294
if not plan.tasks:
73-
yield MessageChunk(
74-
content="No tasks found for this request.",
75-
kind=MessageDataKind.TEXT,
76-
meta=MessageChunkMetadata(session_id=session_id, user_id=user_id),
77-
is_final=True,
95+
yield self._create_message_chunk(
96+
"No tasks found for this request.", session_id, user_id, is_final=True
7897
)
7998
return
8099

@@ -90,19 +109,14 @@ async def _execute_plan(
90109

91110
except Exception as e:
92111
error_msg = f"Error executing {task.agent_name}: {str(e)}"
93-
yield MessageChunk(
94-
content=f"(Error): {error_msg}",
95-
kind=MessageDataKind.TEXT,
96-
meta=MessageChunkMetadata(session_id=session_id, user_id=user_id),
97-
is_final=True,
98-
)
112+
yield self._create_error_message_chunk(error_msg, session_id, user_id)
99113

100114
# Check if no results were produced
101115
if not plan.tasks:
102-
yield MessageChunk(
103-
content="No agents were able to process this request.",
104-
kind=MessageDataKind.TEXT,
105-
meta=MessageChunkMetadata(session_id=session_id, user_id=user_id),
116+
yield self._create_message_chunk(
117+
"No agents were able to process this request.",
118+
session_id,
119+
user_id,
106120
is_final=True,
107121
)
108122

@@ -147,12 +161,8 @@ async def _execute_task(
147161
logger.info(f"Task status update: {event.status.state}")
148162
continue
149163
if isinstance(event, TaskArtifactUpdateEvent):
150-
yield MessageChunk(
151-
content=event.artifact.parts[0].root.text,
152-
kind=MessageDataKind.TEXT,
153-
meta=MessageChunkMetadata(
154-
session_id=task.session_id, user_id=task.user_id
155-
),
164+
yield self._create_message_chunk(
165+
event.artifact.parts[0].root.text, task.session_id, task.user_id
156166
)
157167

158168
# Complete task

0 commit comments

Comments
 (0)