Skip to content

Commit 6a90cc0

Browse files
fix: Async correctness - avoid asyncio.run() from sync context reachable by async callers (#1176)
* fix: async correctness in tool approval - prevent RuntimeError in async workflows * Add event loop detection in _check_tool_approval_sync() * Use thread pool fallback when already in async context * Prevents 'asyncio.run() cannot be called from a running event loop' crash * Maintains backward compatibility with sync usage * Follows existing pattern from registry.py and backends.py Fixes #1165 Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com> * fix: address reviewer feedback on async correctness fix - Add concurrent.futures import to top of agent.py (per Gemini review) - Create shared approval/utils.py to eliminate code duplication (per Qodo/Coderabbit) - Fix timeout configuration to respect agent's _approval_timeout semantics (per all reviewers) - Replace ThreadPoolExecutor context manager with proper shutdown handling (per Coderabbit) - Remove redundant loop.is_running() check (per Copilot) - Consolidate async-to-sync bridging logic across approval system Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com> --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
1 parent ffb9bd1 commit 6a90cc0

4 files changed

Lines changed: 92 additions & 23 deletions

File tree

src/praisonai-agents/praisonaiagents/agent/agent.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import asyncio
66
import contextlib
77
import threading
8+
import concurrent.futures
89
from typing import List, Optional, Any, Dict, Union, Literal, TYPE_CHECKING, Callable, Generator
910
from collections import OrderedDict
1011
import inspect
@@ -4918,7 +4919,6 @@ def _execute_tool_with_context(self, function_name, arguments, state, tool_call_
49184919
# P8/G11: Apply tool timeout if configured
49194920
tool_timeout = getattr(self, '_tool_timeout', None)
49204921
if tool_timeout and tool_timeout > 0:
4921-
import concurrent.futures
49224922
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
49234923
future = executor.submit(self._execute_tool_impl, function_name, arguments)
49244924
try:
@@ -5170,7 +5170,22 @@ def _check_tool_approval_sync(self, function_name, arguments):
51705170
if hasattr(backend, 'request_approval_sync'):
51715171
decision = backend.request_approval_sync(request)
51725172
else:
5173-
decision = asyncio.run(backend.request_approval(request))
5173+
# Use the shared utility to avoid code duplication and handle timeout correctly
5174+
from ..approval.utils import run_coroutine_safely
5175+
5176+
# Compute effective timeout from agent configuration
5177+
if cfg_timeout is None:
5178+
effective_timeout = None # indefinite wait
5179+
elif cfg_timeout > 0:
5180+
effective_timeout = cfg_timeout
5181+
else:
5182+
# cfg_timeout == 0: use backend default or fallback
5183+
effective_timeout = getattr(backend, '_timeout', 60)
5184+
5185+
decision = run_coroutine_safely(
5186+
backend.request_approval(request),
5187+
timeout=effective_timeout
5188+
)
51745189
finally:
51755190
if orig_timeout is not None and hasattr(backend, '_timeout'):
51765191
backend._timeout = orig_timeout

src/praisonai-agents/praisonaiagents/approval/backends.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -237,18 +237,8 @@ async def request_approval(self, request: ApprovalRequest) -> ApprovalDecision:
237237

238238
def request_approval_sync(self, request: ApprovalRequest) -> ApprovalDecision:
239239
"""Synchronous wrapper."""
240-
try:
241-
loop = asyncio.get_running_loop()
242-
except RuntimeError:
243-
loop = None
244-
245-
if loop and loop.is_running():
246-
import concurrent.futures
247-
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
248-
future = pool.submit(asyncio.run, self.request_approval(request))
249-
return future.result(timeout=60)
250-
else:
251-
return asyncio.run(self.request_approval(request))
240+
from .utils import run_coroutine_safely
241+
return run_coroutine_safely(self.request_approval(request), timeout=60)
252242

253243

254244
class CallbackBackend:

src/praisonai-agents/praisonaiagents/approval/registry.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,12 @@ def approve_sync(
210210
if hasattr(backend, "request_approval_sync"):
211211
decision = backend.request_approval_sync(request)
212212
else:
213-
# Fallback: run async method
214-
try:
215-
decision = asyncio.run(backend.request_approval(request))
216-
except RuntimeError:
217-
# Already in an event loop — fall back to thread
218-
import concurrent.futures
219-
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
220-
future = pool.submit(asyncio.run, backend.request_approval(request))
221-
decision = future.result(timeout=self.timeout)
213+
# Use shared utility for consistent async-to-sync bridging
214+
from .utils import run_coroutine_safely
215+
decision = run_coroutine_safely(
216+
backend.request_approval(request),
217+
timeout=self.timeout
218+
)
222219

223220
if decision.approved:
224221
self.mark_approved(tool_name)
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""
2+
Utility functions for approval handling.
3+
4+
Provides reusable async-to-sync bridging logic to prevent code duplication
5+
across the approval system.
6+
"""
7+
8+
import asyncio
9+
import concurrent.futures
10+
from typing import Any, Awaitable, Callable, Optional, TypeVar
11+
12+
T = TypeVar('T')
13+
14+
15+
def run_coroutine_safely(
16+
coro: Awaitable[T],
17+
timeout: Optional[float] = None
18+
) -> T:
19+
"""
20+
Run a coroutine safely, handling both sync and async contexts.
21+
22+
This function detects if an event loop is already running and uses a
23+
ThreadPoolExecutor as a fallback to avoid RuntimeError. It respects
24+
timeout semantics consistently across both code paths.
25+
26+
Args:
27+
coro: The coroutine to execute
28+
timeout: Timeout in seconds. None means indefinite wait.
29+
30+
Returns:
31+
The result of the coroutine
32+
33+
Raises:
34+
TimeoutError: If the operation times out
35+
Any exception raised by the coroutine
36+
"""
37+
try:
38+
loop = asyncio.get_running_loop()
39+
except RuntimeError:
40+
loop = None
41+
42+
if loop and loop.is_running():
43+
# We're in an async context - use thread pool to avoid RuntimeError
44+
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
45+
46+
# Wrap the coroutine with timeout handling inside the thread
47+
def run_with_timeout():
48+
if timeout is not None and timeout > 0:
49+
return asyncio.run(asyncio.wait_for(coro, timeout=timeout))
50+
else:
51+
return asyncio.run(coro)
52+
53+
future = pool.submit(run_with_timeout)
54+
try:
55+
# Don't use timeout on Future.result() since we handle timeout
56+
# inside the coroutine via asyncio.wait_for
57+
result = future.result(timeout=None if timeout is None or timeout == 0 else timeout)
58+
return result
59+
finally:
60+
# Properly shut down the executor without waiting for threads
61+
pool.shutdown(wait=False, cancel_futures=True)
62+
else:
63+
# No running event loop - use asyncio.run directly
64+
if timeout is not None and timeout > 0:
65+
return asyncio.run(asyncio.wait_for(coro, timeout=timeout))
66+
else:
67+
return asyncio.run(coro)

0 commit comments

Comments
 (0)