From bca1e04786861555ae5015ede18b6a5c88b02ba6 Mon Sep 17 00:00:00 2001 From: Avi Cohen Date: Sun, 24 May 2026 19:35:35 +0300 Subject: [PATCH] starknet_transaction_prover: Prometheus /metrics endpoint with build_info Adds `MetricsLayer` that short-circuits `GET /metrics` with a Prometheus text-format scrape, and an `install_exporter` helper that registers the global recorder and emits `prover_build_info` with version + git_sha labels. Wired alongside `HealthLayer` so the scrape path bypasses the JSON-RPC parser. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 2 + crates/starknet_transaction_prover/Cargo.toml | 2 + .../starknet_transaction_prover/src/main.rs | 10 ++ .../starknet_transaction_prover/src/server.rs | 13 ++- .../src/server/metrics.rs | 106 ++++++++++++++++++ .../src/server/metrics_test.rs | 57 ++++++++++ .../src/server/rpc_impl.rs | 2 + .../src/server/tls.rs | 11 +- 8 files changed, 198 insertions(+), 5 deletions(-) create mode 100644 crates/starknet_transaction_prover/src/server/metrics.rs create mode 100644 crates/starknet_transaction_prover/src/server/metrics_test.rs diff --git a/Cargo.lock b/Cargo.lock index 38eb4dd312d..056250d6b2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13153,6 +13153,8 @@ dependencies = [ "indexmap 2.14.0", "jsonrpsee", "jsonschema", + "metrics", + "metrics-exporter-prometheus", "mockito", "privacy-circuit-verify 1.2.2", "privacy-prove", diff --git a/crates/starknet_transaction_prover/Cargo.toml b/crates/starknet_transaction_prover/Cargo.toml index 34da85ce5b5..1dbe2aac6ec 100644 --- a/crates/starknet_transaction_prover/Cargo.toml +++ b/crates/starknet_transaction_prover/Cargo.toml @@ -28,6 +28,8 @@ http.workspace = true http-body-util.workspace = true indexmap.workspace = true jsonrpsee = { workspace = true, features = ["macros", "server"] } +metrics.workspace = true +metrics-exporter-prometheus.workspace = true privacy-prove = { workspace = true, optional = true } reqwest = { workspace = true, features = ["json"] } serde = { workspace = true, features = ["derive"] } diff --git a/crates/starknet_transaction_prover/src/main.rs b/crates/starknet_transaction_prover/src/main.rs index 6251edf4589..bf6155ba20d 100644 --- a/crates/starknet_transaction_prover/src/main.rs +++ b/crates/starknet_transaction_prover/src/main.rs @@ -22,11 +22,13 @@ async fn main() -> anyhow::Result<()> { }; use starknet_transaction_prover::server::cors::{build_cors_layer, cors_mode}; use starknet_transaction_prover::server::log_redact::redact_url_host; + use starknet_transaction_prover::server::metrics::install_exporter; use starknet_transaction_prover::server::panic::install_panic_hook; use starknet_transaction_prover::server::rpc_api::ProvingRpcServer; use starknet_transaction_prover::server::rpc_impl::ProvingRpcServerImpl; use starknet_transaction_prover::server::{ start_server, + MetricsLayer, OhttpJsonrpseeLayer, OHTTP_JSONRPSEE_BODY_BUILDER, }; @@ -55,6 +57,13 @@ async fn main() -> anyhow::Result<()> { let config = ServiceConfig::from_args(args)?; + // Install Prometheus exporter and emit `prover_build_info` before binding + // so a scrape during slow startup still returns the build identity. + let prometheus_handle = + install_exporter(env!("CARGO_PKG_VERSION"), option_env!("GIT_SHA").unwrap_or("unknown")) + .context("Failed to install Prometheus exporter")?; + let metrics_layer = Some(MetricsLayer::new(prometheus_handle)); + // Startup banner — version + chain id + redacted RPC host only. No URLs // with userinfo, no fee token address, no TLS paths, no tx data. info!( @@ -101,6 +110,7 @@ async fn main() -> anyhow::Result<()> { config.max_request_body_size, cors_layer, ohttp_layer, + metrics_layer, ) .await?; diff --git a/crates/starknet_transaction_prover/src/server.rs b/crates/starknet_transaction_prover/src/server.rs index f92c192b176..915317d4e87 100644 --- a/crates/starknet_transaction_prover/src/server.rs +++ b/crates/starknet_transaction_prover/src/server.rs @@ -41,7 +41,8 @@ pub const OHTTP_JSONRPSEE_BODY_BUILDER: fn(Full) -> HttpBody = HttpBody:: /// /// Layer order (tower makes the last-added layer innermost): /// - `RequestLogLayer` is outermost so the latency it measures covers every other layer. -/// - `HealthLayer` sits inside it so probes complete before CORS/OHTTP. +/// - `HealthLayer` (and `MetricsLayer` when configured) sit inside it so `/health` probes and +/// `/metrics` scrapes short-circuit before CORS/OHTTP. /// - `OhttpLayer` must sit OUTSIDE `CompressionLayer` so compression applies to the inner JSON-RPC /// response (the client's inner `Accept-Encoding` travels through BHTTP into jsonrpsee) rather /// than to the OHTTP ciphertext envelope. `MapRequestBodyLayer`/`MapResponseBodyLayer` keep @@ -50,10 +51,11 @@ pub const OHTTP_JSONRPSEE_BODY_BUILDER: fn(Full) -> HttpBody = HttpBody:: /// - `RequestSpanLayer` sits BELOW `OhttpLayer` so it spans the decapsulated inner request with a /// fresh, envelope-unlinkable id (see `request_span`). macro_rules! prover_http_middleware { - ($cors_layer:expr, $ohttp_layer:expr $(,)?) => { + ($metrics_layer:expr, $cors_layer:expr, $ohttp_layer:expr $(,)?) => { ServiceBuilder::new() .layer(RequestLogLayer) .layer(HealthLayer) + .option_layer($metrics_layer) .option_layer($cors_layer) .layer(MapRequestBodyLayer::new(HttpBody::new)) .option_layer($ohttp_layer) @@ -68,6 +70,7 @@ pub mod cors; pub mod errors; pub mod health; pub mod log_redact; +pub mod metrics; #[cfg(test)] pub mod mock_rpc; pub mod panic; @@ -78,6 +81,7 @@ pub mod rpc_impl; pub mod tls; pub use health::{HealthLayer, HEALTH_PATH}; +pub use metrics::{MetricsLayer, METRICS_PATH}; pub use request_log::{RequestLogLayer, REQUEST_ID_HEADER}; pub use request_span::RequestSpanLayer; @@ -89,6 +93,7 @@ mod rpc_spec_test; mod ohttp_integration_test; /// Starts the JSON-RPC server in either HTTP or HTTPS mode depending on the transport. +#[allow(clippy::too_many_arguments)] pub async fn start_server( addr: SocketAddr, transport: &TransportMode, @@ -97,6 +102,7 @@ pub async fn start_server( max_request_body_size: u32, cors_layer: Option, ohttp_layer: Option, + metrics_layer: Option, ) -> anyhow::Result<(SocketAddr, ServerHandle)> { match transport { TransportMode::Http => { @@ -107,7 +113,7 @@ pub async fn start_server( let server = ServerBuilder::default() .set_config(server_config) // See `prover_http_middleware!` for the full layer-order rationale. - .set_http_middleware(prover_http_middleware!(cors_layer, ohttp_layer)) + .set_http_middleware(prover_http_middleware!(metrics_layer, cors_layer, ohttp_layer)) .build(&addr) .await .context(format!("Failed to bind JSON-RPC server to {addr}"))?; @@ -125,6 +131,7 @@ pub async fn start_server( max_request_body_size, cors_layer, ohttp_layer, + metrics_layer, ) .await } diff --git a/crates/starknet_transaction_prover/src/server/metrics.rs b/crates/starknet_transaction_prover/src/server/metrics.rs new file mode 100644 index 00000000000..580530ae915 --- /dev/null +++ b/crates/starknet_transaction_prover/src/server/metrics.rs @@ -0,0 +1,106 @@ +//! Prometheus `/metrics` endpoint as a tower middleware layer. +//! +//! Short-circuits `GET /metrics` ahead of jsonrpsee so scrapes never run +//! through the JSON-RPC parser. Label cardinality is bounded by the +//! enumerations in [`names`] — no user-controlled values become labels. + +use std::task::{Context, Poll}; + +use bytes::Bytes; +use futures::future::{ready, Either, Ready}; +use http::{header, Method, Request, Response, StatusCode}; +use http_body_util::Full; +use jsonrpsee::server::HttpBody; +use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; +use tower::{Layer, Service}; + +#[cfg(test)] +#[path = "metrics_test.rs"] +mod metrics_test; + +/// Path served by [`MetricsLayer`]. +pub const METRICS_PATH: &str = "/metrics"; + +/// Metric name constants. Kept here so `metrics!` invocations elsewhere link +/// to a single definition instead of bare string literals. +pub mod names { + /// Build identity. Value is always 1; labels carry version + git_sha. + pub const BUILD_INFO: &str = "prover_build_info"; + /// Requests rejected because the concurrency semaphore was full. + pub const CONCURRENCY_REJECTED_TOTAL: &str = "prover_concurrency_rejected_total"; +} + +/// Initializes the global Prometheus exporter and emits the `build_info` +/// gauge. Returns the handle used by [`MetricsLayer`] to render the scrape +/// response. +/// +/// Should be called exactly once at startup. The handle is cheap to clone +/// (it wraps an `Arc`). +pub fn install_exporter(version: &str, git_sha: &str) -> anyhow::Result { + let handle = PrometheusBuilder::new() + .install_recorder() + .map_err(|err| anyhow::anyhow!("failed to install prometheus recorder: {err}"))?; + metrics::gauge!( + names::BUILD_INFO, + "version" => version.to_string(), + "git_sha" => git_sha.to_string(), + ) + .set(1.0); + // Pre-register the counter at 0 so it shows up in scrapes before the + // first rejection — dashboards relying on `rate(...) > 0` need the + // series to exist. + metrics::counter!(names::CONCURRENCY_REJECTED_TOTAL).increment(0); + Ok(handle) +} + +/// tower [`Layer`] that intercepts `GET /metrics`. +#[derive(Clone)] +pub struct MetricsLayer { + handle: PrometheusHandle, +} + +impl MetricsLayer { + pub fn new(handle: PrometheusHandle) -> Self { + Self { handle } + } +} + +impl Layer for MetricsLayer { + type Service = MetricsService; + + fn layer(&self, inner: S) -> Self::Service { + MetricsService { inner, handle: self.handle.clone() } + } +} + +#[derive(Clone)] +pub struct MetricsService { + inner: S, + handle: PrometheusHandle, +} + +impl Service> for MetricsService +where + S: Service, Response = Response>, +{ + type Response = Response; + type Error = S::Error; + type Future = Either>, S::Future>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: Request) -> Self::Future { + if request.method() == Method::GET && request.uri().path() == METRICS_PATH { + let body = Bytes::from(self.handle.render()); + let response = Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "text/plain; version=0.0.4") + .body(HttpBody::new(Full::new(body))) + .expect("response build with a string body is infallible"); + return Either::Left(ready(Ok(response))); + } + Either::Right(self.inner.call(request)) + } +} diff --git a/crates/starknet_transaction_prover/src/server/metrics_test.rs b/crates/starknet_transaction_prover/src/server/metrics_test.rs new file mode 100644 index 00000000000..29a092ce687 --- /dev/null +++ b/crates/starknet_transaction_prover/src/server/metrics_test.rs @@ -0,0 +1,57 @@ +use bytes::Bytes; +use http::{Method, Request, Response, StatusCode}; +use http_body_util::{BodyExt, Full}; +use jsonrpsee::server::HttpBody; +use tower::{Layer, ServiceExt}; + +use crate::server::metrics::{install_exporter, MetricsLayer, METRICS_PATH}; + +fn fallthrough_service() -> impl tower::Service< + Request, + Response = Response, + Error = std::convert::Infallible, + Future = futures::future::Ready, std::convert::Infallible>>, +> + Clone { + tower::service_fn(|_req: Request| { + let response = Response::builder() + .status(StatusCode::IM_A_TEAPOT) + .body(HttpBody::new(Full::new(Bytes::from_static(b"fallthrough")))) + .expect("static body is infallible"); + futures::future::ready(Ok::<_, std::convert::Infallible>(response)) + }) +} + +fn empty_request(method: Method, path: &str) -> Request { + Request::builder() + .method(method) + .uri(path) + .body(HttpBody::new(Full::new(Bytes::new()))) + .expect("static body is infallible") +} + +async fn read_body(response: Response) -> (StatusCode, Vec) { + let (parts, body) = response.into_parts(); + let bytes = body.collect().await.expect("body collect").to_bytes().to_vec(); + (parts.status, bytes) +} + +#[tokio::test] +async fn get_metrics_renders_prometheus_text() { + // Note: install_exporter installs the global recorder, so this test must + // be the only one in the crate that calls it. Other metric tests should + // share this fixture or call install_exporter via `try_install`. + let handle = install_exporter("0.0.1-test", "deadbeef").expect("install"); + let svc = MetricsLayer::new(handle).layer(fallthrough_service()); + + let response = svc.oneshot(empty_request(Method::GET, METRICS_PATH)).await.unwrap(); + + let (status, body) = read_body(response).await; + assert_eq!(status, StatusCode::OK); + let body_text = String::from_utf8(body).unwrap(); + assert!( + body_text.contains("prover_build_info"), + "scrape should include build_info, got:\n{body_text}" + ); + assert!(body_text.contains("version=\"0.0.1-test\"")); + assert!(body_text.contains("git_sha=\"deadbeef\"")); +} diff --git a/crates/starknet_transaction_prover/src/server/rpc_impl.rs b/crates/starknet_transaction_prover/src/server/rpc_impl.rs index 23ecc74966b..4fd5ed9a3f5 100644 --- a/crates/starknet_transaction_prover/src/server/rpc_impl.rs +++ b/crates/starknet_transaction_prover/src/server/rpc_impl.rs @@ -13,6 +13,7 @@ use tracing::warn; use crate::proving::virtual_snos_prover::{ProveTransactionResult, RpcVirtualSnosProver}; use crate::server::config::ServiceConfig; use crate::server::errors::service_busy; +use crate::server::metrics::names::CONCURRENCY_REJECTED_TOTAL; use crate::server::rpc_api::ProvingRpcServer; /// Starknet RPC specification version. @@ -57,6 +58,7 @@ impl ProvingRpcServer for ProvingRpcServerImpl { transaction: RpcTransaction, ) -> RpcResult { let _permit = self.concurrency_semaphore.try_acquire().map_err(|_| { + metrics::counter!(CONCURRENCY_REJECTED_TOTAL).increment(1); warn!( max_concurrent_requests = self.max_concurrent_requests, "Rejected proving request: service is at capacity" diff --git a/crates/starknet_transaction_prover/src/server/tls.rs b/crates/starknet_transaction_prover/src/server/tls.rs index c28f597c390..4001e36dbb8 100644 --- a/crates/starknet_transaction_prover/src/server/tls.rs +++ b/crates/starknet_transaction_prover/src/server/tls.rs @@ -27,7 +27,13 @@ use tower_http::map_request_body::MapRequestBodyLayer; use tower_http::map_response_body::MapResponseBodyLayer; use tracing::warn; -use crate::server::{HealthLayer, OhttpJsonrpseeLayer, RequestLogLayer, RequestSpanLayer}; +use crate::server::{ + HealthLayer, + MetricsLayer, + OhttpJsonrpseeLayer, + RequestLogLayer, + RequestSpanLayer, +}; /// Maximum time allowed for a TLS handshake before the connection is dropped. const TLS_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10); @@ -45,6 +51,7 @@ pub async fn start_tls_server( max_request_body_size: u32, cors_layer: Option, ohttp_layer: Option, + metrics_layer: Option, ) -> anyhow::Result<(SocketAddr, ServerHandle)> { let tls_acceptor = load_tls_acceptor(cert_path, key_path)?; @@ -55,7 +62,7 @@ pub async fn start_tls_server( // See `prover_http_middleware!` for the full layer-order rationale. let svc_builder = ServerBuilder::default() .set_config(server_config) - .set_http_middleware(prover_http_middleware!(cors_layer, ohttp_layer)) + .set_http_middleware(prover_http_middleware!(metrics_layer, cors_layer, ohttp_layer)) .to_service_builder(); let listener = TcpListener::bind(addr)