Skip to content

Commit d9c346a

Browse files
avi-starkwareclaude
andcommitted
starknet_transaction_prover: HTTP request count + latency + in-flight metrics
Adds `HttpMetricsLayer` placed between the monitoring endpoints and the rest of the stack so probes/scrapes don't distort the request-latency distribution. Records request count by bounded method/status labels, end-to-end latency, and in-flight gauge. Also adds a shared test recorder helper that installs the global Prometheus recorder once across the test binary. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d84bd92 commit d9c346a

7 files changed

Lines changed: 291 additions & 16 deletions

File tree

crates/starknet_transaction_prover/src/server.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod config;
3030
pub mod cors;
3131
pub mod errors;
3232
pub mod health;
33+
pub mod http_metrics;
3334
pub mod log_redact;
3435
pub mod metrics;
3536
#[cfg(test)]
@@ -39,9 +40,12 @@ pub mod request_log;
3940
pub mod request_span;
4041
pub mod rpc_api;
4142
pub mod rpc_impl;
43+
#[cfg(test)]
44+
pub mod test_recorder;
4245
pub mod tls;
4346

4447
pub use health::{HealthLayer, HEALTH_PATH};
48+
pub use http_metrics::HttpMetricsLayer;
4549
pub use metrics::{MetricsLayer, METRICS_PATH};
4650
pub use request_log::{RequestLogLayer, REQUEST_ID_HEADER};
4751
pub use request_span::RequestSpanLayer;
@@ -88,16 +92,17 @@ pub async fn start_server(
8892
// type it expects. `HttpBody::new` is a zero-cost wrapper, so
8993
// non-OHTTP requests still stream through unbuffered.
9094
.set_http_middleware(
91-
// `RequestLogLayer` is outermost so the latency it measures
92-
// covers every other layer. `HealthLayer` and `MetricsLayer`
93-
// sit inside it so probes/scrapes complete before CORS/OHTTP.
94-
// `RequestSpanLayer` sits BELOW `OhttpLayer` so it spans the
95-
// decapsulated inner request with a fresh, envelope-unlinkable
96-
// id (see `request_span`).
95+
// `RequestLogLayer` outermost so its latency covers the
96+
// whole stack. `HealthLayer` and `MetricsLayer` sit inside
97+
// it so probes/scrapes are excluded from `HttpMetricsLayer`'s
98+
// request-latency distribution. `RequestSpanLayer` sits BELOW
99+
// `OhttpLayer` so it spans the decapsulated inner request with
100+
// a fresh, envelope-unlinkable id (see `request_span`).
97101
ServiceBuilder::new()
98102
.layer(RequestLogLayer)
99103
.layer(HealthLayer)
100104
.option_layer(metrics_layer)
105+
.layer(HttpMetricsLayer)
101106
.option_layer(cors_layer)
102107
.layer(MapRequestBodyLayer::new(HttpBody::new))
103108
.option_layer(ohttp_layer)
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
//! tower middleware that records HTTP-level Prometheus metrics:
2+
//! request count, latency histogram, and an RAII-guarded in-flight gauge.
3+
//! Sits outside `HealthLayer`/`MetricsLayer` so monitoring probes don't
4+
//! distort the latency distribution. Label cardinality is bounded by
5+
//! `method_label` and the HTTP status code enumeration.
6+
7+
use std::task::{Context, Poll};
8+
use std::time::Instant;
9+
10+
use http::{Method, Request, Response, StatusCode};
11+
use jsonrpsee::server::HttpBody;
12+
use tower::{Layer, Service};
13+
14+
#[cfg(test)]
15+
#[path = "http_metrics_test.rs"]
16+
mod http_metrics_test;
17+
18+
/// Metric name constants.
19+
pub mod names {
20+
/// Counter of HTTP requests by method + status code.
21+
pub const REQUESTS_TOTAL: &str = "prover_http_requests_total";
22+
/// Histogram of end-to-end HTTP request latency by method.
23+
pub const REQUEST_DURATION_SECONDS: &str = "prover_http_request_duration_seconds";
24+
/// Gauge of in-flight HTTP requests.
25+
pub const IN_FLIGHT_REQUESTS: &str = "prover_http_inflight_requests";
26+
}
27+
28+
/// Pre-registers the three HTTP metrics so they appear in /metrics even
29+
/// before the first request — dashboards relying on `rate(...) > 0` need
30+
/// the series to exist. Note we deliberately do *not* pre-`record` the
31+
/// histogram: that would inject a phantom 0-second observation that
32+
/// distorts every quantile.
33+
pub fn preregister_http_metrics() {
34+
metrics::counter!(names::REQUESTS_TOTAL, "method" => "POST", "status" => "2xx").increment(0);
35+
metrics::describe_histogram!(
36+
names::REQUEST_DURATION_SECONDS,
37+
"HTTP request latency in seconds, by method",
38+
);
39+
metrics::gauge!(names::IN_FLIGHT_REQUESTS).set(0.0);
40+
}
41+
42+
#[derive(Clone, Copy)]
43+
pub struct HttpMetricsLayer;
44+
45+
impl<S> Layer<S> for HttpMetricsLayer {
46+
type Service = HttpMetricsService<S>;
47+
48+
fn layer(&self, inner: S) -> Self::Service {
49+
HttpMetricsService { inner }
50+
}
51+
}
52+
53+
#[derive(Clone)]
54+
pub struct HttpMetricsService<S> {
55+
inner: S,
56+
}
57+
58+
impl<S, ReqB> Service<Request<ReqB>> for HttpMetricsService<S>
59+
where
60+
S: Service<Request<ReqB>, Response = Response<HttpBody>>,
61+
S::Future: Send + 'static,
62+
S::Error: Send + 'static,
63+
{
64+
type Response = Response<HttpBody>;
65+
type Error = S::Error;
66+
type Future = std::pin::Pin<
67+
Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
68+
>;
69+
70+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
71+
self.inner.poll_ready(cx)
72+
}
73+
74+
fn call(&mut self, request: Request<ReqB>) -> Self::Future {
75+
let method = method_label(request.method());
76+
let start = Instant::now();
77+
let future = self.inner.call(request);
78+
79+
Box::pin(async move {
80+
metrics::gauge!(names::IN_FLIGHT_REQUESTS).increment(1.0);
81+
let _in_flight_guard = InFlightGuard;
82+
let result = future.await;
83+
let duration_seconds = start.elapsed().as_secs_f64();
84+
let status = match &result {
85+
Ok(response) => status_label(response.status()),
86+
// Sentinel for "tower stack failure, no HTTP response
87+
// produced" so dashboards can filter it out from real codes.
88+
Err(_) => "error",
89+
};
90+
metrics::histogram!(names::REQUEST_DURATION_SECONDS, "method" => method)
91+
.record(duration_seconds);
92+
metrics::counter!(
93+
names::REQUESTS_TOTAL,
94+
"method" => method,
95+
"status" => status,
96+
)
97+
.increment(1);
98+
result
99+
})
100+
}
101+
}
102+
103+
/// Collapses HTTP statuses to a 6-value enum so a malformed response or
104+
/// future framework version that emits exotic codes can't blow up
105+
/// Prometheus series cardinality.
106+
fn status_label(status: StatusCode) -> &'static str {
107+
match status.as_u16() {
108+
100..=199 => "1xx",
109+
200..=299 => "2xx",
110+
300..=399 => "3xx",
111+
400..=499 => "4xx",
112+
500..=599 => "5xx",
113+
_ => "other",
114+
}
115+
}
116+
117+
/// Collapses HTTP methods into a small bounded set of label values so a
118+
/// malformed request (or future hyper version that admits new tokens)
119+
/// can't grow the Prometheus series cardinality unboundedly.
120+
fn method_label(method: &Method) -> &'static str {
121+
match *method {
122+
Method::GET => "GET",
123+
Method::POST => "POST",
124+
Method::PUT => "PUT",
125+
Method::DELETE => "DELETE",
126+
Method::HEAD => "HEAD",
127+
Method::OPTIONS => "OPTIONS",
128+
Method::PATCH => "PATCH",
129+
_ => "other",
130+
}
131+
}
132+
133+
/// Decrements the in-flight gauge when dropped. Using a guard rather than
134+
/// an explicit decrement after `future.await` covers panic + cancellation
135+
/// paths so the gauge can't leak upward without coming back down.
136+
struct InFlightGuard;
137+
138+
impl Drop for InFlightGuard {
139+
fn drop(&mut self) {
140+
metrics::gauge!(names::IN_FLIGHT_REQUESTS).decrement(1.0);
141+
}
142+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
//! Unit tests for [`HttpMetricsLayer`].
2+
//!
3+
//! All HTTP-metric tests live in this single `#[tokio::test]` because the
4+
//! Prometheus recorder is process-global: parallel tests sharing the same
5+
//! recorder would race on counter values. We run a sequence of requests
6+
//! and assert deltas between them.
7+
8+
use bytes::Bytes;
9+
use http::{Method, Request, Response, StatusCode};
10+
use http_body_util::Full;
11+
use jsonrpsee::server::HttpBody;
12+
use tower::{Layer, ServiceExt};
13+
14+
use crate::server::http_metrics::{names, HttpMetricsLayer};
15+
use crate::server::test_recorder::shared_handle;
16+
17+
fn ok_service() -> impl tower::Service<
18+
Request<HttpBody>,
19+
Response = Response<HttpBody>,
20+
Error = std::convert::Infallible,
21+
Future = futures::future::Ready<Result<Response<HttpBody>, std::convert::Infallible>>,
22+
> + Clone {
23+
tower::service_fn(|_req: Request<HttpBody>| {
24+
let response = Response::builder()
25+
.status(StatusCode::OK)
26+
.body(HttpBody::new(Full::new(Bytes::new())))
27+
.expect("static body is infallible");
28+
futures::future::ready(Ok::<_, std::convert::Infallible>(response))
29+
})
30+
}
31+
32+
fn build_request(method: Method) -> Request<HttpBody> {
33+
Request::builder()
34+
.method(method)
35+
.uri("/")
36+
.body(HttpBody::new(Full::new(Bytes::new())))
37+
.expect("static body is infallible")
38+
}
39+
40+
#[tokio::test]
41+
async fn records_counter_histogram_and_returns_inflight_to_zero() {
42+
let handle = shared_handle();
43+
let svc = HttpMetricsLayer.layer(ok_service());
44+
45+
// Capture counter / histogram baselines before this test runs so we
46+
// can assert deltas — the recorder is shared across the test binary
47+
// so other tests may have moved the absolute values.
48+
let before = parse_counter_and_histogram(&handle.render());
49+
50+
// Issue three POSTs sequentially. After each await the in-flight gauge
51+
// must drop back to zero (the guard runs on future completion).
52+
for _ in 0..3 {
53+
let response = svc.clone().oneshot(build_request(Method::POST)).await.unwrap();
54+
assert_eq!(response.status(), StatusCode::OK);
55+
}
56+
57+
let scrape = handle.render();
58+
let after = parse_counter_and_histogram(&scrape);
59+
assert_eq!(after.counter - before.counter, 3.0, "counter delta");
60+
assert_eq!(after.histogram_count - before.histogram_count, 3.0, "histogram delta");
61+
62+
// Gauge returned to zero — guard ran for every request.
63+
let gauge_line = scrape
64+
.lines()
65+
.find(|line| line.starts_with(names::IN_FLIGHT_REQUESTS) && !line.starts_with("# "))
66+
.unwrap_or_else(|| panic!("missing in-flight gauge in scrape:\n{scrape}"));
67+
let gauge_value: f64 =
68+
gauge_line.rsplit_once(' ').and_then(|(_, value)| value.parse().ok()).expect("gauge parse");
69+
assert_eq!(gauge_value, 0.0);
70+
}
71+
72+
struct Snapshot {
73+
counter: f64,
74+
histogram_count: f64,
75+
}
76+
77+
fn parse_counter_and_histogram(scrape: &str) -> Snapshot {
78+
let counter = scrape
79+
.lines()
80+
.find(|line| {
81+
line.starts_with(names::REQUESTS_TOTAL)
82+
&& line.contains("method=\"POST\"")
83+
&& line.contains("status=\"2xx\"")
84+
&& !line.starts_with("# ")
85+
})
86+
.and_then(|line| line.rsplit_once(' ').and_then(|(_, value)| value.parse().ok()))
87+
.unwrap_or(0.0);
88+
let histogram_count = scrape
89+
.lines()
90+
.find(|line| {
91+
line.starts_with(&format!("{}_count", names::REQUEST_DURATION_SECONDS))
92+
&& line.contains("method=\"POST\"")
93+
&& !line.starts_with("# ")
94+
})
95+
.and_then(|line| line.rsplit_once(' ').and_then(|(_, value)| value.parse().ok()))
96+
.unwrap_or(0.0);
97+
Snapshot { counter, histogram_count }
98+
}

crates/starknet_transaction_prover/src/server/metrics.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,11 @@ pub fn install_exporter(version: &str, git_sha: &str) -> anyhow::Result<Promethe
6868
"git_sha" => git_sha.to_string(),
6969
)
7070
.set(1.0);
71-
// Pre-register the counter at 0 so it shows up in scrapes before the
72-
// first rejection — dashboards relying on `rate(...) > 0` need the
73-
// series to exist.
71+
// Pre-register counters/gauges at zero so they show up in scrapes
72+
// before the first request — dashboards relying on `rate(...) > 0`
73+
// need the series to exist.
7474
metrics::counter!(names::CONCURRENCY_REJECTED_TOTAL).increment(0);
75+
super::http_metrics::preregister_http_metrics();
7576
Ok(handle)
7677
}
7778

