Skip to content

Wrapper layer: async-sync bridge breaks in event loops, fractured plugin registries, framework logic leaking into core orchestrator #1762

@MervinPraison

Description

@MervinPraison

Scope

In-depth review of the wrapper layer at src/praisonai/praisonai/ against the stated philosophy (Simpler · More extensible · Faster · Agent-centric) and engineering principles (DRY, protocol-driven core, lazy imports, multi-agent + async safe by default, 3-way feature surface).

Out of scope: docs, tests, coverage, file sizes, line counts.

The three gaps below were validated by reading the actual source — file paths, line numbers, and snippets are taken verbatim from main.


1. AsyncSQLiteConversationStore sync wrappers raise RuntimeError inside any running event loop — async-safety claim is broken

Where

  • src/praisonai/praisonai/persistence/conversation/async_sqlite.py:123-127, 153-157, 179-183, 198-202, 257-261, 283-287, 343-347, 369-373, 383-386
  • src/praisonai/praisonai/_async_bridge.py:83-110

Evidence

async_sqlite.py exposes a "dual" API — every async method has a sync sibling that just calls run_sync(...):

# src/praisonai/praisonai/persistence/conversation/async_sqlite.py:123-127
def create_session(self, session: ConversationSession) -> ConversationSession:
    """Sync wrapper for create_session."""
    return run_sync(
        self.async_create_session(session)
    )

But run_sync explicitly refuses to run inside any running event loop:

# src/praisonai/praisonai/_async_bridge.py:102-110
try:
    asyncio.get_running_loop()
except RuntimeError:
    pass
else:
    raise RuntimeError(
        "run_sync() cannot be called from a running event loop; "
        "await the coroutine directly instead."
    )

Why it's a real gap

This violates the stated principle of "multi-agent + async safe by default":

  • The class advertises a sync API (create_session, get_session, list_sessions, add_message, …). Most callers treat it as a normal blocking store.
  • The moment such a caller lives inside an event loop — a FastAPI/Starlette handler, a Jupyter cell, the MCP server in mcp_server/, the WebSocket UI under ui_realtime/, an asyncio.run-driven script — every one of those methods raises RuntimeError.
  • The wrapper has 9+ entry points to this footgun and no protocol surface that tells the caller "this store cannot be used from inside an event loop." The sync surface is a lie in those contexts.
  • For purely sync callers, the design also forces async sqlite I/O onto a single background-thread event loop (_BackgroundLoop in _async_bridge.py:19-78), serialising every conversation-store call across every agent — the opposite of "multi-agent safe."

Proposed fix (before → after)

Before — single class trying to be both:

# persistence/conversation/async_sqlite.py
class AsyncSQLiteConversationStore(ConversationStore):
    async def async_create_session(self, session): ...

    def create_session(self, session):
        return run_sync(self.async_create_session(session))  # raises in any event loop

After — two stores, picked at construction, no hidden bridge:

# persistence/conversation/sqlite.py  (sync, blocking sqlite3)
class SyncSQLiteConversationStore(ConversationStore):
    def create_session(self, session: ConversationSession) -> ConversationSession:
        with self._conn:  # sqlite3 connection, per-call lock for multi-agent safety
            self._conn.execute(
                "INSERT INTO sessions (...) VALUES (...)", (...)
            )
        return session

# persistence/conversation/async_sqlite.py  (async-only, no sync wrappers)
class AsyncSQLiteConversationStore(ConversationStore):
    async def create_session(self, session: ConversationSession) -> ConversationSession:
        await self._conn.execute("INSERT INTO sessions (...) VALUES (...)", (...))
        await self._conn.commit()
        return session

# persistence/factory.py
def build_conversation_store(url: str, *, mode: Literal["sync","async"]):
    if mode == "async":
        return AsyncSQLiteConversationStore(url)
    return SyncSQLiteConversationStore(url)

And — separately — gate run_sync() so it can never be reached from a coroutine via the public API: keep it in _async_bridge.py for legacy CLI paths, but stop calling it from store/persistence classes that claim a sync surface.

This restores 3-way safety: CLI/YAML callers use SyncSQLiteConversationStore; the FastAPI/MCP/UI servers await the async store directly; nothing silently raises mid-request.


2. Four parallel adapter/integration subsystems with no unified registry — extension point is fractured

Where

  • src/praisonai/praisonai/framework_adapters/ (base.py, registry.py, crewai_adapter.py, autogen_adapter.py, praisonai_adapter.py, validators.py)
  • src/praisonai/praisonai/integrations/ (base.py, registry.py, claude_code.py, gemini_cli.py, codex_cli.py, cursor_cli.py, managed_agents.py, hosted_agent.py, local_agent.py, sandboxed_agent.py, …)
  • src/praisonai/praisonai/integration/ (host_app.py, gateway_host.py, context_files.py, _legacy_handlers.py, bridges/, pages/)
  • src/praisonai/praisonai/adapters/ (readers.py, rerankers.py, retrievers.py, vector_stores.py)

Evidence

integrations/ and framework_adapters/ each ship their own registry.py. integrations/__init__.py then implements a hand-maintained 70-line __getattr__ ladder:

