Skip to content

Commit 46ef080

Browse files
committed
fix: harden Rust runtime fail-closed handling
Signed-off-by: Mihai Criveti <crivetimihai@gmail.com>
1 parent 270db5e commit 46ef080

File tree

7 files changed

+235
-17
lines changed

7 files changed

+235
-17
lines changed

mcpgateway/transports/rust_mcp_runtime_proxy.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ async def handle_streamable_http(self, scope: Scope, receive: Receive, send: Sen
9696
content=_stream_request_body(receive) if method == "POST" else b"",
9797
headers=headers,
9898
timeout=timeout,
99+
follow_redirects=False,
99100
) as response:
100101
await send(
101102
{
@@ -144,7 +145,7 @@ async def _get_runtime_client(self) -> httpx.AsyncClient:
144145
transport=httpx.AsyncHTTPTransport(uds=uds_path),
145146
limits=get_http_limits(),
146147
timeout=httpx.Timeout(settings.experimental_rust_mcp_runtime_timeout_seconds),
147-
follow_redirects=True,
148+
follow_redirects=False,
148149
)
149150
return self._uds_client
150151

mcpgateway/transports/streamablehttp_transport.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None)
455455
"ttlSeconds": self.ttl,
456456
},
457457
timeout=httpx.Timeout(settings.experimental_rust_mcp_runtime_timeout_seconds),
458+
follow_redirects=False,
458459
)
459460
response.raise_for_status()
460461
payload = response.json()
@@ -481,6 +482,7 @@ async def replay_events_after(self, last_event_id: EventId, send_callback: Event
481482
"keyPrefix": self.key_prefix,
482483
},
483484
timeout=httpx.Timeout(settings.experimental_rust_mcp_runtime_timeout_seconds),
485+
follow_redirects=False,
484486
)
485487
response.raise_for_status()
486488
payload = response.json()
@@ -515,7 +517,7 @@ async def _get_rust_event_store_client() -> httpx.AsyncClient:
515517
transport=httpx.AsyncHTTPTransport(uds=uds_path),
516518
limits=get_http_limits(),
517519
timeout=httpx.Timeout(settings.experimental_rust_mcp_runtime_timeout_seconds),
518-
follow_redirects=True,
520+
follow_redirects=False,
519521
)
520522
return _rust_event_store_client
521523

tests/unit/mcpgateway/transports/test_rust_mcp_runtime_proxy.py

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,12 @@ async def __aexit__(self, exc_type, exc, tb):
6666
return False
6767

6868
class FakeClient:
69-
def stream(self, method, url, *, content, headers, timeout): # noqa: ANN001
69+
def stream(self, method, url, *, content, headers, timeout, follow_redirects): # noqa: ANN001
7070
captured["method"] = method
7171
captured["url"] = url
7272
captured["headers"] = headers
7373
captured["timeout"] = timeout
74+
captured["follow_redirects"] = follow_redirects
7475
return FakeStreamContext(content=content)
7576

7677
monkeypatch.setattr("mcpgateway.transports.rust_mcp_runtime_proxy.settings.experimental_rust_mcp_runtime_url", "http://127.0.0.1:8787")
@@ -125,6 +126,7 @@ async def send(message):
125126
assert captured["method"] == "POST"
126127
assert captured["url"] == "http://127.0.0.1:8787/mcp/?session_id=abc123"
127128
assert captured["timeout"].connect == 17
129+
assert captured["follow_redirects"] is False
128130

129131
forwarded_headers = dict(captured["headers"])
130132
assert forwarded_headers["authorization"] == "Bearer test-token"
@@ -188,11 +190,12 @@ async def __aexit__(self, exc_type, exc, tb):
188190
return False
189191

190192
class FakeClient:
191-
def stream(self, method, url, *, content, headers, timeout): # noqa: ANN001
193+
def stream(self, method, url, *, content, headers, timeout, follow_redirects): # noqa: ANN001
192194
captured["method"] = method
193195
captured["url"] = url
194196
captured["headers"] = headers
195197
captured["timeout"] = timeout
198+
captured["follow_redirects"] = follow_redirects
196199
return FakeStreamContext(content=content)
197200

198201
async def receive():
@@ -233,6 +236,7 @@ async def send(message):
233236
assert captured["method"] == "POST"
234237
assert captured["url"] == "http://127.0.0.1:8787/mcp/"
235238
assert captured["content"] == b'{"jsonrpc":"2.0","id":1,"method":"ping","params":{}}'
239+
assert captured["follow_redirects"] is False
236240
assert events[-1] == {"type": "http.response.body", "body": b"", "more_body": False}
237241

