Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 164 additions & 61 deletions src/praisonai-agents/praisonaiagents/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import contextlib
import threading
from typing import List, Optional, Any, Dict, Union, Literal, TYPE_CHECKING, Callable, Generator
from collections import OrderedDict
import inspect

# Module-level logger for thread safety errors and debugging
logger = logging.getLogger(__name__)

# ============================================================================
# Performance: Lazy imports for heavy dependencies
# Rich, LLM, and display utilities are only imported when needed (output=verbose)
Expand Down Expand Up @@ -1511,9 +1515,12 @@ def __init__(
self.embedder_config = embedder_config
self.knowledge = knowledge
self.use_system_prompt = use_system_prompt
# Thread-safe chat_history with lazy lock for concurrent access
# Thread-safe chat_history with eager lock initialization
self.chat_history = []
self.__history_lock = None # Lazy initialized
self.__history_lock = threading.Lock() # Eager initialization to prevent race conditions
Comment on lines +1518 to +1520
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

chat_history is still a racy public list.

These helpers only protect the custom-LLM path. The standard OpenAI, async, streaming, restore/autosave paths in this file still read/append/replace self.chat_history directly, and src/praisonai-agents/praisonaiagents/session.py / src/praisonai-agents/praisonaiagents/llm/llm.py also bypass _history_lock. In concurrent runs that still allows lost turns, mid-iteration mutation, and stale references after slice assignment. To actually fix this, chat_history needs to be fully encapsulated behind a thread-safe API/container instead of exposing the raw list.

Also applies to: 1785-1807, 6362-6412

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/agent/agent.py` around lines 1514 -
1516, self.chat_history is a public list that remains racy despite the eager
__history_lock; replace direct list access with a thread-safe encapsulation and
migrate all consumers to it: create a ThreadSafeChatHistory (or HistoryBuffer)
class used by the agent (referencing self.chat_history and self.__history_lock)
that internally holds the list and exposes atomic methods like
append_turn/add_message, replace_range/replace_slice, get_snapshot/get_slice,
iterate/iter_snapshot, clear, and length, each guarded by the internal Lock;
update all places in this file (including code paths around lines noted: the
OpenAI/async/streaming/restore/autosave paths) and the other modules (session.py
and llm/llm.py) to call those methods instead of reading/writing the raw list,
and remove direct uses of self.__history_lock outside the new class so no module
accesses the underlying list directly.


# Thread-safe snapshot/redo stack lock - always available even when autonomy is disabled
self.__snapshot_lock = threading.Lock()
self.markdown = markdown
self.stream = stream
self.metrics = metrics
Expand Down Expand Up @@ -1632,10 +1639,11 @@ def __init__(
# P8/G11: Tool timeout - prevent slow tools from blocking
self._tool_timeout = tool_timeout

# Cache for system prompts and formatted tools with lazy thread-safe lock
self._system_prompt_cache = {}
self._formatted_tools_cache = {}
self.__cache_lock = None # Lazy initialized RLock
# Cache for system prompts and formatted tools with eager thread-safe lock
# Use OrderedDict for LRU behavior
self._system_prompt_cache = OrderedDict()
self._formatted_tools_cache = OrderedDict()
Comment on lines +1642 to +1645
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrderedDict is imported inside __init__, which reintroduces per-instance import work despite this module explicitly moving lightweight imports out of __init__ for performance. Import OrderedDict at module scope (top of file) and reference it directly here.

Copilot uses AI. Check for mistakes.
self.__cache_lock = threading.RLock() # Eager initialization to prevent race conditions
# Limit cache size to prevent unbounded growth
self._max_cache_size = 100

Expand Down Expand Up @@ -1747,20 +1755,106 @@ def _telemetry(self):

@property
def _history_lock(self):
"""Lazy-loaded history lock for thread-safe chat history access."""
if self.__history_lock is None:
import threading
self.__history_lock = threading.Lock()
"""Thread-safe chat history lock."""
return self.__history_lock

@property
def _cache_lock(self):
"""Lazy-loaded cache lock for thread-safe cache access."""
if self.__cache_lock is None:
import threading
self.__cache_lock = threading.RLock()
"""Thread-safe cache lock."""
return self.__cache_lock

@property
def _snapshot_lock(self):
"""Thread-safe snapshot/redo stack lock."""
return self.__snapshot_lock
Comment on lines +1768 to +1769
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_snapshot_lock returns self.__snapshot_lock, but __snapshot_lock is only initialized in _init_autonomy when autonomy is enabled. When autonomy is disabled, _init_autonomy returns early without setting __snapshot_lock, so any accidental access to _snapshot_lock would raise AttributeError. Initialize self.__snapshot_lock eagerly in __init__ (or set it in the autonomy-disabled branch too), or make the property defensive.

Suggested change
"""Thread-safe snapshot/redo stack lock."""
return self.__snapshot_lock
"""Thread-safe snapshot/redo stack lock.
This property is defensive: if the underlying lock was not
initialized (e.g., because autonomy was disabled and
`_init_autonomy` exited early), it is created on first access.
"""
try:
return self.__snapshot_lock
except AttributeError:
# Fallback initialization to avoid AttributeError when autonomy is disabled
self.__snapshot_lock = threading.Lock()
return self.__snapshot_lock

Copilot uses AI. Check for mistakes.

def _cache_put(self, cache_dict, key, value):
"""Thread-safe LRU cache put operation.

Args:
cache_dict: The cache dictionary (OrderedDict)
key: Cache key
value: Value to cache
"""
with self._cache_lock:
# Move to end if already exists (LRU update)
if key in cache_dict:
del cache_dict[key]

# Add new entry
cache_dict[key] = value
Comment on lines +1781 to +1785
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation for updating the LRU cache uses del and re-assignment to move an existing key to the end. A more idiomatic and efficient way to achieve this with OrderedDict is to update the value and then use move_to_end(). This is an O(1) operation, whereas del can be less performant.

Suggested change
if key in cache_dict:
del cache_dict[key]
# Add new entry
cache_dict[key] = value
cache_dict[key] = value
cache_dict.move_to_end(key)


# Evict oldest if over limit
while len(cache_dict) > self._max_cache_size:
cache_dict.popitem(last=False) # Remove oldest (FIFO)

def _add_to_chat_history(self, role, content):
"""Thread-safe method to add messages to chat history.

Args:
role: Message role ("user", "assistant", "system")
content: Message content
"""
with self._history_lock:
self.chat_history.append({"role": role, "content": content})
Comment on lines +1795 to +1799
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread-safety for chat_history is only enforced if all call sites use these helpers (or take _history_lock). There are still many direct self.chat_history.append(...) usages elsewhere in this file (e.g., at lines 5994, 6030, 6475, 6530, 6809, 8131, etc.), which means races can still occur even with the new lock. To fully address #1164, replace the remaining direct mutations/rollbacks with _add_to_chat_history / _truncate_chat_history or wrap them in with self._history_lock:.

Copilot uses AI. Check for mistakes.

def _add_to_chat_history_if_not_duplicate(self, role, content):
"""Thread-safe method to add messages to chat history only if not duplicate.

Atomically checks for duplicate and adds message under the same lock to prevent TOCTOU races.

Args:
role: Message role ("user", "assistant", "system")
content: Message content

Returns:
bool: True if message was added, False if duplicate was detected
"""
with self._history_lock:
# Check for duplicate within the same critical section
if (self.chat_history and
self.chat_history[-1].get("role") == role and
self.chat_history[-1].get("content") == content):
return False

# Not a duplicate, add the message
self.chat_history.append({"role": role, "content": content})
return True

def _get_chat_history_length(self):
"""Thread-safe method to get chat history length."""
with self._history_lock:
return len(self.chat_history)

def _truncate_chat_history(self, length):
"""Thread-safe method to truncate chat history to specified length.

Args:
length: Target length for chat history
"""
with self._history_lock:
self.chat_history = self.chat_history[:length]

def _cache_get(self, cache_dict, key):
"""Thread-safe LRU cache get operation.

Args:
cache_dict: The cache dictionary (OrderedDict)
key: Cache key

Returns:
Value if found, None otherwise
"""
with self._cache_lock:
if key not in cache_dict:
return None

# Move to end (mark as recently used)
value = cache_dict[key]
del cache_dict[key]
cache_dict[key] = value
return value
Comment on lines +1853 to +1856
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When retrieving an item from the LRU cache, you are using del and re-assignment to move it to the end. This can be made more efficient by using OrderedDict.move_to_end(), which is an O(1) operation and more clearly expresses the intent.

Suggested change
value = cache_dict[key]
del cache_dict[key]
cache_dict[key] = value
return value
value = cache_dict[key]
cache_dict.move_to_end(key)
return value


@property
def auto_memory(self):
"""AutoMemory instance for automatic memory extraction."""
Expand Down Expand Up @@ -2220,19 +2314,23 @@ def undo(self) -> bool:
result = agent.start("Refactor utils.py")
agent.undo() # Restore original files
"""
if self._file_snapshot is None or not self._snapshot_stack:
return False
try:
target_hash = self._snapshot_stack.pop()
# Get current hash before restore (for redo)
current_hash = self._file_snapshot.get_current_hash()
if current_hash:
self._redo_stack.append(current_hash)
self._file_snapshot.restore(target_hash)
return True
except Exception as e:
logger.debug(f"Undo failed: {e}")
if self._file_snapshot is None:
return False

with self._snapshot_lock:
if not self._snapshot_stack:
return False
try:
target_hash = self._snapshot_stack.pop()
# Get current hash before restore (for redo)
current_hash = self._file_snapshot.get_current_hash()
if current_hash:
self._redo_stack.append(current_hash)
self._file_snapshot.restore(target_hash)
return True
except Exception as e:
logger.debug(f"Undo failed: {e}")
return False
Comment on lines +2320 to +2333
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

undo()/redo() error handling currently raises NameError.

logger is never defined in this module, so any failure inside these blocks hits a second exception on the debug line instead of returning False. The adjacent logger.debug(...) calls at Line 2258 and Line 2362 have the same problem.

🐛 Minimal fix
-                logger.debug(f"Undo failed: {e}")
+                logging.debug(f"Undo failed: {e}")
                 return False
@@
-                logger.debug(f"Redo failed: {e}")
+                logging.debug(f"Redo failed: {e}")
                 return False

Also applies to: 2318-2330

🧰 Tools
🪛 Ruff (0.15.7)

[warning] 2303-2303: Do not catch blind exception: Exception

(BLE001)


[error] 2304-2304: Undefined name logger

(F821)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/agent/agent.py` around lines 2292 -
2305, The debug calls in undo() and redo() (and the nearby logger.debug at other
locations) reference an undefined name logger causing a NameError on exception;
fix by introducing a module-level logger (e.g., import logging and assign logger
= logging.getLogger(__name__)) at the top of the module and leave the existing
logger.debug(...) calls as-is so exceptions inside the with self._snapshot_lock
blocks (and surrounding handlers) log correctly and return False. Ensure the new
logger is available before methods like undo(), redo(), and any other routines
that call logger.debug are defined.


def redo(self) -> bool:
"""Redo a previously undone set of file changes.
Expand All @@ -2242,18 +2340,22 @@ def redo(self) -> bool:
Returns:
True if redo was successful, False if nothing to redo.
"""
if self._file_snapshot is None or not self._redo_stack:
return False
try:
target_hash = self._redo_stack.pop()
current_hash = self._file_snapshot.get_current_hash()
if current_hash:
self._snapshot_stack.append(current_hash)
self._file_snapshot.restore(target_hash)
return True
except Exception as e:
logger.debug(f"Redo failed: {e}")
if self._file_snapshot is None:
return False

with self._snapshot_lock:
if not self._redo_stack:
return False
try:
target_hash = self._redo_stack.pop()
current_hash = self._file_snapshot.get_current_hash()
if current_hash:
self._snapshot_stack.append(current_hash)
self._file_snapshot.restore(target_hash)
return True
except Exception as e:
logger.debug(f"Redo failed: {e}")
return False

def diff(self, from_hash: Optional[str] = None):
"""Get file diffs from autonomous execution.
Expand All @@ -2279,8 +2381,11 @@ def diff(self, from_hash: Optional[str] = None):
return []
try:
base = from_hash
if base is None and self._snapshot_stack:
base = self._snapshot_stack[0]
if base is None:
# Protect snapshot stack read with lock to prevent TOCTOU with undo/redo
with self._snapshot_lock:
if self._snapshot_stack:
base = self._snapshot_stack[0]
if base is None:
return []
return self._file_snapshot.diff(base)
Expand Down Expand Up @@ -2477,8 +2582,9 @@ def run_autonomous(
if self._file_snapshot is not None and self.autonomy_config.get("snapshot", False):
try:
snap_info = self._file_snapshot.track(message="pre-autonomous")
self._snapshot_stack.append(snap_info.commit_hash)
self._redo_stack.clear()
with self._snapshot_lock:
self._snapshot_stack.append(snap_info.commit_hash)
self._redo_stack.clear()
except Exception as e:
logging.debug(f"Pre-autonomous snapshot failed: {e}")

Expand Down Expand Up @@ -4304,8 +4410,9 @@ def _build_system_prompt(self, tools=None):
tools_key = self._get_tools_cache_key(tools)
cache_key = f"{self.role}:{self.goal}:{tools_key}"

if cache_key in self._system_prompt_cache:
return self._system_prompt_cache[cache_key]
cached_prompt = self._cache_get(self._system_prompt_cache, cache_key)
if cached_prompt is not None:
return cached_prompt
else:
cache_key = None # Don't cache when memory is enabled

Expand Down Expand Up @@ -4371,9 +4478,9 @@ def _build_system_prompt(self, tools=None):
system_prompt += "\n\nExplain Before Acting: Before calling a tool, provide a brief one-sentence explanation of what you are about to do and why. Skip explanations only for repetitive low-level operations where narration would be noisy. When performing a batch of similar operations (e.g. searching for multiple items), explain the group once rather than narrating each call individually."

# Cache the generated system prompt (only if cache_key is set, i.e., memory not enabled)
# Simple cache size limit to prevent unbounded growth
if cache_key and len(self._system_prompt_cache) < self._max_cache_size:
self._system_prompt_cache[cache_key] = system_prompt
# Use LRU eviction to prevent unbounded growth
if cache_key:
self._cache_put(self._system_prompt_cache, cache_key, system_prompt)
return system_prompt

def _build_response_format(self, schema_model):
Expand Down Expand Up @@ -4567,8 +4674,9 @@ def _format_tools_for_completion(self, tools=None):

# Check cache first
tools_key = self._get_tools_cache_key(tools)
if tools_key in self._formatted_tools_cache:
return self._formatted_tools_cache[tools_key]
cached_tools = self._cache_get(self._formatted_tools_cache, tools_key)
if cached_tools is not None:
return cached_tools

formatted_tools = []
for tool in tools:
Expand Down Expand Up @@ -4619,10 +4727,8 @@ def _format_tools_for_completion(self, tools=None):
logging.error(f"Tools are not JSON serializable: {e}")
return []

# Cache the formatted tools
# Simple cache size limit to prevent unbounded growth
if len(self._formatted_tools_cache) < self._max_cache_size:
self._formatted_tools_cache[tools_key] = formatted_tools
# Cache the formatted tools with LRU eviction
self._cache_put(self._formatted_tools_cache, tools_key, formatted_tools)
return formatted_tools

def generate_task(self) -> 'Task':
Expand Down Expand Up @@ -6279,12 +6385,9 @@ def _chat_impl(self, prompt, temperature, tools, output_json, output_pydantic, r
# Extract text from multimodal prompts
normalized_content = next((item["text"] for item in prompt if item.get("type") == "text"), "")

# Prevent duplicate messages
if not (self.chat_history and
self.chat_history[-1].get("role") == "user" and
self.chat_history[-1].get("content") == normalized_content):
# Add user message to chat history BEFORE LLM call so handoffs can access it
self.chat_history.append({"role": "user", "content": normalized_content})
# Add user message to chat history BEFORE LLM call so handoffs can access it
# Use atomic check-then-act to prevent TOCTOU race conditions
if self._add_to_chat_history_if_not_duplicate("user", normalized_content):
# Persist user message to DB
self._persist_message("user", normalized_content)

Expand Down Expand Up @@ -6334,7 +6437,7 @@ def _chat_impl(self, prompt, temperature, tools, output_json, output_pydantic, r

response_text = self.llm_instance.get_response(**llm_kwargs)

self.chat_history.append({"role": "assistant", "content": response_text})
self._add_to_chat_history("assistant", response_text)
# Persist assistant message to DB
self._persist_message("assistant", response_text)

Expand Down Expand Up @@ -8595,12 +8698,12 @@ async def handle_agent_query(request: Request, query_data: Optional[AgentQuery]

print(f"🚀 Agent '{self.name}' available at http://{host}:{port}")

# Start the server if it's not already running for this port
# Check and mark server as started atomically to prevent race conditions
should_start = not _server_started.get(port, False)
if should_start:
_server_started[port] = True

# Server start/wait outside the lock to avoid holding it during sleep
# Server start/wait outside the lock to avoid holding it during sleep
if should_start:
Comment on lines +8701 to 8707
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The port can be marked started before any server is actually available.

_server_started[port] flips before the background thread is created and long before uvicorn is listening. A concurrent launch() will then skip startup even if thread creation or server boot fails, leaving that port wedged until restart. The shared _registered_agents / _shared_apps maps are also still read outside _server_lock, so the global launch state remains racy. Based on learnings: Never maintain shared mutable global state between agents; use explicit channels (EventBus, handoff) for multi-agent communication.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai-agents/praisonaiagents/agent/agent.py` around lines 8673 -
8679, The code sets _server_started[port] = True before the background server
actually begins, causing race conditions in launch() and leaving ports wedged if
thread/startup fails; fix by only marking the port started after the server is
confirmed listening (e.g., set _server_started[port] = True inside the
background thread once uvicorn reports ready or after a readiness Event is set),
ensure the background thread communicates success/failure back to the launcher
via a threading.Event or similar handoff, and also protect all accesses to
shared maps _registered_agents and _shared_apps with _server_lock (or eliminate
these globals and use explicit event/hand-off channels) so reads/writes are
atomic and the global launch state is not racy.

# Start the server in a separate thread
def run_server():
Expand Down