Skip to content

Race conditions in lazy async primitive init, hard-capped parallel workers, and silent exception swallowing in praisonaiagents core SDK #1475

@MervinPraison

Description

@MervinPraison

Summary

In-depth analysis of src/praisonai-agents/praisonaiagents uncovered three architectural gaps that violate the project's stated principles of being multi-agent safe, async-safe, and production-ready. Each is validated with exact file paths, line numbers, and code.


Gap 1: Systemic Race Conditions in Lazy Async Primitive Initialization

Problem

Six or more locations across the core SDK lazily initialize asyncio.Lock() or asyncio.Semaphore() using the pattern:

if self._lock is None:
    self._lock = asyncio.Lock()
return self._lock

This has no synchronization. When two concurrent async tasks (or threads) call the method simultaneously, both pass the None check, both create separate primitives, and only the last assignment persists — the other caller holds a reference to a discarded lock that protects nothing.

Affected Locations

File Lines Method Primitive
process/process.py 67–71 _get_state_lock() asyncio.Lock
llm/rate_limiter.py 110–114 _get_lock() asyncio.Lock
llm/rate_limiter.py 116–120 _get_api_tokens_lock() asyncio.Lock
agent/handoff.py 465–466 Inline in execute() asyncio.Semaphore (class-level!)
agents/delegator.py 157–158 _get_semaphore() asyncio.Semaphore
background/runner.py 59–63 _get_semaphore() asyncio.Semaphore
storage/base.py 445–450 _get_async_lock() asyncio.Lock

Worst Case: handoff.py:465-466 (Class-Level Variable)

# agent/handoff.py lines 465-466
if self.config.max_concurrent > 0 and Handoff._semaphore is None:
    Handoff._semaphore = asyncio.Semaphore(self.config.max_concurrent)

This is a class-level variable (Handoff._semaphore) with zero synchronization. Multiple concurrent handoff calls from different agents will race to initialize it — each creating a separate Semaphore with potentially different max_concurrent values. Only the last write wins, and earlier callers hold references to orphaned semaphores.

Another Example: process/process.py:67-71

# process/process.py lines 67-71
async def _get_state_lock(self):
    """Get or create the async state lock (must be called from async context)."""
    if self._state_lock is None:          # Task A checks: None ✓
        # Task A suspended here by event loop
        # Task B checks: still None ✓
        self._state_lock = asyncio.Lock()  # Task A creates Lock #1
        # Task B creates Lock #2, overwrites Lock #1
    return self._state_lock                # Task A returns Lock #1, Task B returns Lock #2

Note: process.py already has a _state_lock_init = threading.Lock() (line 49) that was clearly intended for this purpose but is never used in _get_state_lock().

Suggested Fix

Use double-checked locking with the existing threading lock (for process.py), or add one where missing:

# process/process.py — fixed
async def _get_state_lock(self):
    if self._state_lock is None:
        with self._state_lock_init:          # Use the existing threading.Lock
            if self._state_lock is None:     # Double-check after acquiring
                self._state_lock = asyncio.Lock()
    return self._state_lock

For all other locations, add a threading.Lock guard:

# Generic fix pattern for rate_limiter.py, delegator.py, runner.py, storage/base.py
_init_lock = threading.Lock()

def _get_lock(self) -> asyncio.Lock:
    if self._lock is None:
        with self._init_lock:
            if self._lock is None:
                self._lock = asyncio.Lock()
    return self._lock

For handoff.py class-level semaphore:

# agent/handoff.py — fixed
_semaphore_lock = threading.Lock()

# In execute():
if self.config.max_concurrent > 0 and Handoff._semaphore is None:
    with Handoff._semaphore_lock:
        if Handoff._semaphore is None:
            Handoff._semaphore = asyncio.Semaphore(self.config.max_concurrent)

This pattern is already used correctly in agent/agent.py (lines 258–298) for _agent_counter and _env_cache_lock. The fix is to apply the same established pattern consistently.