238242

@@ -257,11 +261,12 @@ async def __aexit__(self, exc_type, exc, tb):
257261
return False
258262

259263
class FakeClient:
260-
def stream(self, method, url, *, content, headers, timeout): # noqa: ANN001
264+
def stream(self, method, url, *, content, headers, timeout, follow_redirects): # noqa: ANN001
261265
captured["method"] = method
262266
captured["url"] = url
263267
captured["headers"] = dict(headers)
264268
captured["timeout"] = timeout
269+
captured["follow_redirects"] = follow_redirects
265270
return FakeStreamContext()
266271

267272
monkeypatch.setattr("mcpgateway.transports.rust_mcp_runtime_proxy.settings.experimental_rust_mcp_runtime_url", "http://127.0.0.1:8787")
@@ -298,6 +303,7 @@ async def send(message):
298303

299304
assert captured["method"] == "GET"
300305
assert captured["url"] == "http://127.0.0.1:8787/mcp/?session_id=session-1"
306+
assert captured["follow_redirects"] is False
301307
assert captured["headers"]["x-contextforge-affinity-forwarded"] == "rust"
302308
assert "x-forwarded-internally" not in captured["headers"]
303309
assert "x-original-worker" not in captured["headers"]
@@ -332,12 +338,13 @@ async def __aexit__(self, exc_type, exc, tb):
332338
return False
333339

334340
class FakeClient:
335-
def stream(self, method, url, *, content, headers, timeout): # noqa: ANN001
341+
def stream(self, method, url, *, content, headers, timeout, follow_redirects): # noqa: ANN001
336342
captured["method"] = method
337343
captured["url"] = url
338344
captured["content"] = content
339345
captured["headers"] = headers
340346
captured["timeout"] = timeout
347+
captured["follow_redirects"] = follow_redirects
341348
return FakeStreamContext()
342349

343350
monkeypatch.setattr("mcpgateway.transports.rust_mcp_runtime_proxy.settings.experimental_rust_mcp_runtime_url", "http://127.0.0.1:8787")
@@ -371,6 +378,7 @@ async def send(message):
371378
assert captured["method"] == "GET"
372379
assert captured["url"] == "http://127.0.0.1:8787/mcp/?session_id=abc123"
373380
assert captured["content"] == b""
381+
assert captured["follow_redirects"] is False
374382
assert dict(captured["headers"])["x-contextforge-server-id"] == "123e4567-e89b-12d3-a456-426614174000"
375383
assert events[0]["status"] == 200
376384
assert (b"content-type", b"text/event-stream") in events[0]["headers"]
@@ -402,11 +410,12 @@ class FakeAsyncClient:
402410
def __init__(self, **kwargs):
403411
constructed["kwargs"] = kwargs
404412

405-
def stream(self, method, url, *, content, headers, timeout): # noqa: ANN001
413+
def stream(self, method, url, *, content, headers, timeout, follow_redirects): # noqa: ANN001
406414
constructed["method"] = method
407415
constructed["url"] = url
408416
constructed["headers"] = headers
409417
constructed["timeout"] = timeout
418+
constructed["follow_redirects"] = follow_redirects
410419
return FakeStreamContext()
411420

412421
get_http_client_mock = AsyncMock()
@@ -441,6 +450,57 @@ async def send(message):
441450
assert constructed["method"] == "POST"
442451
assert constructed["url"] == "http://localhost/mcp/"
443452
assert constructed["kwargs"]["transport"]._pool._uds == "/tmp/contextforge-mcp-rust.sock" # pylint: disable=protected-access
453+
assert constructed["kwargs"]["follow_redirects"] is False
454+
assert constructed["follow_redirects"] is False
455+
assert events[-1] == {"type": "http.response.body", "body": b"", "more_body": False}
456+
457+
458+
@pytest.mark.asyncio
459+
async def test_runtime_proxy_surfaces_redirect_without_following(monkeypatch):
460+
"""Internal runtime redirects should be surfaced directly and never auto-followed."""
461+
requests_seen = []
462+
463+
def handler(request: httpx.Request) -> httpx.Response:
464+
requests_seen.append(str(request.url))
465+
if request.url.path == "/mcp/":
466+
return httpx.Response(
467+
307,
468+
headers={"location": "http://127.0.0.1:8787/final"},
469+
request=request,
470+
)
471+
return httpx.Response(200, json={"jsonrpc": "2.0", "id": 1, "result": {"unexpected": True}}, request=request)
472+
473+
client = httpx.AsyncClient(transport=httpx.MockTransport(handler))
474+
monkeypatch.setattr("mcpgateway.transports.rust_mcp_runtime_proxy.settings.experimental_rust_mcp_runtime_url", "http://127.0.0.1:8787")
475+
monkeypatch.setattr("mcpgateway.transports.rust_mcp_runtime_proxy.get_http_client", AsyncMock(return_value=client))
476+
477+
fallback = AsyncMock()
478+
proxy = RustMCPRuntimeProxy(fallback)
479+
events = []
480+
481+
async def send(message):
482+
events.append(message)
483+
484+
try:
485+
await proxy.handle_streamable_http(
486+
{
487+
"type": "http",
488+
"method": "GET",
489+
"path": "/",
490+
"modified_path": "/mcp",
491+
"query_string": b"session_id=abc123",
492+
"headers": [],
493+
},
494+
_make_receive(b""),
495+
send,
496+
)
497+
finally:
498+
await client.aclose()
499+
500+
fallback.assert_not_awaited()
501+
assert requests_seen == ["http://127.0.0.1:8787/mcp/?session_id=abc123"]
502+
assert events[0]["status"] == 307
503+
assert (b"location", b"http://127.0.0.1:8787/final") in events[0]["headers"]
444504
assert events[-1] == {"type": "http.response.body", "body": b"", "more_body": False}
445505

