Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ...state import TeamState
from ._chat_agent_container import ChatAgentContainer
from ._events import (
GroupChatGetThread,
GroupChatPause,
GroupChatReset,
GroupChatResume,
Expand Down Expand Up @@ -745,6 +746,61 @@ async def resume(self) -> None:
recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),
)

async def get_thread(self) -> List[BaseAgentEvent | BaseChatMessage]:
"""Get all messages exchanged so far in the group chat.

This method retrieves the current message thread from the group chat manager,
which contains all messages that have been exchanged between participants
since the team was initialized or last reset.

Returns:
A list of all messages in the current message thread.

Raises:
RuntimeError: If the team has not been initialized.

Example:

.. code-block:: python

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")

agent1 = AssistantAgent("Assistant1", model_client=model_client)
agent2 = AssistantAgent("Assistant2", model_client=model_client)
termination = MaxMessageTermination(3)
team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

# Run the team
result = await team.run(task="Count from 1 to 10, respond one at a time.")

# Get the message thread
messages = await team.get_thread()
print(f"Total messages exchanged: {len(messages)}")
for msg in messages:
print(f"{msg.source}: {msg.content}")


asyncio.run(main())

"""
if not self._initialized:
raise RuntimeError("Team has not been initialized. Call reset() first.")

# Send a get thread request to the group chat manager.
result = await self._runtime.send_message(
GroupChatGetThread(),
recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),
)
return result # type: ignore

async def save_state(self) -> Mapping[str, Any]:
"""Save the state of the group chat team.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ._events import (
GroupChatAgentResponse,
GroupChatError,
GroupChatGetThread,
GroupChatMessage,
GroupChatPause,
GroupChatRequestPublish,
Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(
GroupChatTeamResponse,
GroupChatMessage,
GroupChatReset,
GroupChatGetThread,
],
)
if max_turns is not None and max_turns <= 0:
Expand Down Expand Up @@ -285,6 +287,19 @@ async def handle_resume(self, message: GroupChatResume, ctx: MessageContext) ->
"""Resume the group chat manager. This is a no-op in the base class."""
pass

@rpc
async def handle_get_thread(self, message: GroupChatGetThread, ctx: MessageContext) -> Any:
"""Return all messages exchanged so far in the group chat.

Args:
message: The GroupChatGetThread request message.
ctx: The message context.

Returns:
A list of all messages in the current message thread (List[BaseAgentEvent | BaseChatMessage]).
"""
return self._message_thread

@abstractmethod
async def validate_group_state(self, messages: List[BaseChatMessage] | None) -> None:
"""Validate the state of the group chat given the start messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,9 @@ class GroupChatError(BaseModel):

error: SerializableException
"""The error that occurred."""


class GroupChatGetThread(BaseModel):
"""A request to get the current message thread from the group chat."""

...
128 changes: 128 additions & 0 deletions python/packages/autogen-agentchat/tests/test_group_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -1944,3 +1944,131 @@ async def test_selector_group_chat_streaming(runtime: AgentRuntime | None) -> No

# Content-based verification instead of index-based
# Note: The streaming test verifies the streaming behavior, not the final result content


@pytest.mark.asyncio
async def test_round_robin_group_chat_get_thread(runtime: AgentRuntime | None) -> None:
"""Test the get_thread() method to retrieve message history."""

# Create simple echo agents for testing
agent1 = _EchoAgent("agent1", "Echo agent 1")
agent2 = _EchoAgent("agent2", "Echo agent 2")

# Create an agent that will stop at the 3rd turn
stop_agent = _StopAgent("stop_agent", "Stop agent", stop_at=3)

# Create team
termination = StopMessageTermination()
team = RoundRobinGroupChat(
participants=[agent1, agent2, stop_agent],
termination_condition=termination,
runtime=runtime,
)

# Test that get_thread() raises error before team.reset()
with pytest.raises(RuntimeError, match="Team has not been initialized"):
await team.get_thread()

# Run the team
result = await team.run(task="Hello, world!")

# Test new feature: get message thread
thread = await team.get_thread()

# Verify: thread should contain all messages
assert thread is not None
assert isinstance(thread, list)
assert len(thread) > 0

# Verify: thread messages should match result.messages
assert len(thread) == len(result.messages)

# Verify: message content should match
for thread_msg, result_msg in zip(thread, result.messages, strict=True):
assert compare_messages(thread_msg, result_msg)

# Verify: first message should be the task message
assert isinstance(thread[0], TextMessage)
assert thread[0].content == "Hello, world!"

# Verify: last message should be a stop message
assert isinstance(thread[-1], StopMessage)
assert thread[-1].content == "TERMINATE"

# Test get_thread() after reset - should return empty list
await team.reset()
thread_after_reset = await team.get_thread()
assert thread_after_reset == []


@pytest.mark.asyncio
async def test_selector_group_chat_get_thread(runtime: AgentRuntime | None) -> None:
"""Test get_thread() with SelectorGroupChat."""

# Use ReplayClient to simulate LLM selection
model_client = ReplayChatCompletionClient(
[
"agent1", # First round: select agent1
"agent2", # Second round: select agent2
"agent1", # Third round: select agent1 (will stop)
]
)

agent1 = _StopAgent("agent1", "Agent 1", stop_at=3)
agent2 = _EchoAgent("agent2", "Agent 2")

termination = StopMessageTermination()
team = SelectorGroupChat(
participants=[agent1, agent2],
model_client=model_client,
termination_condition=termination,
runtime=runtime,
)

# Run the team
result = await team.run(task="Test message")

# Test get_thread
thread = await team.get_thread()

# Verify
assert thread is not None
assert len(thread) == len(result.messages)
assert isinstance(thread[0], TextMessage)
assert thread[0].content == "Test message"


@pytest.mark.asyncio
async def test_swarm_get_thread(runtime: AgentRuntime | None) -> None:
"""Test get_thread() with Swarm."""

# Create a simple handoff scenario
def transfer_to_agent2() -> Handoff:
return Handoff(target="agent2")

agent1 = _EchoAgent("agent1", "Agent 1")
agent2 = _StopAgent("agent2", "Agent 2", stop_at=2)

# Add handoff to agent1
agent1._handoffs = [transfer_to_agent2] # type: ignore

termination = HandoffTermination(target="agent2")
team = Swarm(
participants=[agent1, agent2],
termination_condition=termination,
runtime=runtime,
)

# Run the team
await team.run(task="Start task")

# Test get_thread
thread = await team.get_thread()

# Verify
assert thread is not None
assert isinstance(thread, list)
assert len(thread) > 0
# First message should be the task
assert isinstance(thread[0], TextMessage)
assert thread[0].content == "Start task"