From 2eaa6116559ccd7f11d1622870962f7326d3029d Mon Sep 17 00:00:00 2001 From: Avi Cohen Date: Sun, 24 May 2026 19:36:51 +0300 Subject: [PATCH] starknet_transaction_prover: HTTP request count + latency + in-flight metrics Adds `HttpMetricsLayer` placed between the monitoring endpoints and the rest of the stack so probes/scrapes don't distort the request-latency distribution. Records request count by bounded method/status labels, end-to-end latency, and in-flight gauge. Also adds a shared test recorder helper that installs the global Prometheus recorder once across the test binary. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../starknet_transaction_prover/src/server.rs | 7 + .../src/server/http_metrics.rs | 143 ++++++++++++++++++ .../src/server/http_metrics_test.rs | 98 ++++++++++++ .../src/server/metrics.rs | 7 +- .../src/server/metrics_test.rs | 17 ++- .../src/server/test_recorder.rs | 24 +++ .../src/server/tls.rs | 1 + 7 files changed, 287 insertions(+), 10 deletions(-) create mode 100644 crates/starknet_transaction_prover/src/server/http_metrics.rs create mode 100644 crates/starknet_transaction_prover/src/server/http_metrics_test.rs create mode 100644 crates/starknet_transaction_prover/src/server/test_recorder.rs diff --git a/crates/starknet_transaction_prover/src/server.rs b/crates/starknet_transaction_prover/src/server.rs index 915317d4e87..6a801ca0932 100644 --- a/crates/starknet_transaction_prover/src/server.rs +++ b/crates/starknet_transaction_prover/src/server.rs @@ -43,6 +43,8 @@ pub const OHTTP_JSONRPSEE_BODY_BUILDER: fn(Full) -> HttpBody = HttpBody:: /// - `RequestLogLayer` is outermost so the latency it measures covers every other layer. /// - `HealthLayer` (and `MetricsLayer` when configured) sit inside it so `/health` probes and /// `/metrics` scrapes short-circuit before CORS/OHTTP. +/// - `HttpMetricsLayer` records per-request latency; it sits below `HealthLayer`/`MetricsLayer` so +/// the probe and scrape traffic they short-circuit is excluded from the distribution. /// - `OhttpLayer` must sit OUTSIDE `CompressionLayer` so compression applies to the inner JSON-RPC /// response (the client's inner `Accept-Encoding` travels through BHTTP into jsonrpsee) rather /// than to the OHTTP ciphertext envelope. `MapRequestBodyLayer`/`MapResponseBodyLayer` keep @@ -56,6 +58,7 @@ macro_rules! prover_http_middleware { .layer(RequestLogLayer) .layer(HealthLayer) .option_layer($metrics_layer) + .layer(HttpMetricsLayer) .option_layer($cors_layer) .layer(MapRequestBodyLayer::new(HttpBody::new)) .option_layer($ohttp_layer) @@ -69,6 +72,7 @@ pub mod config; pub mod cors; pub mod errors; pub mod health; +pub mod http_metrics; pub mod log_redact; pub mod metrics; #[cfg(test)] @@ -78,9 +82,12 @@ pub mod request_log; pub mod request_span; pub mod rpc_api; pub mod rpc_impl; +#[cfg(test)] +pub mod test_recorder; pub mod tls; pub use health::{HealthLayer, HEALTH_PATH}; +pub use http_metrics::HttpMetricsLayer; pub use metrics::{MetricsLayer, METRICS_PATH}; pub use request_log::{RequestLogLayer, REQUEST_ID_HEADER}; pub use request_span::RequestSpanLayer; diff --git a/crates/starknet_transaction_prover/src/server/http_metrics.rs b/crates/starknet_transaction_prover/src/server/http_metrics.rs new file mode 100644 index 00000000000..5432497e19a --- /dev/null +++ b/crates/starknet_transaction_prover/src/server/http_metrics.rs @@ -0,0 +1,143 @@ +//! tower middleware that records HTTP-level Prometheus metrics: +//! request count, latency histogram, and an RAII-guarded in-flight gauge. +//! Sits inside (below) `HealthLayer`/`MetricsLayer` so the `/health` and +//! `/metrics` probes they short-circuit never reach this layer and don't +//! distort the latency distribution. Label cardinality is bounded by +//! `method_label` and the HTTP status code enumeration. + +use std::task::{Context, Poll}; +use std::time::Instant; + +use http::{Method, Request, Response, StatusCode}; +use jsonrpsee::server::HttpBody; +use tower::{Layer, Service}; + +#[cfg(test)] +#[path = "http_metrics_test.rs"] +mod http_metrics_test; + +/// Metric name constants. +pub mod names { + /// Counter of HTTP requests by method + status code. + pub const REQUESTS_TOTAL: &str = "prover_http_requests_total"; + /// Histogram of end-to-end HTTP request latency by method. + pub const REQUEST_DURATION_SECONDS: &str = "prover_http_request_duration_seconds"; + /// Gauge of in-flight HTTP requests. + pub const IN_FLIGHT_REQUESTS: &str = "prover_http_inflight_requests"; +} + +/// Pre-registers the three HTTP metrics so they appear in /metrics even +/// before the first request — dashboards relying on `rate(...) > 0` need +/// the series to exist. Note we deliberately do *not* pre-`record` the +/// histogram: that would inject a phantom 0-second observation that +/// distorts every quantile. +pub fn preregister_http_metrics() { + metrics::counter!(names::REQUESTS_TOTAL, "method" => "POST", "status" => "2xx").increment(0); + metrics::describe_histogram!( + names::REQUEST_DURATION_SECONDS, + "HTTP request latency in seconds, by method", + ); + metrics::gauge!(names::IN_FLIGHT_REQUESTS).set(0.0); +} + +#[derive(Clone, Copy)] +pub struct HttpMetricsLayer; + +impl Layer for HttpMetricsLayer { + type Service = HttpMetricsService; + + fn layer(&self, inner: S) -> Self::Service { + HttpMetricsService { inner } + } +} + +#[derive(Clone)] +pub struct HttpMetricsService { + inner: S, +} + +impl Service> for HttpMetricsService +where + S: Service, Response = Response>, + S::Future: Send + 'static, + S::Error: Send + 'static, +{ + type Response = Response; + type Error = S::Error; + type Future = std::pin::Pin< + Box> + Send>, + >; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + let method = method_label(request.method()); + let start = Instant::now(); + let future = self.inner.call(request); + + Box::pin(async move { + metrics::gauge!(names::IN_FLIGHT_REQUESTS).increment(1.0); + let _in_flight_guard = InFlightGuard; + let result = future.await; + let duration_seconds = start.elapsed().as_secs_f64(); + let status = match &result { + Ok(response) => status_label(response.status()), + // Sentinel for "tower stack failure, no HTTP response + // produced" so dashboards can filter it out from real codes. + Err(_) => "error", + }; + metrics::histogram!(names::REQUEST_DURATION_SECONDS, "method" => method) + .record(duration_seconds); + metrics::counter!( + names::REQUESTS_TOTAL, + "method" => method, + "status" => status, + ) + .increment(1); + result + }) + } +} + +/// Collapses HTTP statuses to a 6-value enum so a malformed response or +/// future framework version that emits exotic codes can't blow up +/// Prometheus series cardinality. +fn status_label(status: StatusCode) -> &'static str { + match status.as_u16() { + 100..=199 => "1xx", + 200..=299 => "2xx", + 300..=399 => "3xx", + 400..=499 => "4xx", + 500..=599 => "5xx", + _ => "other", + } +} + +/// Collapses HTTP methods into a small bounded set of label values so a +/// malformed request (or future hyper version that admits new tokens) +/// can't grow the Prometheus series cardinality unboundedly. +fn method_label(method: &Method) -> &'static str { + match *method { + Method::GET => "GET", + Method::POST => "POST", + Method::PUT => "PUT", + Method::DELETE => "DELETE", + Method::HEAD => "HEAD", + Method::OPTIONS => "OPTIONS", + Method::PATCH => "PATCH", + _ => "other", + } +} + +/// Decrements the in-flight gauge when dropped. Using a guard rather than +/// an explicit decrement after `future.await` covers panic + cancellation +/// paths so the gauge can't leak upward without coming back down. +struct InFlightGuard; + +impl Drop for InFlightGuard { + fn drop(&mut self) { + metrics::gauge!(names::IN_FLIGHT_REQUESTS).decrement(1.0); + } +} diff --git a/crates/starknet_transaction_prover/src/server/http_metrics_test.rs b/crates/starknet_transaction_prover/src/server/http_metrics_test.rs new file mode 100644 index 00000000000..ebb42ce40e3 --- /dev/null +++ b/crates/starknet_transaction_prover/src/server/http_metrics_test.rs @@ -0,0 +1,98 @@ +//! Unit tests for [`HttpMetricsLayer`]. +//! +//! All HTTP-metric tests live in this single `#[tokio::test]` because the +//! Prometheus recorder is process-global: parallel tests sharing the same +//! recorder would race on counter values. We run a sequence of requests +//! and assert deltas between them. + +use bytes::Bytes; +use http::{Method, Request, Response, StatusCode}; +use http_body_util::Full; +use jsonrpsee::server::HttpBody; +use tower::{Layer, ServiceExt}; + +use crate::server::http_metrics::{names, HttpMetricsLayer}; +use crate::server::test_recorder::shared_handle; + +fn ok_service() -> impl tower::Service< + Request, + Response = Response, + Error = std::convert::Infallible, + Future = futures::future::Ready, std::convert::Infallible>>, +> + Clone { + tower::service_fn(|_req: Request| { + let response = Response::builder() + .status(StatusCode::OK) + .body(HttpBody::new(Full::new(Bytes::new()))) + .expect("static body is infallible"); + futures::future::ready(Ok::<_, std::convert::Infallible>(response)) + }) +} + +fn build_request(method: Method) -> Request { + Request::builder() + .method(method) + .uri("/") + .body(HttpBody::new(Full::new(Bytes::new()))) + .expect("static body is infallible") +} + +#[tokio::test] +async fn records_counter_histogram_and_returns_inflight_to_zero() { + let handle = shared_handle(); + let svc = HttpMetricsLayer.layer(ok_service()); + + // Capture counter / histogram baselines before this test runs so we + // can assert deltas — the recorder is shared across the test binary + // so other tests may have moved the absolute values. + let before = parse_counter_and_histogram(&handle.render()); + + // Issue three POSTs sequentially. After each await the in-flight gauge + // must drop back to zero (the guard runs on future completion). + for _ in 0..3 { + let response = svc.clone().oneshot(build_request(Method::POST)).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + let scrape = handle.render(); + let after = parse_counter_and_histogram(&scrape); + assert_eq!(after.counter - before.counter, 3.0, "counter delta"); + assert_eq!(after.histogram_count - before.histogram_count, 3.0, "histogram delta"); + + // Gauge returned to zero — guard ran for every request. + let gauge_line = scrape + .lines() + .find(|line| line.starts_with(names::IN_FLIGHT_REQUESTS) && !line.starts_with("# ")) + .unwrap_or_else(|| panic!("missing in-flight gauge in scrape:\n{scrape}")); + let gauge_value: f64 = + gauge_line.rsplit_once(' ').and_then(|(_, value)| value.parse().ok()).expect("gauge parse"); + assert_eq!(gauge_value, 0.0); +} + +struct Snapshot { + counter: f64, + histogram_count: f64, +} + +fn parse_counter_and_histogram(scrape: &str) -> Snapshot { + let counter = scrape + .lines() + .find(|line| { + line.starts_with(names::REQUESTS_TOTAL) + && line.contains("method=\"POST\"") + && line.contains("status=\"2xx\"") + && !line.starts_with("# ") + }) + .and_then(|line| line.rsplit_once(' ').and_then(|(_, value)| value.parse().ok())) + .unwrap_or(0.0); + let histogram_count = scrape + .lines() + .find(|line| { + line.starts_with(&format!("{}_count", names::REQUEST_DURATION_SECONDS)) + && line.contains("method=\"POST\"") + && !line.starts_with("# ") + }) + .and_then(|line| line.rsplit_once(' ').and_then(|(_, value)| value.parse().ok())) + .unwrap_or(0.0); + Snapshot { counter, histogram_count } +} diff --git a/crates/starknet_transaction_prover/src/server/metrics.rs b/crates/starknet_transaction_prover/src/server/metrics.rs index 992cd1122b3..f763f23000c 100644 --- a/crates/starknet_transaction_prover/src/server/metrics.rs +++ b/crates/starknet_transaction_prover/src/server/metrics.rs @@ -68,10 +68,11 @@ pub fn install_exporter(version: &str, git_sha: &str) -> anyhow::Result git_sha.to_string(), ) .set(1.0); - // Pre-register the counter at 0 so it shows up in scrapes before the - // first rejection — dashboards relying on `rate(...) > 0` need the - // series to exist. + // Pre-register counters/gauges at zero so they show up in scrapes + // before the first request — dashboards relying on `rate(...) > 0` + // need the series to exist. metrics::counter!(names::CONCURRENCY_REJECTED_TOTAL).increment(0); + super::http_metrics::preregister_http_metrics(); Ok(handle) } diff --git a/crates/starknet_transaction_prover/src/server/metrics_test.rs b/crates/starknet_transaction_prover/src/server/metrics_test.rs index 29a092ce687..5fd184f45db 100644 --- a/crates/starknet_transaction_prover/src/server/metrics_test.rs +++ b/crates/starknet_transaction_prover/src/server/metrics_test.rs @@ -4,7 +4,8 @@ use http_body_util::{BodyExt, Full}; use jsonrpsee::server::HttpBody; use tower::{Layer, ServiceExt}; -use crate::server::metrics::{install_exporter, MetricsLayer, METRICS_PATH}; +use crate::server::metrics::{MetricsLayer, METRICS_PATH}; +use crate::server::test_recorder::shared_handle; fn fallthrough_service() -> impl tower::Service< Request, @@ -37,10 +38,9 @@ async fn read_body(response: Response) -> (StatusCode, Vec) { #[tokio::test] async fn get_metrics_renders_prometheus_text() { - // Note: install_exporter installs the global recorder, so this test must - // be the only one in the crate that calls it. Other metric tests should - // share this fixture or call install_exporter via `try_install`. - let handle = install_exporter("0.0.1-test", "deadbeef").expect("install"); + // `shared_handle` installs the recorder exactly once across the test + // binary; see `test_recorder.rs`. + let handle = shared_handle().clone(); let svc = MetricsLayer::new(handle).layer(fallthrough_service()); let response = svc.oneshot(empty_request(Method::GET, METRICS_PATH)).await.unwrap(); @@ -52,6 +52,9 @@ async fn get_metrics_renders_prometheus_text() { body_text.contains("prover_build_info"), "scrape should include build_info, got:\n{body_text}" ); - assert!(body_text.contains("version=\"0.0.1-test\"")); - assert!(body_text.contains("git_sha=\"deadbeef\"")); + // Don't bind to specific label values — `shared_handle` uses generic + // test labels and is also called by other tests. Verifying the + // build_info series exists at all is sufficient. + assert!(body_text.contains("version=")); + assert!(body_text.contains("git_sha=")); } diff --git a/crates/starknet_transaction_prover/src/server/test_recorder.rs b/crates/starknet_transaction_prover/src/server/test_recorder.rs new file mode 100644 index 00000000000..6fc342118b9 --- /dev/null +++ b/crates/starknet_transaction_prover/src/server/test_recorder.rs @@ -0,0 +1,24 @@ +//! Test-only helper for sharing a Prometheus recorder across unit tests. +//! +//! `metrics-exporter-prometheus` installs into a single global recorder. +//! Two tests that each call `install_exporter` race on it: the second +//! install fails with "attempted to set a recorder after the metrics +//! system was already initialized". This module exposes a `OnceLock` that +//! installs exactly once and hands the same handle to every test, with +//! generic version/git_sha labels so no test depends on specific values. + +use std::sync::OnceLock; + +use metrics_exporter_prometheus::PrometheusHandle; + +use crate::server::metrics::install_exporter; + +static SHARED_HANDLE: OnceLock = OnceLock::new(); + +/// Returns the shared [`PrometheusHandle`], installing the recorder on the +/// first call. Safe to call from any test (including in parallel) — the +/// `OnceLock` synchronizes installation. +pub fn shared_handle() -> &'static PrometheusHandle { + SHARED_HANDLE + .get_or_init(|| install_exporter("0.0.0-test", "test-sha").expect("install test recorder")) +} diff --git a/crates/starknet_transaction_prover/src/server/tls.rs b/crates/starknet_transaction_prover/src/server/tls.rs index 4001e36dbb8..16534c829e7 100644 --- a/crates/starknet_transaction_prover/src/server/tls.rs +++ b/crates/starknet_transaction_prover/src/server/tls.rs @@ -29,6 +29,7 @@ use tracing::warn; use crate::server::{ HealthLayer, + HttpMetricsLayer, MetricsLayer, OhttpJsonrpseeLayer, RequestLogLayer,