Skip to content

Wrapper architecture: deprecated event-loop API, fork-drift between sync/async schedulers, unsafe global singletons in jobs server #1770

@MervinPraison

Description

@MervinPraison

Scope

This issue documents three validated architectural gaps found in the wrapper package at src/praisonai/praisonai/. Each one directly contradicts a stated pillar of the PraisonAI philosophy (async-safe by default, no global singletons, 3-way feature surface, DRY), and each has a clear, minimal fix.

The findings below are not stylistic — they are correctness / safety issues that bite in production:

All file paths, line numbers and snippets below were read from the tree at claude/bold-bohr-MOE3G (HEAD d5f1114 Release v4.6.48).


1. Pervasive use of the deprecated asyncio.get_event_loop() — unsafe inside _BackgroundLoop, broken on Python 3.12+

Where it is

grep -rn "asyncio.get_event_loop()" src/praisonai/praisonai returns 36 call sites across 14 modules. The most damaging ones:

a. cli/commands/standardise.py:381 and :491 — sync entry path calling run_until_complete() on a non-running loop.

# src/praisonai/praisonai/cli/commands/standardise.py:381
        # Start runtime in background
        asyncio.get_event_loop().run_until_complete(runtime.start())
# src/praisonai/praisonai/cli/commands/standardise.py:491
        try:
            asyncio.get_event_loop().run_until_complete(runtime.stop())
        except Exception:
            pass

b. sandbox/e2b.py — 7 calls inside async def methods where get_running_loop() is the correct API.

# src/praisonai/praisonai/sandbox/e2b.py:96-105
    async def stop(self) -> None:
        """Stop/cleanup the sandbox environment."""
        if not self._is_running:
            return
        if self._sandbox:
            try:
                await asyncio.get_event_loop().run_in_executor(
                    None, self._sandbox.kill
                )
            except Exception as e:
                logger.warning(f"Failed to kill E2B sandbox: {e}")

Other identical patterns in e2b.py at lines 164, 222, 230, 236, 321, 339.

c. jobs/executor.py — 3 calls inside the async job runner.

# src/praisonai/praisonai/jobs/executor.py:285-290
        # Execute the recipe
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(
            None,
            lambda: execute_resolved_recipe(resolved)
        )

Identical pattern at lines 326 and 363.

d. Other affected modules (verified by grep):
bots/email.py:235, 475, 555 · gateway/server.py:897, 971, 2143 · api/agent_invoke.py:224, 363 · acp/server.py:420 · ui/callbacks.py:37 · recipe/operations.py:215 · cli/features/interactive_tools.py:255 · cli_backends/claude.py:177, 181 · browser/cli.py (multiple).

Why this is an architectural gap

  1. It's already broken on modern Python. asyncio.get_event_loop() has emitted DeprecationWarning since 3.10 when there is no running loop, and as of 3.12 it raises in that case. The two cli/commands/standardise.py call sites are pure sync entry points — there is no running loop — so on Python 3.12+ they raise on first invocation. The correct sync entry is asyncio.run(runtime.start()).

  2. It interacts dangerously with the wrapper's own background loop. praisonai/_async_bridge.py:19-80 defines _BackgroundLoop, a thread-bound asyncio loop that runs coroutines submitted from sync callers. Any code reached via run_sync() executes inside that background loop's thread; if helper code then calls asyncio.get_event_loop() from a different thread (e.g. a callback fired from a sync worker), get_event_loop() may attach a brand-new event loop to that thread or raise — both outcomes silently violate the "multi-agent + async safe by default" pillar.

  3. It defeats the protocol-driven, lightweight design. The whole point of _async_bridge.py is to centralize loop ownership. Each of these 36 sites re-derives a loop ad-hoc instead of using asyncio.get_running_loop() (inside async def) or asyncio.run() / run_sync() (at the sync boundary).

How to resolve

Two simple, mechanical rewrites — no API change:

Inside async def (most of the 36 sites): replace with asyncio.to_thread(...) (Python 3.9+, the modern equivalent of "off-load a sync call to a worker thread"):

