Skip to content

Commit adf3407

Browse files
avi-starkwareclaude
andcommitted
starknet_transaction_prover,tower_ohttp: OHTTP-unlinkable request-id for decapsulated content
Tags downstream content logs with a request-id via a new `RequestSpanLayer` placed below the OHTTP layer. For plaintext it reuses the envelope id from `RequestLogLayer`; for an OHTTP-decapsulated request (marked with a new `tower_ohttp::Decapsulated` extension) it mints a FRESH UUID and discards any client-supplied inner id. The fresh inner id is never echoed back, so the relay-visible envelope id and the gateway's content-log id cannot be joined — preserving OHTTP unlinkability while still giving every request's downstream logs a correlatable id. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent fd81285 commit adf3407

7 files changed

Lines changed: 277 additions & 7 deletions

File tree

crates/starknet_transaction_prover/src/server.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ pub const OHTTP_JSONRPSEE_BODY_BUILDER: fn(Full<Bytes>) -> HttpBody = HttpBody::
4747
/// than to the OHTTP ciphertext envelope. `MapRequestBodyLayer`/`MapResponseBodyLayer` keep
4848
/// `HttpBody` on both sides of OHTTP to satisfy its symmetric-body bound; `HttpBody::new` is a
4949
/// zero-cost wrapper, so non-OHTTP requests still stream through unbuffered.
50+
/// - `RequestSpanLayer` sits BELOW `OhttpLayer` so it spans the decapsulated inner request with a
51+
/// fresh, envelope-unlinkable id (see `request_span`).
5052
macro_rules! prover_http_middleware {
5153
($cors_layer:expr, $ohttp_layer:expr $(,)?) => {
5254
ServiceBuilder::new()
@@ -55,6 +57,7 @@ macro_rules! prover_http_middleware {
5557
.option_layer($cors_layer)
5658
.layer(MapRequestBodyLayer::new(HttpBody::new))
5759
.option_layer($ohttp_layer)
60+
.layer(RequestSpanLayer)
5861
.layer(MapResponseBodyLayer::new(HttpBody::new))
5962
.layer(CompressionLayer::new())
6063
};
@@ -68,12 +71,14 @@ pub mod log_redact;
6871
#[cfg(test)]
6972
pub mod mock_rpc;
7073
pub mod request_log;
74+
pub mod request_span;
7175
pub mod rpc_api;
7276
pub mod rpc_impl;
7377
pub mod tls;
7478

7579
pub use health::{HealthLayer, HEALTH_PATH};
7680
pub use request_log::{RequestLogLayer, REQUEST_ID_HEADER};
81+
pub use request_span::RequestSpanLayer;
7782

7883
#[cfg(test)]
7984
mod rpc_spec_test;

crates/starknet_transaction_prover/src/server/ohttp_integration_test.rs

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ use tower_ohttp::test_utils::{
3030
};
3131
use tower_ohttp::OhttpLayer;
3232

33+
use crate::server::request_log::{RequestLogLayer, REQUEST_ID_HEADER};
34+
use crate::server::request_span::RequestSpanLayer;
35+
3336
const DEFAULT_BODY_LIMIT: usize = 102_400;
3437
const KEY_CACHE_SECS: u64 = 3600;
3538

36-
/// Body builder for jsonrpsee's `HttpBody`. Returned as a `fn` pointer to
37-
/// give `OhttpLayer` a sized, `Copy` closure type without an `as` cast.
3839
fn body_builder() -> fn(Full<bytes::Bytes>) -> HttpBody {
3940
HttpBody::new
4041
}
@@ -95,12 +96,15 @@ async fn production_chain_compresses_inner_not_outer() {
9596
let ohttp_layer =
9697
OhttpLayer::new(gateway.clone(), DEFAULT_BODY_LIMIT, KEY_CACHE_SECS, body_builder());
9798

98-
// Replicates the production ServiceBuilder chain from `server.rs`/`tls.rs`.
99-
// Must be kept in sync with those files.
99+
// Replicates the OHTTP body-handling portion of the production chain in
100+
// `server.rs`/`tls.rs` (the outermost observability layers — request log,
101+
// health, metrics — don't affect body/compression handling and are
102+
// omitted). `RequestSpanLayer` is included since it sits inside OHTTP.
100103
let mut svc = tower::ServiceBuilder::new()
101104
.option_layer(None::<CorsLayer>)
102105
.layer(MapRequestBodyLayer::new(HttpBody::new))
103106
.option_layer(Some(ohttp_layer))
107+
.layer(RequestSpanLayer)
104108
.layer(MapResponseBodyLayer::new(HttpBody::new))
105109
.layer(CompressionLayer::new())
106110
.service(tower::service_fn(jsonrpsee_echo_service));
@@ -176,3 +180,75 @@ async fn non_ohttp_request_passes_through_jsonrpsee() {
176180
let body = response.into_body().collect().await.unwrap().to_bytes();
177181
assert_eq!(body.as_ref(), json_body);
178182
}
183+
184+
/// End-to-end OHTTP unlinkability: the request-id echoed on the OUTER
185+
/// (relay-visible) response must differ from the fresh id bound to the
186+
/// decapsulated inner dispatch, and the client-supplied inner id must be
187+
/// discarded — so no shared key links the relay's view to the gateway's.
188+
/// Exercises the real decapsulation path through
189+
/// `RequestLogLayer → OhttpLayer → RequestSpanLayer`.
190+
#[tokio::test]
191+
async fn ohttp_inner_request_id_unlinkable_from_envelope() {
192+
let gateway = test_gateway();
193+
let ohttp_layer =
194+
OhttpLayer::new(gateway.clone(), DEFAULT_BODY_LIMIT, KEY_CACHE_SECS, body_builder());
195+
196+
// Inner service echoes the request-id it observes into the response body.
197+
let echo_id = tower::service_fn(|req: http::Request<HttpBody>| async move {
198+
let id = req.headers().get(REQUEST_ID_HEADER).map(|v| v.to_str().unwrap()).unwrap_or("");
199+
Ok::<_, BoxError>(
200+
http::Response::builder()
201+
.status(http::StatusCode::OK)
202+
.body(HttpBody::from(id.as_bytes().to_vec()))
203+
.unwrap(),
204+
)
205+
});
206+
207+
let mut svc = tower::ServiceBuilder::new()
208+
.layer(RequestLogLayer)
209+
.layer(MapRequestBodyLayer::new(HttpBody::new))
210+
.option_layer(Some(ohttp_layer))
211+
.layer(RequestSpanLayer)
212+
.layer(MapResponseBodyLayer::new(HttpBody::new))
213+
.service(echo_id);
214+
215+
// The envelope carries a client-chosen inner id that must be discarded.
216+
let (encapsulated, client_response) = encapsulate_bhttp_request(
217+
&gateway,
218+
"POST",
219+
"/",
220+
b"",
221+
&[("x-request-id", b"inner-client-id")],
222+
);
223+
224+
// The outer envelope request carries the relay-visible id.
225+
let mut outer = ohttp_http_request(encapsulated);
226+
outer
227+
.headers_mut()
228+
.insert(REQUEST_ID_HEADER, http::HeaderValue::from_static("envelope-relay-id"));
229+
230+
let response = svc.call(outer).await.unwrap();
231+
232+
// The outer (relay-visible) response echoes the envelope id.
233+
let envelope_id =
234+
response.headers().get(REQUEST_ID_HEADER).unwrap().to_str().unwrap().to_owned();
235+
assert_eq!(envelope_id, "envelope-relay-id");
236+
237+
let encrypted_body = response.into_body().collect().await.unwrap().to_bytes();
238+
let decapsulated = decapsulate_bhttp_response(client_response, &encrypted_body);
239+
assert_eq!(decapsulated.status, 200);
240+
let inner_id = String::from_utf8(decapsulated.body).expect("utf8 inner id");
241+
242+
assert_ne!(inner_id, envelope_id, "inner id must not equal the relay-visible envelope id");
243+
assert_ne!(inner_id, "inner-client-id", "client-supplied inner id must be discarded");
244+
assert!(
245+
uuid::Uuid::parse_str(&inner_id).is_ok(),
246+
"inner id must be a fresh UUID, got {inner_id:?}"
247+
);
248+
// No id is set on the inner *response*, so nothing — neither the envelope
249+
// id nor the fresh content id — leaks into the encrypted reply's headers.
250+
assert!(
251+
decapsulated.bhttp_message.header().get(b"x-request-id").is_none(),
252+
"inner OHTTP response must not carry an x-request-id header"
253+
);
254+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
//! tower middleware that binds an `http_request` tracing span over the
2+
//! downstream dispatch. It sits BELOW the OHTTP layer, so it sees the
3+
//! decapsulated inner request (or a plaintext pass-through), and picks the id:
4+
//!
5+
//! - **plaintext** — reuse the `x-request-id` the outer layer already assigned;
6+
//! - **OHTTP-decapsulated** ([`tower_ohttp::Decapsulated`]) — mint a fresh id, overwriting any
7+
//! client-supplied inner id. The relay never observes it (it lives in content logs and, at most,
8+
//! the encrypted inner response), so the relay-visible envelope id and this content-log id can't
9+
//! be joined.
10+
//!
11+
//! See [`super::request_log`] for why the envelope and content ids are kept
12+
//! separate (OHTTP unlinkability).
13+
14+
use std::task::{Context, Poll};
15+
16+
use http::{HeaderValue, Request, Response};
17+
use tower::{Layer, Service};
18+
use tower_ohttp::Decapsulated;
19+
use tracing::instrument::Instrumented;
20+
use tracing::{info_span, Instrument};
21+
22+
use crate::server::request_log::{
23+
extract_or_generate_request_id,
24+
new_request_id,
25+
REQUEST_ID_HEADER,
26+
};
27+
28+
#[cfg(test)]
29+
#[path = "request_span_test.rs"]
30+
mod request_span_test;
31+
32+
/// tower [`Layer`] producing [`RequestSpanService`].
33+
#[derive(Clone, Copy, Default)]
34+
pub struct RequestSpanLayer;
35+
36+
impl<S> Layer<S> for RequestSpanLayer {
37+
type Service = RequestSpanService<S>;
38+
39+
fn layer(&self, inner: S) -> Self::Service {
40+
RequestSpanService { inner }
41+
}
42+
}
43+
44+
#[derive(Clone)]
45+
pub struct RequestSpanService<S> {
46+
inner: S,
47+
}
48+
49+
impl<S, ReqB, RespB> Service<Request<ReqB>> for RequestSpanService<S>
50+
where
51+
S: Service<Request<ReqB>, Response = Response<RespB>>,
52+
{
53+
type Response = Response<RespB>;
54+
type Error = S::Error;
55+
type Future = Instrumented<S::Future>;
56+
57+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58+
self.inner.poll_ready(cx)
59+
}
60+
61+
fn call(&mut self, mut request: Request<ReqB>) -> Self::Future {
62+
let request_id = if request.extensions().get::<Decapsulated>().is_some() {
63+
// Fresh id, distinct from the relay-visible envelope id (OHTTP unlinkability).
64+
new_request_id()
65+
} else {
66+
// In production this re-derives the exact id `RequestLogLayer`
67+
// already assigned to the plaintext request, so logs and the echoed
68+
// response header share one id. Re-validating here (rather than
69+
// assuming the header) also keeps the layer correct when it runs
70+
// standalone, e.g. in unit tests without `RequestLogLayer` upstream.
71+
extract_or_generate_request_id(&request)
72+
};
73+
if let Ok(header_value) = HeaderValue::from_str(&request_id) {
74+
request.headers_mut().insert(REQUEST_ID_HEADER, header_value);
75+
}
76+
self.inner.call(request).instrument(info_span!("http_request", request_id = %request_id))
77+
}
78+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use bytes::Bytes;
2+
use http::{Method, Request, Response, StatusCode};
3+
use http_body_util::{BodyExt, Full};
4+
use jsonrpsee::server::HttpBody;
5+
use tower::{Layer, ServiceExt};
6+
use tower_ohttp::Decapsulated;
7+
8+
use crate::server::request_log::{RequestLogLayer, REQUEST_ID_HEADER};
9+
use crate::server::request_span::RequestSpanLayer;
10+
11+
/// Inner service that echoes the request's `x-request-id` into the response
12+
/// body, so tests can observe which id `RequestSpanLayer` bound.
13+
fn echo_id_service() -> impl tower::Service<
14+
Request<HttpBody>,
15+
Response = Response<HttpBody>,
16+
Error = std::convert::Infallible,
17+
Future = futures::future::Ready<Result<Response<HttpBody>, std::convert::Infallible>>,
18+
> + Clone {
19+
tower::service_fn(|request: Request<HttpBody>| {
20+
let id = request
21+
.headers()
22+
.get(REQUEST_ID_HEADER)
23+
.map(|value| value.to_str().unwrap().to_owned())
24+
.unwrap_or_default();
25+
futures::future::ready(Ok::<_, std::convert::Infallible>(
26+
Response::builder()
27+
.status(StatusCode::OK)
28+
.body(HttpBody::new(Full::new(Bytes::from(id))))
29+
.expect("static body is infallible"),
30+
))
31+
})
32+
}
33+
34+
async fn body_string(response: Response<HttpBody>) -> String {
35+
let bytes = response.into_body().collect().await.expect("body collect").to_bytes().to_vec();
36+
String::from_utf8(bytes).expect("utf8 body")
37+
}
38+
39+
#[tokio::test]
40+
async fn plaintext_reuses_inbound_request_id() {
41+
let request = Request::builder()
42+
.method(Method::POST)
43+
.uri("/")
44+
.header(REQUEST_ID_HEADER, "reused-xyz")
45+
.body(HttpBody::new(Full::new(Bytes::new())))
46+
.expect("static body is infallible");
47+
48+
let response = RequestSpanLayer.layer(echo_id_service()).oneshot(request).await.unwrap();
49+
50+
assert_eq!(body_string(response).await, "reused-xyz");
51+
}
52+
53+
#[tokio::test]
54+
async fn decapsulated_gets_fresh_id_discarding_inbound() {
55+
let mut request = Request::builder()
56+
.method(Method::POST)
57+
.uri("/")
58+
.header(REQUEST_ID_HEADER, "envelope-abc")
59+
.body(HttpBody::new(Full::new(Bytes::new())))
60+
.expect("static body is infallible");
61+
request.extensions_mut().insert(Decapsulated);
62+
63+
let response = RequestSpanLayer.layer(echo_id_service()).oneshot(request).await.unwrap();
64+
65+
let id = body_string(response).await;
66+
assert_ne!(id, "envelope-abc", "must discard the client-supplied inner id");
67+
assert!(uuid::Uuid::parse_str(&id).is_ok(), "must mint a fresh UUID, got {id:?}");
68+
}
69+
70+
/// The cross-layer plaintext contract: with `RequestLogLayer` (outer) stacked
71+
/// over `RequestSpanLayer` (inner) and no inbound id, the id the outer layer
72+
/// generates and echoes on the response must be the same id the inner layer
73+
/// binds for the handler — one shared id end-to-end.
74+
#[tokio::test]
75+
async fn plaintext_log_and_span_layers_share_generated_id() {
76+
let request = Request::builder()
77+
.method(Method::POST)
78+
.uri("/")
79+
.body(HttpBody::new(Full::new(Bytes::new())))
80+
.expect("static body is infallible");
81+
82+
let svc = RequestLogLayer.layer(RequestSpanLayer.layer(echo_id_service()));
83+
let response = svc.oneshot(request).await.unwrap();
84+
85+
let echoed_id = response
86+
.headers()
87+
.get(REQUEST_ID_HEADER)
88+
.expect("response carries the id")
89+
.to_str()
90+
.unwrap()
91+
.to_owned();
92+
let handler_id = body_string(response).await;
93+
94+
assert_eq!(echoed_id, handler_id, "echoed response id must equal the id the handler saw");
95+
assert!(
96+
uuid::Uuid::parse_str(&handler_id).is_ok(),
97+
"generated id must be a UUID, got {handler_id:?}"
98+
);
99+
}

crates/starknet_transaction_prover/src/server/tls.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use tower_http::map_request_body::MapRequestBodyLayer;
2727
use tower_http::map_response_body::MapResponseBodyLayer;
2828
use tracing::warn;
2929

30-
use crate::server::{HealthLayer, OhttpJsonrpseeLayer, RequestLogLayer};
30+
use crate::server::{HealthLayer, OhttpJsonrpseeLayer, RequestLogLayer, RequestSpanLayer};
3131

3232
/// Maximum time allowed for a TLS handshake before the connection is dropped.
3333
const TLS_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);

crates/tower_ohttp/src/layer.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use tracing::debug;
3434
use crate::bhttp_codec::{encapsulate_response, rebuild_request};
3535
use crate::errors::OhttpError;
3636
use crate::gateway::OhttpGateway;
37-
use crate::{OHTTP_KEYS_PATH, OHTTP_REQUEST_CONTENT_TYPE};
37+
use crate::{Decapsulated, OHTTP_KEYS_PATH, OHTTP_REQUEST_CONTENT_TYPE};
3838

3939
/// Shared runtime state for the OHTTP gateway.
4040
struct OhttpState<F> {
@@ -194,7 +194,11 @@ where
194194
OhttpError::InvalidFormat("Invalid Binary HTTP message")
195195
})?;
196196

197-
let inner_request = rebuild_request(&bhttp_message)?.map(build_body);
197+
let mut inner_request = rebuild_request(&bhttp_message)?.map(build_body);
198+
// Mark the request so downstream layers can tell decapsulated
199+
// traffic apart from plaintext pass-through (e.g. to assign a
200+
// fresh, envelope-unlinkable request id).
201+
inner_request.extensions_mut().insert(Decapsulated);
198202

199203
inner.oneshot(inner_request).await.map_err(|error| {
200204
debug!("Inner service error after successful OHTTP decapsulation: {error:?}");

crates/tower_ohttp/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ pub use errors::OhttpError;
3434
pub use gateway::OhttpGateway;
3535
pub use layer::{OhttpLayer, OhttpService};
3636

37+
/// Marker inserted into the extensions of a request rebuilt from a decapsulated
38+
/// OHTTP envelope, before it is forwarded to the inner service. Downstream
39+
/// layers can check for it (`request.extensions().get::<Decapsulated>()`) to
40+
/// distinguish envelope-decapsulated traffic from plaintext pass-through —
41+
/// the two are otherwise indistinguishable once the inner request is rebuilt.
42+
#[derive(Clone, Copy, Debug)]
43+
pub struct Decapsulated;
44+
3745
pub(crate) const OHTTP_REQUEST_CONTENT_TYPE: &str = "message/ohttp-req";
3846
pub(crate) const OHTTP_RESPONSE_CONTENT_TYPE: &str = "message/ohttp-res";
3947
pub(crate) const OHTTP_KEYS_PATH: &str = "/ohttp-keys";

0 commit comments

Comments
 (0)