Skip to content

Commit b321f22

Browse files
avi-starkwareclaude
andcommitted
starknet_transaction_prover: /health returns 503 when service is saturated
Adds `SaturationMonitor` (shared by `ProvingRpcServerImpl` and `HealthLayer`) that tracks whether the concurrency semaphore has been continuously rejecting proving requests. Once that has held for the configured window (`health_max_saturated_ms`, default 10s), `/health` returns 503 with an opaque body so load balancers can drain the pod before in-flight requests start failing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 23ed570 commit b321f22

11 files changed

Lines changed: 297 additions & 23 deletions

File tree

crates/starknet_transaction_prover/src/main.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ fn main() {
1111
async fn main() -> anyhow::Result<()> {
1212
use std::net::SocketAddr;
1313
use std::sync::Arc;
14+
use std::time::Duration;
1415

1516
use anyhow::Context;
1617
use clap::Parser;
@@ -21,11 +22,13 @@ async fn main() -> anyhow::Result<()> {
2122
TransportMode,
2223
};
2324
use starknet_transaction_prover::server::cors::{build_cors_layer, cors_mode};
25+
use starknet_transaction_prover::server::health::HealthLayer;
2426
use starknet_transaction_prover::server::log_redact::redact_url_host;
2527
use starknet_transaction_prover::server::metrics::install_exporter;
2628
use starknet_transaction_prover::server::panic::install_panic_hook;
2729
use starknet_transaction_prover::server::rpc_api::ProvingRpcServer;
2830
use starknet_transaction_prover::server::rpc_impl::ProvingRpcServerImpl;
31+
use starknet_transaction_prover::server::saturation::SaturationMonitor;
2932
use starknet_transaction_prover::server::{
3033
start_server,
3134
MetricsLayer,
@@ -78,8 +81,16 @@ async fn main() -> anyhow::Result<()> {
7881
"Starting Starknet transaction prover."
7982
);
8083

84+
// Shared saturation tracker — writer is `ProvingRpcServerImpl` on every
85+
// permit acquire/reject; reader is `HealthLayer`. See `saturation.rs`.
86+
let saturation_monitor = SaturationMonitor::default();
87+
let health_layer = HealthLayer::with_saturation(
88+
saturation_monitor.clone(),
89+
Duration::from_millis(config.health_max_saturated_ms),
90+
);
91+
8192
// Build and start the JSON-RPC server.
82-
let rpc_impl = ProvingRpcServerImpl::from_config(&config);
93+
let rpc_impl = ProvingRpcServerImpl::from_config(&config, saturation_monitor);
8394
let addr = SocketAddr::new(config.ip, config.port);
8495
let cors_layer = build_cors_layer(&config.cors_allow_origin)?;
8596

@@ -111,6 +122,7 @@ async fn main() -> anyhow::Result<()> {
111122
cors_layer,
112123
ohttp_layer,
113124
metrics_layer,
125+
health_layer,
114126
)
115127
.await?;
116128

crates/starknet_transaction_prover/src/server.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ pub const OHTTP_JSONRPSEE_BODY_BUILDER: fn(Full<Bytes>) -> HttpBody = HttpBody::
5353
/// - `RequestSpanLayer` sits BELOW `OhttpLayer` so it spans the decapsulated inner request with a
5454
/// fresh, envelope-unlinkable id (see `request_span`).
5555
macro_rules! prover_http_middleware {
56-
($metrics_layer:expr, $cors_layer:expr, $ohttp_layer:expr $(,)?) => {
56+
($health_layer:expr, $metrics_layer:expr, $cors_layer:expr, $ohttp_layer:expr $(,)?) => {
5757
ServiceBuilder::new()
5858
.layer(RequestLogLayer)
59-
.layer(HealthLayer)
59+
.layer($health_layer)
6060
.option_layer($metrics_layer)
6161
.layer(HttpMetricsLayer)
6262
.option_layer($cors_layer)
@@ -82,6 +82,7 @@ pub mod request_log;
8282
pub mod request_span;
8383
pub mod rpc_api;
8484
pub mod rpc_impl;
85+
pub mod saturation;
8586
#[cfg(test)]
8687
pub mod test_recorder;
8788
pub mod tls;
@@ -91,6 +92,7 @@ pub use http_metrics::HttpMetricsLayer;
9192
pub use metrics::{MetricsLayer, METRICS_PATH};
9293
pub use request_log::{RequestLogLayer, REQUEST_ID_HEADER};
9394
pub use request_span::RequestSpanLayer;
95+
pub use saturation::SaturationMonitor;
9496

9597
#[cfg(test)]
9698
mod rpc_spec_test;
@@ -110,6 +112,7 @@ pub async fn start_server(
110112
cors_layer: Option<CorsLayer>,
111113
ohttp_layer: Option<OhttpJsonrpseeLayer>,
112114
metrics_layer: Option<MetricsLayer>,
115+
health_layer: HealthLayer,
113116
) -> anyhow::Result<(SocketAddr, ServerHandle)> {
114117
match transport {
115118
TransportMode::Http => {
@@ -120,7 +123,12 @@ pub async fn start_server(
120123
let server = ServerBuilder::default()
121124
.set_config(server_config)
122125
// See `prover_http_middleware!` for the full layer-order rationale.
123-
.set_http_middleware(prover_http_middleware!(metrics_layer, cors_layer, ohttp_layer))
126+
.set_http_middleware(prover_http_middleware!(
127+
health_layer,
128+
metrics_layer,
129+
cors_layer,
130+
ohttp_layer
131+
))
124132
.build(&addr)
125133
.await
126134
.context(format!("Failed to bind JSON-RPC server to {addr}"))?;
@@ -139,6 +147,7 @@ pub async fn start_server(
139147
cors_layer,
140148
ohttp_layer,
141149
metrics_layer,
150+
health_layer,
142151
)
143152
.await
144153
}

crates/starknet_transaction_prover/src/server/config.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ const DEFAULT_COMPILED_CLASS_CACHE_SIZE: usize = 600;
3131
/// 5 MiB — matches the convention used elsewhere in the sequencer.
3232
const DEFAULT_MAX_REQUEST_BODY_SIZE: u32 = 5 * 1024 * 1024;
3333
const DEFAULT_OHTTP_KEY_CACHE_MAX_AGE_SECS: u64 = 3600;
34+
/// Default saturation window before `/health` returns 503. 10 seconds
35+
/// matches "service is rejecting requests for a sustained period" without
36+
/// flipping on a single in-flight burst.
37+
const DEFAULT_HEALTH_MAX_SATURATED_MS: u64 = 10_000;
3438

3539
/// Transport mode for the JSON-RPC server.
3640
#[derive(Clone, Debug)]
@@ -96,6 +100,7 @@ struct RawServiceConfig {
96100
max_request_body_size: u32,
97101
ohttp_enabled: bool,
98102
ohttp_key_cache_max_age_secs: u64,
103+
health_max_saturated_ms: u64,
99104
}
100105

101106
impl Default for RawServiceConfig {
@@ -121,6 +126,7 @@ impl Default for RawServiceConfig {
121126
max_request_body_size: DEFAULT_MAX_REQUEST_BODY_SIZE,
122127
ohttp_enabled: false,
123128
ohttp_key_cache_max_age_secs: DEFAULT_OHTTP_KEY_CACHE_MAX_AGE_SECS,
129+
health_max_saturated_ms: DEFAULT_HEALTH_MAX_SATURATED_MS,
124130
}
125131
}
126132
}
@@ -152,6 +158,11 @@ pub struct ServiceConfig {
152158
pub ohttp_enabled: bool,
153159
/// Cache-Control max-age for the `GET /ohttp-keys` response (seconds).
154160
pub ohttp_key_cache_max_age_secs: u64,
161+
/// Saturation window (milliseconds) before `/health` flips to 503. The
162+
/// service is "saturated" when it has been continuously rejecting
163+
/// proving requests due to the concurrency limit; once that has held
164+
/// for this many milliseconds, load balancers should drain the pod.
165+
pub health_max_saturated_ms: u64,
155166
}
156167

157168
impl ServiceConfig {
@@ -380,6 +391,15 @@ impl ServiceConfig {
380391
config.ohttp_key_cache_max_age_secs = secs;
381392
}
382393
}
394+
if let Some(ms) = args.health_max_saturated_ms {
395+
if ms != config.health_max_saturated_ms {
396+
info!(
397+
"CLI override: health_max_saturated_ms: {} -> {}",
398+
config.health_max_saturated_ms, ms
399+
);
400+
config.health_max_saturated_ms = ms;
401+
}
402+
}
383403

384404
// Validate required fields.
385405
if config.rpc_node_url.is_empty() {
@@ -464,6 +484,7 @@ impl ServiceConfig {
464484
max_request_body_size: config.max_request_body_size,
465485
ohttp_enabled: config.ohttp_enabled,
466486
ohttp_key_cache_max_age_secs: config.ohttp_key_cache_max_age_secs,
487+
health_max_saturated_ms: config.health_max_saturated_ms,
467488
})
468489
}
469490
}
@@ -573,6 +594,14 @@ pub struct CliArgs {
573594
#[arg(long, value_enum, value_name = "FORMAT", env = "LOG_FORMAT", default_value_t = LogFormat::Text)]
574595
pub log_format: LogFormat,
575596

597+
/// Saturation window (milliseconds) before `/health` returns 503
598+
/// (default: 10000). The service is "saturated" when it has been
599+
/// continuously rejecting proving requests due to the concurrency
600+
/// limit; once that has held for this many milliseconds, load
601+
/// balancers should drain the pod.
602+
#[arg(long, value_name = "MILLIS", env = "HEALTH_MAX_SATURATED_MS")]
603+
pub health_max_saturated_ms: Option<u64>,
604+
576605
/// Hidden escape hatch: override the embedded bouncer config (block capacity limits) with a
577606
/// custom JSON file. Not advertised because the embedded defaults are tuned for this prover
578607
/// (including high `l1_gas` / `message_segment_length`: virtual OS output is not L1-bound; it

crates/starknet_transaction_prover/src/server/config_test.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ fn base_args() -> CliArgs {
6060
ohttp_enabled: false,
6161
ohttp_key_cache_max_age_secs: None,
6262
log_format: LogFormat::Text,
63+
health_max_saturated_ms: None,
6364
}
6465
}
6566

@@ -149,6 +150,7 @@ fn cors_allow_origin_rejects_non_array_in_config_file() {
149150
ohttp_enabled: false,
150151
ohttp_key_cache_max_age_secs: None,
151152
log_format: LogFormat::Text,
153+
health_max_saturated_ms: None,
152154
};
153155

154156
let error = ServiceConfig::from_args(args).unwrap_err();

crates/starknet_transaction_prover/src/server/health.rs

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
//! HTTP `/health` endpoint as a tower middleware layer.
22
//!
33
//! Short-circuits `GET /health` before the jsonrpsee service sees the request
4-
//! (which would 405 a GET). Any other request passes through unchanged.
4+
//! (which would 405 a GET). With a `SaturationMonitor` attached, returns 503
5+
//! once the service has been continuously rejecting requests for the
6+
//! configured threshold so load balancers can drain the pod. Body is opaque
7+
//! (no timestamps, counters, or upstream URLs).
58
69
use std::task::{Context, Poll};
10+
use std::time::Duration;
711

812
use bytes::Bytes;
913
use futures::future::{ready, Either, Ready};
@@ -12,28 +16,73 @@ use http_body_util::Full;
1216
use jsonrpsee::server::HttpBody;
1317
use tower::{Layer, Service};
1418

19+
use crate::server::saturation::SaturationMonitor;
20+
1521
#[cfg(test)]
1622
#[path = "health_test.rs"]
1723
mod health_test;
1824

1925
pub const HEALTH_PATH: &str = "/health";
2026

2127
const HEALTHY_BODY: &[u8] = br#"{"status":"ok"}"#;
28+
/// Body returned by `GET /health` when saturated. Reason is an opaque code,
29+
/// no internal state included.
30+
const SATURATED_BODY: &[u8] = br#"{"status":"unhealthy","reason":"saturated"}"#;
31+
32+
/// `saturation: None` keeps the original always-200 behaviour;
33+
/// `Some(monitor)` flips to `503` once `monitor.saturated_for_at_least`
34+
/// crosses `saturation_threshold`.
35+
#[derive(Clone, Default)]
36+
pub struct HealthLayer {
37+
saturation: Option<SaturationMonitor>,
38+
saturation_threshold: Duration,
39+
}
2240

23-
#[derive(Clone, Copy, Default)]
24-
pub struct HealthLayer;
41+
impl HealthLayer {
42+
pub fn with_saturation(monitor: SaturationMonitor, threshold: Duration) -> Self {
43+
Self { saturation: Some(monitor), saturation_threshold: threshold }
44+
}
45+
}
2546

2647
impl<S> Layer<S> for HealthLayer {
2748
type Service = HealthService<S>;
2849

2950
fn layer(&self, inner: S) -> Self::Service {
30-
HealthService { inner }
51+
HealthService {
52+
inner,
53+
saturation: self.saturation.clone(),
54+
saturation_threshold: self.saturation_threshold,
55+
}
3156
}
3257
}
3358

3459
#[derive(Clone)]
3560
pub struct HealthService<S> {
3661
inner: S,
62+
saturation: Option<SaturationMonitor>,
63+
saturation_threshold: Duration,
64+
}
65+
66+
impl<S> HealthService<S> {
67+
fn health_response(&self) -> Response<HttpBody> {
68+
let saturated = self
69+
.saturation
70+
.as_ref()
71+
.is_some_and(|monitor| monitor.saturated_for_at_least(self.saturation_threshold));
72+
if saturated {
73+
Response::builder()
74+
.status(StatusCode::SERVICE_UNAVAILABLE)
75+
.header(header::CONTENT_TYPE, "application/json")
76+
.body(HttpBody::new(Full::new(Bytes::from_static(SATURATED_BODY))))
77+
.expect("response build with a static body is infallible")
78+
} else {
79+
Response::builder()
80+
.status(StatusCode::OK)
81+
.header(header::CONTENT_TYPE, "application/json")
82+
.body(HttpBody::new(Full::new(Bytes::from_static(HEALTHY_BODY))))
83+
.expect("response build with a static body is infallible")
84+
}
85+
}
3786
}
3887

3988
impl<S, ReqB> Service<Request<ReqB>> for HealthService<S>
@@ -52,12 +101,7 @@ where
52101

53102
fn call(&mut self, request: Request<ReqB>) -> Self::Future {
54103
if request.method() == Method::GET && request.uri().path() == HEALTH_PATH {
55-
let response = Response::builder()
56-
.status(StatusCode::OK)
57-
.header(header::CONTENT_TYPE, "application/json")
58-
.body(HttpBody::new(Full::new(Bytes::from_static(HEALTHY_BODY))))
59-
.expect("response build with a static body is infallible");
60-
return Either::Left(ready(Ok(response)));
104+
return Either::Left(ready(Ok(self.health_response())));
61105
}
62106
Either::Right(self.inner.call(request))
63107
}

crates/starknet_transaction_prover/src/server/health_test.rs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use jsonrpsee::server::HttpBody;
55
use tower::{Layer, ServiceExt};
66

77
use crate::server::health::{HealthLayer, HEALTH_PATH};
8+
use crate::server::saturation::SaturationMonitor;
89

910
/// Inner stub returning 418 so we can tell whether `HealthLayer` short-circuited.
1011
fn fallthrough_service() -> impl tower::Service<
@@ -38,7 +39,7 @@ async fn read_body(response: Response<HttpBody>) -> (StatusCode, Vec<u8>, http::
3839

3940
#[tokio::test]
4041
async fn get_health_returns_200_with_json_body() {
41-
let svc = HealthLayer.layer(fallthrough_service());
42+
let svc = HealthLayer::default().layer(fallthrough_service());
4243

4344
let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();
4445

@@ -50,7 +51,7 @@ async fn get_health_returns_200_with_json_body() {
5051

5152
#[tokio::test]
5253
async fn non_get_health_falls_through() {
53-
let svc = HealthLayer.layer(fallthrough_service());
54+
let svc = HealthLayer::default().layer(fallthrough_service());
5455

5556
let response = svc.oneshot(empty_request(Method::POST, HEALTH_PATH)).await.unwrap();
5657

@@ -60,10 +61,57 @@ async fn non_get_health_falls_through() {
6061

6162
#[tokio::test]
6263
async fn get_other_path_falls_through() {
63-
let svc = HealthLayer.layer(fallthrough_service());
64+
let svc = HealthLayer::default().layer(fallthrough_service());
6465

6566
let response = svc.oneshot(empty_request(Method::GET, "/")).await.unwrap();
6667

6768
let (status, _body, _) = read_body(response).await;
6869
assert_eq!(status, StatusCode::IM_A_TEAPOT);
6970
}
71+
72+
#[tokio::test]
73+
async fn unsaturated_health_returns_200_when_monitor_is_supplied() {
74+
let monitor = SaturationMonitor::default();
75+
let svc = HealthLayer::with_saturation(monitor, std::time::Duration::from_millis(0))
76+
.layer(fallthrough_service());
77+
78+
let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();
79+
80+
let (status, body, _) = read_body(response).await;
81+
assert_eq!(status, StatusCode::OK);
82+
assert_eq!(body, br#"{"status":"ok"}"#);
83+
}
84+
85+
#[tokio::test]
86+
async fn saturated_for_at_least_threshold_returns_503_with_opaque_body() {
87+
let monitor = SaturationMonitor::default();
88+
monitor.mark_rejected();
89+
// Zero threshold so the saturation is immediately past it. Avoids
90+
// sleeping in the test.
91+
let svc = HealthLayer::with_saturation(monitor, std::time::Duration::from_millis(0))
92+
.layer(fallthrough_service());
93+
94+
let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();
95+
96+
let (status, body, _) = read_body(response).await;
97+
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
98+
let body_text = std::str::from_utf8(&body).unwrap();
99+
assert!(body_text.contains("saturated"));
100+
// No internal state — no timestamps, no permits, no upstream URLs.
101+
assert!(!body_text.contains("Instant"));
102+
assert!(!body_text.chars().any(|c| c.is_ascii_digit()), "body had digits: {body_text}");
103+
}
104+
105+
#[tokio::test]
106+
async fn recovery_clears_saturation_and_health_returns_to_200() {
107+
let monitor = SaturationMonitor::default();
108+
monitor.mark_rejected();
109+
monitor.mark_accepted();
110+
let svc = HealthLayer::with_saturation(monitor, std::time::Duration::from_millis(0))
111+
.layer(fallthrough_service());
112+
113+
let response = svc.oneshot(empty_request(Method::GET, HEALTH_PATH)).await.unwrap();
114+
let (status, body, _) = read_body(response).await;
115+
assert_eq!(status, StatusCode::OK);
116+
assert_eq!(body, br#"{"status":"ok"}"#);
117+
}

0 commit comments

Comments
 (0)