Skip to content

fix(concurrency): isolate per-session state for concurrent agent safety#400

Open
oysteinkrog wants to merge 2 commits intoBeehiveInnovations:mainfrom
oysteinkrog:fix/concurrent-agent-safety
Open

fix(concurrency): isolate per-session state for concurrent agent safety#400
oysteinkrog wants to merge 2 commits intoBeehiveInnovations:mainfrom
oysteinkrog:fix/concurrent-agent-safety

Conversation

@oysteinkrog
Copy link
Copy Markdown

Summary

Fixes #395 — ConsensusTool (and all workflow tools) share mutable instance state across concurrent callers on the singleton tool instances, causing one agent's data to silently overwrite another's.

  • tools/workflow/workflow_mixin.py: Add asyncio.Lock per tool instance; split execute_workflow into a locking wrapper + _execute_workflow_locked inner method so concurrent calls to the same tool type are serialized. Update store_initial_issue base signature to accept an optional continuation_id parameter.

  • tools/consensus.py: Add a per-session _sessions: dict[str, dict] keyed by continuation_id to fully isolate original_proposal, models_to_consult, and accumulated_responses per caller. Split execute_workflow into a locking wrapper + _execute_consensus_workflow_locked (calls super()._execute_workflow_locked to avoid re-entrant deadlock on the same asyncio.Lock). Clean up the session dict on final step completion.

  • tools/{analyze,codereview,planner,precommit,refactor,secaudit,testgen,tracer}.py: Add continuation_id: str | None = None to their store_initial_issue overrides to match the updated base class signature.

Root Cause

All tools are instantiated once in server.py as singletons (TOOLS = {"chat": ChatTool(), ...}). The asyncio event loop is single-threaded, but at every await yield point (waiting on model API responses) a second caller can enter the same method and overwrite self.original_proposal, self.work_history, etc. on the shared instance.

Test plan

  • All 866 existing unit tests pass (python -m pytest tests/ -v -m "not integration" — 866 passed, 4 skipped)
  • ./code_quality_checks.sh passes (ruff, black, isort, full unit suite)
  • Verify two concurrent consensus calls with different proposals do not corrupt each other (manual/integration test)

🤖 Generated with Claude Code

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @oysteinkrog, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the concurrency safety of workflow tools, particularly the ConsensusTool, by addressing a critical issue where shared mutable instance state could be silently overwritten by concurrent agent calls. It achieves this by introducing a per-instance asyncio.Lock to serialize access to shared state and implementing a robust per-session state isolation mechanism using continuation_id for workflow-specific data. These changes ensure that multiple concurrent workflows can execute without data corruption, improving the reliability and scalability of the system.

Highlights

  • Concurrency Safety for Workflow Tools: Introduced a per-instance asyncio.Lock in the base WorkflowTool to serialize concurrent calls to the same tool singleton, preventing corruption of shared mutable state like work_history and consolidated_findings.
  • Per-Session State Isolation in ConsensusTool: Implemented a _sessions dictionary in ConsensusTool, keyed by continuation_id, to provide isolated storage for original_proposal, models_to_consult, and accumulated_responses, ensuring concurrent consensus workflows do not overwrite each other's data.
  • Refactored Workflow Execution: Split the execute_workflow method in WorkflowTool into a public locking wrapper and a private _execute_workflow_locked inner method, ensuring that the concurrency lock is acquired before executing the core workflow logic.
  • Updated store_initial_issue Signature: Modified the store_initial_issue method signature across all workflow tools to accept an optional continuation_id parameter, allowing subclasses like ConsensusTool to utilize per-session storage.
  • Session Cleanup: Added logic to ConsensusTool to clean up per-session state from the _sessions dictionary once a workflow completes, freeing up memory.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • tools/analyze.py
    • Updated store_initial_issue method signature to accept an optional continuation_id parameter.
  • tools/codereview.py
    • Updated store_initial_issue method signature to accept an optional continuation_id parameter.
  • tools/consensus.py
    • Initialized a _sessions dictionary for managing per-session state.
    • Added _get_session and _cleanup_session helper methods for session management.
    • Modified handle_work_completion to retrieve accumulated_responses from the session.
    • Modified handle_work_continuation to retrieve models_to_consult from the session.
    • Wrapped execute_workflow with _concurrency_lock and introduced _execute_consensus_workflow_locked for inner logic.
    • Updated store_initial_issue to store original_proposal in the session and accept continuation_id.
    • Adjusted _consult_model, customize_workflow_response, and _customize_consensus_metadata to use session-isolated data.
    • Ensured session cleanup upon workflow completion.
  • tools/planner.py
    • Updated store_initial_issue method signature to accept an optional continuation_id parameter.
  • tools/precommit.py
    • Updated store_initial_issue method signature to accept an optional continuation_id parameter.
  • tools/refactor.py
    • Updated store_initial_issue method signature to accept an optional continuation_id parameter.
  • tools/secaudit.py
    • Updated store_initial_issue method signature to accept an optional continuation_id parameter.
  • tools/testgen.py
    • Updated store_initial_issue method signature to accept an optional continuation_id parameter.
  • tools/tracer.py
    • Updated store_initial_issue method signature to accept an optional continuation_id parameter.
  • tools/workflow/workflow_mixin.py
    • Imported the asyncio module.
    • Added an _concurrency_lock (an asyncio.Lock) to the __init__ method.
    • Refactored execute_workflow to acquire the _concurrency_lock and call a new _execute_workflow_locked method.
    • Defined _execute_workflow_locked as the inner implementation of the workflow execution.
    • Modified the base store_initial_issue method to accept an optional continuation_id parameter.
