-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: add External Managed Agent Backends support #1359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
dfcb1c8
56c4f7d
635ab50
fb6d8ed
40e4017
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -487,6 +487,7 @@ def __init__( | |
| approval: Optional[Union[bool, str, Dict[str, Any], 'ApprovalConfig', 'ApprovalProtocol']] = None, | ||
| tool_timeout: Optional[int] = None, # P8/G11: Timeout in seconds for each tool call | ||
| learn: Optional[Union[bool, str, Dict[str, Any], 'LearnConfig']] = None, # Continuous learning (peer to memory) | ||
| backend: Optional[Any] = None, # External managed agent backend (e.g., ManagedAgentIntegration) | ||
| ): | ||
| """Initialize an Agent instance. | ||
|
|
||
|
|
@@ -574,6 +575,11 @@ def __init__( | |
| - LearnConfig: Custom configuration | ||
| Learning is a first-class citizen, peer to memory. It captures patterns, | ||
| preferences, and insights from interactions to improve future responses. | ||
| backend: External managed agent backend for hybrid execution. Accepts: | ||
| - ManagedAgentIntegration: External managed agent service | ||
| - None: Use local execution (default) | ||
| When provided, agent can delegate execution to managed infrastructure | ||
| for long-running tasks or when local resources are constrained. | ||
|
|
||
| Raises: | ||
| ValueError: If all of name, role, goal, backstory, and instructions are None. | ||
|
|
@@ -1798,6 +1804,9 @@ def __init__( | |
| self._output_file = output_file if _output_config else None | ||
| self._output_template = output_template if _output_config else None | ||
|
|
||
| # Backend - external managed agent backend for hybrid execution | ||
| self.backend = backend | ||
|
Comment on lines
+1807
to
+1808
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The The integration is incomplete until the execution path (e.g.,
Comment on lines
+1807
to
+1808
|
||
|
|
||
| # Telemetry - lazy initialized via property for performance | ||
| self.__telemetry = None | ||
| self.__telemetry_initialized = False | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1049,6 +1049,27 @@ def chat(self, prompt: str, temperature: float = 1.0, tools: Optional[List[Any]] | |
| 'required' forces the LLM to call a tool before responding. | ||
| ...other args... | ||
| """ | ||
| # Check if external managed backend is configured | ||
| if hasattr(self, 'backend') and self.backend is not None: | ||
| # Extract kwargs for delegation, excluding 'self' and function locals | ||
| delegation_kwargs = { | ||
| 'temperature': temperature, | ||
| 'tools': tools, | ||
| 'output_json': output_json, | ||
| 'output_pydantic': output_pydantic, | ||
| 'reasoning_steps': reasoning_steps, | ||
| 'stream': stream, | ||
| 'task_name': task_name, | ||
| 'task_description': task_description, | ||
| 'task_id': task_id, | ||
| 'config': config, | ||
| 'force_retrieval': force_retrieval, | ||
| 'skip_retrieval': skip_retrieval, | ||
| 'attachments': attachments, | ||
| 'tool_choice': tool_choice | ||
| } | ||
| return self._delegate_to_backend(prompt, **delegation_kwargs) | ||
|
Comment on lines
+1052
to
+1071
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't bypass the normal This early return skips the rate limiter, BEFORE/AFTER_AGENT hooks, run tracking, guardrails, and 🤖 Prompt for AI Agents |
||
|
|
||
| # Emit context trace event (zero overhead when not set) | ||
| from ..trace.context_events import get_context_emitter | ||
| _trace_emitter = get_context_emitter() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -251,6 +251,10 @@ def run(self, prompt: str, **kwargs: Any) -> Optional[str]: | |
| - Background processing | ||
| - API endpoints | ||
| """ | ||
| # Check if external managed backend is configured | ||
| if hasattr(self, 'backend') and self.backend is not None: | ||
| return self._delegate_to_backend(prompt, **kwargs) | ||
|
|
||
| # Production defaults: no streaming, no display | ||
| if 'stream' not in kwargs: | ||
| kwargs['stream'] = False | ||
|
|
@@ -274,6 +278,168 @@ def run(self, prompt: str, **kwargs: Any) -> Optional[str]: | |
|
|
||
| return result | ||
|
|
||
| def _delegate_to_backend(self, prompt: str, **kwargs) -> Optional[str]: | ||
| """Delegate execution to external managed backend (e.g., ManagedAgentIntegration).""" | ||
| import asyncio | ||
|
|
||
| # Check if backend has required methods | ||
| if not hasattr(self.backend, 'execute'): | ||
| raise RuntimeError(f"Backend {type(self.backend).__name__} does not support execute() method") | ||
|
|
||
| # Handle streaming vs non-streaming | ||
| stream_requested = kwargs.get('stream', False) | ||
|
|
||
| if stream_requested: | ||
| # For streaming, delegate to backend's stream method if available | ||
| if hasattr(self.backend, 'stream'): | ||
| return self._delegate_streaming_to_backend(prompt, **kwargs) | ||
| else: | ||
| # Fallback: execute non-streaming even if stream was requested | ||
| return self._execute_backend_sync(prompt, **kwargs) | ||
| else: | ||
| # Non-streaming execution | ||
| return self._execute_backend_sync(prompt, **kwargs) | ||
|
Comment on lines
+281
to
+301
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add an async backend path for This helper only serves the new synchronous delegation branches. Async callers still go through the local execution stack, so 🤖 Prompt for AI Agents |
||
|
|
||
| def _execute_backend_sync(self, prompt: str, **kwargs) -> Optional[str]: | ||
| """Execute backend in sync mode, handling async backends.""" | ||
| try: | ||
| # Try to run in existing event loop | ||
| loop = asyncio.get_running_loop() | ||
| # If we're already in an async context, we can't use asyncio.run() | ||
| # Create a new task instead | ||
| import concurrent.futures | ||
| import threading | ||
|
|
||
| def run_async(): | ||
| new_loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(new_loop) | ||
| try: | ||
| return new_loop.run_until_complete(self.backend.execute(prompt, **kwargs)) | ||
| finally: | ||
| new_loop.close() | ||
|
|
||
| with concurrent.futures.ThreadPoolExecutor() as executor: | ||
| future = executor.submit(run_async) | ||
| return future.result() | ||
|
|
||
| except RuntimeError: | ||
| # No event loop running, safe to use asyncio.run() | ||
| return asyncio.run(self.backend.execute(prompt, **kwargs)) | ||
|
Comment on lines
+305
to
+327
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Narrow the
🛠️ Safer structure def _execute_backend_sync(self, prompt: str, **kwargs) -> Optional[str]:
- try:
- # Try to run in existing event loop
- loop = asyncio.get_running_loop()
- # If we're already in an async context, we can't use asyncio.run()
- # Create a new task instead
- import concurrent.futures
- import threading
+ try:
+ asyncio.get_running_loop()
+ except RuntimeError:
+ return asyncio.run(self.backend.execute(prompt, **kwargs))
+
+ import concurrent.futures
- def run_async():
- new_loop = asyncio.new_event_loop()
- asyncio.set_event_loop(new_loop)
- try:
- return new_loop.run_until_complete(self.backend.execute(prompt, **kwargs))
- finally:
- new_loop.close()
-
- with concurrent.futures.ThreadPoolExecutor() as executor:
- future = executor.submit(run_async)
- return future.result()
-
- except RuntimeError:
- # No event loop running, safe to use asyncio.run()
- return asyncio.run(self.backend.execute(prompt, **kwargs))
+ def run_async():
+ new_loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(new_loop)
+ try:
+ return new_loop.run_until_complete(self.backend.execute(prompt, **kwargs))
+ finally:
+ new_loop.close()
+
+ with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
+ future = executor.submit(run_async)
+ return future.result()🤖 Prompt for AI Agents |
||
|
|
||
| def _delegate_streaming_to_backend(self, prompt: str, **kwargs): | ||
| """Delegate to backend's streaming method.""" | ||
| try: | ||
| # For streaming, we need to return an iterator/generator | ||
| # The backend's stream method is async, so we need to handle that | ||
| import asyncio | ||
|
|
||
| async def stream_wrapper(): | ||
| async for chunk in self.backend.stream(prompt, **kwargs): | ||
| yield chunk | ||
|
|
||
| # Convert async generator to sync generator | ||
| def sync_stream(): | ||
| try: | ||
| loop = asyncio.get_running_loop() | ||
| # Already in async context - need to handle differently | ||
| import concurrent.futures | ||
| import threading | ||
| import queue | ||
|
|
||
| result_queue = queue.Queue() | ||
| exception_holder = [None] | ||
|
|
||
| def run_in_thread(): | ||
| new_loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(new_loop) | ||
| try: | ||
| async def collect(): | ||
| try: | ||
| async for item in self.backend.stream(prompt, **kwargs): | ||
| result_queue.put(('item', item)) | ||
| result_queue.put(('done', None)) | ||
| except Exception as e: | ||
| exception_holder[0] = e | ||
| result_queue.put(('error', e)) | ||
|
|
||
| new_loop.run_until_complete(collect()) | ||
| finally: | ||
| new_loop.close() | ||
|
|
||
| thread = threading.Thread(target=run_in_thread) | ||
| thread.start() | ||
|
|
||
| while True: | ||
| msg_type, data = result_queue.get() | ||
| if msg_type == 'item': | ||
| # For managed backends, we might get event objects | ||
| # Convert to string format expected by Agent | ||
| if isinstance(data, dict): | ||
| if data.get('type') == 'agent.message': | ||
| content = data.get('content', []) | ||
| if isinstance(content, list): | ||
| text_parts = [] | ||
| for block in content: | ||
| if isinstance(block, dict) and block.get('type') == 'text': | ||
| text_parts.append(block.get('text', '')) | ||
| elif isinstance(block, str): | ||
| text_parts.append(block) | ||
| if text_parts: | ||
| yield ''.join(text_parts) | ||
| elif isinstance(content, str): | ||
| yield content | ||
| # Skip other event types (session.status_idle, etc.) | ||
| elif isinstance(data, str): | ||
| yield data | ||
| elif msg_type == 'done': | ||
| break | ||
| elif msg_type == 'error': | ||
| raise data | ||
|
|
||
| thread.join() | ||
|
|
||
| except RuntimeError: | ||
| # No event loop - can run directly | ||
| async def run_stream(): | ||
| async for item in self.backend.stream(prompt, **kwargs): | ||
| yield item | ||
|
|
||
| # Use asyncio.run for each item (not ideal but works) | ||
| async_gen = run_stream() | ||
|
|
||
| async def collect_all(): | ||
| results = [] | ||
| async for item in async_gen: | ||
| results.append(item) | ||
| return results | ||
|
|
||
| results = asyncio.run(collect_all()) | ||
| for item in results: | ||
|
Comment on lines
+401
to
+417
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The sync streaming fallback buffers the whole stream. When no loop is running—which is the normal 🤖 Prompt for AI Agents |
||
| # Similar conversion logic | ||
| if isinstance(item, dict): | ||
| if item.get('type') == 'agent.message': | ||
| content = item.get('content', []) | ||
| if isinstance(content, list): | ||
| text_parts = [] | ||
| for block in content: | ||
| if isinstance(block, dict) and block.get('type') == 'text': | ||
| text_parts.append(block.get('text', '')) | ||
| elif isinstance(block, str): | ||
| text_parts.append(block) | ||
| if text_parts: | ||
| yield ''.join(text_parts) | ||
| elif isinstance(content, str): | ||
| yield content | ||
| elif isinstance(item, str): | ||
| yield item | ||
|
|
||
| return sync_stream() | ||
|
|
||
| except Exception as e: | ||
| # Fallback to non-streaming | ||
| logger.warning(f"Backend streaming failed, falling back to non-streaming: {e}") | ||
| return self._execute_backend_sync(prompt, **kwargs) | ||
|
|
||
| def _get_planning_agent(self): | ||
| """Lazy load PlanningAgent for planning mode.""" | ||
| if self._planning_agent is None and self.planning: | ||
|
|
@@ -453,6 +619,10 @@ def start(self, prompt: Optional[str] = None, **kwargs: Any) -> Union[str, Gener | |
| from praisonaiagents.utils.variables import substitute_variables | ||
| prompt = substitute_variables(prompt, {}) | ||
|
|
||
| # Check if external managed backend is configured | ||
| if hasattr(self, 'backend') and self.backend is not None: | ||
| return self._delegate_to_backend(prompt, **kwargs) | ||
|
|
||
| # ───────────────────────────────────────────────────────────────────── | ||
| # UNIFIED AUTONOMY API: If autonomy is enabled, route to run_autonomous | ||
| # This allows: Agent(autonomy=True) + agent.start("Task") to just work! | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
backendis only stored, never used.These changes add a public
backendoption and document delegated execution, but none of the execution paths in this file consultself.backend.Agent(backend=managed)therefore still runs locally, so the feature is effectively unimplemented.Also applies to: 578-582, 1807-1808
🤖 Prompt for AI Agents