crates/starknet_transaction_prover/src/server/metrics_test.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use http_body_util::{BodyExt, Full};
44
use jsonrpsee::server::HttpBody;
55
use tower::{Layer, ServiceExt};
66

7-
use crate::server::metrics::{install_exporter, MetricsLayer, METRICS_PATH};
7+
use crate::server::metrics::{MetricsLayer, METRICS_PATH};
8+
use crate::server::test_recorder::shared_handle;
89

910
fn fallthrough_service() -> impl tower::Service<
1011
Request<HttpBody>,
@@ -37,10 +38,9 @@ async fn read_body(response: Response<HttpBody>) -> (StatusCode, Vec<u8>) {
3738

3839
#[tokio::test]
3940
async fn get_metrics_renders_prometheus_text() {
40-
// Note: install_exporter installs the global recorder, so this test must
41-
// be the only one in the crate that calls it. Other metric tests should
42-
// share this fixture or call install_exporter via `try_install`.
43-
let handle = install_exporter("0.0.1-test", "deadbeef").expect("install");
41+
// `shared_handle` installs the recorder exactly once across the test
42+
// binary; see `test_recorder.rs`.
43+
let handle = shared_handle().clone();
4444
let svc = MetricsLayer::new(handle).layer(fallthrough_service());
4545

4646
let response = svc.oneshot(empty_request(Method::GET, METRICS_PATH)).await.unwrap();
@@ -52,6 +52,9 @@ async fn get_metrics_renders_prometheus_text() {
5252
body_text.contains("prover_build_info"),
5353
"scrape should include build_info, got:\n{body_text}"
5454
);
55-
assert!(body_text.contains("version=\"0.0.1-test\""));
56-
assert!(body_text.contains("git_sha=\"deadbeef\""));
55+
// Don't bind to specific label values — `shared_handle` uses generic
56+
// test labels and is also called by other tests. Verifying the
57+
// build_info series exists at all is sufficient.
58+
assert!(body_text.contains("version="));
59+
assert!(body_text.contains("git_sha="));
5760
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
//! Test-only helper for sharing a Prometheus recorder across unit tests.
2+
//!
3+
//! `metrics-exporter-prometheus` installs into a single global recorder.
4+
//! Two tests that each call `install_exporter` race on it: the second
5+
//! install fails with "attempted to set a recorder after the metrics
6+
//! system was already initialized". This module exposes a `OnceLock` that
7+
//! installs exactly once and hands the same handle to every test, with
8+
//! generic version/git_sha labels so no test depends on specific values.
9+
10+
use std::sync::OnceLock;
11+
12+
use metrics_exporter_prometheus::PrometheusHandle;
13+
14+
use crate::server::metrics::install_exporter;
15+
16+
static SHARED_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();
17+
18+
/// Returns the shared [`PrometheusHandle`], installing the recorder on the
19+
/// first call. Safe to call from any test (including in parallel) — the
20+
/// `OnceLock` synchronizes installation.
21+
pub fn shared_handle() -> &'static PrometheusHandle {
22+
SHARED_HANDLE
23+
.get_or_init(|| install_exporter("0.0.0-test", "test-sha").expect("install test recorder"))
24+
}

crates/starknet_transaction_prover/src/server/tls.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use tracing::warn;
2929

3030
use crate::server::{
3131
HealthLayer,
32+
HttpMetricsLayer,
3233
MetricsLayer,
3334
OhttpJsonrpseeLayer,
3435
RequestLogLayer,
@@ -71,6 +72,7 @@ pub async fn start_tls_server(
7172
.layer(RequestLogLayer)
7273
.layer(HealthLayer)
7374
.option_layer(metrics_layer)
75+
.layer(HttpMetricsLayer)
7476
.option_layer(cors_layer)
7577
.layer(MapRequestBodyLayer::new(HttpBody::new))
7678
.option_layer(ohttp_layer)

0 commit comments

Comments
 (0)