Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/starknet_transaction_prover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub const OHTTP_JSONRPSEE_BODY_BUILDER: fn(Full<Bytes>) -> HttpBody = HttpBody::
/// - `RequestLogLayer` is outermost so the latency it measures covers every other layer.
/// - `HealthLayer` (and `MetricsLayer` when configured) sit inside it so `/health` probes and
/// `/metrics` scrapes short-circuit before CORS/OHTTP.
/// - `HttpMetricsLayer` records per-request latency; it sits below `HealthLayer`/`MetricsLayer` so
/// the probe and scrape traffic they short-circuit is excluded from the distribution.
/// - `OhttpLayer` must sit OUTSIDE `CompressionLayer` so compression applies to the inner JSON-RPC
/// response (the client's inner `Accept-Encoding` travels through BHTTP into jsonrpsee) rather
/// than to the OHTTP ciphertext envelope. `MapRequestBodyLayer`/`MapResponseBodyLayer` keep
Expand All @@ -56,6 +58,7 @@ macro_rules! prover_http_middleware {
.layer(RequestLogLayer)
.layer(HealthLayer)
.option_layer($metrics_layer)
.layer(HttpMetricsLayer)
.option_layer($cors_layer)
.layer(MapRequestBodyLayer::new(HttpBody::new))
.option_layer($ohttp_layer)
Expand All @@ -69,6 +72,7 @@ pub mod config;
pub mod cors;
pub mod errors;
pub mod health;
pub mod http_metrics;
pub mod log_redact;
pub mod metrics;
#[cfg(test)]
Expand All @@ -78,9 +82,12 @@ pub mod request_log;
pub mod request_span;
pub mod rpc_api;
pub mod rpc_impl;
#[cfg(test)]
pub mod test_recorder;
pub mod tls;

pub use health::{HealthLayer, HEALTH_PATH};
pub use http_metrics::HttpMetricsLayer;
pub use metrics::{MetricsLayer, METRICS_PATH};
pub use request_log::{RequestLogLayer, REQUEST_ID_HEADER};
pub use request_span::RequestSpanLayer;
Expand Down
143 changes: 143 additions & 0 deletions crates/starknet_transaction_prover/src/server/http_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//! tower middleware that records HTTP-level Prometheus metrics:
//! request count, latency histogram, and an RAII-guarded in-flight gauge.
//! Sits inside (below) `HealthLayer`/`MetricsLayer` so the `/health` and
//! `/metrics` probes they short-circuit never reach this layer and don't
//! distort the latency distribution. Label cardinality is bounded by
//! `method_label` and the HTTP status code enumeration.

use std::task::{Context, Poll};
use std::time::Instant;

use http::{Method, Request, Response, StatusCode};
use jsonrpsee::server::HttpBody;
use tower::{Layer, Service};

#[cfg(test)]
#[path = "http_metrics_test.rs"]
mod http_metrics_test;

/// Metric name constants.
pub mod names {
/// Counter of HTTP requests by method + status code.
pub const REQUESTS_TOTAL: &str = "prover_http_requests_total";
/// Histogram of end-to-end HTTP request latency by method.
pub const REQUEST_DURATION_SECONDS: &str = "prover_http_request_duration_seconds";
/// Gauge of in-flight HTTP requests.
pub const IN_FLIGHT_REQUESTS: &str = "prover_http_inflight_requests";
}

/// Pre-registers the three HTTP metrics so they appear in /metrics even
/// before the first request — dashboards relying on `rate(...) > 0` need
/// the series to exist. Note we deliberately do *not* pre-`record` the
/// histogram: that would inject a phantom 0-second observation that
/// distorts every quantile.
pub fn preregister_http_metrics() {
metrics::counter!(names::REQUESTS_TOTAL, "method" => "POST", "status" => "2xx").increment(0);
metrics::describe_histogram!(
names::REQUEST_DURATION_SECONDS,
"HTTP request latency in seconds, by method",
);
metrics::gauge!(names::IN_FLIGHT_REQUESTS).set(0.0);
}

#[derive(Clone, Copy)]
pub struct HttpMetricsLayer;

impl<S> Layer<S> for HttpMetricsLayer {
type Service = HttpMetricsService<S>;

fn layer(&self, inner: S) -> Self::Service {
HttpMetricsService { inner }
}
}

#[derive(Clone)]
pub struct HttpMetricsService<S> {
inner: S,
}

impl<S, ReqB> Service<Request<ReqB>> for HttpMetricsService<S>
where
S: Service<Request<ReqB>, Response = Response<HttpBody>>,
S::Future: Send + 'static,
S::Error: Send + 'static,
{
type Response = Response<HttpBody>;
type Error = S::Error;
type Future = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: Request<ReqB>) -> Self::Future {
let method = method_label(request.method());
let start = Instant::now();
let future = self.inner.call(request);

Box::pin(async move {
metrics::gauge!(names::IN_FLIGHT_REQUESTS).increment(1.0);
let _in_flight_guard = InFlightGuard;
let result = future.await;
let duration_seconds = start.elapsed().as_secs_f64();
let status = match &result {
Ok(response) => status_label(response.status()),
// Sentinel for "tower stack failure, no HTTP response
// produced" so dashboards can filter it out from real codes.
Err(_) => "error",
};
metrics::histogram!(names::REQUEST_DURATION_SECONDS, "method" => method)
.record(duration_seconds);
metrics::counter!(
names::REQUESTS_TOTAL,
"method" => method,
"status" => status,
)
.increment(1);
result
})
}
}

