diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index 7abe75c7c..45b193d7b 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -487,6 +487,7 @@ def __init__( approval: Optional[Union[bool, str, Dict[str, Any], 'ApprovalConfig', 'ApprovalProtocol']] = None, tool_timeout: Optional[int] = None, # P8/G11: Timeout in seconds for each tool call learn: Optional[Union[bool, str, Dict[str, Any], 'LearnConfig']] = None, # Continuous learning (peer to memory) + backend: Optional[Any] = None, # External managed agent backend (e.g., ManagedAgentIntegration) ): """Initialize an Agent instance. @@ -574,6 +575,11 @@ def __init__( - LearnConfig: Custom configuration Learning is a first-class citizen, peer to memory. It captures patterns, preferences, and insights from interactions to improve future responses. + backend: External managed agent backend for hybrid execution. Accepts: + - ManagedAgentIntegration: External managed agent service + - None: Use local execution (default) + When provided, agent can delegate execution to managed infrastructure + for long-running tasks or when local resources are constrained. Raises: ValueError: If all of name, role, goal, backstory, and instructions are None. @@ -1798,6 +1804,9 @@ def __init__( self._output_file = output_file if _output_config else None self._output_template = output_template if _output_config else None + # Backend - external managed agent backend for hybrid execution + self.backend = backend + # Telemetry - lazy initialized via property for performance self.__telemetry = None self.__telemetry_initialized = False diff --git a/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py b/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py index 422f83202..a2276362b 100644 --- a/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py +++ b/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py @@ -1049,6 +1049,27 @@ def chat(self, prompt: str, temperature: float = 1.0, tools: Optional[List[Any]] 'required' forces the LLM to call a tool before responding. ...other args... """ + # Check if external managed backend is configured + if hasattr(self, 'backend') and self.backend is not None: + # Extract kwargs for delegation, excluding 'self' and function locals + delegation_kwargs = { + 'temperature': temperature, + 'tools': tools, + 'output_json': output_json, + 'output_pydantic': output_pydantic, + 'reasoning_steps': reasoning_steps, + 'stream': stream, + 'task_name': task_name, + 'task_description': task_description, + 'task_id': task_id, + 'config': config, + 'force_retrieval': force_retrieval, + 'skip_retrieval': skip_retrieval, + 'attachments': attachments, + 'tool_choice': tool_choice + } + return self._delegate_to_backend(prompt, **delegation_kwargs) + # Emit context trace event (zero overhead when not set) from ..trace.context_events import get_context_emitter _trace_emitter = get_context_emitter() diff --git a/src/praisonai-agents/praisonaiagents/agent/execution_mixin.py b/src/praisonai-agents/praisonaiagents/agent/execution_mixin.py index ccbf1a1e6..71513037f 100644 --- a/src/praisonai-agents/praisonaiagents/agent/execution_mixin.py +++ b/src/praisonai-agents/praisonaiagents/agent/execution_mixin.py @@ -251,6 +251,10 @@ def run(self, prompt: str, **kwargs: Any) -> Optional[str]: - Background processing - API endpoints """ + # Check if external managed backend is configured + if hasattr(self, 'backend') and self.backend is not None: + return self._delegate_to_backend(prompt, **kwargs) + # Production defaults: no streaming, no display if 'stream' not in kwargs: kwargs['stream'] = False @@ -274,6 +278,168 @@ def run(self, prompt: str, **kwargs: Any) -> Optional[str]: return result + def _delegate_to_backend(self, prompt: str, **kwargs) -> Optional[str]: + """Delegate execution to external managed backend (e.g., ManagedAgentIntegration).""" + import asyncio + + # Check if backend has required methods + if not hasattr(self.backend, 'execute'): + raise RuntimeError(f"Backend {type(self.backend).__name__} does not support execute() method") + + # Handle streaming vs non-streaming + stream_requested = kwargs.get('stream', False) + + if stream_requested: + # For streaming, delegate to backend's stream method if available + if hasattr(self.backend, 'stream'): + return self._delegate_streaming_to_backend(prompt, **kwargs) + else: + # Fallback: execute non-streaming even if stream was requested + return self._execute_backend_sync(prompt, **kwargs) + else: + # Non-streaming execution + return self._execute_backend_sync(prompt, **kwargs) + + def _execute_backend_sync(self, prompt: str, **kwargs) -> Optional[str]: + """Execute backend in sync mode, handling async backends.""" + try: + # Try to run in existing event loop + loop = asyncio.get_running_loop() + # If we're already in an async context, we can't use asyncio.run() + # Create a new task instead + import concurrent.futures + import threading + + def run_async(): + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + return new_loop.run_until_complete(self.backend.execute(prompt, **kwargs)) + finally: + new_loop.close() + + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(run_async) + return future.result() + + except RuntimeError: + # No event loop running, safe to use asyncio.run() + return asyncio.run(self.backend.execute(prompt, **kwargs)) + + def _delegate_streaming_to_backend(self, prompt: str, **kwargs): + """Delegate to backend's streaming method.""" + try: + # For streaming, we need to return an iterator/generator + # The backend's stream method is async, so we need to handle that + import asyncio + + async def stream_wrapper(): + async for chunk in self.backend.stream(prompt, **kwargs): + yield chunk + + # Convert async generator to sync generator + def sync_stream(): + try: + loop = asyncio.get_running_loop() + # Already in async context - need to handle differently + import concurrent.futures + import threading + import queue + + result_queue = queue.Queue() + exception_holder = [None] + + def run_in_thread(): + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + async def collect(): + try: + async for item in self.backend.stream(prompt, **kwargs): + result_queue.put(('item', item)) + result_queue.put(('done', None)) + except Exception as e: + exception_holder[0] = e + result_queue.put(('error', e)) + + new_loop.run_until_complete(collect()) + finally: + new_loop.close() + + thread = threading.Thread(target=run_in_thread) + thread.start() + + while True: + msg_type, data = result_queue.get() + if msg_type == 'item': + # For managed backends, we might get event objects + # Convert to string format expected by Agent + if isinstance(data, dict): + if data.get('type') == 'agent.message': + content = data.get('content', []) + if isinstance(content, list): + text_parts = [] + for block in content: + if isinstance(block, dict) and block.get('type') == 'text': + text_parts.append(block.get('text', '')) + elif isinstance(block, str): + text_parts.append(block) + if text_parts: + yield ''.join(text_parts) + elif isinstance(content, str): + yield content + # Skip other event types (session.status_idle, etc.) + elif isinstance(data, str): + yield data + elif msg_type == 'done': + break + elif msg_type == 'error': + raise data + + thread.join() + + except RuntimeError: + # No event loop - can run directly + async def run_stream(): + async for item in self.backend.stream(prompt, **kwargs): + yield item + + # Use asyncio.run for each item (not ideal but works) + async_gen = run_stream() + + async def collect_all(): + results = [] + async for item in async_gen: + results.append(item) + return results + + results = asyncio.run(collect_all()) + for item in results: + # Similar conversion logic + if isinstance(item, dict): + if item.get('type') == 'agent.message': + content = item.get('content', []) + if isinstance(content, list): + text_parts = [] + for block in content: + if isinstance(block, dict) and block.get('type') == 'text': + text_parts.append(block.get('text', '')) + elif isinstance(block, str): + text_parts.append(block) + if text_parts: + yield ''.join(text_parts) + elif isinstance(content, str): + yield content + elif isinstance(item, str): + yield item + + return sync_stream() + + except Exception as e: + # Fallback to non-streaming + logger.warning(f"Backend streaming failed, falling back to non-streaming: {e}") + return self._execute_backend_sync(prompt, **kwargs) + def _get_planning_agent(self): """Lazy load PlanningAgent for planning mode.""" if self._planning_agent is None and self.planning: @@ -453,6 +619,10 @@ def start(self, prompt: Optional[str] = None, **kwargs: Any) -> Union[str, Gener from praisonaiagents.utils.variables import substitute_variables prompt = substitute_variables(prompt, {}) + # Check if external managed backend is configured + if hasattr(self, 'backend') and self.backend is not None: + return self._delegate_to_backend(prompt, **kwargs) + # ───────────────────────────────────────────────────────────────────── # UNIFIED AUTONOMY API: If autonomy is enabled, route to run_autonomous # This allows: Agent(autonomy=True) + agent.start("Task") to just work! diff --git a/src/praisonai/praisonai/integrations/__init__.py b/src/praisonai/praisonai/integrations/__init__.py index 8c64dac1d..51d34c908 100644 --- a/src/praisonai/praisonai/integrations/__init__.py +++ b/src/praisonai/praisonai/integrations/__init__.py @@ -1,20 +1,24 @@ """ -PraisonAI Integrations - External CLI tool integrations. +PraisonAI Integrations - External CLI tool and managed agent integrations. -This module provides integrations with external AI coding CLI tools: +This module provides integrations with external AI coding tools: - Claude Code CLI - Gemini CLI - OpenAI Codex CLI - Cursor CLI +- Managed Agent Backends (Anthropic Managed Agents API) All integrations use lazy loading to avoid performance impact. Usage: - from praisonai.integrations import ClaudeCodeIntegration, GeminiCLIIntegration + from praisonai.integrations import ClaudeCodeIntegration, ManagedAgentIntegration - # Create integration + # CLI tool integration claude = ClaudeCodeIntegration(workspace="/path/to/project") + # Managed agent integration + managed = ManagedAgentIntegration(provider="anthropic", api_key="...") + # Use as agent tool tool = claude.as_tool() @@ -29,6 +33,7 @@ 'GeminiCLIIntegration', 'CodexCLIIntegration', 'CursorCLIIntegration', + 'ManagedAgentIntegration', 'get_available_integrations', ] @@ -50,6 +55,9 @@ def __getattr__(name): elif name == 'CursorCLIIntegration': from .cursor_cli import CursorCLIIntegration return CursorCLIIntegration + elif name == 'ManagedAgentIntegration': + from .managed_agents import ManagedAgentIntegration + return ManagedAgentIntegration elif name == 'get_available_integrations': from .base import get_available_integrations return get_available_integrations diff --git a/src/praisonai/praisonai/integrations/managed_agents.py b/src/praisonai/praisonai/integrations/managed_agents.py new file mode 100644 index 000000000..5a45c1fc6 --- /dev/null +++ b/src/praisonai/praisonai/integrations/managed_agents.py @@ -0,0 +1,428 @@ +""" +External Managed Agent Backends Integration. + +Provides integration with external managed agent infrastructures as execution backends, +starting with Anthropic's Managed Agents API. This follows the existing BaseCLIIntegration +pattern but for managed remote agent services instead of CLI tools. + +Features: +- Generic managed agent backend support +- Anthropic Managed Agents API integration +- Session management and event streaming +- Tool mapping from managed to PraisonAI tools +- Async execution with timeout handling + +Usage: + from praisonai.integrations import ManagedAgentIntegration + + # Create managed backend + managed = ManagedAgentIntegration( + provider="anthropic", + api_key="your-api-key", + config={"model": "claude-sonnet-4-6"} + ) + + # Execute agent in managed infrastructure + result = await managed.execute("Create a FastAPI app") + + # Use with PraisonAI Agent + from praisonaiagents import Agent + agent = Agent(name="coder", backend=managed) + result = agent.start("Create a FastAPI app") +""" + +import asyncio +import json +import os +from typing import AsyncIterator, Dict, Any, Optional, List +from abc import ABC, abstractmethod + +# Use existing aiohttp for HTTP requests (no new dependencies) +try: + import aiohttp +except ImportError: + aiohttp = None + +from .base import BaseCLIIntegration + + +class ManagedBackendProtocol(ABC): + """ + Protocol for managed agent backend providers. + + Defines the interface that all managed agent providers must implement. + Follows the protocol-driven design from AGENTS.md. + """ + + @abstractmethod + async def create_agent(self, instructions: str, **config) -> str: + """Create an agent configuration and return agent_id.""" + pass + + @abstractmethod + async def create_environment(self, **config) -> str: + """Create an environment and return environment_id.""" + pass + + @abstractmethod + async def create_session(self, agent_id: str, environment_id: str) -> str: + """Create a session and return session_id.""" + pass + + @abstractmethod + async def send_message(self, session_id: str, prompt: str) -> None: + """Send a message to the session.""" + pass + + @abstractmethod + async def stream_events(self, session_id: str) -> AsyncIterator[Dict[str, Any]]: + """Stream events from the session.""" + pass + + @abstractmethod + async def collect_response(self, session_id: str) -> str: + """Collect the complete response from the session.""" + pass + + +class AnthropicManagedProvider(ManagedBackendProtocol): + """ + Anthropic Managed Agents API provider implementation. + + Implements the ManagedBackendProtocol for Anthropic's managed agent service. + Reference: https://platform.claude.com/docs/en/managed-agents/overview + """ + + def __init__(self, api_key: str, base_url: str = "https://api.anthropic.com/v1"): + self.api_key = api_key + self.base_url = base_url.rstrip("/") + self.session = None + + async def _get_session(self): + """Get or create aiohttp session.""" + if self.session is None: + headers = { + "x-api-key": self.api_key, + "Content-Type": "application/json", + "anthropic-version": "2023-06-01", + "anthropic-beta": "managed-agents-2026-04-01" + } + self.session = aiohttp.ClientSession(headers=headers) + return self.session + + async def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: + """Make an HTTP request to the Anthropic API.""" + session = await self._get_session() + url = f"{self.base_url}{endpoint}" + + async with session.request(method, url, **kwargs) as response: + response.raise_for_status() + return await response.json() + + async def create_agent(self, instructions: str, **config) -> str: + """Create an agent configuration.""" + payload = { + "instructions": instructions, + **config + } + + result = await self._request("POST", "/agents", json=payload) + return result["id"] + + async def create_environment(self, **config) -> str: + """Create a container environment.""" + payload = { + "type": "container", + **config + } + + result = await self._request("POST", "/environments", json=payload) + return result["id"] + + async def create_session(self, agent_id: str, environment_id: str) -> str: + """Create a session.""" + payload = { + "agent": agent_id, + "environment_id": environment_id + } + + result = await self._request("POST", "/sessions", json=payload) + return result["id"] + + async def send_message(self, session_id: str, prompt: str) -> None: + """Send a message to the session.""" + payload = { + "events": [ + { + "type": "user.message", + "content": prompt + } + ] + } + + await self._request("POST", f"/sessions/{session_id}/events", json=payload) + + async def stream_events(self, session_id: str) -> AsyncIterator[Dict[str, Any]]: + """Stream events from the session using SSE.""" + session = await self._get_session() + url = f"{self.base_url}/sessions/{session_id}/stream" + + async with session.get(url) as response: + response.raise_for_status() + + while True: + line_bytes = await response.content.readline() + if not line_bytes: + break + line = line_bytes.decode('utf-8').strip() + if line.startswith('data: '): + try: + event_data = json.loads(line[6:]) # Remove 'data: ' prefix + yield event_data + except json.JSONDecodeError: + continue + + async def collect_response(self, session_id: str) -> str: + """Collect the complete response from the session.""" + response_parts = [] + + async for event in self.stream_events(session_id): + if event.get("type") == "agent.message": + content = event.get("content", {}) + if isinstance(content, str): + response_parts.append(content) + elif isinstance(content, dict) and "text" in content: + response_parts.append(content["text"]) + elif event.get("type") == "session.complete": + break + + return "".join(response_parts) + + async def close(self): + """Close the HTTP session.""" + if self.session: + await self.session.close() + self.session = None + + +class ManagedAgentIntegration(BaseCLIIntegration): + """ + Generic integration for managed agent APIs. + + Extends BaseCLIIntegration to provide a consistent interface for external + managed agent infrastructures. Supports multiple providers through the + ManagedBackendProtocol. + + Attributes: + provider: Provider name ("anthropic", etc.) + api_key: API key for the provider + config: Provider-specific configuration + backend: The managed backend provider instance + agent_id: Created agent ID (cached) + environment_id: Created environment ID (cached) + """ + + def __init__( + self, + provider: str = "anthropic", + api_key: Optional[str] = None, + config: Optional[Dict[str, Any]] = None, + workspace: str = ".", + timeout: int = 600, # Managed agents may take longer than CLI tools + instructions: str = "You are a helpful AI assistant.", + ): + """ + Initialize managed agent integration. + + Args: + provider: Provider name ("anthropic", etc.) + api_key: API key for the provider + config: Provider-specific configuration + workspace: Working directory (for compatibility with base class) + timeout: Timeout in seconds for operations + instructions: Default instructions for agent creation + """ + super().__init__(workspace=workspace, timeout=timeout) + + self.provider = provider + self.api_key = api_key + self.config = config or {} + self.instructions = instructions + + # Initialize provider + self.backend = self._create_provider(provider, api_key) + + # Cached IDs for reuse + self.agent_id: Optional[str] = None + self.environment_id: Optional[str] = None + self._session_cache: Dict[str, str] = {} + + @property + def cli_command(self) -> str: + """Return the CLI command name for compatibility.""" + return f"managed-{self.provider}" + + @property + def is_available(self) -> bool: + """Check if the managed agent service is available.""" + # For managed services, we check if we have an API key and aiohttp + return ( + aiohttp is not None and + self.api_key is not None and + self.backend is not None + ) + + def _create_provider(self, provider: str, api_key: str) -> Optional[ManagedBackendProtocol]: + """Create a provider instance based on the provider name.""" + if aiohttp is None: + # Return None so is_available will be False + return None + + if api_key is None: + # Try to get from environment + if provider == "anthropic": + api_key = os.getenv("ANTHROPIC_API_KEY") or os.getenv("CLAUDE_API_KEY") + self.api_key = api_key + + if api_key is None: + return None + + # Persist resolved key so is_available reflects correct state + self.api_key = api_key + + if provider == "anthropic": + return AnthropicManagedProvider(api_key) + else: + raise ValueError(f"Unsupported provider: {provider}") + + async def _ensure_agent(self) -> str: + """Ensure agent exists and return agent_id.""" + if self.agent_id is None: + self.agent_id = await self.backend.create_agent( + self.instructions, + **self.config + ) + return self.agent_id + + async def _ensure_environment(self) -> str: + """Ensure environment exists and return environment_id.""" + if self.environment_id is None: + self.environment_id = await self.backend.create_environment() + return self.environment_id + + async def execute(self, prompt: str, **options) -> str: + """ + Execute agent in managed infrastructure. + + Args: + prompt: The prompt/query to send to the agent + **options: Additional options (session_id for reuse, etc.) + + Returns: + str: The agent's response + """ + if not self.is_available: + raise RuntimeError(f"Managed agent service ({self.provider}) is not available") + + # Get or create session + session_key = options.get('session_key', 'default') + session_id = self._session_cache.get(session_key) + + if session_id is None: + agent_id = await self._ensure_agent() + environment_id = await self._ensure_environment() + session_id = await self.backend.create_session(agent_id, environment_id) + self._session_cache[session_key] = session_id + + # Send message + await self.backend.send_message(session_id, prompt) + + # Collect response with timeout + try: + return await asyncio.wait_for( + self.backend.collect_response(session_id), + timeout=self.timeout + ) + except asyncio.TimeoutError as err: + raise RuntimeError(f"Managed agent execution timed out after {self.timeout}s") from err + + async def stream(self, prompt: str, **options) -> AsyncIterator[Dict[str, Any]]: + """ + Stream output from managed agent. + + Args: + prompt: The prompt/query to send to the agent + **options: Additional options + + Yields: + dict: Events from the managed agent + """ + if not self.is_available: + raise RuntimeError(f"Managed agent service ({self.provider}) is not available") + + # Get or create session + session_key = options.get('session_key', 'default') + session_id = self._session_cache.get(session_key) + + if session_id is None: + agent_id = await self._ensure_agent() + environment_id = await self._ensure_environment() + session_id = await self.backend.create_session(agent_id, environment_id) + self._session_cache[session_key] = session_id + + # Send message + await self.backend.send_message(session_id, prompt) + + # Stream events with timeout + loop = asyncio.get_event_loop() + deadline = loop.time() + self.timeout + + try: + async for event in self.backend.stream_events(session_id): + if loop.time() > deadline: + raise asyncio.TimeoutError() + yield event + except asyncio.TimeoutError as err: + raise RuntimeError(f"Managed agent streaming timed out after {self.timeout}s") from err + + def reset_session(self, session_key: str = 'default'): + """Reset a specific session.""" + if session_key in self._session_cache: + del self._session_cache[session_key] + + def reset_all_sessions(self): + """Reset all sessions.""" + self._session_cache.clear() + + async def close(self): + """Close the managed agent integration and cleanup resources.""" + if hasattr(self.backend, 'close'): + await self.backend.close() + self.reset_all_sessions() + + +# Tool mapping helpers +TOOL_MAPPING = { + # Managed agent built-in tools -> PraisonAI tool equivalents + "bash": "execute_command", + "read": "read_file", + "write": "write_file", + "edit": "apply_diff", + "glob": "list_files", + "grep": "search_file", + "web_fetch": "web_fetch", + "search": "search_web", +} + + +def map_managed_tools(managed_tools: List[str]) -> List[str]: + """ + Map managed agent tool names to PraisonAI tool names. + + Args: + managed_tools: List of managed agent tool names + + Returns: + List of corresponding PraisonAI tool names + """ + return [TOOL_MAPPING.get(tool, tool) for tool in managed_tools] \ No newline at end of file diff --git a/src/praisonai/tests/unit/integrations/test_managed_agents.py b/src/praisonai/tests/unit/integrations/test_managed_agents.py new file mode 100644 index 000000000..8f59c489a --- /dev/null +++ b/src/praisonai/tests/unit/integrations/test_managed_agents.py @@ -0,0 +1,197 @@ +""" +Unit tests for the ManagedAgentIntegration feature. + +Tests the basic functionality of the managed agent backend integration +without making actual API calls. +""" + +import pytest +from unittest.mock import Mock, patch + + +def test_managed_agent_integration_import(): + """Test that ManagedAgentIntegration can be imported.""" + from praisonai.integrations.managed_agents import ManagedAgentIntegration + assert ManagedAgentIntegration is not None + + +def test_managed_agent_integration_creation(): + """Test creating a ManagedAgentIntegration instance.""" + with patch('praisonai.integrations.managed_agents.aiohttp', None): + from praisonai.integrations.managed_agents import ManagedAgentIntegration + + # Should not raise an exception even without aiohttp + managed = ManagedAgentIntegration( + provider="anthropic", + api_key="test_key" + ) + + assert managed.provider == "anthropic" + assert managed.api_key == "test_key" + assert managed.cli_command == "managed-anthropic" + assert not managed.is_available # Should be False without aiohttp + + +def test_tool_mapping(): + """Test the tool mapping functionality.""" + from praisonai.integrations.managed_agents import map_managed_tools + + managed_tools = ["bash", "read", "write", "edit", "unknown_tool"] + mapped_tools = map_managed_tools(managed_tools) + + expected = ["execute_command", "read_file", "write_file", "apply_diff", "unknown_tool"] + assert mapped_tools == expected + + +def test_agent_backend_parameter(): + """Test that Agent class supports the backend parameter.""" + # Mock aiohttp to avoid import issues + with patch('praisonai.integrations.managed_agents.aiohttp', None): + from praisonai.integrations.managed_agents import ManagedAgentIntegration + from praisonaiagents import Agent + + # Create a managed backend instance + managed = ManagedAgentIntegration(provider="anthropic", api_key="test_key") + + # Create agent with backend parameter + agent = Agent( + name="test_agent", + instructions="You are a test agent.", + backend=managed + ) + + # Verify backend is stored + assert agent.backend == managed + + +def test_agent_backend_delegation(): + """Test that Agent properly delegates execution to backend.""" + import asyncio + from typing import Dict, Any, AsyncIterator + + class MockManagedBackend: + """Mock backend to test delegation.""" + + def __init__(self): + self.executed_prompts = [] + self.execution_kwargs = [] + + async def execute(self, prompt: str, **kwargs) -> str: + self.executed_prompts.append(prompt) + self.execution_kwargs.append(kwargs) + return f"Backend response: {prompt}" + + async def stream(self, prompt: str, **kwargs) -> AsyncIterator[Dict[str, Any]]: + self.executed_prompts.append(prompt) + self.execution_kwargs.append(kwargs) + yield { + 'type': 'agent.message', + 'content': [{'type': 'text', 'text': f"Backend streamed: {prompt}"}] + } + + # Create mock backend + mock_backend = MockManagedBackend() + + # Create agent with backend + agent = Agent( + name="test-agent", + instructions="Test agent", + backend=mock_backend + ) + + # Test run() delegation + result = agent.run("Test run prompt") + assert result == "Backend response: Test run prompt" + assert len(mock_backend.executed_prompts) == 1 + assert mock_backend.executed_prompts[0] == "Test run prompt" + + # Test start() delegation + result = agent.start("Test start prompt") + assert result == "Backend response: Test start prompt" + assert len(mock_backend.executed_prompts) == 2 + assert mock_backend.executed_prompts[1] == "Test start prompt" + + # Test chat() delegation + result = agent.chat("Test chat prompt") + assert result == "Backend response: Test chat prompt" + assert len(mock_backend.executed_prompts) == 3 + assert mock_backend.executed_prompts[2] == "Test chat prompt" + + # Test that Agent without backend doesn't delegate + local_agent = Agent(name="local", instructions="Local agent") + assert not hasattr(local_agent, 'backend') or local_agent.backend is None + + +def test_managed_backend_protocol(): + """Test the ManagedBackendProtocol interface.""" + from praisonai.integrations.managed_agents import ManagedBackendProtocol + + # Test that the protocol has the expected abstract methods + expected_methods = [ + 'create_agent', + 'create_environment', + 'create_session', + 'send_message', + 'stream_events', + 'collect_response' + ] + + for method_name in expected_methods: + assert hasattr(ManagedBackendProtocol, method_name) + + +@patch('praisonai.integrations.managed_agents.aiohttp') +def test_anthropic_provider_creation(mock_aiohttp): + """Test creating an Anthropic provider.""" + from praisonai.integrations.managed_agents import ManagedAgentIntegration + + # Mock aiohttp to be available + mock_aiohttp.__bool__ = lambda: True + + managed = ManagedAgentIntegration( + provider="anthropic", + api_key="test_key" + ) + + assert managed.provider == "anthropic" + assert managed.api_key == "test_key" + assert managed.backend is not None + assert managed.is_available + + +def test_unsupported_provider(): + """Test creating integration with unsupported provider.""" + with patch('praisonai.integrations.managed_agents.aiohttp'): + from praisonai.integrations.managed_agents import ManagedAgentIntegration + + with pytest.raises(ValueError, match="Unsupported provider: unknown"): + ManagedAgentIntegration(provider="unknown", api_key="test_key") + + +def test_session_caching(): + """Test that session IDs are cached correctly (regression test for #357 bug).""" + with patch('praisonai.integrations.managed_agents.aiohttp'): + from praisonai.integrations.managed_agents import ManagedAgentIntegration + + managed = ManagedAgentIntegration(provider="anthropic", api_key="test_key") + + # Simulate adding session to cache + managed._session_cache["test_session"] = "session_id_123" + + # Verify the correct session ID is cached (not the key) + assert managed._session_cache["test_session"] == "session_id_123" + assert managed._session_cache["test_session"] != "test_session" + + +def test_api_key_persistence(): + """Test that API keys from environment are persisted (regression test).""" + with patch('praisonai.integrations.managed_agents.aiohttp'), \ + patch('os.getenv', return_value="env_api_key"): + + from praisonai.integrations.managed_agents import ManagedAgentIntegration + + # Create without explicit API key to trigger env lookup + managed = ManagedAgentIntegration(provider="anthropic", api_key=None) + + # Should have stored the env key back to api_key + assert managed.api_key == "env_api_key" \ No newline at end of file diff --git a/test_backend_delegation.py b/test_backend_delegation.py new file mode 100644 index 000000000..2da27b4f5 --- /dev/null +++ b/test_backend_delegation.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +""" +Test script to verify that Agent backend delegation is working correctly. +""" + +import sys +import asyncio +from typing import Dict, Any, AsyncIterator + +# Add the praisonai-agents source to the path +sys.path.insert(0, '/home/runner/work/PraisonAI/PraisonAI/src/praisonai-agents') + +from praisonaiagents import Agent + + +class MockManagedBackend: + """Mock backend to test delegation without real API calls.""" + + def __init__(self): + self.executed_prompts = [] + self.streamed_prompts = [] + + async def execute(self, prompt: str, **kwargs) -> str: + self.executed_prompts.append(prompt) + return f"Mock response for: {prompt}" + + async def stream(self, prompt: str, **kwargs) -> AsyncIterator[Dict[str, Any]]: + self.streamed_prompts.append(prompt) + # Simulate streaming response with agent.message event + yield { + 'type': 'agent.message', + 'content': [{'type': 'text', 'text': f"Mock streamed response for: {prompt}"}] + } + yield { + 'type': 'session.status_idle', + 'reason': 'completed' + } + + +def test_backend_delegation(): + """Test that Agent properly delegates to external backend.""" + + print("Testing Agent backend delegation...") + + # Create a mock backend + mock_backend = MockManagedBackend() + + # Create agent with backend + agent = Agent(name="test", instructions="You are a test agent", backend=mock_backend) + + # Test 1: run() method delegation + print("\n1. Testing run() method delegation...") + result = agent.run("Hello, test run") + print(f"Result: {result}") + + # Verify the backend was called + assert len(mock_backend.executed_prompts) == 1 + assert mock_backend.executed_prompts[0] == "Hello, test run" + assert "Mock response for: Hello, test run" in result + print("✅ run() delegation working!") + + # Test 2: start() method delegation (non-streaming) + print("\n2. Testing start() method delegation (non-streaming)...") + result = agent.start("Hello, test start") + print(f"Result: {result}") + + # Verify the backend was called again + assert len(mock_backend.executed_prompts) == 2 + assert mock_backend.executed_prompts[1] == "Hello, test start" + assert "Mock response for: Hello, test start" in result + print("✅ start() delegation working!") + + # Test 3: chat() method delegation + print("\n3. Testing chat() method delegation...") + result = agent.chat("Hello, test chat") + print(f"Result: {result}") + + # Verify the backend was called again + assert len(mock_backend.executed_prompts) == 3 + assert mock_backend.executed_prompts[2] == "Hello, test chat" + assert "Mock response for: Hello, test chat" in result + print("✅ chat() delegation working!") + + # Test 4: Test that Agent WITHOUT backend still works locally + print("\n4. Testing Agent without backend (fallback to local)...") + local_agent = Agent(name="local", instructions="You are a local agent") + + # This should fail with a real LLM call since we don't have API keys + # But it should at least try to execute locally (not delegate) + try: + # We just want to make sure it doesn't try to delegate + # We expect this to fail due to missing API keys, which is fine + result = local_agent.run("Test local execution") + except Exception as e: + # Expected - no API keys configured + print(f"Expected error for local agent (no API keys): {type(e).__name__}") + print("✅ Local agent execution attempted (not delegated)!") + + print("\n🎉 All tests passed! Backend delegation is working correctly.") + + return True + + +if __name__ == "__main__": + test_backend_delegation() \ No newline at end of file diff --git a/test_delegation_unit.py b/test_delegation_unit.py new file mode 100644 index 000000000..1556c810f --- /dev/null +++ b/test_delegation_unit.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +""" +Unit test for backend delegation functionality. +""" + +import sys +sys.path.insert(0, '/home/runner/work/PraisonAI/PraisonAI/src/praisonai-agents') + +from praisonaiagents import Agent +import asyncio +from typing import Dict, Any, AsyncIterator + +class MockManagedBackend: + """Mock backend to test delegation.""" + + def __init__(self): + self.executed_prompts = [] + self.execution_kwargs = [] + + async def execute(self, prompt: str, **kwargs) -> str: + self.executed_prompts.append(prompt) + self.execution_kwargs.append(kwargs) + return f"Backend response: {prompt}" + + async def stream(self, prompt: str, **kwargs) -> AsyncIterator[Dict[str, Any]]: + self.executed_prompts.append(prompt) + self.execution_kwargs.append(kwargs) + yield { + 'type': 'agent.message', + 'content': [{'type': 'text', 'text': f"Backend streamed: {prompt}"}] + } + +def test_agent_backend_delegation(): + """Test that Agent properly delegates execution to backend.""" + + # Create mock backend + mock_backend = MockManagedBackend() + + # Create agent with backend + agent = Agent( + name="test-agent", + instructions="Test agent", + backend=mock_backend + ) + + # Test run() delegation + result = agent.run("Test run prompt") + assert result == "Backend response: Test run prompt" + assert len(mock_backend.executed_prompts) == 1 + assert mock_backend.executed_prompts[0] == "Test run prompt" + print("✅ run() delegation test passed") + + # Test start() delegation + result = agent.start("Test start prompt") + assert result == "Backend response: Test start prompt" + assert len(mock_backend.executed_prompts) == 2 + assert mock_backend.executed_prompts[1] == "Test start prompt" + print("✅ start() delegation test passed") + + # Test chat() delegation + result = agent.chat("Test chat prompt") + assert result == "Backend response: Test chat prompt" + assert len(mock_backend.executed_prompts) == 3 + assert mock_backend.executed_prompts[2] == "Test chat prompt" + print("✅ chat() delegation test passed") + + # Test that Agent without backend doesn't delegate + local_agent = Agent(name="local", instructions="Local agent") + assert not hasattr(local_agent, 'backend') or local_agent.backend is None + print("✅ Non-backend agent test passed") + + print("\n🎉 All unit tests passed!") + +if __name__ == "__main__": + test_agent_backend_delegation() \ No newline at end of file