Gap 2: Workflow Parallel Execution Hard-Capped at 3 Workers, Silently Ignoring User Configuration

Problem

The Parallel workflow pattern and Loop(parallel=True) both hard-cap concurrency to 3 workers maximum, silently overriding any user-specified max_workers. The Parallel class doesn't even accept a max_workers parameter.

Evidence

Parallel class — no concurrency control at all (workflows.py:186-200):

@dataclass  
class Parallel:
    """Execute multiple steps concurrently and combine results."""
    steps: List = field(default_factory=list)
    
    def __init__(self, steps: List):
        self.steps = steps
    # No max_workers parameter exists

_execute_parallel() — hard-coded cap (workflows.py:2350):

effective_workers = min(3, len(parallel_step.steps))  # Cap at 3 to prevent rate limits

No way for users to override this. Even if you have 10 independent steps doing non-LLM work (file processing, API calls), you're limited to 3.

Loop.max_workers — silently overridden (workflows.py:2439):

max_workers = min(loop_step.max_workers or num_items, 3)  # Cap at 3 to prevent rate limits

The Loop class documents max_workers=4 as a valid example (line 227–228):

# Parallel with limited workers
workflow = Workflow(
    steps=[loop(processor, over="items", parallel=True, max_workers=4)],  # <-- User sets 4
    variables={"items": ["a", "b", "c"]}
)

But the implementation silently reduces it to 3. The user's max_workers=4 is accepted without error or warning, then ignored.

Method docstring is stale (workflows.py:2315):

def _execute_parallel(self, ...):
    """Execute steps in parallel (simulated with sequential for now)."""
    # ^ This docstring says "simulated with sequential" but implementation
    #   actually uses ThreadPoolExecutor — the docstring is stale/misleading

Suggested Fix

  1. Add max_workers parameter to Parallel class:
@dataclass
class Parallel:
    steps: List = field(default_factory=list)
    max_workers: Optional[int] = None  # None = use system default
  1. Make the cap configurable with a sensible default, and warn when user config is overridden:
import logging
logger = logging.getLogger(__name__)

DEFAULT_MAX_PARALLEL_WORKERS = 3

def _execute_parallel(self, parallel_step, ...):
    user_max = getattr(parallel_step, 'max_workers', None)
    system_default = DEFAULT_MAX_PARALLEL_WORKERS
    
    if user_max is not None:
        effective_workers = min(user_max, len(parallel_step.steps))
        if user_max > system_default:
            logger.info(
                f"Parallel max_workers={user_max} exceeds default {system_default}. "
                f"Consider rate limiting if using LLM-backed agents."
            )
    else:
        effective_workers = min(system_default, len(parallel_step.steps))
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=effective_workers) as executor:
        ...
  1. Apply the same pattern to _execute_loop:
def _execute_loop(self, loop_step, ...):
    if loop_step.parallel and num_items > 1:
        user_max = loop_step.max_workers
        if user_max is not None:
            max_workers = min(user_max, num_items)
        else:
            max_workers = min(DEFAULT_MAX_PARALLEL_WORKERS, num_items)
  1. Fix the stale docstring on _execute_parallel (line 2315).

Gap 3: Silent Exception Swallowing in Hook Execution and LLM Response Handling

Problem

Critical execution paths use bare except Exception: pass that silently discard errors. This violates the project principle of execute + verify mode ("no guessing — proof of done required"). When hooks fail silently, users have no way to know their lifecycle callbacks never ran. When response parsing fails silently, the agent may proceed with corrupt or missing data.

Location 1: Hook Execution — chat_mixin.py:511-524

# chat_mixin.py lines 511-524
if _compactor.needs_compaction(messages):
    try:
        self._hook_runner.execute_sync(_HookEvent.BEFORE_COMPACTION, None)
    except Exception:
        pass                    # <-- Hook failure silently swallowed
    compacted_msgs, _cr = _compactor.compact(messages)
    messages[:] = compacted_msgs
    # ...
    try:
        self._hook_runner.execute_sync(_HookEvent.AFTER_COMPACTION, None)
    except Exception:
        pass                    # <-- Hook failure silently swallowed