/// Collapses HTTP statuses to a 6-value enum so a malformed response or
/// future framework version that emits exotic codes can't blow up
/// Prometheus series cardinality.
fn status_label(status: StatusCode) -> &'static str {
match status.as_u16() {
100..=199 => "1xx",
200..=299 => "2xx",
300..=399 => "3xx",
400..=499 => "4xx",
500..=599 => "5xx",
_ => "other",
}
}

/// Collapses HTTP methods into a small bounded set of label values so a
/// malformed request (or future hyper version that admits new tokens)
/// can't grow the Prometheus series cardinality unboundedly.
fn method_label(method: &Method) -> &'static str {
match *method {
Method::GET => "GET",
Method::POST => "POST",
Method::PUT => "PUT",
Method::DELETE => "DELETE",
Method::HEAD => "HEAD",
Method::OPTIONS => "OPTIONS",
Method::PATCH => "PATCH",
_ => "other",
}
}

/// Decrements the in-flight gauge when dropped. Using a guard rather than
/// an explicit decrement after `future.await` covers panic + cancellation
/// paths so the gauge can't leak upward without coming back down.
struct InFlightGuard;

impl Drop for InFlightGuard {
fn drop(&mut self) {
metrics::gauge!(names::IN_FLIGHT_REQUESTS).decrement(1.0);
}
}
98 changes: 98 additions & 0 deletions crates/starknet_transaction_prover/src/server/http_metrics_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//! Unit tests for [`HttpMetricsLayer`].
//!
//! All HTTP-metric tests live in this single `#[tokio::test]` because the
//! Prometheus recorder is process-global: parallel tests sharing the same
//! recorder would race on counter values. We run a sequence of requests
//! and assert deltas between them.

use bytes::Bytes;
use http::{Method, Request, Response, StatusCode};
use http_body_util::Full;
use jsonrpsee::server::HttpBody;
use tower::{Layer, ServiceExt};

use crate::server::http_metrics::{names, HttpMetricsLayer};
use crate::server::test_recorder::shared_handle;

fn ok_service() -> impl tower::Service<
Request<HttpBody>,
Response = Response<HttpBody>,
Error = std::convert::Infallible,
Future = futures::future::Ready<Result<Response<HttpBody>, std::convert::Infallible>>,
> + Clone {
tower::service_fn(|_req: Request<HttpBody>| {
let response = Response::builder()
.status(StatusCode::OK)
.body(HttpBody::new(Full::new(Bytes::new())))
.expect("static body is infallible");
futures::future::ready(Ok::<_, std::convert::Infallible>(response))
})
}

fn build_request(method: Method) -> Request<HttpBody> {
Request::builder()
.method(method)
.uri("/")
.body(HttpBody::new(Full::new(Bytes::new())))
.expect("static body is infallible")
}

#[tokio::test]
async fn records_counter_histogram_and_returns_inflight_to_zero() {
let handle = shared_handle();
let svc = HttpMetricsLayer.layer(ok_service());

// Capture counter / histogram baselines before this test runs so we
// can assert deltas — the recorder is shared across the test binary
// so other tests may have moved the absolute values.
let before = parse_counter_and_histogram(&handle.render());

// Issue three POSTs sequentially. After each await the in-flight gauge
// must drop back to zero (the guard runs on future completion).
for _ in 0..3 {
let response = svc.clone().oneshot(build_request(Method::POST)).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}

let scrape = handle.render();
let after = parse_counter_and_histogram(&scrape);
assert_eq!(after.counter - before.counter, 3.0, "counter delta");
assert_eq!(after.histogram_count - before.histogram_count, 3.0, "histogram delta");

// Gauge returned to zero — guard ran for every request.
let gauge_line = scrape
.lines()
.find(|line| line.starts_with(names::IN_FLIGHT_REQUESTS) && !line.starts_with("# "))
.unwrap_or_else(|| panic!("missing in-flight gauge in scrape:\n{scrape}"));
let gauge_value: f64 =
gauge_line.rsplit_once(' ').and_then(|(_, value)| value.parse().ok()).expect("gauge parse");
assert_eq!(gauge_value, 0.0);
}

struct Snapshot {
counter: f64,
histogram_count: f64,
}

