Skip to content

Commit 4055121

Browse files
avi-starkwareclaude
andcommitted
starknet_transaction_prover,tower_ohttp: OHTTP-unlinkable request-id for decapsulated content
Tags downstream content logs with a request-id via a new `RequestSpanLayer` placed below the OHTTP layer. For plaintext it reuses the envelope id from `RequestLogLayer`; for an OHTTP-decapsulated request (marked with a new `tower_ohttp::Decapsulated` extension) it mints a FRESH UUID and discards any client-supplied inner id. The fresh inner id is never echoed back, so the relay-visible envelope id and the gateway's content-log id cannot be joined — preserving OHTTP unlinkability while still giving every request's downstream logs a correlatable id. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 49a7855 commit 4055121

7 files changed

Lines changed: 245 additions & 7 deletions

File tree

crates/starknet_transaction_prover/src/server.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ pub mod log_redact;
3434
#[cfg(test)]
3535
pub mod mock_rpc;
3636
pub mod request_log;
37+
pub mod request_span;
3738
pub mod rpc_api;
3839
pub mod rpc_impl;
3940
pub mod tls;
4041

4142
pub use health::{HealthLayer, HEALTH_PATH};
4243
pub use request_log::{RequestLogLayer, REQUEST_ID_HEADER};
44+
pub use request_span::RequestSpanLayer;
4345

