Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 157 additions & 54 deletions src/praisonai-agents/praisonaiagents/memory/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import time
import shutil
import threading
from typing import Any, Dict, List, Optional, Union, Literal
import logging
from datetime import datetime
Expand Down Expand Up @@ -189,6 +190,16 @@ def __init__(self, config: Dict[str, Any], verbose: int = 0):
self.cfg = config or {}
self.verbose = verbose

# Thread-local storage for SQLite connections (thread-safe)
self._local = threading.local()

# Write lock for serializing database modifications (thread-safe)
self._write_lock = threading.Lock()

# Connection registry for cleanup across threads (use regular set with careful cleanup)
self._all_connections = set()
self._connection_lock = threading.Lock() # Protect the connection registry

# Set logger level based on verbose
if verbose >= 5:
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -247,6 +258,50 @@ def __init__(self, config: Dict[str, Any], verbose: int = 0):
elif self.use_rag:
self._init_chroma()

def _get_stm_conn(self):
"""Get thread-local short-term memory SQLite connection."""
if not hasattr(self._local, 'stm_conn') or self._local.stm_conn is None:
self._local.stm_conn = sqlite3.connect(
self.short_db,
check_same_thread=False, # Allow cross-thread cleanup
timeout=30.0 # 30 second timeout for lock contention
)
# Configure busy timeout for better contention handling
self._local.stm_conn.execute("PRAGMA busy_timeout=30000") # 30 seconds

# Enable WAL mode for concurrent read/write without blocking
result = self._local.stm_conn.execute("PRAGMA journal_mode=WAL").fetchone()
if result and result[0].upper() != 'WAL':
logger.warning(f"WAL mode not enabled for STM, got: {result[0]}")
self._local.stm_conn.commit()

# Register connection for cleanup
with self._connection_lock:
self._all_connections.add(self._local.stm_conn)
return self._local.stm_conn
Comment on lines +262 to +281
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WAL + thread-local connections reduces contention, but concurrent writers can still raise sqlite3.OperationalError: database is locked because the connection is created with default lock timeout/busy handling. Consider setting a higher timeout= on sqlite3.connect(...) and/or PRAGMA busy_timeout, and optionally adding a small retry/backoff (or an in-process write lock) around write transactions to make multi-agent concurrency robust.

Copilot uses AI. Check for mistakes.

def _get_ltm_conn(self):
"""Get thread-local long-term memory SQLite connection."""
if not hasattr(self._local, 'ltm_conn') or self._local.ltm_conn is None:
self._local.ltm_conn = sqlite3.connect(
self.long_db,
check_same_thread=False, # Allow cross-thread cleanup
timeout=30.0 # 30 second timeout for lock contention
)
# Configure busy timeout for better contention handling
self._local.ltm_conn.execute("PRAGMA busy_timeout=30000") # 30 seconds

# Enable WAL mode for concurrent read/write without blocking
result = self._local.ltm_conn.execute("PRAGMA journal_mode=WAL").fetchone()
if result and result[0].upper() != 'WAL':
logger.warning(f"WAL mode not enabled for LTM, got: {result[0]}")
self._local.ltm_conn.commit()

# Register connection for cleanup
with self._connection_lock:
self._all_connections.add(self._local.ltm_conn)
return self._local.ltm_conn
Comment on lines +261 to +303
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These two methods, _get_stm_conn and _get_ltm_conn, are nearly identical. To improve maintainability and reduce code duplication, you can extract the common logic into a single helper method.

    def _get_conn(self, db_path: str, conn_attr: str) -> sqlite3.Connection:
        """Get or create a thread-local SQLite connection."""
        if not hasattr(self._local, conn_attr) or getattr(self._local, conn_attr) is None:
            conn = sqlite3.connect(
                db_path,
                check_same_thread=False
            )
            # Enable WAL mode for concurrent read/write without blocking
            conn.execute("PRAGMA journal_mode=WAL")
            conn.commit()
            setattr(self._local, conn_attr, conn)
        return getattr(self._local, conn_attr)

    def _get_stm_conn(self):
        """Get thread-local short-term memory SQLite connection."""
        return self._get_conn(self.short_db, 'stm_conn')

    def _get_ltm_conn(self):
        """Get thread-local long-term memory SQLite connection."""
        return self._get_conn(self.long_db, 'ltm_conn')

Comment on lines +283 to +303
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as STM: _get_ltm_conn() enables WAL but still uses default SQLite lock timeout. Under concurrent multi-agent writes this can still throw database is locked. Set timeout= / busy_timeout and consider retry/backoff or a write lock for long-term write operations.