- await asyncio.get_event_loop().run_in_executor(None, self._sandbox.kill)
+ await asyncio.to_thread(self._sandbox.kill)
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(None, lambda: execute_resolved_recipe(resolved))
+ result = await asyncio.to_thread(execute_resolved_recipe, resolved)

At sync entry points (cli/commands/standardise.py, etc.): use asyncio.run (or route through praisonai._async_bridge.run_sync if the wrapper's shared loop is needed):

- asyncio.get_event_loop().run_until_complete(runtime.start())
+ asyncio.run(runtime.start())

A CI guard makes this a one-time fix:

# Add to lint / pre-commit
! grep -rn "asyncio.get_event_loop()" src/praisonai/praisonai \
    --include="*.py" \
  && echo "OK: no deprecated get_event_loop() calls"

2. AsyncAgentScheduler is a degraded, drift-prone fork of AgentScheduler — no timeout, no budget cap, no YAML / recipe constructors

Where it is

Two near-parallel implementations:

  • src/praisonai/praisonai/scheduler/agent_scheduler.py — 554 lines, full-featured sync scheduler.
  • src/praisonai/praisonai/scheduler/async_agent_scheduler.py — 468 lines, async-native but the author left the safety-critical features as inline TODOs.

The drift, in the author's own words

# src/praisonai/praisonai/scheduler/async_agent_scheduler.py:65-95
class AsyncAgentScheduler:
    """
    Async-native scheduler for running PraisonAI agents periodically.
    ...
    - Timeout support (TODO: needs porting from sync version)
    - Budget tracking (TODO: needs porting from sync version)
    - YAML/recipe constructors (TODO: needs porting from sync version)
    """
    def __init__(
        self,
        agent,
        task: str,
        config: Optional[Dict[str, Any]] = None,
        on_success: Optional[Callable] = None,
        on_failure: Optional[Callable] = None,
        # TODO: Add these missing features from sync version:
        # timeout: Optional[int] = None,
        # max_cost: Optional[float] = 1.00
    ):
# src/praisonai/praisonai/scheduler/async_agent_scheduler.py:113-116
        # TODO: Add these missing features from sync version:
        # self.timeout = timeout
        # self.max_cost = max_cost
        # self._total_cost = 0.0
# src/praisonai/praisonai/scheduler/async_agent_scheduler.py:352-396 (excerpt)
    async def _execute_with_retry(self, max_retries: int):
        """Execute agent with retry logic.

        TODO: Port missing features from sync version:
        - Timeout support per execution
        - Budget tracking and limits
        - Daemon state updates (_update_state_if_daemon)
        """
        ...
        # TODO: Add budget check from sync version:
        # if self.max_cost and self._total_cost >= self.max_cost:
        #     logger.warning(f"Budget limit reached: ${self._total_cost:.4f} >= ${self.max_cost}")
        #     return
        ...
        # TODO: Add timeout support from sync version:
        # if self.timeout:
        #     result = await asyncio.wait_for(
        #         self._executor.execute(self.task),
        #         timeout=self.timeout
        #     )
        # else:
        result = await self._executor.execute(self.task)

Compare to the sync side which actually enforces both:

# src/praisonai/praisonai/scheduler/agent_scheduler.py:38-77 (excerpt)
    def __init__(
        self,
        agent,
        task: str,
        ...
        timeout: Optional[int] = None,
        max_cost: Optional[float] = 1.00
    ):
        self.timeout = timeout
        self.max_cost = max_cost
        ...
        self._total_cost = 0.0
        self._stats_lock = threading.Lock()
# src/praisonai/praisonai/scheduler/agent_scheduler.py:215-220 (excerpt)
    def _run_schedule(self, interval: int, max_retries: int):
        while not self._stop_event.is_set():
            # Check budget limit
            if self.max_cost and self._total_cost >= self._total_cost:
                ...

Why this is an architectural gap

  1. It violates the 3-way feature surface pillar. The promise is "every feature ships with CLI + YAML + Python". A user who picks the async API for a long-running 24/7 scheduler loses the budget ceiling and the per-run timeout — the two features that make scheduled agents safe to leave running. There is no warning at the API surface; the parameters simply do not exist on the async class.

  2. It violates DRY. ~1000 lines of scheduler logic split across two files where ~70% of the bodies (schedule parsing, retry policy, stats counters, daemon state, callbacks, success/failure dispatch) are identical. Every future bugfix has to be applied twice and tends to diverge — which is exactly what already happened to timeout and max_cost.

  3. It violates safe by default. An async user who follows the documented example (AsyncAgentScheduler(agent, task="…"), then await scheduler.start("hourly")) gets:

    • no per-execution timeout — a hung LLM call wedges the schedule indefinitely;
    • no budget cap — runaway cost is silent;
    • no daemon state file updates — praisonai scheduler list/status doesn't reflect async daemons correctly.

How to resolve

Lift the shared behaviour into a common base, then make the sync and async classes thin specializations of the I/O strategy only. Sketch:

# src/praisonai/praisonai/scheduler/base.py
class _SchedulerCore:
    """Framework-free scheduler state; no I/O, no loop assumptions."""
    def __init__(self, agent, task, *, timeout=None, max_cost=1.00,
                 on_success=None, on_failure=None, config=None):
        self.agent = agent
        self.task = task
        self.timeout = timeout
        self.max_cost = max_cost
        self.on_success = on_success
        self.on_failure = on_failure
        self.config = config or {}
        self._execution_count = 0
        self._success_count = 0
        self._failure_count = 0
        self._total_cost = 0.0

    def over_budget(self) -> bool:
        return bool(self.max_cost) and self._total_cost >= self.max_cost

    def record_success(self, result, cost=0.0) -> None:
        self._success_count += 1
        self._total_cost += cost

    def record_failure(self) -> None:
        self._failure_count += 1

Then AgentScheduler provides the threading + concurrent.futures timeout, and AsyncAgentScheduler provides the asyncio.wait_for timeout — both calling into the shared _SchedulerCore for accounting:

# src/praisonai/praisonai/scheduler/async_agent_scheduler.py (after refactor)
class AsyncAgentScheduler(_SchedulerCore):
    def __init__(self, agent, task, *, timeout=None, max_cost=1.00, **kw):
        super().__init__(agent, task, timeout=timeout, max_cost=max_cost, **kw)
        ...

    async def _execute_with_retry(self, max_retries):
        if self.over_budget():
            logger.warning("Budget reached: $%.4f >= $%s", self._total_cost, self.max_cost)
            return
        for attempt in range(max_retries):
            try:
                coro = self._executor.execute(self.task)
                result = (await asyncio.wait_for(coro, timeout=self.timeout)
                          if self.timeout else await coro)
                self.record_success(result, cost=self._estimate_cost(result))
                safe_call(self.on_success, result)
                return
            except (asyncio.TimeoutError, Exception) as e:
                ...

This:

  • restores async/sync feature parity (timeout + max_cost work identically);
  • removes ~300 lines of duplicated logic;
  • makes the next feature addition automatically apply to both classes.

3. jobs/server.py initialises shared JobStore / JobExecutor via unlocked global singletons — race on cold start

Where it is

# src/praisonai/praisonai/jobs/server.py:21-43
# Global instances (for single-process deployment)
_store: Optional[JobStore] = None
_executor: Optional[JobExecutor] = None


def get_store() -> JobStore:
    """Get or create the job store."""
    global _store
    if _store is None:
        _store = InMemoryJobStore(max_jobs=1000)
    return _store


def get_executor() -> JobExecutor:
    """Get or create the job executor."""
    global _executor
    if _executor is None:
        _executor = JobExecutor(
            store=get_store(),
            max_concurrent=int(os.environ.get("PRAISONAI_MAX_CONCURRENT_JOBS", "10")),
            default_timeout=int(os.environ.get("PRAISONAI_JOB_TIMEOUT", "3600"))
        )
    return _executor

get_store() and get_executor() are both called from the FastAPI lifespan and from request handlers (server.py:121, server.py:130-131):

# src/praisonai/praisonai/jobs/server.py:121
    jobs_router = create_router(get_store(), get_executor())
# src/praisonai/praisonai/jobs/server.py:130-131
    "store": get_store().__class__.__name__,
    "executor_stats": get_executor().get_stats()

Why this is an architectural gap

  1. It contains a textbook TOCTOU race. Two concurrent requests on a cold worker can both pass if _store is None: (or if _executor is None:) before either assignment lands, then each constructs its own instance. The "winner" overwrites the loser, but the loser's JobExecutor has already started its own background asyncio tasks via await executor.start() on the lifespan path — those tasks are now orphaned and writing to a detached InMemoryJobStore that no router holds. Submitted jobs land in the orphaned store and appear lost.

  2. The module's own comment admits the gap. Line 21: # Global instances (for single-process deployment). This directly contradicts the philosophy ("no global singletons", "multi-agent + async safe by default") and forecloses any multi-worker (uvicorn --workers N) deployment because each worker silently creates its own in-process store and jobs become routable to only one worker.

  3. The codebase already has the right pattern next door. src/praisonai/praisonai/cli_backends/registry.py does this correctly with a lock and double-checked locking:

    # cli_backends/registry.py (reference pattern)
    _registry_lock = threading.Lock()
    
    def _get_cli_backend_registry() -> PluginRegistry:
        global _cli_backend_registry
        if _cli_backend_registry is None:
            with _registry_lock:
                if _cli_backend_registry is None:
                    _cli_backend_registry = PluginRegistry(...)
        return _cli_backend_registry

    The jobs server simply hasn't adopted the same convention.

How to resolve

Two complementary changes — neither is invasive.

a. Make the lazy init atomic (eliminates the race today):

 # src/praisonai/praisonai/jobs/server.py
+import threading
+
 _store: Optional[JobStore] = None
 _executor: Optional[JobExecutor] = None
+_store_lock = threading.Lock()
+_executor_lock = threading.Lock()

 def get_store() -> JobStore:
     global _store
-    if _store is None:
-        _store = InMemoryJobStore(max_jobs=1000)
+    if _store is None:
+        with _store_lock:
+            if _store is None:
+                _store = InMemoryJobStore(max_jobs=1000)
     return _store

 def get_executor() -> JobExecutor:
     global _executor
-    if _executor is None:
-        _executor = JobExecutor(
-            store=get_store(),
-            max_concurrent=int(os.environ.get("PRAISONAI_MAX_CONCURRENT_JOBS", "10")),
-            default_timeout=int(os.environ.get("PRAISONAI_JOB_TIMEOUT", "3600"))
-        )
+    if _executor is None:
+        with _executor_lock:
+            if _executor is None:
+                _executor = JobExecutor(
+                    store=get_store(),
+                    max_concurrent=int(os.environ.get("PRAISONAI_MAX_CONCURRENT_JOBS", "10")),
+                    default_timeout=int(os.environ.get("PRAISONAI_JOB_TIMEOUT", "3600"))
+                )
     return _executor

b. Prefer dependency injection through app.state (eliminates the global, makes multi-tenant deployment possible). The factory create_app() already accepts store / executor parameters; stash them on app.state and have the router read from there instead of from module-level globals:

def create_app(store=None, executor=None, cors_origins=None) -> FastAPI:
    app = FastAPI(lifespan=lifespan)
    app.state.store = store or InMemoryJobStore(max_jobs=1000)
    app.state.executor = executor or JobExecutor(
        store=app.state.store,
        max_concurrent=int(os.environ.get("PRAISONAI_MAX_CONCURRENT_JOBS", "10")),
        default_timeout=int(os.environ.get("PRAISONAI_JOB_TIMEOUT", "3600")),
    )
    app.include_router(create_router_from_state())
    return app

After this, the _store / _executor module globals can be deleted entirely, satisfying the no global singletons pillar and unblocking horizontal scale.


Suggested ordering

  1. Main #3 first — smallest diff, eliminates a production race on the documented API.
  2. Github actions fix #1 next — mostly mechanical (get_event_loop()asyncio.to_thread / asyncio.run), guardable in CI.
  3. Merge pull request #1 from MervinPraison/develop #2 last — biggest refactor, but the only one that restores 3-way parity for scheduled agents.

All three are local to src/praisonai/praisonai/ and require no changes in praisonaiagents (the core SDK), so they fit cleanly under the heavy code lives in the wrapper layering rule.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingclaudeAuto-trigger Claude analysis

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions