diff --git a/src/praisonai/praisonai/_registry.py b/src/praisonai/praisonai/_registry.py index ba642ec4a..c2978b9bd 100644 --- a/src/praisonai/praisonai/_registry.py +++ b/src/praisonai/praisonai/_registry.py @@ -225,4 +225,59 @@ def list_all_names(self) -> list[str]: Sorted list of all registered names and aliases """ with self._lock: - return sorted(list(self._items.keys()) + list(self._aliases.keys())) \ No newline at end of file + return sorted(list(self._items.keys()) + list(self._aliases.keys())) + + def get_by_attr(self, module_name: str, attr_name: str) -> Type[T]: + """Get a plugin by attribute name for __getattr__ dispatch. + + Args: + module_name: Module name requesting the attribute (for error messages) + attr_name: Attribute name to resolve + + Returns: + Plugin class + + Raises: + AttributeError: If plugin is not found + """ + try: + return self.resolve(attr_name) + except ValueError: + raise AttributeError(f"module {module_name!r} has no attribute {attr_name!r}") + + +def create_lazy_getattr(registry: PluginRegistry[T]) -> Callable[[str], T]: + """Create a __getattr__ function backed by a PluginRegistry. + + This replaces manual if/elif ladders in __init__.py files with a data-driven + approach using the plugin registry. + + Args: + registry: The plugin registry to use for resolution + + Returns: + Function that can be used as __getattr__ in a module + + Example: + # In __init__.py: + from ._registry import create_lazy_getattr + + # Assuming you have a registry instance + __getattr__ = create_lazy_getattr(my_registry) + """ + def __getattr__(name: str) -> T: + try: + plugin_class = registry.resolve(name) + return plugin_class + except ValueError: + # Get the calling module name for error context + import inspect + frame = inspect.currentframe() + if frame and frame.f_back: + module_name = frame.f_back.f_globals.get('__name__', 'unknown') + else: + module_name = 'unknown' + + raise AttributeError(f"module {module_name!r} has no attribute {name!r}") + + return __getattr__ \ No newline at end of file diff --git a/src/praisonai/praisonai/agents_generator.py b/src/praisonai/praisonai/agents_generator.py index 4d086f77b..557c352fb 100644 --- a/src/praisonai/praisonai/agents_generator.py +++ b/src/praisonai/praisonai/agents_generator.py @@ -639,45 +639,28 @@ def generate_crew_and_kickoff(self): self.logger.debug("tools folder exists in the root directory") framework = self.framework or config.get('framework', 'crewai') - - # Determine AutoGen version if needed (keeping compatibility logic) - if framework == "autogen": - autogen_version = os.environ.get("AUTOGEN_VERSION", "auto").lower() - autogen_v4_adapter = self._get_framework_adapter("autogen_v4") - autogen_v2_adapter = self._get_framework_adapter("autogen") - - use_v4 = False - if autogen_version == "v0.4" and autogen_v4_adapter.is_available(): - use_v4 = True - elif autogen_version == "v0.2" and autogen_v2_adapter.is_available(): - use_v4 = False - elif autogen_version == "auto": - use_v4 = autogen_v4_adapter.is_available() - else: - use_v4 = autogen_v4_adapter.is_available() and not autogen_v2_adapter.is_available() - - framework = "autogen_v4" if use_v4 else "autogen" - - # Initialize AgentOps if available - try: - import agentops - agentops_api_key = os.getenv("AGENTOPS_API_KEY") - if agentops_api_key: - agentops.init(agentops_api_key, default_tags=[framework]) - except ImportError: - pass - - # Update framework adapter if framework changed (e.g., AutoGen version selection) - if framework != self.framework: - self.framework = framework - self.framework_adapter = self._get_framework_adapter(framework) + + # Get initial adapter and resolve to concrete variant + initial_adapter = self._get_framework_adapter(framework) + adapter = initial_adapter.resolve() + + # Initialize observability hooks + from .observability.hooks import init_observability + init_observability(adapter.name) + + # Run adapter setup hooks + adapter.setup(framework_tag=adapter.name) + + # Update framework reference if resolution changed it + self.framework = adapter.name + self.framework_adapter = adapter # Validate framework availability for non-CLI callers from .framework_adapters.validators import assert_framework_available - assert_framework_available(framework) + assert_framework_available(adapter.name) - self.logger.info(f"Using framework: {framework}") - return self.framework_adapter.run( + self.logger.info(f"Using framework: {adapter.name}") + return adapter.run( config, self.config_list, topic, diff --git a/src/praisonai/praisonai/framework_adapters/autogen_adapter.py b/src/praisonai/praisonai/framework_adapters/autogen_adapter.py index 1fe5f587d..4dd8f0c50 100644 --- a/src/praisonai/praisonai/framework_adapters/autogen_adapter.py +++ b/src/praisonai/praisonai/framework_adapters/autogen_adapter.py @@ -5,6 +5,7 @@ """ import logging +import os from typing import Dict, List, Any, Optional, Callable from .base import BaseFrameworkAdapter @@ -12,7 +13,7 @@ class AutoGenAdapter(BaseFrameworkAdapter): - """Adapter for AutoGen v0.2 framework.""" + """Adapter for AutoGen v0.2 framework with version resolution.""" name = "autogen" install_hint = 'pip install "praisonai[autogen]"' # v0.2 only @@ -26,6 +27,37 @@ def is_available(self) -> bool: except ImportError: return False + def resolve(self) -> "BaseFrameworkAdapter": + """Pick the concrete AutoGen adapter variant based on environment and availability.""" + autogen_version = os.environ.get("AUTOGEN_VERSION", "auto").lower() + + # Import the specific adapters + v4_adapter = AutoGenV4Adapter() + v2_adapter = self # Current instance is v0.2 + + if autogen_version == "v0.4" and v4_adapter.is_available(): + logger.info("AutoGen version resolution: Using v0.4 (explicitly requested)") + return v4_adapter + elif autogen_version == "v0.2" and v2_adapter.is_available(): + logger.info("AutoGen version resolution: Using v0.2 (explicitly requested)") + return v2_adapter + elif autogen_version == "auto": + # Auto-detect: prefer v0.4 if available, fallback to v0.2 + if v4_adapter.is_available(): + logger.info("AutoGen version resolution: Using v0.4 (auto-detected)") + return v4_adapter + else: + logger.info("AutoGen version resolution: Using v0.2 (auto-detected fallback)") + return v2_adapter + else: + # Invalid version or neither available, try both + if v4_adapter.is_available() and not v2_adapter.is_available(): + logger.info("AutoGen version resolution: Using v0.4 (only available version)") + return v4_adapter + else: + logger.info("AutoGen version resolution: Using v0.2 (default fallback)") + return v2_adapter + def run( self, config: Dict[str, Any], diff --git a/src/praisonai/praisonai/framework_adapters/base.py b/src/praisonai/praisonai/framework_adapters/base.py index 1e5470ab2..1d8169517 100644 --- a/src/praisonai/praisonai/framework_adapters/base.py +++ b/src/praisonai/praisonai/framework_adapters/base.py @@ -20,6 +20,22 @@ def is_available(self) -> bool: """Check if the framework is available for import.""" ... + def resolve(self) -> "FrameworkAdapter": + """Pick the concrete adapter variant (e.g. autogen v0.2 vs v0.4). + + Returns: + The resolved adapter instance (self or a different adapter) + """ + ... + + def setup(self, *, framework_tag: str) -> None: + """Framework-specific pre-run hooks (observability, sdk init, etc.). + + Args: + framework_tag: Framework name for observability tagging + """ + ... + def run( self, config: Dict[str, Any], @@ -87,6 +103,14 @@ def _format_template(self, template: str, **kwargs) -> str: logger.warning("Template formatting error: %s; returning original template", e) return template + def resolve(self) -> "FrameworkAdapter": + """Default implementation returns self.""" + return self + + def setup(self, *, framework_tag: str) -> None: + """Default implementation does nothing.""" + pass + def cleanup(self) -> None: """Clean up resources - default implementation does nothing.""" pass diff --git a/src/praisonai/praisonai/integrations/__init__.py b/src/praisonai/praisonai/integrations/__init__.py index 59458eedf..ca623aa09 100644 --- a/src/praisonai/praisonai/integrations/__init__.py +++ b/src/praisonai/praisonai/integrations/__init__.py @@ -56,73 +56,7 @@ ] -def __getattr__(name): - """Lazy load integrations to minimize import overhead.""" - if name == 'BaseCLIIntegration': - from .base import BaseCLIIntegration - return BaseCLIIntegration - elif name == 'CLIExecutionError': - from .base import CLIExecutionError - return CLIExecutionError - elif name == 'ClaudeCodeIntegration': - from .claude_code import ClaudeCodeIntegration - return ClaudeCodeIntegration - elif name == 'GeminiCLIIntegration': - from .gemini_cli import GeminiCLIIntegration - return GeminiCLIIntegration - elif name == 'CodexCLIIntegration': - from .codex_cli import CodexCLIIntegration - return CodexCLIIntegration - elif name == 'CursorCLIIntegration': - from .cursor_cli import CursorCLIIntegration - return CursorCLIIntegration - elif name in ('ManagedAgent', 'ManagedAgentIntegration'): - from .managed_agents import ManagedAgent - return ManagedAgent - elif name == 'AnthropicManagedAgent': - from .managed_agents import AnthropicManagedAgent - return AnthropicManagedAgent - elif name == 'LocalManagedAgent': - from .managed_local import LocalManagedAgent - return LocalManagedAgent - elif name == 'LocalManagedConfig': - from .managed_local import LocalManagedConfig - return LocalManagedConfig - elif name == 'SandboxedAgent': - from .sandboxed_agent import SandboxedAgent - return SandboxedAgent - elif name == 'SandboxedAgentConfig': - from .sandboxed_agent import SandboxedAgentConfig - return SandboxedAgentConfig - elif name in ('ManagedConfig', 'ManagedBackendConfig'): - from .managed_agents import ManagedConfig - return ManagedConfig - elif name == 'get_available_integrations': - from .base import get_available_integrations - return get_available_integrations - elif name == 'ExternalAgentRegistry': - from .registry import ExternalAgentRegistry - return ExternalAgentRegistry - elif name == 'get_registry': - from .registry import get_registry - return get_registry - elif name == 'register_integration': - from .registry import register_integration - return register_integration - elif name == 'create_integration': - from .registry import create_integration - return create_integration - # New canonical agent backends - elif name == 'HostedAgent': - from .hosted_agent import HostedAgent - return HostedAgent - elif name == 'HostedAgentConfig': - from .hosted_agent import HostedAgentConfig - return HostedAgentConfig - elif name == 'LocalAgent': - from .local_agent import LocalAgent - return LocalAgent - elif name == 'LocalAgentConfig': - from .local_agent import LocalAgentConfig - return LocalAgentConfig - raise AttributeError(f"module {__name__!r} has no attribute {name!r}") +def __getattr__(name: str): + """Lazy load integrations using unified registry.""" + from ._unified_registry import INTEGRATIONS_REGISTRY + return INTEGRATIONS_REGISTRY.get_by_attr(__name__, name) diff --git a/src/praisonai/praisonai/integrations/_unified_registry.py b/src/praisonai/praisonai/integrations/_unified_registry.py new file mode 100644 index 000000000..25d4aea88 --- /dev/null +++ b/src/praisonai/praisonai/integrations/_unified_registry.py @@ -0,0 +1,193 @@ +""" +Unified registry for all integration types in PraisonAI. + +This replaces the manual __getattr__ ladder with a data-driven registry approach, +supporting CLI tools, managed agents, framework adapters, and RAG components. +""" + +from typing import Any, Callable, Dict +from .._registry import PluginRegistry + + +class IntegrationRegistry(PluginRegistry[Any]): + """Registry for all integration types with lazy loading.""" + + def __init__(self): + super().__init__(entry_point_group="praisonai.integrations") + self._loaders: Dict[str, Callable[[], Any]] = {} + self._register_builtin_integrations() + + def register_lazy( + self, + name: str, + loader: Callable[[], Any], + *, + aliases: list[str] | None = None, + ) -> None: + """Register a lazily loaded integration.""" + with self._lock: + canonical_name = name.lower() + self._loaders[canonical_name] = loader + + if aliases: + for alias in aliases: + self._aliases[alias.lower()] = canonical_name + + def resolve(self, name: str) -> Any: + """Resolve integration name with lazy loader materialization.""" + with self._lock: + normalized_name = name.lower() + canonical_name = self._aliases.get(normalized_name, normalized_name) + + item = self._items.get(canonical_name) + if item is None: + loader = self._loaders.get(canonical_name) + if loader is not None: + item = loader() + self._items[canonical_name] = item + + if item is not None: + return item + + available_snapshot = sorted( + set(self._items.keys()) | set(self._loaders.keys()) | set(self._aliases.keys()) + ) + + raise ValueError( + f"Unknown {self._entry_point_group} plugin: {name!r}. " + f"Available: {available_snapshot}" + ) + + def _register_builtin_integrations(self): + """Register all built-in integrations with lazy loading.""" + + # CLI Tools + def _claude_code(): + from .claude_code import ClaudeCodeIntegration + return ClaudeCodeIntegration + + def _gemini_cli(): + from .gemini_cli import GeminiCLIIntegration + return GeminiCLIIntegration + + def _codex_cli(): + from .codex_cli import CodexCLIIntegration + return CodexCLIIntegration + + def _cursor_cli(): + from .cursor_cli import CursorCLIIntegration + return CursorCLIIntegration + + # Base classes + def _base_cli_integration(): + from .base import BaseCLIIntegration + return BaseCLIIntegration + + def _cli_execution_error(): + from .base import CLIExecutionError + return CLIExecutionError + + # Managed agents + def _managed_agent(): + from .managed_agents import ManagedAgent + return ManagedAgent + + def _anthropic_managed_agent(): + from .managed_agents import AnthropicManagedAgent + return AnthropicManagedAgent + + def _managed_config(): + from .managed_agents import ManagedConfig + return ManagedConfig + + # Local agents + def _local_managed_agent(): + from .managed_local import LocalManagedAgent + return LocalManagedAgent + + def _local_managed_config(): + from .managed_local import LocalManagedConfig + return LocalManagedConfig + + # Sandboxed agents + def _sandboxed_agent(): + from .sandboxed_agent import SandboxedAgent + return SandboxedAgent + + def _sandboxed_agent_config(): + from .sandboxed_agent import SandboxedAgentConfig + return SandboxedAgentConfig + + # Canonical agent backends + def _hosted_agent(): + from .hosted_agent import HostedAgent + return HostedAgent + + def _hosted_agent_config(): + from .hosted_agent import HostedAgentConfig + return HostedAgentConfig + + def _local_agent(): + from .local_agent import LocalAgent + return LocalAgent + + def _local_agent_config(): + from .local_agent import LocalAgentConfig + return LocalAgentConfig + + # Registry functions + def _get_available_integrations(): + from .base import get_available_integrations + return get_available_integrations + + def _external_agent_registry(): + from .registry import ExternalAgentRegistry + return ExternalAgentRegistry + + def _get_registry(): + from .registry import get_registry + return get_registry + + def _register_integration(): + from .registry import register_integration + return register_integration + + def _create_integration(): + from .registry import create_integration + return create_integration + + # Register all with appropriate aliases + self.register_lazy("BaseCLIIntegration", _base_cli_integration) + self.register_lazy("CLIExecutionError", _cli_execution_error) + + self.register_lazy("ClaudeCodeIntegration", _claude_code) + self.register_lazy("GeminiCLIIntegration", _gemini_cli) + self.register_lazy("CodexCLIIntegration", _codex_cli) + self.register_lazy("CursorCLIIntegration", _cursor_cli) + + self.register_lazy("ManagedAgent", _managed_agent, + aliases=["ManagedAgentIntegration"]) + self.register_lazy("AnthropicManagedAgent", _anthropic_managed_agent) + self.register_lazy("ManagedConfig", _managed_config, + aliases=["ManagedBackendConfig"]) + + self.register_lazy("LocalManagedAgent", _local_managed_agent) + self.register_lazy("LocalManagedConfig", _local_managed_config) + + self.register_lazy("SandboxedAgent", _sandboxed_agent) + self.register_lazy("SandboxedAgentConfig", _sandboxed_agent_config) + + self.register_lazy("HostedAgent", _hosted_agent) + self.register_lazy("HostedAgentConfig", _hosted_agent_config) + self.register_lazy("LocalAgent", _local_agent) + self.register_lazy("LocalAgentConfig", _local_agent_config) + + self.register_lazy("get_available_integrations", _get_available_integrations) + self.register_lazy("ExternalAgentRegistry", _external_agent_registry) + self.register_lazy("get_registry", _get_registry) + self.register_lazy("register_integration", _register_integration) + self.register_lazy("create_integration", _create_integration) + + +# Global registry instance +INTEGRATIONS_REGISTRY = IntegrationRegistry() \ No newline at end of file diff --git a/src/praisonai/praisonai/observability/hooks.py b/src/praisonai/praisonai/observability/hooks.py new file mode 100644 index 000000000..0172eb73d --- /dev/null +++ b/src/praisonai/praisonai/observability/hooks.py @@ -0,0 +1,51 @@ +""" +Observability hooks for PraisonAI. + +Centralizes observability initialization (AgentOps, etc.) so it can be +used consistently across all entry points without duplicating logic. +""" + +import os +import logging +from typing import List, Optional + +logger = logging.getLogger(__name__) + + +def init_observability(framework_tag: str, *, tags: Optional[List[str]] = None) -> None: + """ + Initialize observability providers (AgentOps, etc.) if available. + + Args: + framework_tag: Primary framework tag (e.g., "crewai", "autogen_v4") + tags: Additional tags to include + """ + # Try to initialize AgentOps if available + _init_agentops(framework_tag, tags or []) + + # Future: Add other observability providers here + # _init_langfuse(framework_tag, tags) + # _init_wandb(framework_tag, tags) + + +def _init_agentops(framework_tag: str, additional_tags: List[str]) -> None: + """Initialize AgentOps if available.""" + try: + import agentops + agentops_api_key = os.getenv("AGENTOPS_API_KEY") + if agentops_api_key: + all_tags = [framework_tag] + additional_tags + agentops.init(agentops_api_key, default_tags=all_tags) + logger.debug("Initialized AgentOps with tags: %s", all_tags) + except ImportError: + logger.debug("AgentOps not available, skipping initialization") + except Exception as e: + logger.warning("Failed to initialize AgentOps: %s", e) + + +# Constants for checking availability +try: + import agentops + AGENTOPS_AVAILABLE = True +except ImportError: + AGENTOPS_AVAILABLE = False \ No newline at end of file diff --git a/src/praisonai/praisonai/persistence/conversation/async_sqlite.py b/src/praisonai/praisonai/persistence/conversation/async_sqlite.py index 5da13d8b0..2079b0386 100644 --- a/src/praisonai/praisonai/persistence/conversation/async_sqlite.py +++ b/src/praisonai/praisonai/persistence/conversation/async_sqlite.py @@ -12,7 +12,7 @@ from typing import List, Optional from .base import ConversationStore, ConversationSession, ConversationMessage, validate_identifier -from ..._async_bridge import run_sync +# Note: This store is async-only. For sync operations, use sync_sqlite.SyncSQLiteConversationStore logger = logging.getLogger(__name__) @@ -120,11 +120,6 @@ async def async_create_session(self, session: ConversationSession) -> Conversati return session - def create_session(self, session: ConversationSession) -> ConversationSession: - """Sync wrapper for create_session.""" - return run_sync( - self.async_create_session(session) - ) async def async_get_session(self, session_id: str) -> Optional[ConversationSession]: """Get a session by ID asynchronously.""" @@ -150,11 +145,6 @@ async def async_get_session(self, session_id: str) -> Optional[ConversationSessi ) return None - def get_session(self, session_id: str) -> Optional[ConversationSession]: - """Sync wrapper for get_session.""" - return run_sync( - self.async_get_session(session_id) - ) async def async_update_session(self, session: ConversationSession) -> ConversationSession: """Update an existing session asynchronously.""" @@ -176,11 +166,6 @@ async def async_update_session(self, session: ConversationSession) -> Conversati return session - def update_session(self, session: ConversationSession) -> ConversationSession: - """Sync wrapper for update_session.""" - return run_sync( - self.async_update_session(session) - ) async def async_delete_session(self, session_id: str) -> bool: """Delete a session asynchronously.""" @@ -195,11 +180,6 @@ async def async_delete_session(self, session_id: str) -> bool: return cursor.rowcount > 0 - def delete_session(self, session_id: str) -> bool: - """Sync wrapper for delete_session.""" - return run_sync( - self.async_delete_session(session_id) - ) async def async_list_sessions( self, @@ -248,17 +228,6 @@ async def async_list_sessions( for row in rows ] - def list_sessions( - self, - user_id: Optional[str] = None, - agent_id: Optional[str] = None, - limit: int = 100, - offset: int = 0 - ) -> List[ConversationSession]: - """Sync wrapper for list_sessions.""" - return run_sync( - self.async_list_sessions(user_id, agent_id, limit, offset) - ) async def async_add_message(self, session_id: str, message: ConversationMessage) -> ConversationMessage: """Add a message asynchronously.""" @@ -280,11 +249,6 @@ async def async_add_message(self, session_id: str, message: ConversationMessage) return message - def add_message(self, session_id: str, message: ConversationMessage) -> ConversationMessage: - """Sync wrapper for add_message.""" - return run_sync( - self.async_add_message(session_id, message) - ) async def async_get_messages( self, @@ -334,17 +298,6 @@ async def async_get_messages( for row in rows ] - def get_messages( - self, - session_id: str, - limit: Optional[int] = None, - before: Optional[float] = None, - after: Optional[float] = None - ) -> List[ConversationMessage]: - """Sync wrapper for get_messages.""" - return run_sync( - self.async_get_messages(session_id, limit, before, after) - ) async def async_delete_messages(self, session_id: str, message_ids: Optional[List[str]] = None) -> int: """Delete messages asynchronously.""" @@ -366,11 +319,6 @@ async def async_delete_messages(self, session_id: str, message_ids: Optional[Lis await self._conn.commit() return cursor.rowcount - def delete_messages(self, session_id: str, message_ids: Optional[List[str]] = None) -> int: - """Sync wrapper for delete_messages.""" - return run_sync( - self.async_delete_messages(session_id, message_ids) - ) async def async_close(self) -> None: """Close the connection.""" @@ -379,7 +327,3 @@ async def async_close(self) -> None: self._conn = None self._initialized = False - def close(self) -> None: - """Sync wrapper for close.""" - if self._conn: - run_sync(self.async_close()) diff --git a/src/praisonai/praisonai/persistence/conversation/sync_sqlite.py b/src/praisonai/praisonai/persistence/conversation/sync_sqlite.py new file mode 100644 index 000000000..1d185c009 --- /dev/null +++ b/src/praisonai/praisonai/persistence/conversation/sync_sqlite.py @@ -0,0 +1,346 @@ +""" +Synchronous SQLite implementation of ConversationStore. + +Provides blocking database operations using the standard sqlite3 library. +Safe for multi-agent use via per-operation connection locking. + +Example: + store = SyncSQLiteConversationStore(path="./conversations.db") + store.init() +""" + +import json +import logging +import sqlite3 +import threading +import time +from typing import List, Optional + +from .base import ConversationStore, ConversationSession, ConversationMessage, validate_identifier + +logger = logging.getLogger(__name__) + + +class SyncSQLiteConversationStore(ConversationStore): + """ + Synchronous SQLite conversation store using sqlite3. + + Provides blocking database operations with per-call locking + for multi-agent safety. + """ + + def __init__( + self, + path: str = "praisonai_conversations.db", + table_prefix: str = "praison_", + ): + """ + Initialize sync SQLite store. + + Args: + path: Path to SQLite database file + table_prefix: Prefix for table names + """ + self.path = path + validate_identifier(table_prefix, "table_prefix") + self.table_prefix = table_prefix + self._lock = threading.Lock() + self._initialized = False + + def init(self): + """Initialize connection and create tables.""" + with self._lock: + if self._initialized: + return + + conn = sqlite3.connect(self.path) + conn.row_factory = sqlite3.Row + + try: + self._create_tables(conn) + self._initialized = True + finally: + conn.close() + + def _create_tables(self, conn: sqlite3.Connection): + """Create database tables.""" + sessions_table = f"{self.table_prefix}sessions" + messages_table = f"{self.table_prefix}messages" + + conn.executescript(f""" + CREATE TABLE IF NOT EXISTS {sessions_table} ( + session_id TEXT PRIMARY KEY, + user_id TEXT, + agent_id TEXT, + name TEXT, + state TEXT, + metadata TEXT, + created_at REAL, + updated_at REAL + ); + + CREATE TABLE IF NOT EXISTS {messages_table} ( + id TEXT PRIMARY KEY, + session_id TEXT, + role TEXT, + content TEXT, + tool_calls TEXT, + tool_call_id TEXT, + metadata TEXT, + created_at REAL, + FOREIGN KEY (session_id) REFERENCES {sessions_table}(session_id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_{self.table_prefix}sessions_user_agent + ON {sessions_table}(user_id, agent_id); + + CREATE INDEX IF NOT EXISTS idx_{self.table_prefix}messages_session + ON {messages_table}(session_id, created_at); + """) + conn.commit() + + def _get_connection(self) -> sqlite3.Connection: + """Get a thread-safe connection.""" + if not self._initialized: + self.init() + + conn = sqlite3.connect(self.path) + conn.row_factory = sqlite3.Row + return conn + + def create_session(self, session: ConversationSession) -> ConversationSession: + """Create a new session.""" + with self._lock: + conn = self._get_connection() + try: + table = f"{self.table_prefix}sessions" + conn.execute(f""" + INSERT INTO {table} (session_id, user_id, agent_id, name, state, metadata, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + session.session_id, + session.user_id, + session.agent_id, + session.name, + json.dumps(session.state) if session.state else None, + json.dumps(session.metadata) if session.metadata else None, + session.created_at, + session.updated_at + )) + conn.commit() + return session + finally: + conn.close() + + def get_session(self, session_id: str) -> Optional[ConversationSession]: + """Get a session by ID.""" + conn = self._get_connection() + try: + table = f"{self.table_prefix}sessions" + cursor = conn.execute(f""" + SELECT * FROM {table} WHERE session_id = ? + """, (session_id,)) + row = cursor.fetchone() + + if row: + return ConversationSession( + session_id=row['session_id'], + user_id=row['user_id'], + agent_id=row['agent_id'], + name=row['name'], + state=json.loads(row['state']) if row['state'] else None, + metadata=json.loads(row['metadata']) if row['metadata'] else None, + created_at=row['created_at'], + updated_at=row['updated_at'] + ) + return None + finally: + conn.close() + + def update_session(self, session: ConversationSession) -> ConversationSession: + """Update an existing session.""" + with self._lock: + conn = self._get_connection() + try: + table = f"{self.table_prefix}sessions" + session.updated_at = time.time() + + conn.execute(f""" + UPDATE {table} + SET user_id = ?, agent_id = ?, name = ?, state = ?, metadata = ?, updated_at = ? + WHERE session_id = ? + """, ( + session.user_id, + session.agent_id, + session.name, + json.dumps(session.state) if session.state else None, + json.dumps(session.metadata) if session.metadata else None, + session.updated_at, + session.session_id + )) + conn.commit() + return session + finally: + conn.close() + + def delete_session(self, session_id: str) -> bool: + """Delete a session and all its messages.""" + with self._lock: + conn = self._get_connection() + try: + table = f"{self.table_prefix}sessions" + cursor = conn.execute(f""" + DELETE FROM {table} WHERE session_id = ? + """, (session_id,)) + conn.commit() + return cursor.rowcount > 0 + finally: + conn.close() + + def list_sessions( + self, + user_id: Optional[str] = None, + agent_id: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None + ) -> List[ConversationSession]: + """List sessions with optional filtering.""" + conn = self._get_connection() + try: + table = f"{self.table_prefix}sessions" + conditions = [] + params = [] + + if user_id: + conditions.append("user_id = ?") + params.append(user_id) + + if agent_id: + conditions.append("agent_id = ?") + params.append(agent_id) + + where_clause = " WHERE " + " AND ".join(conditions) if conditions else "" + if limit is not None: + params.append(limit) + limit_clause = " LIMIT ?" + else: + limit_clause = "" + if offset is not None: + params.append(offset) + offset_clause = " OFFSET ?" + else: + offset_clause = "" + + cursor = conn.execute(f""" + SELECT * FROM {table}{where_clause} + ORDER BY updated_at DESC{limit_clause}{offset_clause} + """, params) + + sessions = [] + for row in cursor.fetchall(): + sessions.append(ConversationSession( + session_id=row['session_id'], + user_id=row['user_id'], + agent_id=row['agent_id'], + name=row['name'], + state=json.loads(row['state']) if row['state'] else None, + metadata=json.loads(row['metadata']) if row['metadata'] else None, + created_at=row['created_at'], + updated_at=row['updated_at'] + )) + + return sessions + finally: + conn.close() + + def add_message(self, message: ConversationMessage) -> ConversationMessage: + """Add a message to the conversation.""" + with self._lock: + conn = self._get_connection() + try: + table = f"{self.table_prefix}messages" + conn.execute(f""" + INSERT INTO {table} (id, session_id, role, content, tool_calls, tool_call_id, metadata, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + message.id, + message.session_id, + message.role, + message.content, + json.dumps(message.tool_calls) if message.tool_calls else None, + message.tool_call_id, + json.dumps(message.metadata) if message.metadata else None, + message.created_at + )) + conn.commit() + return message + finally: + conn.close() + + def get_messages( + self, + session_id: str, + limit: Optional[int] = None, + offset: Optional[int] = None + ) -> List[ConversationMessage]: + """Get messages for a session.""" + conn = self._get_connection() + try: + table = f"{self.table_prefix}messages" + params: list = [session_id] + if limit is not None: + params.append(limit) + limit_clause = " LIMIT ?" + else: + limit_clause = "" + if offset is not None: + params.append(offset) + offset_clause = " OFFSET ?" + else: + offset_clause = "" + + cursor = conn.execute(f""" + SELECT * FROM {table} WHERE session_id = ? + ORDER BY created_at{limit_clause}{offset_clause} + """, params) + + messages = [] + for row in cursor.fetchall(): + messages.append(ConversationMessage( + id=row['id'], + session_id=row['session_id'], + role=row['role'], + content=row['content'], + tool_calls=json.loads(row['tool_calls']) if row['tool_calls'] else None, + tool_call_id=row['tool_call_id'], + metadata=json.loads(row['metadata']) if row['metadata'] else None, + created_at=row['created_at'] + )) + + return messages + finally: + conn.close() + + def delete_messages(self, session_id: str, message_ids: Optional[List[str]] = None) -> int: + """Delete messages from a session.""" + with self._lock: + conn = self._get_connection() + try: + table = f"{self.table_prefix}messages" + if message_ids is None: + cursor = conn.execute(f""" + DELETE FROM {table} WHERE session_id = ? + """, (session_id,)) + else: + placeholders = ','.join(['?' for _ in message_ids]) + cursor = conn.execute(f""" + DELETE FROM {table} WHERE session_id = ? AND id IN ({placeholders}) + """, [session_id] + message_ids) + conn.commit() + return cursor.rowcount + finally: + conn.close() + + def clear_session(self, session_id: str) -> bool: + """Clear all messages from a session (keep session metadata).""" + return self.delete_messages(session_id) > 0 \ No newline at end of file diff --git a/src/praisonai/praisonai/persistence/factory.py b/src/praisonai/praisonai/persistence/factory.py index 71d607c32..b9945eb7d 100644 --- a/src/praisonai/praisonai/persistence/factory.py +++ b/src/praisonai/praisonai/persistence/factory.py @@ -6,7 +6,7 @@ """ import logging -from typing import Any, Dict, Optional +from typing import Any, Dict, Literal, Optional from .config import PersistenceConfig from .registry import CONVERSATION_STORES, KNOWLEDGE_STORES, STATE_STORES @@ -17,6 +17,7 @@ def create_conversation_store( backend: str, url: Optional[str] = None, + mode: Literal["sync", "async", "auto"] = "auto", **options: Any ): """ @@ -25,18 +26,40 @@ def create_conversation_store( Args: backend: Backend type (postgres, mysql, sqlite, etc.) url: Connection URL + mode: "sync" for blocking, "async" for async-only, "auto" for legacy behavior **options: Backend-specific options Returns: ConversationStore instance Example: + # Async-safe SQLite store store = create_conversation_store( - "postgres", - url="postgresql://localhost:5432/praisonai" + "sqlite", + path="./conversations.db", + mode="async" + ) + + # Sync-only SQLite store (multi-agent safe) + store = create_conversation_store( + "sqlite", + path="./conversations.db", + mode="sync" ) """ - return CONVERSATION_STORES.create(backend, url=url, **options) + + # Handle sync/async mode routing + if mode == "sync" and backend in ("sqlite", "postgres", "mysql"): + backend_name = f"sync_{backend}" + elif mode == "async" and backend in ("sqlite", "postgres", "mysql"): + backend_name = f"async_{backend}" + elif mode == "auto": + # Legacy behavior - use existing backend as-is + backend_name = backend + else: + backend_name = backend + + return CONVERSATION_STORES.create(backend_name, url=url, **options) def create_knowledge_store( @@ -103,10 +126,15 @@ def create_stores_from_config(config: PersistenceConfig) -> Dict[str, Any]: } if config.conversation_store: + # Extract mode from conversation_options if present + conversation_options = dict(config.conversation_options) + mode = conversation_options.pop("mode", "auto") + stores["conversation"] = create_conversation_store( config.conversation_store, url=config.conversation_url, - **config.conversation_options + mode=mode, + **conversation_options ) if config.knowledge_store: diff --git a/src/praisonai/praisonai/persistence/registry.py b/src/praisonai/praisonai/persistence/registry.py index 0cfa7724d..d54cf99ef 100644 --- a/src/praisonai/praisonai/persistence/registry.py +++ b/src/praisonai/praisonai/persistence/registry.py @@ -121,6 +121,10 @@ def _sqlite(url=None, path=None, **kwargs): from .conversation.sqlite import SQLiteConversationStore return SQLiteConversationStore(path=url or path, **kwargs) + def _sync_sqlite(url=None, path=None, **kwargs): + from .conversation.sync_sqlite import SyncSQLiteConversationStore + return SyncSQLiteConversationStore(path=url or path, **kwargs) + def _async_sqlite(url=None, path=None, **kwargs): from .conversation.async_sqlite import AsyncSQLiteConversationStore return AsyncSQLiteConversationStore(path=url or path, **kwargs) @@ -154,6 +158,8 @@ def _turso(url=None, **kwargs): registry.register("async_mysql", _async_mysql, aliases=("aiomysql", "mysql_async")) registry.register("sqlite", _sqlite) + registry.register("sync_sqlite", _sync_sqlite, + aliases=("sqlite_sync",)) registry.register("async_sqlite", _async_sqlite, aliases=("aiosqlite", "sqlite_async")) registry.register("json", _json) diff --git a/src/praisonai/tests/unit/integrations/test_codex_cli.py b/src/praisonai/tests/unit/integrations/test_codex_cli.py index a5f51d499..3c7814ba5 100644 --- a/src/praisonai/tests/unit/integrations/test_codex_cli.py +++ b/src/praisonai/tests/unit/integrations/test_codex_cli.py @@ -195,6 +195,12 @@ def test_build_command_with_output_schema(self): class TestIntegrationExports: """Tests for integration module exports.""" + def test_exported_codex_integration_is_class(self): + """Test exported CodexCLIIntegration resolves to the class, not a loader.""" + from praisonai.integrations import CodexCLIIntegration + + assert inspect.isclass(CodexCLIIntegration) + def test_get_available_integrations_export_is_sync(self): """Test exported get_available_integrations remains synchronous.""" from praisonai.integrations import get_available_integrations diff --git a/src/praisonai/tests/unit/persistence/test_sync_sqlite_parameterization.py b/src/praisonai/tests/unit/persistence/test_sync_sqlite_parameterization.py new file mode 100644 index 000000000..e6df00138 --- /dev/null +++ b/src/praisonai/tests/unit/persistence/test_sync_sqlite_parameterization.py @@ -0,0 +1,53 @@ +"""Regression tests for sync SQLite query parameterization.""" + +from praisonai.persistence.conversation.sync_sqlite import SyncSQLiteConversationStore + + +class _TestSyncSQLiteConversationStore(SyncSQLiteConversationStore): + def close(self): + pass + + +class _Cursor: + def fetchall(self): + return [] + + +class _Connection: + def __init__(self): + self.query = "" + self.params = None + + def execute(self, query, params): + self.query = query + self.params = params + return _Cursor() + + def close(self): + pass + + +def test_list_sessions_uses_bound_limit_and_offset(monkeypatch): + store = _TestSyncSQLiteConversationStore(path=":memory:") + store._initialized = True + conn = _Connection() + monkeypatch.setattr(store, "_get_connection", lambda: conn) + + store.list_sessions(user_id="u1", agent_id="a1", limit=5, offset=2) + + assert "LIMIT ?" in conn.query + assert "OFFSET ?" in conn.query + assert conn.params == ["u1", "a1", 5, 2] + + +def test_get_messages_uses_bound_limit_and_offset(monkeypatch): + store = _TestSyncSQLiteConversationStore(path=":memory:") + store._initialized = True + conn = _Connection() + monkeypatch.setattr(store, "_get_connection", lambda: conn) + + store.get_messages(session_id="s1", limit=10, offset=3) + + assert "LIMIT ?" in conn.query + assert "OFFSET ?" in conn.query + assert conn.params == ["s1", 10, 3]