446506

tests/unit/mcpgateway/transports/test_streamablehttp_transport.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ def json(self):
168168
return self._payload
169169

170170
class FakeClient:
171-
async def post(self, url, json=None, timeout=None): # noqa: A002
172-
captured_requests.append((url, json, timeout.read))
171+
async def post(self, url, json=None, timeout=None, follow_redirects=None): # noqa: A002
172+
captured_requests.append((url, json, timeout.read, follow_redirects))
173173
if url.endswith("/store"):
174174
return FakeResponse({"eventId": "event-123"})
175175
return FakeResponse(
@@ -208,11 +208,43 @@ async def collector(msg):
208208
"ttlSeconds": 321,
209209
}
210210
assert captured_requests[0][2] == 17
211+
assert captured_requests[0][3] is False
211212
assert captured_requests[1][0] == "http://127.0.0.1:8787/_internal/event-store/replay"
212213
assert captured_requests[1][1] == {
213214
"lastEventId": "event-123",
214215
"keyPrefix": "mcpgw:eventstore:test",
215216
}
217+
assert captured_requests[1][3] is False
218+
219+
220+
@pytest.mark.asyncio
221+
async def test_rust_event_store_replay_rejects_redirects_without_following(monkeypatch):
222+
"""Replay requests should fail closed on redirects from the Rust sidecar."""
223+
requests_seen = []
224+
225+
def handler(request: httpx.Request) -> httpx.Response:
226+
requests_seen.append(str(request.url))
227+
if request.url.path.endswith("/replay"):
228+
return httpx.Response(
229+
307,
230+
headers={"location": "http://127.0.0.1:8787/final"},
231+
request=request,
232+
)
233+
return httpx.Response(200, json={"streamId": "unexpected", "events": []}, request=request)
234+
235+
client = httpx.AsyncClient(transport=httpx.MockTransport(handler))
236+
monkeypatch.setattr(tr, "_get_rust_event_store_client", AsyncMock(return_value=client))
237+
monkeypatch.setattr(tr.settings, "experimental_rust_mcp_runtime_url", "http://127.0.0.1:8787")
238+
239+
store = tr.RustEventStore()
240+
241+
try:
242+
with pytest.raises(httpx.HTTPStatusError, match="307 Temporary Redirect"):
243+
await store.replay_events_after("event-123", AsyncMock())
244+
finally:
245+
await client.aclose()
246+
247+
assert requests_seen == ["http://127.0.0.1:8787/_internal/event-store/replay"]
216248

217249

