diff --git a/src/praisonai-agents/praisonaiagents/agent/concurrency.py b/src/praisonai-agents/praisonaiagents/agent/concurrency.py index 0bc586889..5c31e7da3 100644 --- a/src/praisonai-agents/praisonaiagents/agent/concurrency.py +++ b/src/praisonai-agents/praisonaiagents/agent/concurrency.py @@ -86,26 +86,27 @@ async def acquire(self, agent_name: str) -> None: def acquire_sync(self, agent_name: str) -> None: """Synchronous acquire — for non-async code paths. - Note: This creates/reuses an event loop internally. Prefer async acquire() when possible. + If called while an event loop is already running in the current thread, + this method raises RuntimeError to avoid deadlock. """ sem = self._get_semaphore(agent_name) if sem is None: return try: asyncio.get_running_loop() - # If we're in an async context, we can't block - # Just try_acquire or no-op with warning - if not sem._value > 0: - logger.warning( - f"Sync acquire for '{agent_name}' while async loop running and semaphore full. " - f"Consider using async acquire() instead." - ) - # Decrement manually for sync context - sem._value = max(0, sem._value - 1) except RuntimeError: - # No running loop — safe to use asyncio.run - asyncio.get_event_loop().run_until_complete(sem.acquire()) + # No running loop — safe to create one + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(sem.acquire()) + finally: + loop.close() + else: + raise RuntimeError( + f"acquire_sync('{agent_name}') cannot be called with a running event loop; " + "use async acquire() in async contexts." + ) def release(self, agent_name: str) -> None: """Release concurrency slot for agent. No-op if unlimited.""" diff --git a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py index d1009cd7a..54b3b30c0 100644 --- a/src/praisonai-agents/praisonaiagents/agent/tool_execution.py +++ b/src/praisonai-agents/praisonaiagents/agent/tool_execution.py @@ -202,25 +202,19 @@ def execute_with_context(): with with_injection_context(state): return self._execute_tool_impl(function_name, arguments) - # Use explicit executor lifecycle to actually bound execution time - executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + # Use reusable executor to prevent resource leaks + if not hasattr(self, '_tool_executor'): + self._tool_executor = concurrent.futures.ThreadPoolExecutor( + max_workers=2, thread_name_prefix=f"tool-{self.name}" + ) + + future = self._tool_executor.submit(ctx.run, execute_with_context) try: - future = executor.submit(ctx.run, execute_with_context) - try: - result = future.result(timeout=tool_timeout) - except concurrent.futures.TimeoutError: - # Cancel and shutdown immediately to avoid blocking - future.cancel() - executor.shutdown(wait=False, cancel_futures=True) - logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") - result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} - else: - # Normal completion - shutdown gracefully - executor.shutdown(wait=False) - finally: - # Ensure executor is always cleaned up - if not executor._shutdown: - executor.shutdown(wait=False) + result = future.result(timeout=tool_timeout) + except concurrent.futures.TimeoutError: + future.cancel() + logging.warning(f"Tool {function_name} timed out after {tool_timeout}s") + result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True} else: with with_injection_context(state): result = self._execute_tool_impl(function_name, arguments) diff --git a/src/praisonai-agents/praisonaiagents/plugins/__init__.py b/src/praisonai-agents/praisonaiagents/plugins/__init__.py index d1636f8d7..b428c440d 100644 --- a/src/praisonai-agents/praisonaiagents/plugins/__init__.py +++ b/src/praisonai-agents/praisonaiagents/plugins/__init__.py @@ -78,6 +78,9 @@ def my_plugin_func(hook_type, *args, **kwargs): # ============================================================================ # Global state for plugin system (lazy initialized) +import threading + +_plugins_lock = threading.Lock() _plugins_enabled: bool = False _enabled_plugin_names: list = None # None = all, list = specific @@ -108,8 +111,9 @@ def enable(plugins: list = None) -> None: """ global _plugins_enabled, _enabled_plugin_names - _plugins_enabled = True - _enabled_plugin_names = plugins # None = all, list = specific + with _plugins_lock: + _plugins_enabled = True + _enabled_plugin_names = plugins # None = all, list = specific # Get plugin manager and auto-discover from .manager import get_plugin_manager @@ -119,10 +123,14 @@ def enable(plugins: list = None) -> None: manager.auto_discover_plugins() manager.discover_entry_points() + # Snapshot the names under lock to avoid TOCTOU + with _plugins_lock: + target_plugins = list(_enabled_plugin_names) if _enabled_plugin_names is not None else None + # Enable specific plugins or all - if plugins is not None: + if target_plugins is not None: # Enable only specified plugins - for name in plugins: + for name in target_plugins: manager.enable(name) else: # Enable all discovered plugins @@ -158,8 +166,9 @@ def disable(plugins: list = None) -> None: manager.disable(name) else: # Disable all plugins - _plugins_enabled = False - _enabled_plugin_names = None + with _plugins_lock: + _plugins_enabled = False + _enabled_plugin_names = None for plugin_info in manager.list_plugins(): manager.disable(plugin_info.get("name", "")) @@ -225,10 +234,9 @@ def is_enabled(name: str = None) -> bool: Returns: True if enabled, False otherwise. """ - global _plugins_enabled - - if name is None: - return _plugins_enabled + with _plugins_lock: + if name is None: + return _plugins_enabled from .manager import get_plugin_manager manager = get_plugin_manager() diff --git a/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py b/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py index 0806f3b6d..c6c37c99b 100644 --- a/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py +++ b/src/praisonai-agents/tests/unit/agent/test_agent_concurrency.py @@ -106,3 +106,18 @@ def test_sync_acquire_release(self): # Sync acquire should work reg.acquire_sync("sync_agent") reg.release("sync_agent") + + @pytest.mark.asyncio + async def test_sync_acquire_running_loop_noop(self): + """Sync acquire in async context should fail fast without changing state.""" + from praisonaiagents.agent.concurrency import ConcurrencyRegistry + reg = ConcurrencyRegistry() + reg.set_limit("loop_agent", 1) + await reg.acquire("loop_agent") + with pytest.raises(RuntimeError, match="running event loop"): + reg.acquire_sync("loop_agent") + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(reg.acquire("loop_agent"), timeout=0.05) + reg.release("loop_agent") + await asyncio.wait_for(reg.acquire("loop_agent"), timeout=0.05) + reg.release("loop_agent")