4446
#[cfg(test)]
4547
mod rpc_spec_test;
@@ -83,13 +85,17 @@ pub async fn start_server(
8385
.set_http_middleware(
8486
// `RequestLogLayer` is outermost so the latency it measures
8587
// covers every other layer. `HealthLayer` sits inside it so
86-
// probes complete before CORS/OHTTP.
88+
// probes complete before CORS/OHTTP. `RequestSpanLayer` sits
89+
// BELOW `OhttpLayer` so it spans the decapsulated inner
90+
// request with a fresh, envelope-unlinkable id (see
91+
// `request_span`).
8792
ServiceBuilder::new()
8893
.layer(RequestLogLayer)
8994
.layer(HealthLayer)
9095
.option_layer(cors_layer)
9196
.layer(MapRequestBodyLayer::new(HttpBody::new))
9297
.option_layer(ohttp_layer)
98+
.layer(RequestSpanLayer)
9399
.layer(MapResponseBodyLayer::new(HttpBody::new))
94100
.layer(CompressionLayer::new()),
95101
)

crates/starknet_transaction_prover/src/server/ohttp_integration_test.rs

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ use tower_ohttp::test_utils::{
3030
};
3131
use tower_ohttp::OhttpLayer;
3232

33+
use crate::server::request_log::{RequestLogLayer, REQUEST_ID_HEADER};
34+
use crate::server::request_span::RequestSpanLayer;
35+
3336
const DEFAULT_BODY_LIMIT: usize = 102_400;
3437
const KEY_CACHE_SECS: u64 = 3600;
3538

36-
/// Body builder for jsonrpsee's `HttpBody`. Returned as a `fn` pointer to
37-
/// give `OhttpLayer` a sized, `Copy` closure type without an `as` cast.
3839
fn body_builder() -> fn(Full<bytes::Bytes>) -> HttpBody {
3940
HttpBody::new
4041
}
@@ -95,12 +96,15 @@ async fn production_chain_compresses_inner_not_outer() {
9596
let ohttp_layer =
9697
OhttpLayer::new(gateway.clone(), DEFAULT_BODY_LIMIT, KEY_CACHE_SECS, body_builder());
9798

98-
// Replicates the production ServiceBuilder chain from `server.rs`/`tls.rs`.
99-
// Must be kept in sync with those files.
99+
// Replicates the OHTTP body-handling portion of the production chain in
100+
// `server.rs`/`tls.rs` (the outermost observability layers — request log,
101+
// health, metrics — don't affect body/compression handling and are
102+
// omitted). `RequestSpanLayer` is included since it sits inside OHTTP.
100103
let mut svc = tower::ServiceBuilder::new()
101104
.option_layer(None::<CorsLayer>)
102105
.layer(MapRequestBodyLayer::new(HttpBody::new))
103106
.option_layer(Some(ohttp_layer))
107+
.layer(RequestSpanLayer)
104108
.layer(MapResponseBodyLayer::new(HttpBody::new))
105109
.layer(CompressionLayer::new())
106110
.service(tower::service_fn(jsonrpsee_echo_service));
@@ -176,3 +180,75 @@ async fn non_ohttp_request_passes_through_jsonrpsee() {
176180
let body = response.into_body().collect().await.unwrap().to_bytes();
177181
assert_eq!(body.as_ref(), json_body);
178182
}
183+
184+
/// End-to-end OHTTP unlinkability: the request-id echoed on the OUTER
185+
/// (relay-visible) response must differ from the fresh id bound to the
186+
/// decapsulated inner dispatch, and the client-supplied inner id must be
187+
/// discarded — so no shared key links the relay's view to the gateway's.
188+
/// Exercises the real decapsulation path through
189+
/// `RequestLogLayer → OhttpLayer → RequestSpanLayer`.
190+
#[tokio::test]
191+
async fn ohttp_inner_request_id_unlinkable_from_envelope() {
192+
let gateway = test_gateway();
193+
let ohttp_layer =
194+
OhttpLayer::new(gateway.clone(), DEFAULT_BODY_LIMIT, KEY_CACHE_SECS, body_builder());
195+
196+
// Inner service echoes the request-id it observes into the response body.
197+
let echo_id = tower::service_fn(|req: http::Request<HttpBody>| async move {
198+
let id = req.headers().get(REQUEST_ID_HEADER).map(|v| v.to_str().unwrap()).unwrap_or("");
199+
Ok::<_, BoxError>(
200+
http::Response::builder()
201+
.status(http::StatusCode::OK)
202+
.body(HttpBody::from(id.as_bytes().to_vec()))
203+
.unwrap(),
204+
)
205+
});
206+
207+
let mut svc = tower::ServiceBuilder::new()
208+
.layer(RequestLogLayer)
209+
.layer(MapRequestBodyLayer::new(HttpBody::new))
210+
.option_layer(Some(ohttp_layer))
211+
.layer(RequestSpanLayer)
212+
.layer(MapResponseBodyLayer::new(HttpBody::new))
213+
.service(echo_id);
214+
215+
// The envelope carries a client-chosen inner id that must be discarded.
216+
let (encapsulated, client_response) = encapsulate_bhttp_request(
217+
&gateway,
218+
"POST",
219+
"/",
220+
b"",
221+
&[("x-request-id", b"inner-client-id")],
222+
);
223+
224+
// The outer envelope request carries the relay-visible id.
225+
let mut outer = ohttp_http_request(encapsulated);
226+
outer
227+
.headers_mut()
228+
.insert(REQUEST_ID_HEADER, http::HeaderValue::from_static("envelope-relay-id"));
229+
230+
let response = svc.call(outer).await.unwrap();
231+
232+
// The outer (relay-visible) response echoes the envelope id.
233+
let envelope_id =
234+
response.headers().get(REQUEST_ID_HEADER).unwrap().to_str().unwrap().to_owned();
235+
assert_eq!(envelope_id, "envelope-relay-id");
236+
237+
let encrypted_body = response.into_body().collect().await.unwrap().to_bytes();
238+
let decapsulated = decapsulate_bhttp_response(client_response, &encrypted_body);
239+
assert_eq!(decapsulated.status, 200);
240+
let inner_id = String::from_utf8(decapsulated.body).expect("utf8 inner id");
241+
242+
assert_ne!(inner_id, envelope_id, "inner id must not equal the relay-visible envelope id");
243+
assert_ne!(inner_id, "inner-client-id", "client-supplied inner id must be discarded");
244+
assert!(
245+
uuid::Uuid::parse_str(&inner_id).is_ok(),
246+
"inner id must be a fresh UUID, got {inner_id:?}"
247+
);
248+
// No id is set on the inner *response*, so nothing — neither the envelope
249+
// id nor the fresh content id — leaks into the encrypted reply's headers.
250+
assert!(
251+
decapsulated.bhttp_message.header().get(b"x-request-id").is_none(),
252+
"inner OHTTP response must not carry an x-request-id header"
253+
);
254+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//! tower middleware that binds an `http_request` tracing span over the
2+
//! downstream dispatch. It sits BELOW the OHTTP layer, so it sees the
3+
//! decapsulated inner request (or a plaintext pass-through), and picks the id:
4+
//!
5+
//! - **plaintext** — reuse the `x-request-id` the outer layer already assigned;
6+
//! - **OHTTP-decapsulated** ([`tower_ohttp::Decapsulated`]) — mint a fresh id, overwriting any
7+
//! client-supplied inner id. The relay never observes it (it lives in content logs and, at most,
8+
//! the encrypted inner response), so the relay-visible envelope id and this content-log id can't
9+
//! be joined.
10+
//!
11+
//! See [`super::request_log`] for why the envelope and content ids are kept
12+
//! separate (OHTTP unlinkability).
13+
14+
use std::task::{Context, Poll};
15+
16+
use http::{HeaderValue, Request, Response};
17+
use tower::{Layer, Service};
18+
use tower_ohttp::Decapsulated;
19+
use tracing::instrument::Instrumented;
20+
use tracing::{info_span, Instrument};
21+
22+
use crate::server::request_log::{
23+
extract_or_generate_request_id,
24+
new_request_id,
25+
REQUEST_ID_HEADER,
26+
};
27+
28+
#[cfg(test)]
29+
#[path = "request_span_test.rs"]
30+
mod request_span_test;
31+
32+
/// tower [`Layer`] producing [`RequestSpanService`].
33+
#[derive(Clone, Copy, Default)]
34+
pub struct RequestSpanLayer;
35+
36+
impl<S> Layer<S> for RequestSpanLayer {
37+
type Service = RequestSpanService<S>;
38+
39+
fn layer(&self, inner: S) -> Self::Service {
40+
RequestSpanService { inner }
41+
}
42+
}
43+
44+
#[derive(Clone)]
45+
pub struct RequestSpanService<S> {
46+
inner: S,
47+
}
48+
49+
impl<S, ReqB, RespB> Service<Request<ReqB>> for RequestSpanService<S>
50+
where
51+
S: Service<Request<ReqB>, Response = Response<RespB>>,
52+
{
53+
type Response = Response<RespB>;
54+
type Error = S::Error;
55+
type Future = Instrumented<S::Future>;
56+
57+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58+
self.inner.poll_ready(cx)
59+
}
60+
61+
fn call(&mut self, mut request: Request<ReqB>) -> Self::Future {
62+
let request_id = if request.extensions().get::<Decapsulated>().is_some() {
63+
// Fresh id, distinct from the relay-visible envelope id (OHTTP unlinkability).
64+
new_request_id()
65+
} else {
66+
// Reuse the id the outer layer assigned; re-validate here so the
67+
// layer is self-contained even without `RequestLogLayer` upstream.
68+
extract_or_generate_request_id(&request)
69+
};
70+
if let Ok(header_value) = HeaderValue::from_str(&request_id) {
71+
request.headers_mut().insert(REQUEST_ID_HEADER, header_value);
72+
}
73+
self.inner.call(request).instrument(info_span!("http_request", request_id = %request_id))
74+
}
75+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use bytes::Bytes;
2+
use http::{Method, Request, Response, StatusCode};
3+
use http_body_util::{BodyExt, Full};
4+
use jsonrpsee::server::HttpBody;
5+
use tower::{Layer, ServiceExt};
6+
use tower_ohttp::Decapsulated;
7+
8+
use crate::server::request_log::REQUEST_ID_HEADER;
9+
use crate::server::request_span::RequestSpanLayer;
10+
11+
/// Inner service that echoes the request's `x-request-id` into the response
12+
/// body, so tests can observe which id `RequestSpanLayer` bound.
13+
fn echo_id_service() -> impl tower::Service<
14+
Request<HttpBody>,
15+
Response = Response<HttpBody>,
16+
Error = std::convert::Infallible,
17+
Future = futures::future::Ready<Result<Response<HttpBody>, std::convert::Infallible>>,
18+
> + Clone {
19+
tower::service_fn(|request: Request<HttpBody>| {
20+
let id = request
21+
.headers()
22+
.get(REQUEST_ID_HEADER)
23+
.map(|value| value.to_str().unwrap().to_owned())
24+
.unwrap_or_default();
25+
futures::future::ready(Ok::<_, std::convert::Infallible>(
26+
Response::builder()
27+
.status(StatusCode::OK)
28+
.body(HttpBody::new(Full::new(Bytes::from(id))))
29+
.expect("static body is infallible"),
30+
))
31+
})
32+
}
33+
34+
async fn body_string(response: Response<HttpBody>) -> String {
35+
let bytes = response.into_body().collect().await.expect("body collect").to_bytes().to_vec();
36+
String::from_utf8(bytes).expect("utf8 body")
37+
}
38+
39+
#[tokio::test]
40+
async fn plaintext_reuses_inbound_request_id() {
41+
let request = Request::builder()
42+
.method(Method::POST)
43+
.uri("/")
44+
.header(REQUEST_ID_HEADER, "reused-xyz")
45+
.body(HttpBody::new(Full::new(Bytes::new())))
46+
.expect("static body is infallible");
47+
48+
let response = RequestSpanLayer.layer(echo_id_service()).oneshot(request).await.unwrap();
49+
50+
assert_eq!(body_string(response).await, "reused-xyz");
51+
}
52+
53+
#[tokio::test]
54+
async fn decapsulated_gets_fresh_id_discarding_inbound() {
55+
let mut request = Request::builder()
56+
.method(Method::POST)
57+
.uri("/")
58+
.header(REQUEST_ID_HEADER, "envelope-abc")
59+
.body(HttpBody::new(Full::new(Bytes::new())))
60+
.expect("static body is infallible");
61+
request.extensions_mut().insert(Decapsulated);
62+
63+
let response = RequestSpanLayer.layer(echo_id_service()).oneshot(request).await.unwrap();
64+
65+
let id = body_string(response).await;
66+
assert_ne!(id, "envelope-abc", "must discard the client-supplied inner id");
67+
assert!(uuid::Uuid::parse_str(&id).is_ok(), "must mint a fresh UUID, got {id:?}");
68+
}

crates/starknet_transaction_prover/src/server/tls.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use tower_http::map_request_body::MapRequestBodyLayer;
2727
use tower_http::map_response_body::MapResponseBodyLayer;
2828
use tracing::warn;
2929

30-
use crate::server::{HealthLayer, OhttpJsonrpseeLayer, RequestLogLayer};
30+
use crate::server::{HealthLayer, OhttpJsonrpseeLayer, RequestLogLayer, RequestSpanLayer};
3131

3232
/// Maximum time allowed for a TLS handshake before the connection is dropped.
3333
const TLS_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
@@ -68,6 +68,7 @@ pub async fn start_tls_server(
6868
.option_layer(cors_layer)
6969
.layer(MapRequestBodyLayer::new(HttpBody::new))
7070
.option_layer(ohttp_layer)
71+
.layer(RequestSpanLayer)
7172
.layer(MapResponseBodyLayer::new(HttpBody::new))
7273
.layer(CompressionLayer::new()),
7374
)

crates/tower_ohttp/src/layer.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,11 @@ where
194194
OhttpError::InvalidFormat("Invalid Binary HTTP message")
195195
})?;
196196

197-
let inner_request = rebuild_request(&bhttp_message)?.map(build_body);
197+
let mut inner_request = rebuild_request(&bhttp_message)?.map(build_body);
198+
// Mark the request so downstream layers can tell decapsulated
199+
// traffic apart from plaintext pass-through (e.g. to assign a
200+
// fresh, envelope-unlinkable request id).
201+
inner_request.extensions_mut().insert(crate::Decapsulated);
198202

199203
inner.oneshot(inner_request).await.map_err(|error| {
200204
debug!("Inner service error after successful OHTTP decapsulation: {error:?}");

crates/tower_ohttp/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ pub use errors::OhttpError;
3434
pub use gateway::OhttpGateway;
3535
pub use layer::{OhttpLayer, OhttpService};
3636

37+
/// Marker inserted into the extensions of a request rebuilt from a decapsulated
38+
/// OHTTP envelope, before it is forwarded to the inner service. Downstream
39+
/// layers can check for it (`request.extensions().get::<Decapsulated>()`) to
40+
/// distinguish envelope-decapsulated traffic from plaintext pass-through —
41+
/// the two are otherwise indistinguishable once the inner request is rebuilt.
42+
#[derive(Clone, Copy, Debug)]
43+
pub struct Decapsulated;
44+
3745
pub(crate) const OHTTP_REQUEST_CONTENT_TYPE: &str = "message/ohttp-req";
3846
pub(crate) const OHTTP_RESPONSE_CONTENT_TYPE: &str = "message/ohttp-res";
3947
pub(crate) const OHTTP_KEYS_PATH: &str = "/ohttp-keys";

0 commit comments

Comments
 (0)