218250
@pytest.mark.asyncio
@@ -290,11 +322,12 @@ async def test_get_rust_event_store_client_uses_shared_http_client_without_uds(m
290322
@pytest.mark.asyncio
291323
async def test_get_rust_event_store_client_reuses_uds_client(monkeypatch):
292324
"""UDS-backed Rust event-store client should be created once and then reused."""
293-
constructed = {"count": 0}
325+
constructed = {"count": 0, "kwargs": None}
294326

295327
class FakeAsyncClient:
296328
def __init__(self, **_kwargs):
297329
constructed["count"] += 1
330+
constructed["kwargs"] = _kwargs
298331

299332
monkeypatch.setattr(tr, "_rust_event_store_client", None)
300333
monkeypatch.setattr(tr.settings, "experimental_rust_mcp_runtime_uds", "/tmp/contextforge-mcp-rust.sock")
@@ -305,6 +338,7 @@ def __init__(self, **_kwargs):
305338

306339
assert first is second
307340
assert constructed["count"] == 1
341+
assert constructed["kwargs"]["follow_redirects"] is False
308342

309343

310344
def test_get_streamable_http_auth_context_returns_empty_for_non_dict_context():

tools_rust/mcp_runtime/FOLLOWUPS.md

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,36 @@ Recommended next step:
528528
- Add explicit shutdown cleanup on the Rust side and a `close()`/shutdown hook
529529
for the Python proxy's cached UDS client in a separate follow-up.
530530

531-
#### 19. Session-auth reuse still trades freshness for fewer auth round-trips
531+
#### 19. Redis hot-path round-trip and cache single-flight polish
532+
533+
Status:
534+
- Deferred performance polish
535+
536+
Observed behavior:
537+
- Runtime-session refresh in Redis still uses `GET` followed by `EXPIRE`
538+
instead of a single `GETEX`-style refresh.
539+
- Event-store replay still fetches replay payloads one entry at a time from
540+
the Redis hash instead of batching those lookups.
541+
- A few in-process caches still use simple double-checked locking rather than
542+
a stronger single-flight pattern, so duplicate initialization work is still
543+
possible under contention.
544+
545+
Why this matters:
546+
- These are performance and efficiency opportunities, not current correctness
547+
regressions.
548+
- They are most visible under heavy load or when many workers race to populate
549+
the same hot cache entries.
550+
551+
Likely area:
552+
- `tools_rust/mcp_runtime/src/lib.rs`
553+
554+
Recommended next step:
555+
- Revisit the Redis/runtime hot paths in a focused performance pass and assess:
556+
- `GETEX` or equivalent atomic session-touch semantics
557+
- `HMGET`/pipeline replay fetches for event batches
558+
- `OnceCell` or another single-flight pattern for expensive cache fills
559+
560+
#### 20. Session-auth reuse still trades freshness for fewer auth round-trips
532561

533562
Status:
534563
- Deferred Rust-specific design follow-up
@@ -552,7 +581,7 @@ Recommended next step:
552581
should consume a revocation/invalidation signal from Python to drop cached
553582
auth state immediately.
554583

555-
#### 20. Legacy migration suites are still red
584+
#### 21. Legacy migration suites are still red
556585

557586
Status:
558587
- Deferred broader release/upgrade follow-up
@@ -581,7 +610,7 @@ Recommended next step:
581610
fix the legacy migration/data-retention issues independently of the Rust MCP
582611
transport PR.
583612

584-
#### 21. Live PostgreSQL TLS validation is still unexecuted
613+
#### 22. Live PostgreSQL TLS validation is still unexecuted
585614

586615
Status:
587616
- Deferred release-validation follow-up
@@ -609,7 +638,7 @@ Recommended next step:
609638
- Rust with `sslrootcert=/path/to/ca.pem`
610639
- explicit failure for unsupported `sslcert` / `sslkey`
611640

612-
#### 22. Minikube clean reinstall flow still looks unhealthy
641+
#### 23. Minikube clean reinstall flow still looks unhealthy
613642

614643
Status:
615644
- Deferred Helm/deployment follow-up
@@ -635,7 +664,7 @@ Recommended next step:
635664
issue is in Helm invocation, namespace lifecycle timing, or local Minikube
636665
state.
637666

638-
#### 23. Optional `2025-11-25-report` surface is not release-clean
667+
#### 24. Optional `2025-11-25-report` surface is not release-clean
639668

640669
Status:
641670
- Deferred protocol-surface follow-up

tools_rust/mcp_runtime/src/lib.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3446,8 +3446,19 @@ async fn replay_events_from_rust_event_store(
34463446
continue;
34473447
};
34483448

3449-
let message = serde_json::from_str::<Value>(&message_json).unwrap_or(Value::Null);
3450-
events.push(EventStoreReplayEvent { event_id, message });
3449+
match serde_json::from_str::<Value>(&message_json) {
3450+
Ok(message) => events.push(EventStoreReplayEvent { event_id, message }),
3451+
Err(err) => {
3452+
error!(
3453+
"Rust event store replay decode failed for stream {} event {}: {err}",
3454+
index_record.stream_id, event_id
3455+
);
3456+
return Err(json_response(
3457+
StatusCode::BAD_GATEWAY,
3458+
json!({"detail": "Rust event store replay decode failed"}),
3459+
));
3460+
}
3461+
}
34513462
}
34523463

34533464
Ok(EventStoreReplayResponse {

0 commit comments

Comments
 (0)