fn parse_counter_and_histogram(scrape: &str) -> Snapshot {
let counter = scrape
.lines()
.find(|line| {
line.starts_with(names::REQUESTS_TOTAL)
&& line.contains("method=\"POST\"")
&& line.contains("status=\"2xx\"")
&& !line.starts_with("# ")
})
.and_then(|line| line.rsplit_once(' ').and_then(|(_, value)| value.parse().ok()))
.unwrap_or(0.0);
let histogram_count = scrape
.lines()
.find(|line| {
line.starts_with(&format!("{}_count", names::REQUEST_DURATION_SECONDS))
&& line.contains("method=\"POST\"")
&& !line.starts_with("# ")
})
.and_then(|line| line.rsplit_once(' ').and_then(|(_, value)| value.parse().ok()))
.unwrap_or(0.0);
Snapshot { counter, histogram_count }
}
7 changes: 4 additions & 3 deletions crates/starknet_transaction_prover/src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ pub fn install_exporter(version: &str, git_sha: &str) -> anyhow::Result<Promethe
"git_sha" => git_sha.to_string(),
)
.set(1.0);
// Pre-register the counter at 0 so it shows up in scrapes before the
// first rejection — dashboards relying on `rate(...) > 0` need the
// series to exist.
// Pre-register counters/gauges at zero so they show up in scrapes
// before the first request — dashboards relying on `rate(...) > 0`
// need the series to exist.
metrics::counter!(names::CONCURRENCY_REJECTED_TOTAL).increment(0);
super::http_metrics::preregister_http_metrics();
Ok(handle)
}

Expand Down
17 changes: 10 additions & 7 deletions crates/starknet_transaction_prover/src/server/metrics_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use http_body_util::{BodyExt, Full};
use jsonrpsee::server::HttpBody;
use tower::{Layer, ServiceExt};

use crate::server::metrics::{install_exporter, MetricsLayer, METRICS_PATH};
use crate::server::metrics::{MetricsLayer, METRICS_PATH};
use crate::server::test_recorder::shared_handle;

fn fallthrough_service() -> impl tower::Service<
Request<HttpBody>,
Expand Down Expand Up @@ -37,10 +38,9 @@ async fn read_body(response: Response<HttpBody>) -> (StatusCode, Vec<u8>) {

#[tokio::test]
async fn get_metrics_renders_prometheus_text() {
// Note: install_exporter installs the global recorder, so this test must
// be the only one in the crate that calls it. Other metric tests should
// share this fixture or call install_exporter via `try_install`.
let handle = install_exporter("0.0.1-test", "deadbeef").expect("install");
// `shared_handle` installs the recorder exactly once across the test
// binary; see `test_recorder.rs`.
let handle = shared_handle().clone();
let svc = MetricsLayer::new(handle).layer(fallthrough_service());

let response = svc.oneshot(empty_request(Method::GET, METRICS_PATH)).await.unwrap();
Expand All @@ -52,6 +52,9 @@ async fn get_metrics_renders_prometheus_text() {
body_text.contains("prover_build_info"),
"scrape should include build_info, got:\n{body_text}"
);
assert!(body_text.contains("version=\"0.0.1-test\""));
assert!(body_text.contains("git_sha=\"deadbeef\""));
// Don't bind to specific label values — `shared_handle` uses generic
// test labels and is also called by other tests. Verifying the
// build_info series exists at all is sufficient.
assert!(body_text.contains("version="));
assert!(body_text.contains("git_sha="));
}
24 changes: 24 additions & 0 deletions crates/starknet_transaction_prover/src/server/test_recorder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//! Test-only helper for sharing a Prometheus recorder across unit tests.
//!
//! `metrics-exporter-prometheus` installs into a single global recorder.
//! Two tests that each call `install_exporter` race on it: the second
//! install fails with "attempted to set a recorder after the metrics
//! system was already initialized". This module exposes a `OnceLock` that
//! installs exactly once and hands the same handle to every test, with
//! generic version/git_sha labels so no test depends on specific values.

use std::sync::OnceLock;

use metrics_exporter_prometheus::PrometheusHandle;

use crate::server::metrics::install_exporter;

static SHARED_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();

/// Returns the shared [`PrometheusHandle`], installing the recorder on the
/// first call. Safe to call from any test (including in parallel) — the
/// `OnceLock` synchronizes installation.
pub fn shared_handle() -> &'static PrometheusHandle {
SHARED_HANDLE
.get_or_init(|| install_exporter("0.0.0-test", "test-sha").expect("install test recorder"))
}
1 change: 1 addition & 0 deletions crates/starknet_transaction_prover/src/server/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tracing::warn;

use crate::server::{
HealthLayer,
HttpMetricsLayer,
MetricsLayer,
OhttpJsonrpseeLayer,
RequestLogLayer,
Expand Down
Loading