Implement SSE backpressure to prevent memory exhaustion from slow SSE clients.
Problem
Location: mcpgateway/transports/sse_transport.py line 112
class SSETransport:
def __init__(self, base_url: str = None):
self._base_url = base_url or f"http://{settings.host}:{settings.port}"
self._connected = False
self._message_queue = asyncio.Queue() # Unbounded - no maxsize!
self._client_gone = asyncio.Event()
self._session_id = str(uuid.uuid4())
# Line 215: Message sending
async def send_message(self, message: dict) -> None:
await self._message_queue.put(message) # Always succeeds, never blocks
Issues
- Unbounded queue - No limit on queued messages
- No slow client detection - Can't identify problematic clients
- No backpressure - Producers never slowed
- Memory exhaustion - Slow clients accumulate messages indefinitely
Memory Impact
| Slow Clients |
Messages/Client |
Memory Growth |
| 1 |
10,000 |
~10MB |
| 10 |
10,000 |
~100MB |
| 100 |
10,000 |
~1GB |
Proposed Solution
Bounded Queue with Backpressure
class SSETransport:
def __init__(self, max_queue_size: int = 1000):
self._message_queue = asyncio.Queue(maxsize=max_queue_size)
self._client_slow = False
async def send_message(self, message, timeout: float = 5.0):
try:
await asyncio.wait_for(
self._message_queue.put(message),
timeout=timeout
)
except asyncio.TimeoutError:
self._client_slow = True
raise SlowClientError(f"Client queue full after {timeout}s")
Configuration
SSE_MAX_QUEUE_SIZE=1000
SSE_QUEUE_TIMEOUT=5.0
SSE_SLOW_CLIENT_DISCONNECT=true
Acceptance Criteria
Implement SSE backpressure to prevent memory exhaustion from slow SSE clients.
Problem
Location:
mcpgateway/transports/sse_transport.pyline 112Issues
Memory Impact
Proposed Solution
Bounded Queue with Backpressure
Configuration
Acceptance Criteria
make verify