|
| 1 | +//! tower middleware that logs one structured line per HTTP request and |
| 2 | +//! propagates a request id. |
| 3 | +//! |
| 4 | +//! Sits ahead of jsonrpsee so every JSON-RPC POST (and any pass-through like |
| 5 | +//! OHTTP key fetches) gets a single log line with `event="http_request"`, |
| 6 | +//! `request_id`, `method`, `path`, `status`, and `latency_ms`. The id is |
| 7 | +//! either accepted from the incoming `x-request-id` header or generated as a |
| 8 | +//! UUID v4. The id is also returned on the response so callers can quote it |
| 9 | +//! when reporting failures. |
| 10 | +//! |
| 11 | +//! The id is bound to an `http_request` tracing span that wraps the whole |
| 12 | +//! downstream call, so every `tracing::*` event emitted while handling the |
| 13 | +//! request — including the OHTTP-decapsulated inner JSON-RPC dispatch, which |
| 14 | +//! runs within this same future — inherits `request_id`. Unlike a framework |
| 15 | +//! that re-routes the decapsulated request through a separate service, this |
| 16 | +//! layer's linear position means one span covers both the envelope and its |
| 17 | +//! inner request, so no second per-request id is needed. |
| 18 | +//! |
| 19 | +//! Body bytes are never inspected — transaction calldata is private user data |
| 20 | +//! per the privacy-pool threat model. |
| 21 | +
|
| 22 | +use std::pin::Pin; |
| 23 | +use std::task::{Context, Poll}; |
| 24 | +use std::time::Instant; |
| 25 | + |
| 26 | +use http::{HeaderValue, Request, Response}; |
| 27 | +use jsonrpsee::server::HttpBody; |
| 28 | +use tower::{Layer, Service}; |
| 29 | +use tracing::{info, info_span, Instrument}; |
| 30 | + |
| 31 | +#[cfg(test)] |
| 32 | +#[path = "request_log_test.rs"] |
| 33 | +mod request_log_test; |
| 34 | + |
| 35 | +/// HTTP header carrying the request id. |
| 36 | +pub const REQUEST_ID_HEADER: &str = "x-request-id"; |
| 37 | + |
| 38 | +/// Cap on accepted incoming request-id length. Anything longer is dropped |
| 39 | +/// in favour of a freshly generated id so the value never balloons into |
| 40 | +/// AsyncLocalStorage / tracing fields and so log aggregators don't have |
| 41 | +/// to parse megabyte-scale ids. |
| 42 | +const MAX_REQUEST_ID_LEN: usize = 128; |
| 43 | + |
| 44 | +fn new_request_id() -> String { |
| 45 | + uuid::Uuid::new_v4().to_string() |
| 46 | +} |
| 47 | + |
| 48 | +/// Accepts the incoming `x-request-id` only when it's a short printable |
| 49 | +/// ASCII token. CR/LF would let a client smuggle headers into the |
| 50 | +/// response; arbitrary bytes (including unicode) make the value unsafe |
| 51 | +/// to round-trip through `HeaderValue::from_str`. Any reject falls back |
| 52 | +/// to a freshly generated UUID v4. |
| 53 | +fn extract_or_generate_request_id<B>(request: &Request<B>) -> String { |
| 54 | + request |
| 55 | + .headers() |
| 56 | + .get(REQUEST_ID_HEADER) |
| 57 | + .and_then(|value| value.to_str().ok()) |
| 58 | + .filter(|value| !value.is_empty() && value.len() <= MAX_REQUEST_ID_LEN) |
| 59 | + .filter(|value| value.bytes().all(is_safe_request_id_byte)) |
| 60 | + .map(|value| value.to_string()) |
| 61 | + .unwrap_or_else(new_request_id) |
| 62 | +} |
| 63 | + |
| 64 | +fn is_safe_request_id_byte(b: u8) -> bool { |
| 65 | + // Printable ASCII excluding whitespace. Rejects CR/LF/NUL/DEL etc. |
| 66 | + b.is_ascii_graphic() |
| 67 | +} |
| 68 | + |
| 69 | +/// tower [`Layer`] producing [`RequestLogService`]. |
| 70 | +#[derive(Clone, Copy, Default)] |
| 71 | +pub struct RequestLogLayer; |
| 72 | + |
| 73 | +impl<S> Layer<S> for RequestLogLayer { |
| 74 | + type Service = RequestLogService<S>; |
| 75 | + |
| 76 | + fn layer(&self, inner: S) -> Self::Service { |
| 77 | + RequestLogService { inner } |
| 78 | + } |
| 79 | +} |
| 80 | + |
| 81 | +#[derive(Clone)] |
| 82 | +pub struct RequestLogService<S> { |
| 83 | + inner: S, |
| 84 | +} |
| 85 | + |
| 86 | +impl<S, ReqB> Service<Request<ReqB>> for RequestLogService<S> |
| 87 | +where |
| 88 | + S: Service<Request<ReqB>, Response = Response<HttpBody>>, |
| 89 | + S::Future: Send + 'static, |
| 90 | + S::Error: Send + 'static, |
| 91 | +{ |
| 92 | + type Response = Response<HttpBody>; |
| 93 | + type Error = S::Error; |
| 94 | + type Future = |
| 95 | + Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>; |
| 96 | + |
| 97 | + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 98 | + self.inner.poll_ready(cx) |
| 99 | + } |
| 100 | + |
| 101 | + fn call(&mut self, mut request: Request<ReqB>) -> Self::Future { |
| 102 | + let request_id = extract_or_generate_request_id(&request); |
| 103 | + // Ensure the header reflects the (possibly generated) id so inner |
| 104 | + // services that re-read it see a consistent value. |
| 105 | + if let Ok(header_value) = HeaderValue::from_str(&request_id) { |
| 106 | + request.headers_mut().insert(REQUEST_ID_HEADER, header_value); |
| 107 | + } |
| 108 | + let method = request.method().clone(); |
| 109 | + let path = request.uri().path().to_string(); |
| 110 | + let start = Instant::now(); |
| 111 | + |
| 112 | + // Bind the id to a span covering the whole downstream call so every |
| 113 | + // event a handler emits (including the OHTTP-decapsulated inner |
| 114 | + // dispatch, which runs inside this future) carries `request_id`. |
| 115 | + let span = info_span!("http_request", request_id = %request_id); |
| 116 | + let future = self.inner.call(request).instrument(span); |
| 117 | + let request_id_for_response = request_id; |
| 118 | + |
| 119 | + Box::pin(async move { |
| 120 | + let result = future.await; |
| 121 | + // `as_millis` returns `u128`; saturate to `u64` so the tracing |
| 122 | + // field type stays simple. A request latency anywhere near 2^64 |
| 123 | + // ms (584 million years) is unreachable. |
| 124 | + let latency_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); |
| 125 | + match result { |
| 126 | + Ok(mut response) => { |
| 127 | + let status = response.status().as_u16(); |
| 128 | + if let Ok(header_value) = HeaderValue::from_str(&request_id_for_response) { |
| 129 | + response.headers_mut().insert(REQUEST_ID_HEADER, header_value); |
| 130 | + } |
| 131 | + info!( |
| 132 | + event = "http_request", |
| 133 | + request_id = %request_id_for_response, |
| 134 | + method = %method, |
| 135 | + path = %path, |
| 136 | + status = status, |
| 137 | + latency_ms = latency_ms, |
| 138 | + "HTTP request handled." |
| 139 | + ); |
| 140 | + Ok(response) |
| 141 | + } |
| 142 | + Err(err) => { |
| 143 | + // The error path can't observe status, but still emit a |
| 144 | + // log line so request timing is visible even on inner |
| 145 | + // service failure. |
| 146 | + info!( |
| 147 | + event = "http_request", |
| 148 | + request_id = %request_id_for_response, |
| 149 | + method = %method, |
| 150 | + path = %path, |
| 151 | + latency_ms = latency_ms, |
| 152 | + outcome = "service_error", |
| 153 | + "HTTP request failed in tower stack." |
| 154 | + ); |
| 155 | + Err(err) |
| 156 | + } |
| 157 | + } |
| 158 | + }) |
| 159 | + } |
| 160 | +} |
0 commit comments