diff --git a/crates/starknet_transaction_prover/src/main.rs b/crates/starknet_transaction_prover/src/main.rs index bf6155ba20d..ecdc38b95d1 100644 --- a/crates/starknet_transaction_prover/src/main.rs +++ b/crates/starknet_transaction_prover/src/main.rs @@ -11,6 +11,7 @@ fn main() { async fn main() -> anyhow::Result<()> { use std::net::SocketAddr; use std::sync::Arc; + use std::time::Duration; use anyhow::Context; use clap::Parser; @@ -21,11 +22,13 @@ async fn main() -> anyhow::Result<()> { TransportMode, }; use starknet_transaction_prover::server::cors::{build_cors_layer, cors_mode}; + use starknet_transaction_prover::server::health::HealthLayer; use starknet_transaction_prover::server::log_redact::redact_url_host; use starknet_transaction_prover::server::metrics::install_exporter; use starknet_transaction_prover::server::panic::install_panic_hook; use starknet_transaction_prover::server::rpc_api::ProvingRpcServer; use starknet_transaction_prover::server::rpc_impl::ProvingRpcServerImpl; + use starknet_transaction_prover::server::saturation::SaturationMonitor; use starknet_transaction_prover::server::{ start_server, MetricsLayer, @@ -78,8 +81,16 @@ async fn main() -> anyhow::Result<()> { "Starting Starknet transaction prover." ); + // Shared saturation tracker — writer is `ProvingRpcServerImpl` on every + // permit acquire/reject; reader is `HealthLayer`. See `saturation.rs`. + let saturation_monitor = SaturationMonitor::default(); + let health_layer = HealthLayer::with_saturation( + saturation_monitor.clone(), + Duration::from_millis(config.health_max_saturated_ms), + ); + // Build and start the JSON-RPC server. - let rpc_impl = ProvingRpcServerImpl::from_config(&config); + let rpc_impl = ProvingRpcServerImpl::from_config(&config, saturation_monitor); let addr = SocketAddr::new(config.ip, config.port); let cors_layer = build_cors_layer(&config.cors_allow_origin)?; @@ -111,6 +122,7 @@ async fn main() -> anyhow::Result<()> { cors_layer, ohttp_layer, metrics_layer, + health_layer, ) .await?; diff --git a/crates/starknet_transaction_prover/src/server.rs b/crates/starknet_transaction_prover/src/server.rs index 6a801ca0932..3759f23e8ad 100644 --- a/crates/starknet_transaction_prover/src/server.rs +++ b/crates/starknet_transaction_prover/src/server.rs @@ -53,10 +53,10 @@ pub const OHTTP_JSONRPSEE_BODY_BUILDER: fn(Full) -> HttpBody = HttpBody:: /// - `RequestSpanLayer` sits BELOW `OhttpLayer` so it spans the decapsulated inner request with a /// fresh, envelope-unlinkable id (see `request_span`). macro_rules! prover_http_middleware { - ($metrics_layer:expr, $cors_layer:expr, $ohttp_layer:expr $(,)?) => { + ($health_layer:expr, $metrics_layer:expr, $cors_layer:expr, $ohttp_layer:expr $(,)?) => { ServiceBuilder::new() .layer(RequestLogLayer) - .layer(HealthLayer) + .layer($health_layer) .option_layer($metrics_layer) .layer(HttpMetricsLayer) .option_layer($cors_layer) @@ -82,6 +82,7 @@ pub mod request_log; pub mod request_span; pub mod rpc_api; pub mod rpc_impl; +pub mod saturation; #[cfg(test)] pub mod test_recorder; pub mod tls; @@ -91,6 +92,7 @@ pub use http_metrics::HttpMetricsLayer; pub use metrics::{MetricsLayer, METRICS_PATH}; pub use request_log::{RequestLogLayer, REQUEST_ID_HEADER}; pub use request_span::RequestSpanLayer; +pub use saturation::SaturationMonitor; #[cfg(test)] mod rpc_spec_test; @@ -110,6 +112,7 @@ pub async fn start_server( cors_layer: Option, ohttp_layer: Option, metrics_layer: Option, + health_layer: HealthLayer, ) -> anyhow::Result<(SocketAddr, ServerHandle)> { match transport { TransportMode::Http => { @@ -120,7 +123,12 @@ pub async fn start_server( let server = ServerBuilder::default() .set_config(server_config) // See `prover_http_middleware!` for the full layer-order rationale. - .set_http_middleware(prover_http_middleware!(metrics_layer, cors_layer, ohttp_layer)) + .set_http_middleware(prover_http_middleware!( + health_layer, + metrics_layer, + cors_layer, + ohttp_layer + )) .build(&addr) .await .context(format!("Failed to bind JSON-RPC server to {addr}"))?; @@ -139,6 +147,7 @@ pub async fn start_server( cors_layer, ohttp_layer, metrics_layer, + health_layer, ) .await } diff --git a/crates/starknet_transaction_prover/src/server/config.rs b/crates/starknet_transaction_prover/src/server/config.rs index 403c97a750d..73bd1fe729f 100644 --- a/crates/starknet_transaction_prover/src/server/config.rs +++ b/crates/starknet_transaction_prover/src/server/config.rs @@ -31,6 +31,10 @@ const DEFAULT_COMPILED_CLASS_CACHE_SIZE: usize = 600; /// 5 MiB — matches the convention used elsewhere in the sequencer. const DEFAULT_MAX_REQUEST_BODY_SIZE: u32 = 5 * 1024 * 1024; const DEFAULT_OHTTP_KEY_CACHE_MAX_AGE_SECS: u64 = 3600; +/// Default saturation window before `/health` returns 503. 10 seconds +/// matches "service is rejecting requests for a sustained period" without +/// flipping on a single in-flight burst. +const DEFAULT_HEALTH_MAX_SATURATED_MS: u64 = 10_000; /// Transport mode for the JSON-RPC server. #[derive(Clone, Debug)] @@ -96,6 +100,7 @@ struct RawServiceConfig { max_request_body_size: u32, ohttp_enabled: bool, ohttp_key_cache_max_age_secs: u64, + health_max_saturated_ms: u64, } impl Default for RawServiceConfig { @@ -121,6 +126,7 @@ impl Default for RawServiceConfig { max_request_body_size: DEFAULT_MAX_REQUEST_BODY_SIZE, ohttp_enabled: false, ohttp_key_cache_max_age_secs: DEFAULT_OHTTP_KEY_CACHE_MAX_AGE_SECS, + health_max_saturated_ms: DEFAULT_HEALTH_MAX_SATURATED_MS, } } } @@ -152,6 +158,11 @@ pub struct ServiceConfig { pub ohttp_enabled: bool, /// Cache-Control max-age for the `GET /ohttp-keys` response (seconds). pub ohttp_key_cache_max_age_secs: u64, + /// Saturation window (milliseconds) before `/health` flips to 503. The + /// service is "saturated" when it has been continuously rejecting + /// proving requests due to the concurrency limit; once that has held + /// for this many milliseconds, load balancers should drain the pod. + pub health_max_saturated_ms: u64, } impl ServiceConfig { @@ -380,6 +391,15 @@ impl ServiceConfig { config.ohttp_key_cache_max_age_secs = secs; } } + if let Some(ms) = args.health_max_saturated_ms { + if ms != config.health_max_saturated_ms { + info!( + "CLI override: health_max_saturated_ms: {} -> {}", + config.health_max_saturated_ms, ms + ); + config.health_max_saturated_ms = ms; + } + } // Validate required fields. if config.rpc_node_url.is_empty() { @@ -464,6 +484,7 @@ impl ServiceConfig { max_request_body_size: config.max_request_body_size, ohttp_enabled: config.ohttp_enabled, ohttp_key_cache_max_age_secs: config.ohttp_key_cache_max_age_secs, + health_max_saturated_ms: config.health_max_saturated_ms, }) } } @@ -573,6 +594,14 @@ pub struct CliArgs { #[arg(long, value_enum, value_name = "FORMAT", env = "LOG_FORMAT", default_value_t = LogFormat::Text)] pub log_format: LogFormat, + /// Saturation window (milliseconds) before `/health` returns 503 + /// (default: 10000). The service is "saturated" when it has been + /// continuously rejecting proving requests due to the concurrency + /// limit; once that has held for this many milliseconds, load + /// balancers should drain the pod. + #[arg(long, value_name = "MILLIS", env = "HEALTH_MAX_SATURATED_MS")] + pub health_max_saturated_ms: Option, + /// Hidden escape hatch: override the embedded bouncer config (block capacity limits) with a /// custom JSON file. Not advertised because the embedded defaults are tuned for this prover /// (including high `l1_gas` / `message_segment_length`: virtual OS output is not L1-bound; it diff --git a/crates/starknet_transaction_prover/src/server/config_test.rs b/crates/starknet_transaction_prover/src/server/config_test.rs index c8ccad8a0a0..eed7513ad28 100644 --- a/crates/starknet_transaction_prover/src/server/config_test.rs +++ b/crates/starknet_transaction_prover/src/server/config_test.rs @@ -60,6 +60,7 @@ fn base_args() -> CliArgs { ohttp_enabled: false, ohttp_key_cache_max_age_secs: None, log_format: LogFormat::Text, + health_max_saturated_ms: None, } } @@ -149,6 +150,7 @@ fn cors_allow_origin_rejects_non_array_in_config_file() { ohttp_enabled: false, ohttp_key_cache_max_age_secs: None, log_format: LogFormat::Text, + health_max_saturated_ms: None, }; let error = ServiceConfig::from_args(args).unwrap_err(); diff --git a/crates/starknet_transaction_prover/src/server/health.rs b/crates/starknet_transaction_prover/src/server/health.rs index ec49cf8a59c..031b4ec7dc8 100644 --- a/crates/starknet_transaction_prover/src/server/health.rs +++ b/crates/starknet_transaction_prover/src/server/health.rs @@ -1,9 +1,13 @@ //! HTTP `/health` endpoint as a tower middleware layer. //! //! Short-circuits `GET /health` before the jsonrpsee service sees the request -//! (which would 405 a GET). Any other request passes through unchanged. +//! (which would 405 a GET). With a `SaturationMonitor` attached, returns 503 +//! once the service has been continuously rejecting requests for the +//! configured threshold so load balancers can drain the pod. Body is opaque +//! (no timestamps, counters, or upstream URLs). use std::task::{Context, Poll}; +use std::time::Duration; use bytes::Bytes; use futures::future::{ready, Either, Ready}; @@ -12,6 +16,8 @@ use http_body_util::Full; use jsonrpsee::server::HttpBody; use tower::{Layer, Service}; +use crate::server::saturation::SaturationMonitor; + #[cfg(test)] #[path = "health_test.rs"] mod health_test; @@ -19,21 +25,64 @@ mod health_test; pub const HEALTH_PATH: &str = "/health"; const HEALTHY_BODY: &[u8] = br#"{"status":"ok"}"#; +/// Body returned by `GET /health` when saturated. Reason is an opaque code, +/// no internal state included. +const SATURATED_BODY: &[u8] = br#"{"status":"unhealthy","reason":"saturated"}"#; + +/// `saturation: None` keeps the original always-200 behaviour; +/// `Some(monitor)` flips to `503` once `monitor.saturated_for_at_least` +/// crosses `saturation_threshold`. +#[derive(Clone, Default)] +pub struct HealthLayer { + saturation: Option, + saturation_threshold: Duration, +} -#[derive(Clone, Copy, Default)] -pub struct HealthLayer; +impl HealthLayer { + pub fn with_saturation(monitor: SaturationMonitor, threshold: Duration) -> Self { + Self { saturation: Some(monitor), saturation_threshold: threshold } + } +} impl Layer for HealthLayer { type Service = HealthService; fn layer(&self, inner: S) -> Self::Service { - HealthService { inner } + HealthService { + inner, + saturation: self.saturation.clone(), + saturation_threshold: self.saturation_threshold, + } } } #[derive(Clone)] pub struct HealthService { inner: S, + saturation: Option, + saturation_threshold: Duration, +} + +impl HealthService { + fn health_response(&self) -> Response { + let saturated = self + .saturation + .as_ref() + .is_some_and(|monitor| monitor.saturated_for_at_least(self.saturation_threshold)); + if saturated { + Response::builder() + .status(StatusCode::SERVICE_UNAVAILABLE) + .header(header::CONTENT_TYPE, "application/json") + .body(HttpBody::new(Full::new(Bytes::from_static(SATURATED_BODY)))) + .expect("response build with a static body is infallible") + } else { + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/json") + .body(HttpBody::new(Full::new(Bytes::from_static(HEALTHY_BODY)))) + .expect("response build with a static body is infallible") + } + } } impl Service> for HealthService @@ -52,12 +101,7 @@ where fn call(&mut self, request: Request) -> Self::Future { if request.method() == Method::GET && request.uri().path() == HEALTH_PATH { - let response = Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/json") - .body(HttpBody::new(Full::new(Bytes::from_static(HEALTHY_BODY)))) - .expect("response build with a static body is infallible"); - return Either::Left(ready(Ok(response))); + return Either::Left(ready(Ok(self.health_response()))); } Either::Right(self.inner.call(request)) } diff --git a/crates/starknet_transaction_prover/src/server/health_test.rs b/crates/starknet_transaction_prover/src/server/health_test.rs index 288855aa25e..3e028907976 100644 --- a/crates/starknet_transaction_prover/src/server/health_test.rs +++ b/crates/starknet_transaction_prover/src/server/health_test.rs @@ -5,6 +5,7 @@ use jsonrpsee::server::HttpBody; use tower::{Layer, ServiceExt}; use crate::server::health::{HealthLayer, HEALTH_PATH}; +use crate::server::saturation::SaturationMonitor; /// Inner stub returning 418 so we can tell whether `HealthLayer` short-circuited. fn fallthrough_service() -> impl tower::Service< @@ -38,7 +39,7 @@ async fn read_body(response: Response) -> (StatusCode, Vec, http:: #[tokio::test] async fn get_health_returns_200_with_json_body() { - let svc = HealthLayer.layer(fallthrough_service()); + let svc = HealthLayer::default().layer(fallthrough_service()); let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap(); @@ -50,7 +51,7 @@ async fn get_health_returns_200_with_json_body() { #[tokio::test] async fn non_get_health_falls_through() { - let svc = HealthLayer.layer(fallthrough_service()); + let svc = HealthLayer::default().layer(fallthrough_service()); let response = svc.oneshot(empty_request(Method::POST, HEALTH_PATH)).await.unwrap(); @@ -60,10 +61,57 @@ async fn non_get_health_falls_through() { #[tokio::test] async fn get_other_path_falls_through() { - let svc = HealthLayer.layer(fallthrough_service()); + let svc = HealthLayer::default().layer(fallthrough_service()); let response = svc.oneshot(empty_request(Method::GET, "/")).await.unwrap(); let (status, _body, _) = read_body(response).await; assert_eq!(status, StatusCode::IM_A_TEAPOT); } + +#[tokio::test] +async fn unsaturated_health_returns_200_when_monitor_is_supplied() { + let monitor = SaturationMonitor::default(); + let svc = HealthLayer::with_saturation(monitor, std::time::Duration::from_millis(0)) + .layer(fallthrough_service()); + + let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap(); + + let (status, body, _) = read_body(response).await; + assert_eq!(status, StatusCode::OK); + assert_eq!(body, br#"{"status":"ok"}"#); +} + +#[tokio::test] +async fn saturated_for_at_least_threshold_returns_503_with_opaque_body() { + let monitor = SaturationMonitor::default(); + monitor.mark_rejected(); + // Zero threshold so the saturation is immediately past it. Avoids + // sleeping in the test. + let svc = HealthLayer::with_saturation(monitor, std::time::Duration::from_millis(0)) + .layer(fallthrough_service()); + + let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap(); + + let (status, body, _) = read_body(response).await; + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + let body_text = std::str::from_utf8(&body).unwrap(); + assert!(body_text.contains("saturated")); + // No internal state — no timestamps, no permits, no upstream URLs. + assert!(!body_text.contains("Instant")); + assert!(!body_text.chars().any(|c| c.is_ascii_digit()), "body had digits: {body_text}"); +} + +#[tokio::test] +async fn recovery_clears_saturation_and_health_returns_to_200() { + let monitor = SaturationMonitor::default(); + monitor.mark_rejected(); + monitor.mark_accepted(); + let svc = HealthLayer::with_saturation(monitor, std::time::Duration::from_millis(0)) + .layer(fallthrough_service()); + + let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap(); + let (status, body, _) = read_body(response).await; + assert_eq!(status, StatusCode::OK); + assert_eq!(body, br#"{"status":"ok"}"#); +} diff --git a/crates/starknet_transaction_prover/src/server/rpc_impl.rs b/crates/starknet_transaction_prover/src/server/rpc_impl.rs index 4fd5ed9a3f5..166ed045950 100644 --- a/crates/starknet_transaction_prover/src/server/rpc_impl.rs +++ b/crates/starknet_transaction_prover/src/server/rpc_impl.rs @@ -15,6 +15,7 @@ use crate::server::config::ServiceConfig; use crate::server::errors::service_busy; use crate::server::metrics::names::CONCURRENCY_REJECTED_TOTAL; use crate::server::rpc_api::ProvingRpcServer; +use crate::server::saturation::SaturationMonitor; /// Starknet RPC specification version. pub(crate) const SPEC_VERSION: &str = "0.10.2"; @@ -27,22 +28,30 @@ pub struct ProvingRpcServerImpl { concurrency_semaphore: Arc, /// Configured max concurrent requests (used in error messages). max_concurrent_requests: usize, + /// Tracks how long the service has been rejecting requests so + /// `/health` can flip to 503. Cloned cheaply (Arc internally). + saturation_monitor: SaturationMonitor, } impl ProvingRpcServerImpl { /// Creates a new ProvingRpcServerImpl from a prover. - pub(crate) fn new(prover: RpcVirtualSnosProver, max_concurrent_requests: usize) -> Self { + pub(crate) fn new( + prover: RpcVirtualSnosProver, + max_concurrent_requests: usize, + saturation_monitor: SaturationMonitor, + ) -> Self { Self { prover, concurrency_semaphore: Arc::new(Semaphore::new(max_concurrent_requests)), max_concurrent_requests, + saturation_monitor, } } /// Creates a new ProvingRpcServerImpl from configuration. - pub fn from_config(config: &ServiceConfig) -> Self { + pub fn from_config(config: &ServiceConfig, saturation_monitor: SaturationMonitor) -> Self { let prover = RpcVirtualSnosProver::new(&config.prover_config); - Self::new(prover, config.max_concurrent_requests) + Self::new(prover, config.max_concurrent_requests, saturation_monitor) } } @@ -59,12 +68,16 @@ impl ProvingRpcServer for ProvingRpcServerImpl { ) -> RpcResult { let _permit = self.concurrency_semaphore.try_acquire().map_err(|_| { metrics::counter!(CONCURRENCY_REJECTED_TOTAL).increment(1); + self.saturation_monitor.mark_rejected(); warn!( max_concurrent_requests = self.max_concurrent_requests, "Rejected proving request: service is at capacity" ); service_busy(self.max_concurrent_requests) })?; + // We hold the permit. The service successfully accepted this + // request — clear any saturation window so /health can recover. + self.saturation_monitor.mark_accepted(); self.prover.prove_transaction(block_id, transaction).await.map_err(|err| { warn!("prove_transaction failed: {:?}", err); diff --git a/crates/starknet_transaction_prover/src/server/rpc_spec_test.rs b/crates/starknet_transaction_prover/src/server/rpc_spec_test.rs index 5cf05811670..c6672949824 100644 --- a/crates/starknet_transaction_prover/src/server/rpc_spec_test.rs +++ b/crates/starknet_transaction_prover/src/server/rpc_spec_test.rs @@ -122,8 +122,11 @@ static SPEC_ERRORS: LazyLock> = LazyLock::new(|| { fn rpc_module() -> RpcModule { let config = ProverConfig { rpc_node_url: DUMMY_RPC_NODE_URL.to_string(), ..Default::default() }; - let rpc_impl = - ProvingRpcServerImpl::new(RpcVirtualSnosProver::new(&config), TEST_MAX_CONCURRENT_REQUESTS); + let rpc_impl = ProvingRpcServerImpl::new( + RpcVirtualSnosProver::new(&config), + TEST_MAX_CONCURRENT_REQUESTS, + crate::server::SaturationMonitor::default(), + ); rpc_impl.into_rpc() } diff --git a/crates/starknet_transaction_prover/src/server/saturation.rs b/crates/starknet_transaction_prover/src/server/saturation.rs new file mode 100644 index 00000000000..56210956d35 --- /dev/null +++ b/crates/starknet_transaction_prover/src/server/saturation.rs @@ -0,0 +1,63 @@ +//! Saturation tracking for the prover's concurrency-limited request path. +//! +//! Used by both `ProvingRpcServerImpl` (writer) and `HealthLayer` (reader) +//! so `/health` can flip to 503 once the service has been rejecting +//! requests for a sustained period. +//! +//! Mechanism: +//! - On every rejection from the concurrency semaphore: if the monitor is currently "clear", set a +//! timestamp marking when saturation started. +//! - On every successful acquire: clear the timestamp. +//! - `/health` consults [`SaturationMonitor::saturated_for_at_least`] with the configured threshold +//! and returns 503 if it passed. +//! +//! Saturation is therefore measured by *traffic that the service has tried +//! to serve*. With no traffic at all, the monitor reports healthy — there +//! is no saturation event to time. This matches the proof-interceptor's +//! upstream-reachability tracking ([[health-reachability]]) so the two +//! services behave the same way from a load-balancer's perspective. + +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +#[cfg(test)] +#[path = "saturation_test.rs"] +mod saturation_test; + +/// Cheap-to-clone handle to the shared saturation state. The interior +/// `Arc>` makes both `mark_rejected`/`mark_accepted` and the +/// read path lock-free at the API surface. +#[derive(Clone, Default)] +pub struct SaturationMonitor { + state: Arc>>, +} + +impl SaturationMonitor { + /// Record a rejection. Starts the saturation window if this is the + /// first rejection since the last `mark_accepted` (or since startup). + pub fn mark_rejected(&self) { + let mut state = self.state.lock().expect("saturation lock poisoned"); + if state.is_none() { + *state = Some(Instant::now()); + } + } + + /// Record a successful acquire. Clears the saturation window so a + /// transient burst of rejections doesn't keep `/health` red forever + /// once the service recovers. + pub fn mark_accepted(&self) { + let mut state = self.state.lock().expect("saturation lock poisoned"); + *state = None; + } + + /// Returns true when the service has been continuously rejecting + /// requests for at least `threshold`. Returns false when the service + /// has handled at least one request successfully within the window or + /// has not seen any traffic at all. + pub fn saturated_for_at_least(&self, threshold: Duration) -> bool { + match *self.state.lock().expect("saturation lock poisoned") { + Some(started_at) => started_at.elapsed() >= threshold, + None => false, + } + } +} diff --git a/crates/starknet_transaction_prover/src/server/saturation_test.rs b/crates/starknet_transaction_prover/src/server/saturation_test.rs new file mode 100644 index 00000000000..3f60b2876f1 --- /dev/null +++ b/crates/starknet_transaction_prover/src/server/saturation_test.rs @@ -0,0 +1,45 @@ +use std::thread::sleep; +use std::time::Duration; + +use crate::server::saturation::SaturationMonitor; + +#[test] +fn starts_healthy_before_any_traffic() { + let monitor = SaturationMonitor::default(); + assert!(!monitor.saturated_for_at_least(Duration::from_millis(0))); + assert!(!monitor.saturated_for_at_least(Duration::from_secs(10))); +} + +#[test] +fn rejection_starts_window_and_threshold_eventually_passes() { + let monitor = SaturationMonitor::default(); + monitor.mark_rejected(); + // Window has just opened — zero-elapsed comparison must still be true + // (we are at or past the 0ms threshold). + assert!(monitor.saturated_for_at_least(Duration::from_millis(0))); + // Not yet at the 50ms threshold — the rejection happened just now. + assert!(!monitor.saturated_for_at_least(Duration::from_millis(50))); + sleep(Duration::from_millis(60)); + assert!(monitor.saturated_for_at_least(Duration::from_millis(50))); +} + +#[test] +fn repeated_rejections_do_not_reset_the_window() { + let monitor = SaturationMonitor::default(); + monitor.mark_rejected(); + sleep(Duration::from_millis(30)); + // A second rejection should extend, not restart, the saturation + // window — operators care about "how long has this been bad", which + // is the time since the first rejection. + monitor.mark_rejected(); + assert!(monitor.saturated_for_at_least(Duration::from_millis(25))); +} + +#[test] +fn one_successful_acquire_clears_the_window() { + let monitor = SaturationMonitor::default(); + monitor.mark_rejected(); + sleep(Duration::from_millis(10)); + monitor.mark_accepted(); + assert!(!monitor.saturated_for_at_least(Duration::from_millis(0))); +} diff --git a/crates/starknet_transaction_prover/src/server/tls.rs b/crates/starknet_transaction_prover/src/server/tls.rs index 16534c829e7..05480cde08e 100644 --- a/crates/starknet_transaction_prover/src/server/tls.rs +++ b/crates/starknet_transaction_prover/src/server/tls.rs @@ -53,6 +53,7 @@ pub async fn start_tls_server( cors_layer: Option, ohttp_layer: Option, metrics_layer: Option, + health_layer: HealthLayer, ) -> anyhow::Result<(SocketAddr, ServerHandle)> { let tls_acceptor = load_tls_acceptor(cert_path, key_path)?; @@ -63,7 +64,12 @@ pub async fn start_tls_server( // See `prover_http_middleware!` for the full layer-order rationale. let svc_builder = ServerBuilder::default() .set_config(server_config) - .set_http_middleware(prover_http_middleware!(metrics_layer, cors_layer, ohttp_layer)) + .set_http_middleware(prover_http_middleware!( + health_layer, + metrics_layer, + cors_layer, + ohttp_layer + )) .to_service_builder(); let listener = TcpListener::bind(addr)