Copilot uses AI. Check for mistakes.
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change switches SQLite usage from per-operation connect/close to long-lived connections. To avoid leaking file descriptors / keeping DB files open (especially in tests and Windows environments), consider adding automatic cleanup (e.g., __del__, context manager support, or ensuring callers like Session explicitly close). A concurrency-focused test that spins up multiple threads calling store_* would also help prevent regressions.

Suggested change
def close(self) -> None:
"""
Close any open SQLite connections held in this thread's local storage.
This helps avoid leaking file descriptors and keeping database files
open when Memory instances or threads are short-lived (e.g., in tests).
"""
# Close short-term memory connection for this thread, if any
try:
conn = getattr(self._local, "stm_conn", None)
if conn is not None:
conn.close()
self._local.stm_conn = None
except Exception:
# Best-effort cleanup; avoid raising during shutdown
logger.exception("Error while closing short-term memory SQLite connection")
# Close long-term memory connection for this thread, if any
try:
conn = getattr(self._local, "ltm_conn", None)
if conn is not None:
conn.close()
self._local.ltm_conn = None
except Exception:
# Best-effort cleanup; avoid raising during shutdown
logger.exception("Error while closing long-term memory SQLite connection")
def __enter__(self):
"""
Allow Memory to be used as a context manager.
The caller is responsible for using the same thread within the context.
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Ensure connections are closed when leaving a context manager block."""
self.close()
def __del__(self):
"""
Attempt to clean up any open SQLite connections when this instance
is garbage-collected. Errors are suppressed to avoid issues during
interpreter shutdown.
"""
try:
self.close()
except Exception:
pass

Copilot uses AI. Check for mistakes.
def _log_verbose(self, msg: str, level: int = logging.INFO):
"""Only log if verbose >= 5"""
if self.verbose >= 5:
Expand Down Expand Up @@ -276,7 +331,7 @@ def _emit_memory_event(self, event_type: str, memory_type: str,
def _init_stm(self):
"""Creates or verifies short-term memory table."""
os.makedirs(os.path.dirname(self.short_db) or ".", exist_ok=True)
conn = sqlite3.connect(self.short_db)
conn = self._get_stm_conn()
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS short_mem (
Expand All @@ -287,12 +342,11 @@ def _init_stm(self):
)
""")
conn.commit()
conn.close()

def _init_ltm(self):
"""Creates or verifies long-term memory table."""
os.makedirs(os.path.dirname(self.long_db) or ".", exist_ok=True)
conn = sqlite3.connect(self.long_db)
conn = self._get_ltm_conn()
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS long_mem (
Expand All @@ -303,7 +357,6 @@ def _init_ltm(self):
)
""")
conn.commit()
conn.close()

