Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/backend/base/langflow/api/v1/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
handle_mcp_errors,
handle_read_resource,
)
from langflow.services.deps import get_settings_service

router = APIRouter(prefix="/mcp", tags=["mcp"])

Expand Down Expand Up @@ -159,6 +160,8 @@ async def handle_messages(request: Request):
################################################################################
# Streamable HTTP Transport
################################################################################


class StreamableHTTP:
def __init__(self):
self.session_manager: StreamableHTTPSessionManager | None = None
Expand All @@ -185,14 +188,17 @@ async def _start_session_manager(self) -> None:
self._mgr_ready.set() # type: ignore[union-attr] # unblock listeners
self._started = False

async def start(self, *, stateless: bool = True) -> None:
async def start(self) -> None:
"""Create and enter the Streamable HTTP session manager lifecycle."""
async with self._start_stop_lock:
if self._started:
await logger.adebug("Streamable HTTP session manager already running; skipping start")
return
try:
self.session_manager = StreamableHTTPSessionManager(server, stateless=stateless)
settings = get_settings_service().settings
self.session_manager = StreamableHTTPSessionManager(
app=server, stateless=settings.mcp_streamable_http_stateless
)
self._mgr_ready = asyncio.Event()
self._mgr_close = asyncio.Event()
self._mgr_task = asyncio.create_task(self._start_session_manager())
Expand Down Expand Up @@ -237,8 +243,8 @@ def _cleanup(self) -> None:
_streamable_http = StreamableHTTP()


async def start_streamable_http_manager(stateless: bool = True) -> None: # noqa: FBT001, FBT002
await _streamable_http.start(stateless=stateless)
async def start_streamable_http_manager() -> None:
await _streamable_http.start()


def get_streamable_http_manager() -> StreamableHTTPSessionManager:
Expand Down
6 changes: 4 additions & 2 deletions src/backend/base/langflow/api/v1/mcp_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,8 +1205,10 @@ class ProjectMCPServer:
def __init__(self, project_id: UUID):
self.project_id = project_id
self.server = Server(f"langflow-mcp-project-{project_id}")
# TODO: implement an environment variable to enable/disable stateless mode
self.session_manager = StreamableHTTPSessionManager(self.server, stateless=True)
settings = get_settings_service().settings
self.session_manager = StreamableHTTPSessionManager(
app=self.server, stateless=settings.mcp_streamable_http_stateless
)
# since we lazily initialize the session manager's lifecycle
# via .run(), which can only be called once, otherwise an error is raised,
# we use the lock to prevent race conditions on concurrent requests to prevent such an error
Expand Down
2 changes: 1 addition & 1 deletion src/lfx/src/lfx/_assets/component_index.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/lfx/src/lfx/services/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ class Settings(BaseSettings):
"""If set to False, Langflow will not start the MCP Composer service."""
mcp_composer_version: str = "==0.1.0.8.10"
"""Version constraint for mcp-composer when using uvx. Uses PEP 440 syntax."""
mcp_streamable_http_stateless: bool = True
"""If set to True, the Langflow MCP server will use its Streamable HTTP Session Manager in stateless mode."""

# Agentic Experience
agentic_experience: bool = False
Expand Down
Loading