If BEFORE_COMPACTION or AFTER_COMPACTION hooks fail, the user's registered callback is silently skipped. Hooks are an explicit contract — "run my code at this lifecycle point" — and silently breaking that contract can cause data integrity issues in production (e.g., a hook that logs compaction events for compliance).

Location 2: LLM Response Extraction — chat_mixin.py:460-478

# chat_mixin.py lines 460-478
def _extract_llm_response_content(self, response) -> Optional[str]:
    """Return assistant message text, a tool-call summary, or str(response) as fallback."""
    if not response:
        return None
    try:
        if hasattr(response, "choices") and response.choices:
            choice = response.choices[0]
            msg = getattr(choice, "message", None)
            if msg is not None:
                content = getattr(msg, "content", None)
                if content:
                    return content
                tool_calls = getattr(msg, "tool_calls", None) or []
                if tool_calls:
                    names = [getattr(tc.function, "name", "?") for tc in tool_calls]
                    return f"[tool_calls: {', '.join(names)}]"
    except (AttributeError, IndexError, TypeError):
        pass                    # <-- Response parsing error silently swallowed
    return str(response)        # Falls back to raw str() of entire response object

If the response object has an unexpected structure (e.g., a new LLM provider format), the exception is silently caught and str(response) is returned — which could be a massive, unparseable string like <ModelResponse object at 0x...> that gets injected into the agent's context, polluting subsequent reasoning.

Location 3: Structured Output Capability Check — chat_mixin.py:234-238

# chat_mixin.py lines 234-238
def _supports_native_structured_output(self):
    try:
        from ..llm.model_capabilities import supports_structured_outputs
        return supports_structured_outputs(self.llm)
    except Exception:
        return False            # <-- Returns wrong default on import/runtime errors

An ImportError or any runtime error (e.g., malformed model string) silently falls back to False, causing the agent to use the slower text-based JSON extraction path even when the model actually supports native structured output. This is a silent performance degradation that's impossible to diagnose without stepping through code.

Suggested Fix

Replace bare pass with at minimum logging.debug(), and for hooks specifically, let users opt into strict mode:

# chat_mixin.py — hook execution fix
try:
    self._hook_runner.execute_sync(_HookEvent.BEFORE_COMPACTION, None)
except Exception as e:
    logging.warning(f"BEFORE_COMPACTION hook failed: {e}")
    if getattr(self, '_strict_hooks', False):
        raise
# chat_mixin.py — response extraction fix
except (AttributeError, IndexError, TypeError) as e:
    logging.warning(
        f"Failed to extract LLM response content (falling back to str): {e}"
    )
    # Fallback to str(response) is still fine, but now it's visible
# chat_mixin.py — capability check fix
except ImportError:
    return False  # Module genuinely not available — acceptable
except Exception as e:
    logging.warning(f"Structured output capability check failed: {e}")
    return False

Impact

Gap Impact Conditions
Race conditions in async primitives Two tasks bypass the same lock, causing concurrent state mutation → data corruption Multi-agent workflows, parallel handoffs, concurrent rate-limited calls
Hard-capped parallel workers User-configured concurrency silently reduced to 3, no override possible Any workflow with >3 parallel steps or parallel loops
Silent exception swallowing Hook contracts broken silently, response parsing failures hidden, performance silently degraded Production deployments relying on hooks for compliance/logging, non-standard LLM providers

All three gaps compound in production multi-agent scenarios: agents race through unprotected locks, are bottlenecked by an invisible concurrency cap, and failures along the way are silently swallowed — making debugging nearly impossible.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingclaudeAuto-trigger Claude analysisdocumentationImprovements or additions to documentationperformance

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions