Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion crates/starknet_transaction_prover/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ fn main() {
async fn main() -> anyhow::Result<()> {
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use clap::Parser;
Expand All @@ -21,11 +22,13 @@ async fn main() -> anyhow::Result<()> {
TransportMode,
};
use starknet_transaction_prover::server::cors::{build_cors_layer, cors_mode};
use starknet_transaction_prover::server::health::HealthLayer;
use starknet_transaction_prover::server::log_redact::redact_url_host;
use starknet_transaction_prover::server::metrics::install_exporter;
use starknet_transaction_prover::server::panic::install_panic_hook;
use starknet_transaction_prover::server::rpc_api::ProvingRpcServer;
use starknet_transaction_prover::server::rpc_impl::ProvingRpcServerImpl;
use starknet_transaction_prover::server::saturation::SaturationMonitor;
use starknet_transaction_prover::server::{
start_server,
MetricsLayer,
Expand Down Expand Up @@ -78,8 +81,16 @@ async fn main() -> anyhow::Result<()> {
"Starting Starknet transaction prover."
);

// Shared saturation tracker — writer is `ProvingRpcServerImpl` on every
// permit acquire/reject; reader is `HealthLayer`. See `saturation.rs`.
let saturation_monitor = SaturationMonitor::default();
let health_layer = HealthLayer::with_saturation(
saturation_monitor.clone(),
Duration::from_millis(config.health_max_saturated_ms),
);

// Build and start the JSON-RPC server.
let rpc_impl = ProvingRpcServerImpl::from_config(&config);
let rpc_impl = ProvingRpcServerImpl::from_config(&config, saturation_monitor);
let addr = SocketAddr::new(config.ip, config.port);
let cors_layer = build_cors_layer(&config.cors_allow_origin)?;

Expand Down Expand Up @@ -111,6 +122,7 @@ async fn main() -> anyhow::Result<()> {
cors_layer,
ohttp_layer,
metrics_layer,
health_layer,
)
.await?;

Expand Down
15 changes: 12 additions & 3 deletions crates/starknet_transaction_prover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ pub const OHTTP_JSONRPSEE_BODY_BUILDER: fn(Full<Bytes>) -> HttpBody = HttpBody::
/// - `RequestSpanLayer` sits BELOW `OhttpLayer` so it spans the decapsulated inner request with a
/// fresh, envelope-unlinkable id (see `request_span`).
macro_rules! prover_http_middleware {
($metrics_layer:expr, $cors_layer:expr, $ohttp_layer:expr $(,)?) => {
($health_layer:expr, $metrics_layer:expr, $cors_layer:expr, $ohttp_layer:expr $(,)?) => {
ServiceBuilder::new()
.layer(RequestLogLayer)
.layer(HealthLayer)
.layer($health_layer)
.option_layer($metrics_layer)
.layer(HttpMetricsLayer)
.option_layer($cors_layer)
Expand All @@ -82,6 +82,7 @@ pub mod request_log;
pub mod request_span;
pub mod rpc_api;
pub mod rpc_impl;
pub mod saturation;
#[cfg(test)]
pub mod test_recorder;
pub mod tls;
Expand All @@ -91,6 +92,7 @@ pub use http_metrics::HttpMetricsLayer;
pub use metrics::{MetricsLayer, METRICS_PATH};
pub use request_log::{RequestLogLayer, REQUEST_ID_HEADER};
pub use request_span::RequestSpanLayer;
pub use saturation::SaturationMonitor;

#[cfg(test)]
mod rpc_spec_test;
Expand All @@ -110,6 +112,7 @@ pub async fn start_server(
cors_layer: Option<CorsLayer>,
ohttp_layer: Option<OhttpJsonrpseeLayer>,
metrics_layer: Option<MetricsLayer>,
health_layer: HealthLayer,
) -> anyhow::Result<(SocketAddr, ServerHandle)> {
match transport {
TransportMode::Http => {
Expand All @@ -120,7 +123,12 @@ pub async fn start_server(
let server = ServerBuilder::default()
.set_config(server_config)
// See `prover_http_middleware!` for the full layer-order rationale.
.set_http_middleware(prover_http_middleware!(metrics_layer, cors_layer, ohttp_layer))
.set_http_middleware(prover_http_middleware!(
health_layer,
metrics_layer,
cors_layer,
ohttp_layer
))
.build(&addr)
.await
.context(format!("Failed to bind JSON-RPC server to {addr}"))?;
Expand All @@ -139,6 +147,7 @@ pub async fn start_server(
cors_layer,
ohttp_layer,
metrics_layer,
health_layer,
)
.await
}
Expand Down
29 changes: 29 additions & 0 deletions crates/starknet_transaction_prover/src/server/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ const DEFAULT_COMPILED_CLASS_CACHE_SIZE: usize = 600;
/// 5 MiB — matches the convention used elsewhere in the sequencer.
const DEFAULT_MAX_REQUEST_BODY_SIZE: u32 = 5 * 1024 * 1024;
const DEFAULT_OHTTP_KEY_CACHE_MAX_AGE_SECS: u64 = 3600;
/// Default saturation window before `/health` returns 503. 10 seconds
/// matches "service is rejecting requests for a sustained period" without
/// flipping on a single in-flight burst.
const DEFAULT_HEALTH_MAX_SATURATED_MS: u64 = 10_000;

/// Transport mode for the JSON-RPC server.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -96,6 +100,7 @@ struct RawServiceConfig {
max_request_body_size: u32,
ohttp_enabled: bool,
ohttp_key_cache_max_age_secs: u64,
health_max_saturated_ms: u64,
}

impl Default for RawServiceConfig {
Expand All @@ -121,6 +126,7 @@ impl Default for RawServiceConfig {
max_request_body_size: DEFAULT_MAX_REQUEST_BODY_SIZE,
ohttp_enabled: false,
ohttp_key_cache_max_age_secs: DEFAULT_OHTTP_KEY_CACHE_MAX_AGE_SECS,
health_max_saturated_ms: DEFAULT_HEALTH_MAX_SATURATED_MS,
}
}
}
Expand Down Expand Up @@ -152,6 +158,11 @@ pub struct ServiceConfig {
pub ohttp_enabled: bool,
/// Cache-Control max-age for the `GET /ohttp-keys` response (seconds).
pub ohttp_key_cache_max_age_secs: u64,
/// Saturation window (milliseconds) before `/health` flips to 503. The
/// service is "saturated" when it has been continuously rejecting
/// proving requests due to the concurrency limit; once that has held
/// for this many milliseconds, load balancers should drain the pod.
pub health_max_saturated_ms: u64,
}

impl ServiceConfig {
Expand Down Expand Up @@ -380,6 +391,15 @@ impl ServiceConfig {
config.ohttp_key_cache_max_age_secs = secs;
}
}
if let Some(ms) = args.health_max_saturated_ms {
if ms != config.health_max_saturated_ms {
info!(
"CLI override: health_max_saturated_ms: {} -> {}",
config.health_max_saturated_ms, ms
);
config.health_max_saturated_ms = ms;
}
}

// Validate required fields.
if config.rpc_node_url.is_empty() {
Expand Down Expand Up @@ -464,6 +484,7 @@ impl ServiceConfig {
max_request_body_size: config.max_request_body_size,
ohttp_enabled: config.ohttp_enabled,
ohttp_key_cache_max_age_secs: config.ohttp_key_cache_max_age_secs,
health_max_saturated_ms: config.health_max_saturated_ms,
})
}
}
Expand Down Expand Up @@ -573,6 +594,14 @@ pub struct CliArgs {
#[arg(long, value_enum, value_name = "FORMAT", env = "LOG_FORMAT", default_value_t = LogFormat::Text)]
pub log_format: LogFormat,

/// Saturation window (milliseconds) before `/health` returns 503
/// (default: 10000). The service is "saturated" when it has been
/// continuously rejecting proving requests due to the concurrency
/// limit; once that has held for this many milliseconds, load
/// balancers should drain the pod.
#[arg(long, value_name = "MILLIS", env = "HEALTH_MAX_SATURATED_MS")]
pub health_max_saturated_ms: Option<u64>,

/// Hidden escape hatch: override the embedded bouncer config (block capacity limits) with a
/// custom JSON file. Not advertised because the embedded defaults are tuned for this prover
/// (including high `l1_gas` / `message_segment_length`: virtual OS output is not L1-bound; it
Expand Down
2 changes: 2 additions & 0 deletions crates/starknet_transaction_prover/src/server/config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ fn base_args() -> CliArgs {
ohttp_enabled: false,
ohttp_key_cache_max_age_secs: None,
log_format: LogFormat::Text,
health_max_saturated_ms: None,
}
}

Expand Down Expand Up @@ -149,6 +150,7 @@ fn cors_allow_origin_rejects_non_array_in_config_file() {
ohttp_enabled: false,
ohttp_key_cache_max_age_secs: None,
log_format: LogFormat::Text,
health_max_saturated_ms: None,
};

let error = ServiceConfig::from_args(args).unwrap_err();
Expand Down
64 changes: 54 additions & 10 deletions crates/starknet_transaction_prover/src/server/health.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
//! HTTP `/health` endpoint as a tower middleware layer.
//!
//! Short-circuits `GET /health` before the jsonrpsee service sees the request
//! (which would 405 a GET). Any other request passes through unchanged.
//! (which would 405 a GET). With a `SaturationMonitor` attached, returns 503
//! once the service has been continuously rejecting requests for the
//! configured threshold so load balancers can drain the pod. Body is opaque
//! (no timestamps, counters, or upstream URLs).

use std::task::{Context, Poll};
use std::time::Duration;

use bytes::Bytes;
use futures::future::{ready, Either, Ready};
Expand All @@ -12,28 +16,73 @@ use http_body_util::Full;
use jsonrpsee::server::HttpBody;
use tower::{Layer, Service};

use crate::server::saturation::SaturationMonitor;

#[cfg(test)]
#[path = "health_test.rs"]
mod health_test;

pub const HEALTH_PATH: &str = "/health";

const HEALTHY_BODY: &[u8] = br#"{"status":"ok"}"#;
/// Body returned by `GET /health` when saturated. Reason is an opaque code,
/// no internal state included.
const SATURATED_BODY: &[u8] = br#"{"status":"unhealthy","reason":"saturated"}"#;

/// `saturation: None` keeps the original always-200 behaviour;
/// `Some(monitor)` flips to `503` once `monitor.saturated_for_at_least`
/// crosses `saturation_threshold`.
#[derive(Clone, Default)]
pub struct HealthLayer {
saturation: Option<SaturationMonitor>,
saturation_threshold: Duration,
}

#[derive(Clone, Copy, Default)]
pub struct HealthLayer;
impl HealthLayer {
pub fn with_saturation(monitor: SaturationMonitor, threshold: Duration) -> Self {
Self { saturation: Some(monitor), saturation_threshold: threshold }
}
}

impl<S> Layer<S> for HealthLayer {
type Service = HealthService<S>;

fn layer(&self, inner: S) -> Self::Service {
HealthService { inner }
HealthService {
inner,
saturation: self.saturation.clone(),
saturation_threshold: self.saturation_threshold,
}
}
}

#[derive(Clone)]
pub struct HealthService<S> {
inner: S,
saturation: Option<SaturationMonitor>,
saturation_threshold: Duration,
}

impl<S> HealthService<S> {
fn health_response(&self) -> Response<HttpBody> {
let saturated = self
.saturation
.as_ref()
.is_some_and(|monitor| monitor.saturated_for_at_least(self.saturation_threshold));
if saturated {
Response::builder()
.status(StatusCode::SERVICE_UNAVAILABLE)
.header(header::CONTENT_TYPE, "application/json")
.body(HttpBody::new(Full::new(Bytes::from_static(SATURATED_BODY))))
.expect("response build with a static body is infallible")
} else {
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.body(HttpBody::new(Full::new(Bytes::from_static(HEALTHY_BODY))))
.expect("response build with a static body is infallible")
}
}
}

impl<S, ReqB> Service<Request<ReqB>> for HealthService<S>
Expand All @@ -52,12 +101,7 @@ where

fn call(&mut self, request: Request<ReqB>) -> Self::Future {
if request.method() == Method::GET && request.uri().path() == HEALTH_PATH {
let response = Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.body(HttpBody::new(Full::new(Bytes::from_static(HEALTHY_BODY))))
.expect("response build with a static body is infallible");
return Either::Left(ready(Ok(response)));
return Either::Left(ready(Ok(self.health_response())));
}
Either::Right(self.inner.call(request))
}
Expand Down
54 changes: 51 additions & 3 deletions crates/starknet_transaction_prover/src/server/health_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use jsonrpsee::server::HttpBody;
use tower::{Layer, ServiceExt};

use crate::server::health::{HealthLayer, HEALTH_PATH};
use crate::server::saturation::SaturationMonitor;

/// Inner stub returning 418 so we can tell whether `HealthLayer` short-circuited.
fn fallthrough_service() -> impl tower::Service<
Expand Down Expand Up @@ -38,7 +39,7 @@ async fn read_body(response: Response<HttpBody>) -> (StatusCode, Vec<u8>, http::

#[tokio::test]
async fn get_health_returns_200_with_json_body() {
let svc = HealthLayer.layer(fallthrough_service());
let svc = HealthLayer::default().layer(fallthrough_service());

let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();

Expand All @@ -50,7 +51,7 @@ async fn get_health_returns_200_with_json_body() {

#[tokio::test]
async fn non_get_health_falls_through() {
let svc = HealthLayer.layer(fallthrough_service());
let svc = HealthLayer::default().layer(fallthrough_service());

let response = svc.oneshot(empty_request(Method::POST, HEALTH_PATH)).await.unwrap();

Expand All @@ -60,10 +61,57 @@ async fn non_get_health_falls_through() {

#[tokio::test]
async fn get_other_path_falls_through() {
let svc = HealthLayer.layer(fallthrough_service());
let svc = HealthLayer::default().layer(fallthrough_service());

let response = svc.oneshot(empty_request(Method::GET, "/")).await.unwrap();

let (status, _body, _) = read_body(response).await;
assert_eq!(status, StatusCode::IM_A_TEAPOT);
}

#[tokio::test]
async fn unsaturated_health_returns_200_when_monitor_is_supplied() {
let monitor = SaturationMonitor::default();
let svc = HealthLayer::with_saturation(monitor, std::time::Duration::from_millis(0))
.layer(fallthrough_service());

let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();

let (status, body, _) = read_body(response).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, br#"{"status":"ok"}"#);
}

#[tokio::test]
async fn saturated_for_at_least_threshold_returns_503_with_opaque_body() {
let monitor = SaturationMonitor::default();
monitor.mark_rejected();
// Zero threshold so the saturation is immediately past it. Avoids
// sleeping in the test.
let svc = HealthLayer::with_saturation(monitor, std::time::Duration::from_millis(0))
.layer(fallthrough_service());

let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();

let (status, body, _) = read_body(response).await;
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
let body_text = std::str::from_utf8(&body).unwrap();
assert!(body_text.contains("saturated"));
// No internal state — no timestamps, no permits, no upstream URLs.
assert!(!body_text.contains("Instant"));
assert!(!body_text.chars().any(|c| c.is_ascii_digit()), "body had digits: {body_text}");
}

#[tokio::test]
async fn recovery_clears_saturation_and_health_returns_to_200() {
let monitor = SaturationMonitor::default();
monitor.mark_rejected();
monitor.mark_accepted();
let svc = HealthLayer::with_saturation(monitor, std::time::Duration::from_millis(0))
.layer(fallthrough_service());

let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();
let (status, body, _) = read_body(response).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body, br#"{"status":"ok"}"#);
}
Loading
Loading