-
Notifications
You must be signed in to change notification settings - Fork 2
feat(agents): Replace openai-agents with pydantic-ai implementation #139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
e135e0b
d9731ed
2df604e
e84bca4
96667a1
bea1241
7d21084
d733a08
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,9 @@ | ||
# Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
"""Agent implementations for the Airbyte connector builder.""" | ||
|
||
from collections.abc import Callable | ||
from pydantic_ai import Agent, RunContext | ||
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool | ||
|
||
from agents import Agent as OpenAIAgent | ||
from agents import ( | ||
WebSearchTool, | ||
handoff, | ||
) | ||
from pydantic.main import BaseModel | ||
|
||
# from agents import OpenAIConversationsSession | ||
from .guidance import get_default_developer_prompt, get_default_manager_prompt | ||
from .tools import ( | ||
SessionState, | ||
|
@@ -33,53 +26,49 @@ def create_developer_agent( | |
additional_instructions: str, | ||
session_state: SessionState, | ||
mcp_servers: list, | ||
) -> OpenAIAgent: | ||
) -> Agent: | ||
"""Create the developer agent that executes specific phases.""" | ||
return OpenAIAgent( | ||
developer_agent = Agent( | ||
model, | ||
name="MCP Connector Developer", | ||
instructions=get_default_developer_prompt( | ||
deps_type=SessionState, | ||
system_prompt=get_default_developer_prompt( | ||
api_name=api_name, | ||
instructions=additional_instructions, | ||
project_directory=session_state.workspace_dir.absolute(), | ||
), | ||
mcp_servers=mcp_servers, | ||
model=model, | ||
tools=[ | ||
create_log_progress_milestone_from_developer_tool(session_state), | ||
create_log_problem_encountered_by_developer_tool(session_state), | ||
create_log_tool_failure_tool(session_state), | ||
WebSearchTool(), | ||
duckduckgo_search_tool(), | ||
], | ||
) | ||
|
||
for mcp_server in mcp_servers: | ||
developer_agent.toolsets.append(mcp_server) | ||
|
||
return developer_agent | ||
|
||
|
||
def create_manager_agent( | ||
developer_agent: OpenAIAgent, | ||
developer_agent: Agent, | ||
model: str, | ||
api_name: str, | ||
additional_instructions: str, | ||
session_state: SessionState, | ||
mcp_servers: list, | ||
) -> OpenAIAgent: | ||
) -> Agent: | ||
"""Create the manager agent that orchestrates the 3-phase workflow.""" | ||
return OpenAIAgent( | ||
manager_agent = Agent( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Devin, these new Agent constructors don't get their MCP Servers. |
||
model, | ||
name="Connector Builder Manager", | ||
instructions=get_default_manager_prompt( | ||
deps_type=SessionState, | ||
system_prompt=get_default_manager_prompt( | ||
api_name=api_name, | ||
instructions=additional_instructions, | ||
project_directory=session_state.workspace_dir.absolute(), | ||
), | ||
handoffs=[ | ||
handoff( | ||
agent=developer_agent, | ||
tool_name_override="delegate_to_developer", | ||
tool_description_override="Delegating work to the developer agent", | ||
input_type=DelegatedDeveloperTask, | ||
on_handoff=create_on_developer_delegation(session_state), | ||
), | ||
], | ||
mcp_servers=mcp_servers, | ||
model=model, | ||
tools=[ | ||
create_mark_job_success_tool(session_state), | ||
create_mark_job_failed_tool(session_state), | ||
|
@@ -91,77 +80,72 @@ def create_manager_agent( | |
], | ||
) | ||
|
||
|
||
class DelegatedDeveloperTask(BaseModel): | ||
"""Input data for handoff from manager to developer.""" | ||
|
||
api_name: str | ||
assignment_title: str | ||
assignment_description: str | ||
|
||
|
||
class ManagerHandoffInput(BaseModel): | ||
"""Input data for handoff from developer back to manager.""" | ||
|
||
short_status: str | ||
detailed_progress_update: str | ||
is_full_success: bool | ||
is_partial_success: bool | ||
is_blocked: bool | ||
|
||
|
||
def create_on_developer_delegation(session_state: SessionState) -> Callable: | ||
"""Create an on_developer_delegation callback bound to a specific session state.""" | ||
|
||
async def on_developer_delegation(ctx, input_data: DelegatedDeveloperTask) -> None: | ||
for mcp_server in mcp_servers: | ||
manager_agent.toolsets.append(mcp_server) | ||
|
||
@manager_agent.tool | ||
async def delegate_to_developer( | ||
ctx: RunContext[SessionState], | ||
assignment_title: str, | ||
assignment_description: str, | ||
) -> str: | ||
"""Delegate work to the developer agent. | ||
|
||
Args: | ||
assignment_title: Short title or key for this developer assignment. | ||
assignment_description: Detailed description of the task assigned to the developer, | ||
including all context and success criteria they need to complete it. | ||
""" | ||
update_progress_log( | ||
f"🤝 [MANAGER → DEVELOPER] Manager delegating task to developer agent." | ||
f"\n Task Name: {input_data.assignment_title}" | ||
f"\n Task Description: {input_data.assignment_description}", | ||
session_state, | ||
f"\n Task Name: {assignment_title}" | ||
f"\n Task Description: {assignment_description}", | ||
ctx.deps, | ||
) | ||
|
||
return on_developer_delegation | ||
|
||
result = await developer_agent.run( | ||
assignment_description, | ||
message_history=ctx.deps.message_history, | ||
deps=ctx.deps, | ||
) | ||
|
||
def create_on_manager_handback(session_state: SessionState): | ||
"""Create an on_manager_handback callback bound to a specific session state.""" | ||
update_progress_log( | ||
f"🤝 [DEVELOPER → MANAGER] Developer completed task: {assignment_title}" | ||
f"\n Result: {result.output}", | ||
ctx.deps, | ||
) | ||
|
||
async def on_manager_handback(ctx, input_data: ManagerHandoffInput) -> None: | ||
ctx.deps.message_history.extend(result.new_messages()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❓ Verification inconclusiveMonitor unbounded message history growth. The message history is extended indefinitely without pruning or size limits. In long-running sessions with many delegations, this could lead to excessive memory consumption and slower performance as the context grows. As noted in the PR objectives checklist, message history growth should be monitored. Consider implementing a mechanism to prune old messages or set a maximum history size. Verify whether there's a history management mechanism elsewhere in the codebase: 🏁 Script executed: #!/bin/bash
# Search for message_history pruning, cleanup, or size management
rg -n -C5 'message_history.*(?:prune|clear|trim|limit|max|size)' --type py Length of output: 0 #!/bin/bash
# Search for all occurrences of message_history and any pruning/size-management logic
rg -n 'message_history' --type py
rg -nP '(?:message_history|history).*\b(prune|trim|limit|max|size)\b' --type py -C5 Limit message history growth. In connector_builder_agents/src/agents.py:118, 🤖 Prompt for AI Agents
|
||
|
||
return str(result.output) | ||
|
||
@developer_agent.tool | ||
async def report_back_to_manager( | ||
ctx: RunContext[SessionState], | ||
short_status: str, | ||
detailed_progress_update: str, | ||
is_full_success: bool = False, | ||
is_partial_success: bool = False, | ||
is_blocked: bool = False, | ||
) -> str: | ||
"""Report progress or issues back to the manager agent. | ||
|
||
Args: | ||
short_status: One sentence summary of what was accomplished. | ||
detailed_progress_update: A detailed update on progress and next steps. | ||
is_full_success: True if the phase is fully completed. | ||
is_partial_success: True if partially done. | ||
is_blocked: True if encountered a blocker. | ||
""" | ||
update_progress_log( | ||
f"🤝 [DEVELOPER → MANAGER] Developer handing back control to manager." | ||
f"\n Summary of status: {input_data.short_status}" | ||
f"\n Partial success: {input_data.is_partial_success}" | ||
f"\n Full success: {input_data.is_full_success}" | ||
f"\n Blocked: {input_data.is_blocked}" | ||
f"\n Detailed progress update: {input_data.detailed_progress_update}", | ||
session_state, | ||
f"\n Summary of status: {short_status}" | ||
f"\n Partial success: {is_partial_success}" | ||
f"\n Full success: {is_full_success}" | ||
f"\n Blocked: {is_blocked}" | ||
f"\n Detailed progress update: {detailed_progress_update}", | ||
ctx.deps, | ||
) | ||
return "Status reported to manager" | ||
|
||
return on_manager_handback | ||
|
||
|
||
def add_handback_to_manager( | ||
developer_agent: OpenAIAgent, | ||
manager_agent: OpenAIAgent, | ||
session_state: SessionState, | ||
) -> None: | ||
"""Add a handoff from the developer back to the manager to report progress.""" | ||
developer_agent.handoffs.extend( | ||
[ | ||
handoff( | ||
agent=manager_agent, | ||
tool_name_override="report_back_to_manager", | ||
tool_description_override="Report progress or issues back to the manager agent", | ||
input_type=ManagerHandoffInput, | ||
on_handoff=create_on_manager_handback(session_state), | ||
), | ||
handoff( | ||
agent=manager_agent, | ||
tool_name_override="report_task_completion_to_manager", | ||
tool_description_override="Report task completion to the manager agent", | ||
input_type=ManagerHandoffInput, | ||
on_handoff=create_on_manager_handback(session_state), | ||
), | ||
] | ||
) | ||
return manager_agent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the duplicate Pydantic AI distributions.
pydantic-ai
andpydantic-ai-slim[...]
both ship the samepydantic_ai
package; installing both forces pip to pick one wheel and the other will repeatedly reinstall/overwrite during builds. Choose a single distribution (likely the slim extra) to avoid installation failures."python-dotenv>=1.1.1", - "pydantic-ai>=0.0.14,<1.0", "pydantic-ai-slim[openai,duckduckgo]>=0.0.14,<1.0",
📝 Committable suggestion
🤖 Prompt for AI Agents