def _init_mem0(self):
"""Initialize Mem0 client for agent or user memory with optional graph support."""
Expand Down Expand Up @@ -579,15 +632,15 @@ def store_short_term(
logger.error(f"Failed to store in MongoDB short-term memory: {e}")
raise

# Existing SQLite store logic
# Existing SQLite store logic (with write lock for concurrency safety)
try:
conn = sqlite3.connect(self.short_db)
conn.execute(
"INSERT INTO short_mem (id, content, meta, created_at) VALUES (?,?,?,?)",
(ident, text, json.dumps(metadata), created_at)
)
conn.commit()
conn.close()
conn = self._get_stm_conn()
with self._write_lock: # Serialize write operations
conn.execute(
"INSERT INTO short_mem (id, content, meta, created_at) VALUES (?,?,?,?)",
(ident, text, json.dumps(metadata), created_at)
)
conn.commit()
logger.info(f"Successfully stored in SQLite short-term memory with ID: {ident}")
except Exception as e:
logger.error(f"Failed to store in SQLite short-term memory: {e}")
Expand Down Expand Up @@ -713,13 +766,12 @@ def search_short_term(

else:
# Local fallback
conn = sqlite3.connect(self.short_db)
conn = self._get_stm_conn()
c = conn.cursor()
rows = c.execute(
"SELECT id, content, meta FROM short_mem WHERE content LIKE ? LIMIT ?",
(f"%{query}%", limit)
).fetchall()
conn.close()

results = []
for row in rows:
Expand All @@ -739,10 +791,10 @@ def search_short_term(

def reset_short_term(self):
"""Completely clears short-term memory."""
conn = sqlite3.connect(self.short_db)
conn.execute("DELETE FROM short_mem")
conn.commit()
conn.close()
conn = self._get_stm_conn()
with self._write_lock: # Serialize write operations
conn.execute("DELETE FROM short_mem")
conn.commit()

# -------------------------------------------------------------------------
# Long-Term Methods
Expand Down Expand Up @@ -813,15 +865,15 @@ def store_long_term(
logger.error(f"Failed to store in MongoDB long-term memory: {e}")
# Continue to SQLite fallback

# Store in SQLite
# Store in SQLite (with write lock for concurrency safety)
try:
conn = sqlite3.connect(self.long_db)
conn.execute(
"INSERT INTO long_mem (id, content, meta, created_at) VALUES (?,?,?,?)",
(ident, text, json.dumps(metadata), created)
)
conn.commit()
conn.close()
conn = self._get_ltm_conn()
with self._write_lock: # Serialize write operations
conn.execute(
"INSERT INTO long_mem (id, content, meta, created_at) VALUES (?,?,?,?)",
(ident, text, json.dumps(metadata), created)
)
conn.commit()
logger.info(f"Successfully stored in SQLite with ID: {ident}")
except Exception as e:
logger.error(f"Error storing in SQLite: {e}")
Expand Down Expand Up @@ -1002,13 +1054,12 @@ def search_long_term(
self._log_verbose(f"Error searching ChromaDB: {e}", logging.ERROR)

# Always try SQLite as fallback or additional source
conn = sqlite3.connect(self.long_db)
conn = self._get_ltm_conn()
c = conn.cursor()
rows = c.execute(
"SELECT id, content, meta, created_at FROM long_mem WHERE content LIKE ? LIMIT ?",
(f"%{query}%", limit)
).fetchall()
conn.close()

for row in rows:
meta = json.loads(row[2] or "{}")
Expand Down Expand Up @@ -1051,10 +1102,10 @@ def search_long_term(

def reset_long_term(self):
"""Clear local LTM DB, plus Chroma, MongoDB, or mem0 if in use."""
conn = sqlite3.connect(self.long_db)
conn.execute("DELETE FROM long_mem")
conn.commit()
conn.close()
conn = self._get_ltm_conn()
with self._write_lock: # Serialize write operations
conn.execute("DELETE FROM long_mem")
conn.commit()

if self.use_mem0 and hasattr(self, "mem0_client"):
# Mem0 has no universal reset API. Could implement partial or no-op.
Expand Down Expand Up @@ -1085,16 +1136,16 @@ def delete_short_term(self, memory_id: str) -> bool:
"""
deleted = False

# Delete from SQLite
# Delete from SQLite (with write lock for concurrency safety)
try:
conn = sqlite3.connect(self.short_db)
cursor = conn.execute(
"DELETE FROM short_mem WHERE id = ?", (memory_id,)
)
if cursor.rowcount > 0:
deleted = True
conn.commit()
conn.close()
conn = self._get_stm_conn()
with self._write_lock: # Serialize write operations
cursor = conn.execute(
"DELETE FROM short_mem WHERE id = ?", (memory_id,)
)
if cursor.rowcount > 0:
deleted = True
conn.commit()
except Exception as e:
self._log_verbose(f"Error deleting from SQLite short-term: {e}", logging.ERROR)

Expand Down Expand Up @@ -1126,16 +1177,16 @@ def delete_long_term(self, memory_id: str) -> bool:
"""
deleted = False

# Delete from SQLite
# Delete from SQLite (with write lock for concurrency safety)
try:
conn = sqlite3.connect(self.long_db)
cursor = conn.execute(
"DELETE FROM long_mem WHERE id = ?", (memory_id,)
)
if cursor.rowcount > 0:
deleted = True
conn.commit()
conn.close()
conn = self._get_ltm_conn()
with self._write_lock: # Serialize write operations
cursor = conn.execute(
"DELETE FROM long_mem WHERE id = ?", (memory_id,)
)
if cursor.rowcount > 0:
deleted = True
conn.commit()
except Exception as e:
self._log_verbose(f"Error deleting from SQLite long-term: {e}", logging.ERROR)

Expand Down Expand Up @@ -1790,10 +1841,9 @@ def get_all_memories(self) -> List[Dict[str, Any]]:

try:
# Get short-term memories
conn = sqlite3.connect(self.short_db)
conn = self._get_stm_conn()
c = conn.cursor()
rows = c.execute("SELECT id, content, meta, created_at FROM short_mem").fetchall()
conn.close()

for row in rows:
meta = json.loads(row[2] or "{}")
Expand All @@ -1806,10 +1856,9 @@ def get_all_memories(self) -> List[Dict[str, Any]]:
})

# Get long-term memories
conn = sqlite3.connect(self.long_db)
conn = self._get_ltm_conn()
c = conn.cursor()
rows = c.execute("SELECT id, content, meta, created_at FROM long_mem").fetchall()
conn.close()

for row in rows:
meta = json.loads(row[2] or "{}")
Expand Down Expand Up @@ -1873,3 +1922,57 @@ def get_learn_context(self) -> str:
if self.learn is None:
return ""
return self.learn.to_system_prompt_context()

def close_connections(self):
"""
Close database connections.

Closes the current thread's connections and attempts to close all known
connections from other threads. Each thread should call this method before
terminating to ensure proper cleanup.
"""
# Close current thread's connections
if hasattr(self._local, 'stm_conn') and self._local.stm_conn:
try:
self._local.stm_conn.close()
self._local.stm_conn = None
except Exception as e:
logger.warning(f"Error closing current thread's STM connection: {e}")

if hasattr(self._local, 'ltm_conn') and self._local.ltm_conn:
try:
self._local.ltm_conn.close()
self._local.ltm_conn = None
except Exception as e:
logger.warning(f"Error closing current thread's LTM connection: {e}")

# Close all known connections from the registry
with self._connection_lock: # Ensure thread safety during cleanup
connections_to_close = list(self._all_connections)
for conn in connections_to_close:
try:
conn.close()
except Exception as e:
logger.debug(f"Error closing registered connection: {e}")
# Clear the registry
self._all_connections.clear()

def __enter__(self):
"""Allow Memory to be used as a context manager."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Ensure connections are closed when leaving a context manager block."""
self.close_connections()
Comment on lines +1960 to +1966
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for Memory cleanup in Agent class
echo "=== Checking for close_connections calls in agent code ==="
rg -n "close_connections|__exit__|with.*Memory" src/praisonai-agents/praisonaiagents/agent/

echo ""
echo "=== Checking how Memory instances are managed ==="
rg -nA5 "_memory_instance" src/praisonai-agents/praisonaiagents/agent/agent.py | head -80

echo ""
echo "=== Checking for any cleanup/destroy methods in Agent ==="
rg -n "def (cleanup|destroy|close|__del__|__exit__)" src/praisonai-agents/praisonaiagents/agent/agent.py

Repository: MervinPraison/PraisonAI

Length of output: 4256


🏁 Script executed:

#!/bin/bash

echo "=== Check for __del__ in Agent class ==="
rg -n "__del__" src/praisonai-agents/praisonaiagents/agent/agent.py

echo ""
echo "=== Check Memory and FileMemory __del__ implementations ==="
rg -n "__del__" src/praisonai-agents/praisonaiagents/memory/

echo ""
echo "=== Check if Memory instances are used as context managers in tests/examples ==="
rg -n "with.*Memory\(" src/praisonai-agents/ | head -20

echo ""
echo "=== Check if close_connections is called anywhere in the codebase ==="
rg -n "\.close_connections\(\)" src/praisonai-agents/ | head -20

echo ""
echo "=== Check the Memory class signature for cleanup patterns ==="
grep -n "def close_connections" src/praisonai-agents/praisonaiagents/memory/memory.py
grep -n "def __del__" src/praisonai-agents/praisonaiagents/memory/memory.py

Repository: MervinPraison/PraisonAI

Length of output: 720


🏁 Script executed:

#!/bin/bash

echo "=== Memory.__del__ implementation ==="
sed -n '1968,1980p' src/praisonai-agents/praisonaiagents/memory/memory.py

echo ""
echo "=== Memory.close_connections implementation ==="
sed -n '1926,1945p' src/praisonai-agents/praisonaiagents/memory/memory.py

echo ""
echo "=== Check if Agent's _memory_instance is ever explicitly cleaned up ==="
rg -n "self._memory_instance\s*=" src/praisonai-agents/praisonaiagents/agent/agent.py | tail -20
</script>

Repository: MervinPraison/PraisonAI

Length of output: 2335


Add explicit cleanup for Memory instances in Agent class.

The Agent class creates Memory instances in _init_memory() (lines 3596, 3605, 3609, 3619, 3623, 3626, 3633, 3637, 3642, 3645) without explicit cleanup or context manager usage. While Memory provides __del__ to call close_connections() on garbage collection, relying on GC timing is unreliable for resource management, especially in multi-agent scenarios.

Agent lacks a __del__ or cleanup method to ensure _memory_instance.close_connections() is called before the agent is destroyed. This violates the async-safe execution requirement from the coding guidelines. Consider adding explicit cleanup (either __del__ method in Agent or documented cleanup patterns) to guarantee connections are properly closed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/memory/memory.py` around lines 1960 -
1966, Agent creates Memory instances in _init_memory() and never explicitly
closes them; add explicit cleanup to ensure Memory.close_connections() is called
when an Agent is destroyed. Implement a cleanup method on Agent (e.g.,
close_memory or __del__) that checks for self._memory_instance and calls
self._memory_instance.close_connections(); update places that manage Agent
lifecycle to call this cleanup (or implement __aexit__/__exit__ on Agent to use
context manager patterns) to avoid relying on Memory.__del__ and ensure
deterministic resource release.


def __del__(self):
"""
Attempt to clean up any open SQLite connections when this instance
is garbage-collected. Errors are suppressed to avoid issues during
interpreter shutdown.
"""
try:
self.close_connections()
except Exception:
# Best-effort cleanup during garbage collection
pass