Skip to content

Commit 1a20853

Browse files
committed
chore: refactor sampler runners
1 parent a26e110 commit 1a20853

15 files changed

+2635
-681
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""Run coroutines from synchronous sampler code.
2+
3+
This module centralizes the event-loop bridging used by sampler execution
4+
paths so synchronous orchestration can safely invoke async helpers in both
5+
normal scripts and already-running event loops.
6+
"""
7+
8+
import asyncio
9+
import threading
10+
from typing import Any, Callable
11+
12+
13+
def run_coroutine_from_sync(coroutine_factory: Callable[[], Any]) -> Any:
14+
"""Run an async workflow from synchronous code.
15+
16+
This helper executes the coroutine directly when no event loop is active.
17+
If the caller already runs inside an event loop, it executes the coroutine
18+
in a dedicated worker thread and re-raises any exception from that thread.
19+
20+
Args:
21+
coroutine_factory (Callable[[], Any]): Factory returning the coroutine
22+
to execute.
23+
24+
Returns:
25+
Any: The value returned by the coroutine.
26+
"""
27+
28+
try:
29+
asyncio.get_running_loop()
30+
except RuntimeError:
31+
return asyncio.run(coroutine_factory())
32+
33+
result: dict[str, Any] = {}
34+
error: dict[str, BaseException] = {}
35+
36+
def runner() -> None:
37+
try:
38+
result["value"] = asyncio.run(coroutine_factory())
39+
except BaseException as exc: # pragma: no cover - passthrough
40+
error["value"] = exc
41+
42+
thread = threading.Thread(target=runner, daemon=True)
43+
thread.start()
44+
thread.join()
45+
if "value" in error:
46+
raise error["value"]
47+
return result["value"]

0 commit comments

Comments
 (0)