# src/praisonai/praisonai/integrations/__init__.py:59-128
def __getattr__(name):
    """Lazy load integrations to minimize import overhead."""
    if name == 'BaseCLIIntegration':
        from .base import BaseCLIIntegration
        return BaseCLIIntegration
    elif name == 'CLIExecutionError':
        from .base import CLIExecutionError
        return CLIExecutionError
    elif name == 'ClaudeCodeIntegration':
        from .claude_code import ClaudeCodeIntegration
        return ClaudeCodeIntegration
    # ... 30+ more elif branches ...
    elif name == 'LocalAgentConfig':
        from .local_agent import LocalAgentConfig
        return LocalAgentConfig
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

Meanwhile adapters/ (RAG primitives: readers, rerankers, retrievers, vector_stores) and integration/ (UI host app, gateway, context files) live as separate top-level packages with their own discovery rules.

Why it's a real gap

  • Naming collisionintegration/ and integrations/ both exist at the same level. New contributors cannot tell which to extend (and several files in the codebase import from the wrong one — easy to do).
  • DRY violation — "lazy import + class lookup" pattern is implemented three different ways (manual __getattr__ ladder in integrations/, dataclass-based FrameworkAdapterRegistry in framework_adapters/registry.py, sibling ExternalAgentRegistry in integrations/registry.py).
  • No third-party extension point — there is no entry_points group or single register(...) API. Adding a new managed agent backend, a new framework, or a new vector store requires editing N files in N subsystems. This directly contradicts "More extensible" and the principle that "core protocols, heavy code in wrapper/tools."
  • Performance — every new integration adds an elif branch executed at import-time attribute lookup; the laziness is real but the dispatch table is O(n) per attribute access.

Proposed fix (before → after)

Before — manual ladder + multiple registries:

# integrations/__init__.py — 70 lines of elif branches
def __getattr__(name):
    if name == 'ClaudeCodeIntegration':
        from .claude_code import ClaudeCodeIntegration
        return ClaudeCodeIntegration
    elif name == ...:
        ...

After — one registry, data-driven, plugin-discoverable:

# src/praisonai/praisonai/_registry.py  (already exists — extend it)
@dataclass(frozen=True)
class PluginEntry:
    kind: Literal["framework", "cli_tool", "managed_agent", "vector_store", "reranker", ...]
    name: str
    module: str        # "praisonai.integrations.claude_code"
    attr: str          # "ClaudeCodeIntegration"

_BUILTINS: tuple[PluginEntry, ...] = (
    PluginEntry("cli_tool",       "claude_code",   "praisonai.integrations.claude_code",   "ClaudeCodeIntegration"),
    PluginEntry("cli_tool",       "gemini_cli",    "praisonai.integrations.gemini_cli",    "GeminiCLIIntegration"),
    PluginEntry("managed_agent",  "hosted",        "praisonai.integrations.hosted_agent",  "HostedAgent"),
    PluginEntry("framework",      "crewai",        "praisonai.framework_adapters.crewai_adapter",  "CrewAIAdapter"),
    # ...
)

def get(kind: str, name: str):
    entry = _resolve(kind, name)               # also scans entry_points("praisonai.plugins")
    mod = importlib.import_module(entry.module)
    return getattr(mod, entry.attr)

Then integrations/__init__.py collapses to:

def __getattr__(name: str):
    return _registry.get_by_attr(__name__, name)  # one line, table-driven

