Skip to content

[Bug]: workflow_as_mcp captures a single Workflow instance, sharing self.* state across every MCP client #22071

Description

@Fr3ya

Bug Description

workflow_as_mcp in llama-index-tools-mcp/llama_index/tools/mcp/utils.py takes a workflow: Workflow argument, registers an @app.tool(...) closure over it, and calls workflow.run(...) on that exact instance for every MCP client request:

def workflow_as_mcp(workflow: Workflow, ...) -> FastMCP:
    app = FastMCP(**fastmcp_init_kwargs)
    ...
    @app.tool(name=workflow_name, description=workflow_description)
    async def _workflow_tool(run_args, context):
        ...
        handler = workflow.run(start_event=start_event)   # closed-over instance
        ...

Workflow.run() gives each call its own Context, but anything user code stores on self.* (a counter, a cache, a memory store, a tenant id, a cached LLM client) is shared across every MCP client of the resulting server. The sibling helper AGUIWorkflowRouter takes a workflow_factory: Callable[[], Workflow] and constructs a fresh Workflow per request precisely to avoid this; workflow_as_mcp gives the operator no way to opt into per-request isolation. This matters because the documented LlamaIndex Workflow subclassing pattern parks things on self.llm, self.tools, etc. — so user workflows commonly carry mutable instance state.

Version

llama-index-tools-mcp==0.4.8

Steps to Reproduce

import asyncio
from llama_index.core.workflow import Context, StartEvent, StopEvent, Workflow, step
from llama_index.tools.mcp.utils import workflow_as_mcp

class TenantStart(StartEvent):
    tenant_id: str

class CountingWorkflow(Workflow):
    def __init__(self, **kw):
        super().__init__(**kw)
        self.call_count = 0
        self.history    = []

    @step
    async def echo(self, ctx: Context, ev: TenantStart) -> StopEvent:
        self.call_count += 1
        self.history.append(ev.tenant_id)
        return StopEvent(result={
            "call_index": self.call_count,
            "this_tenant": ev.tenant_id,
            "history_visible": list(self.history),
        })

async def main():
    wf  = CountingWorkflow(timeout=30)
    app = workflow_as_mcp(wf)
    # Two MCP clients hit the same server.
    a = await app.call_tool("CountingWorkflow", {"run_args": {"tenant_id": "alice"}})
    b = await app.call_tool("CountingWorkflow", {"run_args": {"tenant_id": "bob"}})
    print("alice:", a[1] if isinstance(a, tuple) else a)
    print("bob  :", b[1] if isinstance(b, tuple) else b)
    print("workflow.history (shared instance):", wf.history)

asyncio.run(main())
# CONFIRMED BUG: workflow.history shows both alice and bob; bob's reply
# contains alice in history_visible.

Relevant Logs/Tracebacks

alice: {'call_index': 1, 'this_tenant': 'alice', 'history_visible': ['alice']}
bob  : {'call_index': 2, 'this_tenant': 'bob',   'history_visible': ['alice', 'bob']}
workflow.history (shared instance): ['alice', 'bob']


Bob's MCP response leaks alice's tenant identity, and `workflow.call_count` / `workflow.history` accumulate across all callers. The concurrent variant (`asyncio.gather(...)`) shows the same shared-instance interleaving.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingtriageIssue needs to be triaged/prioritized

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions