Skip to content

Commit e9d8602

Browse files
fix: concurrency safety gaps in Core SDK
- Replace unsafe asyncio.Semaphore private attribute manipulation with proper sync-to-async bridge in concurrency.py - Fix ThreadPoolExecutor resource leak by using reusable agent-level executor in tool_execution.py - Add thread lock protection for unprotected plugin state in plugins/__init__.py Fixes #1458 Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com>
1 parent e1642ee commit e9d8602

3 files changed

Lines changed: 48 additions & 40 deletions

File tree

src/praisonai-agents/praisonaiagents/agent/concurrency.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -86,26 +86,32 @@ async def acquire(self, agent_name: str) -> None:
8686
def acquire_sync(self, agent_name: str) -> None:
8787
"""Synchronous acquire — for non-async code paths.
8888
89-
Note: This creates/reuses an event loop internally.
89+
Uses a proper sync-to-async bridge to avoid private attribute manipulation.
9090
Prefer async acquire() when possible.
9191
"""
9292
sem = self._get_semaphore(agent_name)
9393
if sem is None:
9494
return
9595
try:
9696
asyncio.get_running_loop()
97-
# If we're in an async context, we can't block
98-
# Just try_acquire or no-op with warning
99-
if not sem._value > 0:
100-
logger.warning(
101-
f"Sync acquire for '{agent_name}' while async loop running and semaphore full. "
102-
f"Consider using async acquire() instead."
103-
)
104-
# Decrement manually for sync context
105-
sem._value = max(0, sem._value - 1)
97+
# We're inside an async loop — cannot block. Run acquire in a thread
98+
# with its own loop to go through the semaphore's proper acquire path.
99+
import concurrent.futures
100+
def _acquire_in_new_loop():
101+
loop = asyncio.new_event_loop()
102+
try:
103+
loop.run_until_complete(sem.acquire())
104+
finally:
105+
loop.close()
106+
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
107+
pool.submit(_acquire_in_new_loop).result(timeout=30)
106108
except RuntimeError:
107-
# No running loop — safe to use asyncio.run
108-
asyncio.get_event_loop().run_until_complete(sem.acquire())
109+
# No running loop — safe to create one
110+
loop = asyncio.new_event_loop()
111+
try:
112+
loop.run_until_complete(sem.acquire())
113+
finally:
114+
loop.close()
109115

110116
def release(self, agent_name: str) -> None:
111117
"""Release concurrency slot for agent. No-op if unlimited."""

src/praisonai-agents/praisonaiagents/agent/tool_execution.py

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -202,25 +202,19 @@ def execute_with_context():
202202
with with_injection_context(state):
203203
return self._execute_tool_impl(function_name, arguments)
204204

205-
# Use explicit executor lifecycle to actually bound execution time
206-
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
205+
# Use reusable executor to prevent resource leaks
206+
if not hasattr(self, '_tool_executor'):
207+
self._tool_executor = concurrent.futures.ThreadPoolExecutor(
208+
max_workers=2, thread_name_prefix=f"tool-{self.name}"
209+
)
210+
211+
future = self._tool_executor.submit(ctx.run, execute_with_context)
207212
try:
208-
future = executor.submit(ctx.run, execute_with_context)
209-
try:
210-
result = future.result(timeout=tool_timeout)
211-
except concurrent.futures.TimeoutError:
212-
# Cancel and shutdown immediately to avoid blocking
213-
future.cancel()
214-
executor.shutdown(wait=False, cancel_futures=True)
215-
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
216-
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
217-
else:
218-
# Normal completion - shutdown gracefully
219-
executor.shutdown(wait=False)
220-
finally:
221-
# Ensure executor is always cleaned up
222-
if not executor._shutdown:
223-
executor.shutdown(wait=False)
213+
result = future.result(timeout=tool_timeout)
214+
except concurrent.futures.TimeoutError:
215+
future.cancel()
216+
logging.warning(f"Tool {function_name} timed out after {tool_timeout}s")
217+
result = {"error": f"Tool timed out after {tool_timeout}s", "timeout": True}
224218
else:
225219
with with_injection_context(state):
226220
result = self._execute_tool_impl(function_name, arguments)

src/praisonai-agents/praisonaiagents/plugins/__init__.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ def my_plugin_func(hook_type, *args, **kwargs):
7878
# ============================================================================
7979

8080
# Global state for plugin system (lazy initialized)
81+
import threading
82+
83+
_plugins_lock = threading.Lock()
8184
_plugins_enabled: bool = False
8285
_enabled_plugin_names: list = None # None = all, list = specific
8386

@@ -108,8 +111,9 @@ def enable(plugins: list = None) -> None:
108111
"""
109112
global _plugins_enabled, _enabled_plugin_names
110113

111-
_plugins_enabled = True
112-
_enabled_plugin_names = plugins # None = all, list = specific
114+
with _plugins_lock:
115+
_plugins_enabled = True
116+
_enabled_plugin_names = plugins # None = all, list = specific
113117

114118
# Get plugin manager and auto-discover
115119
from .manager import get_plugin_manager
@@ -119,10 +123,14 @@ def enable(plugins: list = None) -> None:
119123
manager.auto_discover_plugins()
120124
manager.discover_entry_points()
121125

126+
# Snapshot the names under lock to avoid TOCTOU
127+
with _plugins_lock:
128+
target_plugins = list(_enabled_plugin_names) if _enabled_plugin_names is not None else None
129+
122130
# Enable specific plugins or all
123-
if plugins is not None:
131+
if target_plugins is not None:
124132
# Enable only specified plugins
125-
for name in plugins:
133+
for name in target_plugins:
126134
manager.enable(name)
127135
else:
128136
# Enable all discovered plugins
@@ -158,8 +166,9 @@ def disable(plugins: list = None) -> None:
158166
manager.disable(name)
159167
else:
160168
# Disable all plugins
161-
_plugins_enabled = False
162-
_enabled_plugin_names = None
169+
with _plugins_lock:
170+
_plugins_enabled = False
171+
_enabled_plugin_names = None
163172
for plugin_info in manager.list_plugins():
164173
manager.disable(plugin_info.get("name", ""))
165174

@@ -225,10 +234,9 @@ def is_enabled(name: str = None) -> bool:
225234
Returns:
226235
True if enabled, False otherwise.
227236
"""
228-
global _plugins_enabled
229-
230-
if name is None:
231-
return _plugins_enabled
237+
with _plugins_lock:
238+
if name is None:
239+
return _plugins_enabled
232240

233241
from .manager import get_plugin_manager
234242
manager = get_plugin_manager()

0 commit comments

Comments
 (0)