-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathasync_chat_example.py
More file actions
109 lines (89 loc) · 3.97 KB
/
async_chat_example.py
File metadata and controls
109 lines (89 loc) · 3.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import asyncio
import logging
from typing import List, Dict, Optional, Any
from miniautogen.chat.chat import Chat
from miniautogen.agent.agent import Agent
from miniautogen.chat.chatadmin import ChatAdmin
from miniautogen.pipeline.pipeline import Pipeline
from miniautogen.pipeline.components.components import (
NextAgentSelectorComponent, AgentReplyComponent, TerminateChatComponent,
LLMResponseComponent
)
from miniautogen.llms.llm_client import LLMClientInterface
from miniautogen.storage.in_memory_repository import InMemoryChatRepository
# 1. Mock LLM Client to simulate responses without API keys
class MockLLMClient(LLMClientInterface):
async def get_model_response(self, prompt: List[Dict[str, str]], model_name: Optional[str] = None, temperature: float = 1.0) -> str:
# Simulate network latency
await asyncio.sleep(0.5)
# Determine response based on last message to simulate conversation flow
last_msg = prompt[-1]['content'] if prompt else ""
if "hello" in last_msg.lower():
return "Hi there! How can I help you structurally today?"
elif "bye" in last_msg.lower():
return "TERMINATE"
else:
return f"I processed: {last_msg}"
# 2. Setup Logging
logging.basicConfig(level=logging.INFO)
async def main():
print("--- Starting Async Architecture Demo ---")
# Initialize Repository and Chat
repo = InMemoryChatRepository()
chat = Chat(repository=repo)
# Initialize Mock LLM
mock_llm = MockLLMClient()
# Define Agents
# User Agent (simulated pipeline)
user_agent = Agent("user_1", "User", "User Role")
# For the user, we can just inject a predefined message via context or input component.
# But for this automated demo, let's just seed the chat with a message.
# Bot Agent with Pipeline
# Pipeline: Select Next -> Generate Reply -> Terminate Check
# Actually, Agent pipeline usually generates the CONTENT.
# The Admin pipeline manages the TURN.
# Let's fix the Agent Pipeline: It should take the conversation history and produce a reply string.
# We need a component that formats history for LLM + LLM Component.
class SimplePromptBuilder(Pipeline): # Simplified inline component/pipeline
pass
# We will use the existing LLMResponseComponent but we need to feed it a 'prompt' in the state.
# Let's make a custom component for this demo to glue things together.
from miniautogen.pipeline.components.pipelinecomponent import PipelineComponent
class HistoryToPromptComponent(PipelineComponent):
async def process(self, state: Any) -> Any:
chat = state.get_state().get('group_chat')
msgs = await chat.get_messages(limit=10)
# Format for LLM
prompt = [{"role": "user", "content": m.content} for m in msgs]
state.update_state(prompt=prompt)
return state
bot_pipeline = Pipeline([
HistoryToPromptComponent(),
LLMResponseComponent(mock_llm)
])
bot_agent = Agent("bot_1", "Assistant", "Helpful Assistant", pipeline=bot_pipeline)
chat.add_agent(user_agent)
chat.add_agent(bot_agent)
# Seed Chat
print("Seeding chat...")
await chat.add_message("user_1", "Hello assistant!")
# Setup Admin to manage the loop
# Admin Pipeline: Select Next Agent -> Execute Agent's Reply -> Check Termination
admin_pipeline = Pipeline([
NextAgentSelectorComponent(),
AgentReplyComponent(),
TerminateChatComponent()
])
admin = ChatAdmin("admin", "Admin", "Admin Role", admin_pipeline, chat, "Test Goal", max_rounds=5)
# Run the Loop
print("Running Admin Loop...")
await admin.run()
# Verification
msgs = await chat.get_messages()
print("\n--- Final Message History ---")
for m in msgs:
print(f"[{m.timestamp}] {m.sender_id}: {m.content}")
assert len(msgs) >= 2
print("\n--- Success: Async Flow Verified ---")
if __name__ == "__main__":
asyncio.run(main())