Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/starknet_transaction_prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ http.workspace = true
http-body-util.workspace = true
indexmap.workspace = true
jsonrpsee = { workspace = true, features = ["macros", "server"] }
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
privacy-prove = { workspace = true, optional = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true, features = ["derive"] }
Expand Down
10 changes: 10 additions & 0 deletions crates/starknet_transaction_prover/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ async fn main() -> anyhow::Result<()> {
};
use starknet_transaction_prover::server::cors::{build_cors_layer, cors_mode};
use starknet_transaction_prover::server::log_redact::redact_url_host;
use starknet_transaction_prover::server::metrics::install_exporter;
use starknet_transaction_prover::server::panic::install_panic_hook;
use starknet_transaction_prover::server::rpc_api::ProvingRpcServer;
use starknet_transaction_prover::server::rpc_impl::ProvingRpcServerImpl;
use starknet_transaction_prover::server::{
start_server,
MetricsLayer,
OhttpJsonrpseeLayer,
OHTTP_JSONRPSEE_BODY_BUILDER,
};
Expand Down Expand Up @@ -55,6 +57,13 @@ async fn main() -> anyhow::Result<()> {

let config = ServiceConfig::from_args(args)?;

// Install Prometheus exporter and emit `prover_build_info` before binding
// so a scrape during slow startup still returns the build identity.
let prometheus_handle =
install_exporter(env!("CARGO_PKG_VERSION"), option_env!("GIT_SHA").unwrap_or("unknown"))
.context("Failed to install Prometheus exporter")?;
let metrics_layer = Some(MetricsLayer::new(prometheus_handle));

// Startup banner — version + chain id + redacted RPC host only. No URLs
// with userinfo, no fee token address, no TLS paths, no tx data.
info!(
Expand Down Expand Up @@ -101,6 +110,7 @@ async fn main() -> anyhow::Result<()> {
config.max_request_body_size,
cors_layer,
ohttp_layer,
metrics_layer,
)
.await?;

Expand Down
13 changes: 10 additions & 3 deletions crates/starknet_transaction_prover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ pub const OHTTP_JSONRPSEE_BODY_BUILDER: fn(Full<Bytes>) -> HttpBody = HttpBody::
///
/// Layer order (tower makes the last-added layer innermost):
/// - `RequestLogLayer` is outermost so the latency it measures covers every other layer.
/// - `HealthLayer` sits inside it so probes complete before CORS/OHTTP.
/// - `HealthLayer` (and `MetricsLayer` when configured) sit inside it so `/health` probes and
/// `/metrics` scrapes short-circuit before CORS/OHTTP.
/// - `OhttpLayer` must sit OUTSIDE `CompressionLayer` so compression applies to the inner JSON-RPC
/// response (the client's inner `Accept-Encoding` travels through BHTTP into jsonrpsee) rather
/// than to the OHTTP ciphertext envelope. `MapRequestBodyLayer`/`MapResponseBodyLayer` keep
Expand All @@ -50,10 +51,11 @@ pub const OHTTP_JSONRPSEE_BODY_BUILDER: fn(Full<Bytes>) -> HttpBody = HttpBody::
/// - `RequestSpanLayer` sits BELOW `OhttpLayer` so it spans the decapsulated inner request with a
/// fresh, envelope-unlinkable id (see `request_span`).
macro_rules! prover_http_middleware {
($cors_layer:expr, $ohttp_layer:expr $(,)?) => {
($metrics_layer:expr, $cors_layer:expr, $ohttp_layer:expr $(,)?) => {
ServiceBuilder::new()
.layer(RequestLogLayer)
.layer(HealthLayer)
.option_layer($metrics_layer)
.option_layer($cors_layer)
.layer(MapRequestBodyLayer::new(HttpBody::new))
.option_layer($ohttp_layer)
Expand All @@ -68,6 +70,7 @@ pub mod cors;
pub mod errors;
pub mod health;
pub mod log_redact;
pub mod metrics;
#[cfg(test)]
pub mod mock_rpc;
pub mod panic;
Expand All @@ -78,6 +81,7 @@ pub mod rpc_impl;
pub mod tls;

pub use health::{HealthLayer, HEALTH_PATH};
pub use metrics::{MetricsLayer, METRICS_PATH};
pub use request_log::{RequestLogLayer, REQUEST_ID_HEADER};
pub use request_span::RequestSpanLayer;

Expand All @@ -89,6 +93,7 @@ mod rpc_spec_test;
mod ohttp_integration_test;

/// Starts the JSON-RPC server in either HTTP or HTTPS mode depending on the transport.
#[allow(clippy::too_many_arguments)]
pub async fn start_server(
addr: SocketAddr,
transport: &TransportMode,
Expand All @@ -97,6 +102,7 @@ pub async fn start_server(
max_request_body_size: u32,
cors_layer: Option<CorsLayer>,
ohttp_layer: Option<OhttpJsonrpseeLayer>,
metrics_layer: Option<MetricsLayer>,
) -> anyhow::Result<(SocketAddr, ServerHandle)> {
match transport {
TransportMode::Http => {
Expand All @@ -107,7 +113,7 @@ pub async fn start_server(
let server = ServerBuilder::default()
.set_config(server_config)
// See `prover_http_middleware!` for the full layer-order rationale.
.set_http_middleware(prover_http_middleware!(cors_layer, ohttp_layer))
.set_http_middleware(prover_http_middleware!(metrics_layer, cors_layer, ohttp_layer))
.build(&addr)
.await
.context(format!("Failed to bind JSON-RPC server to {addr}"))?;
Expand All @@ -125,6 +131,7 @@ pub async fn start_server(
max_request_body_size,
cors_layer,
ohttp_layer,
metrics_layer,
)
.await
}
Expand Down
106 changes: 106 additions & 0 deletions crates/starknet_transaction_prover/src/server/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//! Prometheus `/metrics` endpoint as a tower middleware layer.
//!
//! Short-circuits `GET /metrics` ahead of jsonrpsee so scrapes never run
//! through the JSON-RPC parser. Label cardinality is bounded by the
//! enumerations in [`names`] — no user-controlled values become labels.

use std::task::{Context, Poll};

use bytes::Bytes;
use futures::future::{ready, Either, Ready};
use http::{header, Method, Request, Response, StatusCode};
use http_body_util::Full;
use jsonrpsee::server::HttpBody;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use tower::{Layer, Service};

#[cfg(test)]
#[path = "metrics_test.rs"]
mod metrics_test;

/// Path served by [`MetricsLayer`].
pub const METRICS_PATH: &str = "/metrics";

/// Metric name constants. Kept here so `metrics!` invocations elsewhere link
/// to a single definition instead of bare string literals.
pub mod names {
/// Build identity. Value is always 1; labels carry version + git_sha.
pub const BUILD_INFO: &str = "prover_build_info";
/// Requests rejected because the concurrency semaphore was full.
pub const CONCURRENCY_REJECTED_TOTAL: &str = "prover_concurrency_rejected_total";
}

/// Initializes the global Prometheus exporter and emits the `build_info`
/// gauge. Returns the handle used by [`MetricsLayer`] to render the scrape
/// response.
///
/// Should be called exactly once at startup. The handle is cheap to clone
/// (it wraps an `Arc`).
pub fn install_exporter(version: &str, git_sha: &str) -> anyhow::Result<PrometheusHandle> {
let handle = PrometheusBuilder::new()
.install_recorder()
.map_err(|err| anyhow::anyhow!("failed to install prometheus recorder: {err}"))?;
metrics::gauge!(
names::BUILD_INFO,
"version" => version.to_string(),
"git_sha" => git_sha.to_string(),
)
.set(1.0);
// Pre-register the counter at 0 so it shows up in scrapes before the
// first rejection — dashboards relying on `rate(...) > 0` need the
// series to exist.
metrics::counter!(names::CONCURRENCY_REJECTED_TOTAL).increment(0);
Ok(handle)
}

/// tower [`Layer`] that intercepts `GET /metrics`.
#[derive(Clone)]
pub struct MetricsLayer {
handle: PrometheusHandle,
}

impl MetricsLayer {
pub fn new(handle: PrometheusHandle) -> Self {
Self { handle }
}
}

impl<S> Layer<S> for MetricsLayer {
type Service = MetricsService<S>;

fn layer(&self, inner: S) -> Self::Service {
MetricsService { inner, handle: self.handle.clone() }
}
}

#[derive(Clone)]
pub struct MetricsService<S> {
inner: S,
handle: PrometheusHandle,
}

impl<S, ReqB> Service<Request<ReqB>> for MetricsService<S>
where
S: Service<Request<ReqB>, Response = Response<HttpBody>>,
{
type Response = Response<HttpBody>;
type Error = S::Error;
type Future = Either<Ready<Result<Self::Response, Self::Error>>, S::Future>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
Comment thread
cursor[bot] marked this conversation as resolved.

fn call(&mut self, request: Request<ReqB>) -> Self::Future {
if request.method() == Method::GET && request.uri().path() == METRICS_PATH {
let body = Bytes::from(self.handle.render());
let response = Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/plain; version=0.0.4")
.body(HttpBody::new(Full::new(body)))
.expect("response build with a string body is infallible");
return Either::Left(ready(Ok(response)));
}
Either::Right(self.inner.call(request))
}
}
57 changes: 57 additions & 0 deletions crates/starknet_transaction_prover/src/server/metrics_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use bytes::Bytes;
use http::{Method, Request, Response, StatusCode};
use http_body_util::{BodyExt, Full};
use jsonrpsee::server::HttpBody;
use tower::{Layer, ServiceExt};

use crate::server::metrics::{install_exporter, MetricsLayer, METRICS_PATH};

fn fallthrough_service() -> impl tower::Service<
Request<HttpBody>,
Response = Response<HttpBody>,
Error = std::convert::Infallible,
Future = futures::future::Ready<Result<Response<HttpBody>, std::convert::Infallible>>,
> + Clone {
tower::service_fn(|_req: Request<HttpBody>| {
let response = Response::builder()
.status(StatusCode::IM_A_TEAPOT)
.body(HttpBody::new(Full::new(Bytes::from_static(b"fallthrough"))))
.expect("static body is infallible");
futures::future::ready(Ok::<_, std::convert::Infallible>(response))
})
}

fn empty_request(method: Method, path: &str) -> Request<HttpBody> {
Request::builder()
.method(method)
.uri(path)
.body(HttpBody::new(Full::new(Bytes::new())))
.expect("static body is infallible")
}

async fn read_body(response: Response<HttpBody>) -> (StatusCode, Vec<u8>) {
let (parts, body) = response.into_parts();
let bytes = body.collect().await.expect("body collect").to_bytes().to_vec();
(parts.status, bytes)
}

#[tokio::test]
async fn get_metrics_renders_prometheus_text() {
// Note: install_exporter installs the global recorder, so this test must
// be the only one in the crate that calls it. Other metric tests should
// share this fixture or call install_exporter via `try_install`.
let handle = install_exporter("0.0.1-test", "deadbeef").expect("install");
let svc = MetricsLayer::new(handle).layer(fallthrough_service());

let response = svc.oneshot(empty_request(Method::GET, METRICS_PATH)).await.unwrap();

let (status, body) = read_body(response).await;
assert_eq!(status, StatusCode::OK);
let body_text = String::from_utf8(body).unwrap();
assert!(
body_text.contains("prover_build_info"),
"scrape should include build_info, got:\n{body_text}"
);
assert!(body_text.contains("version=\"0.0.1-test\""));
assert!(body_text.contains("git_sha=\"deadbeef\""));
}
2 changes: 2 additions & 0 deletions crates/starknet_transaction_prover/src/server/rpc_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tracing::warn;
use crate::proving::virtual_snos_prover::{ProveTransactionResult, RpcVirtualSnosProver};
use crate::server::config::ServiceConfig;
use crate::server::errors::service_busy;
use crate::server::metrics::names::CONCURRENCY_REJECTED_TOTAL;
use crate::server::rpc_api::ProvingRpcServer;

/// Starknet RPC specification version.
Expand Down Expand Up @@ -57,6 +58,7 @@ impl ProvingRpcServer for ProvingRpcServerImpl {
transaction: RpcTransaction,
) -> RpcResult<ProveTransactionResult> {
let _permit = self.concurrency_semaphore.try_acquire().map_err(|_| {
metrics::counter!(CONCURRENCY_REJECTED_TOTAL).increment(1);
warn!(
max_concurrent_requests = self.max_concurrent_requests,
"Rejected proving request: service is at capacity"
Expand Down
11 changes: 9 additions & 2 deletions crates/starknet_transaction_prover/src/server/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ use tower_http::map_request_body::MapRequestBodyLayer;
use tower_http::map_response_body::MapResponseBodyLayer;
use tracing::warn;

use crate::server::{HealthLayer, OhttpJsonrpseeLayer, RequestLogLayer, RequestSpanLayer};
use crate::server::{
HealthLayer,
MetricsLayer,
OhttpJsonrpseeLayer,
RequestLogLayer,
RequestSpanLayer,
};

/// Maximum time allowed for a TLS handshake before the connection is dropped.
const TLS_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
Expand All @@ -45,6 +51,7 @@ pub async fn start_tls_server(
max_request_body_size: u32,
cors_layer: Option<CorsLayer>,
ohttp_layer: Option<OhttpJsonrpseeLayer>,
metrics_layer: Option<MetricsLayer>,
) -> anyhow::Result<(SocketAddr, ServerHandle)> {
let tls_acceptor = load_tls_acceptor(cert_path, key_path)?;

Expand All @@ -55,7 +62,7 @@ pub async fn start_tls_server(
// See `prover_http_middleware!` for the full layer-order rationale.
let svc_builder = ServerBuilder::default()
.set_config(server_config)
.set_http_middleware(prover_http_middleware!(cors_layer, ohttp_layer))
.set_http_middleware(prover_http_middleware!(metrics_layer, cors_layer, ohttp_layer))
.to_service_builder();

let listener = TcpListener::bind(addr)
Expand Down
Loading