Activity
  • No human activity (comments, reviews, etc.) has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 14914b5f58

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread tools/consensus.py Outdated
Comment on lines +588 to +589
# Workflow complete - release per-session state to free memory.
self._cleanup_session(continuation_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Move session cleanup after response customization

In the final consensus step, session state is deleted before customize_workflow_response() and _customize_consensus_metadata() run. Those methods call _get_session(request.continuation_id), which recreates a fresh empty session, so the final payload can lose accumulated_responses and report empty/zero model metadata (models_consulted, total_models) despite a completed workflow. This happens whenever a normal multi-step consensus run reaches its last step with a continuation_id.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a concurrency issue in workflow tools by introducing an asyncio.Lock for serialization and, in the ConsensusTool, implementing per-session state management to ensure thread safety. The changes are well-structured, splitting execute_workflow into a locking wrapper and an inner method, and isolating session-specific data in ConsensusTool. My review focuses on refining the implementation of session isolation. Specifically, I've suggested making updates to legacy instance variables conditional to prevent session state from leaking into the shared tool instance, which will make the concurrency fix more robust and less confusing. I also recommended clarifying a comment in _get_session to better explain its behavior for the legacy path. Overall, this is a solid improvement for the stability of concurrent agent operations.

Comment thread tools/consensus.py Outdated
Comment on lines +519 to +521
# Keep legacy instance vars in sync (single-caller / no-continuation_id path).
self.models_to_consult = session["models_to_consult"]
self.accumulated_responses = session["accumulated_responses"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

These lines unconditionally update instance variables (self.models_to_consult, self.accumulated_responses) with session-specific data. While the lock prevents race conditions, this pollutes the shared instance state with data from a specific session, which undermines the goal of session isolation.

The comment indicates this is for the "single-caller / no-continuation_id path". To align with that and ensure true session isolation, these updates should be conditional.

            # Keep legacy instance vars in sync (single-caller / no-continuation_id path).
            if not continuation_id:
                self.models_to_consult = session["models_to_consult"]
                self.accumulated_responses = session["accumulated_responses"]

Comment thread tools/consensus.py
Comment on lines +895 to 903
def store_initial_issue(self, step_description: str, continuation_id: str | None = None):
"""Store initial prompt for model consultations."""
# Write into the per-session dict so concurrent callers stay isolated.
session = self._get_session(continuation_id)
session["original_proposal"] = step_description
# Also keep legacy instance vars for any code that reads them directly.
self.original_proposal = step_description
self.initial_prompt = step_description # Keep for backward compatibility

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to another comment, updating self.original_proposal and self.initial_prompt unconditionally pollutes the shared instance state with data from a specific session. This goes against the principle of session isolation you're introducing.

The comment "Also keep legacy instance vars for any code that reads them directly" suggests this is for backward compatibility. Therefore, it should only apply to the legacy path where continuation_id is not present.

    def store_initial_issue(self, step_description: str, continuation_id: str | None = None):
        """Store initial prompt for model consultations."""
        # Write into the per-session dict so concurrent callers stay isolated.
        session = self._get_session(continuation_id)
        session["original_proposal"] = step_description
        # Also keep legacy instance vars for any code that reads them directly.
        if not continuation_id:
            self.original_proposal = step_description
            self.initial_prompt = step_description  # Keep for backward compatibility

Comment thread tools/consensus.py
Comment on lines +157 to +162
"""Return the per-session state dict for *continuation_id*.

When no continuation_id is provided (legacy / single-caller path) a
transient dict backed by the instance variables is returned so the
rest of the code works without special-casing.
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment "callers can read/write through the same interface" is a bit misleading for the legacy path (continuation_id is None). The returned dictionary is a new dictionary containing references to the instance's mutable attributes (like lists).

  • In-place modifications to these mutable objects (e.g., session['accumulated_responses'].append(...)) will be reflected on the instance.
  • However, re-assigning a key in the returned dictionary (e.g., session['original_proposal'] = 'new') will not update the corresponding instance attribute.

This subtlety can be error-prone. While the current code seems to handle it correctly by manually syncing instance attributes, clarifying the comment would prevent future confusion.

        """Return the per-session state dict for *continuation_id*.

        When no continuation_id is provided (legacy / single-caller path), a
        transient dict is returned. Note that only in-place modifications to
        mutable objects (like lists) in this dict will affect the instance state.
        Direct assignments to keys will not be persisted back to the instance.
        """

@oysteinkrog
Copy link
Copy Markdown
Author

All four review comments addressed in commit 68a7b14:

Codex P2 — cleanup before customize: Moved _cleanup_session() to after customize_workflow_response() and _add_workflow_metadata(). Both methods read the session's models_to_consult / accumulated_responses to build final metadata; cleaning up before them caused _get_session() to recreate a fresh empty session, losing the model list.

Gemini high (×2) — unconditional instance-var pollution: Wrapped both sync sites behind if not continuation_id::

  • self.models_to_consult / self.accumulated_responses in _execute_consensus_workflow_locked
  • self.original_proposal / self.initial_prompt in store_initial_issue

When a continuation_id is present (multi-agent path), one caller's session data must never overwrite the shared singleton state.

Gemini medium — _get_session docstring: Updated to clarify the snapshot semantics: in-place mutations on mutable values (lists) propagate to the instance, but scalar key reassignments do not — callers must sync those manually via the if not continuation_id: guards (which all do).

Own review notes (for transparency):

  • Session leak on exception: if _consult_model raises, the session is never cleaned. Acceptable for this PR; try/finally cleanup can be a follow-up.
  • No MAX_SESSIONS cap: sessions are keyed by UUID so collisions are impossible, but abandoned sessions won't be evicted. Fine for now given the asyncio.Lock serializes callers and sessions are short-lived in practice.

oysteinkrog and others added 2 commits February 18, 2026 14:29
Pre-existing files that needed reformatting to satisfy the CI black check
after upgrading to black 26.1.0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Fixes BeehiveInnovations#395. All workflow tools share a single singleton instance in
server.py. When multiple agents call the same tool concurrently, they
race on shared self.* mutable state at asyncio await yield points.

Changes:
- workflow_mixin.py: add asyncio.Lock per tool instance; split
  execute_workflow into _execute_workflow_locked so subclasses can
  call the locked body without re-acquiring the lock
- consensus.py: add _sessions dict keyed by continuation_id to isolate
  original_proposal, models_to_consult, accumulated_responses per caller;
  restore work_history/consolidated_findings from conversation memory on
  continuation steps (base class logic that the custom consensus loop
  bypassed); fix off-by-one in customize_workflow_response status
  condition (step_number < total_steps, not < total_steps - 1)
- all other workflow tools (analyze, codereview, planner, precommit,
  refactor, secaudit, testgen, tracer): update store_initial_issue call
  signature to match updated base class
@oysteinkrog oysteinkrog force-pushed the fix/concurrent-agent-safety branch from f770f5e to 86ea859 Compare February 18, 2026 13:30
jgautheron added a commit to jgautheron/pal-mcp-server that referenced this pull request Apr 7, 2026
…ty (from PR BeehiveInnovations#400)

Adds asyncio.Lock per tool instance and splits execute_workflow into
a locking wrapper to prevent concurrent callers from silently
overwriting each other's state.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bug: ConsensusTool concurrent callers overwrite each other's proposal state (singleton mutable state)

1 participant