-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_runner.py
More file actions
50 lines (38 loc) · 1.51 KB
/
async_runner.py
File metadata and controls
50 lines (38 loc) · 1.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
"""Run coroutines from synchronous sampler code.
This module centralizes the event-loop bridging used by sampler execution
paths so synchronous orchestration can safely invoke async helpers in both
normal scripts and already-running event loops.
"""
import asyncio
import threading
from typing import Any, Callable, NoReturn
def run_coroutine_from_sync(coroutine_factory: Callable[[], Any]) -> Any:
"""Run an async workflow from synchronous code.
This helper executes the coroutine directly when no event loop is active.
If the caller already runs inside an event loop, it executes the coroutine
in a dedicated worker thread and re-raises any exception from that thread.
Args:
coroutine_factory (Callable[[], Any]): Factory returning the coroutine
to execute.
Returns:
Any: The value returned by the coroutine.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coroutine_factory())
result: dict[str, Any] = {}
error: dict[str, BaseException] = {}
def runner() -> None:
try:
result["value"] = asyncio.run(coroutine_factory())
except BaseException as exc: # pragma: no cover - passthrough
error["value"] = exc
def raise_worker_error(exc: BaseException) -> NoReturn:
raise exc
thread = threading.Thread(target=runner, daemon=True)
thread.start()
thread.join()
if "value" in error:
raise_worker_error(error["value"])
return result["value"]