Same registry serves framework_adapters/, integrations/, adapters/ (RAG), db/ backends — DRY across all four. Third parties register via entry_points group praisonai.plugins. Rename integration/host/ (it's the UI host layer, not a plugin layer) to eliminate the name collision.


3. Framework-specific logic baked into generate_crew_and_kickoff — adapter pattern is violated by its own orchestrator

Where

src/praisonai/praisonai/agents_generator.py:641-688

Evidence

The core orchestrator dispatches via self.framework_adapter.run(...) — but immediately before that call, it hard-codes AutoGen version selection and AgentOps initialisation inline:

# src/praisonai/praisonai/agents_generator.py:641-688
framework = self.framework or config.get('framework', 'crewai')

# Determine AutoGen version if needed (keeping compatibility logic)
if framework == "autogen":
    autogen_version = os.environ.get("AUTOGEN_VERSION", "auto").lower()
    autogen_v4_adapter = self._get_framework_adapter("autogen_v4")
    autogen_v2_adapter = self._get_framework_adapter("autogen")

    use_v4 = False
    if autogen_version == "v0.4" and autogen_v4_adapter.is_available():
        use_v4 = True
    elif autogen_version == "v0.2" and autogen_v2_adapter.is_available():
        use_v4 = False
    elif autogen_version == "auto":
        use_v4 = autogen_v4_adapter.is_available()
    else:
        use_v4 = autogen_v4_adapter.is_available() and not autogen_v2_adapter.is_available()

    framework = "autogen_v4" if use_v4 else "autogen"

# Initialize AgentOps if available
try:
    import agentops
    agentops_api_key = os.getenv("AGENTOPS_API_KEY")
    if agentops_api_key:
        agentops.init(agentops_api_key, default_tags=[framework])
except ImportError:
    pass

# Update framework adapter if framework changed (e.g., AutoGen version selection)
if framework != self.framework:
    self.framework = framework
    self.framework_adapter = self._get_framework_adapter(framework)

# Validate framework availability for non-CLI callers
from .framework_adapters.validators import assert_framework_available
assert_framework_available(framework)

self.logger.info(f"Using framework: {framework}")
return self.framework_adapter.run(
    config,
    self.config_list,
    topic,
    tools_dict=tools_dict,
    agent_callback=getattr(self, 'agent_callback', None),
    task_callback=getattr(self, 'task_callback', None),
    cli_config=getattr(self, 'cli_config', None),
)

Why it's a real gap

The whole point of framework_adapters/ is that the orchestrator stays framework-agnostic. This block does the opposite:

  • The orchestrator hard-codes the string "autogen" and knows about AUTOGEN_VERSION, autogen_v4, v0.2, v0.4. Adding AutoGen v0.5 — or any framework with internal version routing (CrewAI 1.x vs 2.x, PraisonAIAgents minor versions) — requires editing the wrapper's core, not adding an adapter.
  • AgentOps init is a cross-cutting observability concern; placing it in the orchestrator means every other entry point (auto.py, scheduler, MCP server) either duplicates this block or silently drops AgentOps tagging. Confirmed: AGENTOPS_AVAILABLE is already imported at agents_generator.py:46 but the init call is duplicated inline rather than gated through one observability hook.
  • It contradicts the stated philosophy: "Core SDK: protocols/hooks/adapters only; heavy code in wrapper/tools." Today the wrapper's "core" knows the names of specific framework versions.
  • It blocks an async-native adapter (e.g., AutoGen v0.4 is async-first) because the orchestrator's surrounding code is sync and the version switch happens after sync-only setup.

Proposed fix (before → after)

Before — orchestrator does framework's job:

# agents_generator.py
if framework == "autogen":
    # ... 15 lines of version probing ...
    framework = "autogen_v4" if use_v4 else "autogen"

try:
    import agentops
    if os.getenv("AGENTOPS_API_KEY"):
        agentops.init(os.getenv("AGENTOPS_API_KEY"), default_tags=[framework])
except ImportError:
    pass

return self.framework_adapter.run(...)

After — adapter owns its own resolution + setup:

# framework_adapters/base.py
class FrameworkAdapter(Protocol):
    name: str
    def resolve(self) -> "FrameworkAdapter":
        """Pick the concrete adapter variant (e.g. autogen v0.2 vs v0.4)."""
        return self
    def setup(self, *, framework_tag: str) -> None:
        """Framework-specific pre-run hooks (observability, sdk init, etc.)."""
    def run(self, config, config_list, topic, **kwargs): ...

# framework_adapters/autogen_adapter.py
class AutoGenAdapter(FrameworkAdapter):
    name = "autogen"
    def resolve(self) -> FrameworkAdapter:
        v = os.environ.get("AUTOGEN_VERSION", "auto").lower()
        v4 = _AutoGenV4Adapter(); v2 = _AutoGenV2Adapter()
        if v == "v0.4" and v4.is_available(): return v4
        if v == "v0.2" and v2.is_available(): return v2
        if v == "auto" and v4.is_available(): return v4
        return v2 if v2.is_available() else v4

# observability/hooks.py  (one place, every entry point uses it)
def init_observability(framework_tag: str) -> None:
    if not AGENTOPS_AVAILABLE: return
    key = os.getenv("AGENTOPS_API_KEY")
    if key:
        import agentops
        agentops.init(key, default_tags=[framework_tag])

# agents_generator.py — framework-agnostic again
adapter = self._get_framework_adapter(framework).resolve()
init_observability(adapter.name)
adapter.setup(framework_tag=adapter.name)
assert_framework_available(adapter.name)
return adapter.run(config, self.config_list, topic, ...)

After this change, the orchestrator no longer names any framework; the same call site supports new frameworks/versions purely by adding an adapter — restoring the stated "protocols/hooks/adapters only" boundary.


Summary

# Gap Principle violated Where
1 AsyncSQLiteConversationStore sync wrappers raise inside any event loop; serialise all multi-agent traffic through one background thread Multi-agent + async safe by default persistence/conversation/async_sqlite.py:123-373, _async_bridge.py:83-110
2 Four parallel adapter/integration subsystems (integration/, integrations/, adapters/, framework_adapters/) with two registries, a 70-line manual __getattr__ ladder, and no third-party extension point DRY, More extensible, protocol-driven core integrations/__init__.py:59-128 + sibling registries
3 generate_crew_and_kickoff hard-codes AutoGen version routing and AgentOps init inline, defeating the adapter pattern Protocols/hooks/adapters only in core; 3-way feature surface agents_generator.py:641-688

Each fix is self-contained and can ship independently. Happy to break these out into separate PRs.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingclaudeAuto-trigger Claude analysisdocumentationImprovements or additions to documentationperformance

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions