diff --git a/bin/debug-trace-server/src/main.rs b/bin/debug-trace-server/src/main.rs index 756c842f..6b3ae068 100644 --- a/bin/debug-trace-server/src/main.rs +++ b/bin/debug-trace-server/src/main.rs @@ -202,6 +202,13 @@ struct Args { #[clap(long, env = "DEBUG_TRACE_SERVER_WITNESS_MAX_CONCURRENT_REQUESTS")] witness_max_concurrent_requests: Option, + /// Per-attempt RPC timeout (milliseconds). Bounds every individual provider attempt + /// even when the caller passes no overall deadline (e.g. background chain sync), so + /// a provider that accepts the TCP connection but never replies is detected and + /// rotated past instead of wedging the retry loop. + #[clap(long, env = "DEBUG_TRACE_SERVER_RPC_PER_ATTEMPT_TIMEOUT_MS")] + rpc_per_attempt_timeout_ms: Option, + /// Logging configuration. #[command(flatten)] log: LogArgs, @@ -284,10 +291,16 @@ async fn main() -> Result<()> { // Initialize components let data_apis: Vec<&str> = args.rpc_endpoint.iter().map(String::as_str).collect(); let witness_apis: Vec<&str> = args.witness_endpoint.iter().map(String::as_str).collect(); + let rpc_defaults = RpcClientConfig::trace_server(); + let per_attempt_timeout = args + .rpc_per_attempt_timeout_ms + .map(std::time::Duration::from_millis) + .unwrap_or(rpc_defaults.per_attempt_timeout); let rpc_config = RpcClientConfig { data_max_concurrent_requests: args.data_max_concurrent_requests, witness_max_concurrent_requests: args.witness_max_concurrent_requests, - ..RpcClientConfig::trace_server() + per_attempt_timeout, + ..rpc_defaults } .with_metrics(Arc::new(metrics::TraceRpcMetrics)); let rpc_client = diff --git a/bin/stateless-validator/src/app.rs b/bin/stateless-validator/src/app.rs index dad2d575..7fa30e20 100644 --- a/bin/stateless-validator/src/app.rs +++ b/bin/stateless-validator/src/app.rs @@ -125,6 +125,12 @@ pub struct CommandLineArgs { #[clap(long, env = "STATELESS_VALIDATOR_RPC_MAX_BACKOFF_MS")] pub rpc_max_backoff_ms: Option, + /// Per-attempt RPC timeout (milliseconds). Bounds every individual provider attempt + /// even when the chain-sync caller passes no overall deadline, so a provider that + /// accepts the TCP connection but never replies is detected and rotated past. + #[clap(long, env = "STATELESS_VALIDATOR_RPC_PER_ATTEMPT_TIMEOUT_MS")] + pub rpc_per_attempt_timeout_ms: Option, + /// Soft cap on rows retained in the canonical-chain table. Old rows are pruned inline /// when `advance_chain` exceeds this. Larger values bound the reorg-lookup window; /// smaller values reduce redb file growth. Defaults to `DEFAULT_MAX_CHAIN_LENGTH` @@ -178,10 +184,13 @@ pub async fn run() -> Result<()> { initial: override_ms(args.rpc_initial_backoff_ms, rpc_defaults.rpc_retry.initial), max: override_ms(args.rpc_max_backoff_ms, rpc_defaults.rpc_retry.max), }; + let per_attempt_timeout = + override_ms(args.rpc_per_attempt_timeout_ms, rpc_defaults.per_attempt_timeout); let rpc_config = RpcClientConfig { data_max_concurrent_requests: args.data_max_concurrent_requests, witness_max_concurrent_requests: args.witness_max_concurrent_requests, rpc_retry, + per_attempt_timeout, ..rpc_defaults } .with_metrics(Arc::new(metrics::ValidatorMetrics)); diff --git a/crates/stateless-common/src/rpc_client.rs b/crates/stateless-common/src/rpc_client.rs index 36db2a8c..3322523e 100644 --- a/crates/stateless-common/src/rpc_client.rs +++ b/crates/stateless-common/src/rpc_client.rs @@ -120,6 +120,11 @@ pub struct RpcClientConfig { /// jitter) before the next round and doubles the sleep each round up to `max`. Retry is /// unbounded — caller-visible failures don't occur. pub rpc_retry: BackoffPolicy, + /// Hard cap on a single provider attempt. Applied even when no overall deadline is set, + /// so a provider that accepts TCP but never replies cannot wedge the retry loop — + /// timing out the attempt rotates `round_robin_with_backoff` to the next provider. + /// With `deadline = Some(d)`, each attempt uses `min(per_attempt_timeout, d - now)`. + pub per_attempt_timeout: Duration, } impl Default for RpcClientConfig { @@ -133,6 +138,10 @@ impl Default for RpcClientConfig { // 30s. That's gentle enough to ride out near-tip witness generation latency (a // few seconds) without hammering upstream when something is genuinely broken. rpc_retry: BackoffPolicy::new(Duration::from_millis(500), Duration::from_secs(30)), + // 30s is well above any healthy single-attempt latency (witness near-tip can take + // several seconds; everything else is sub-second), but bounded enough that a + // stalled (TCP-accept-no-reply) provider is detected within reasonable time. + per_attempt_timeout: Duration::from_secs(30), } } } @@ -145,6 +154,7 @@ impl std::fmt::Debug for RpcClientConfig { .field("data_max_concurrent_requests", &self.data_max_concurrent_requests) .field("witness_max_concurrent_requests", &self.witness_max_concurrent_requests) .field("rpc_retry", &self.rpc_retry) + .field("per_attempt_timeout", &self.per_attempt_timeout) .finish() } } @@ -363,6 +373,7 @@ impl RpcClient { &self.data_providers, &self.data_concurrency, &self.config.rpc_retry, + self.config.per_attempt_timeout, rr_start, method, self.config.metrics.as_ref(), @@ -378,6 +389,12 @@ impl RpcClient { } /// Deadline-aware counterpart of [`Self::get_code`]. + /// + /// An empty response (`"0x"`) for a non-empty codehash is treated as a transient error and + /// rotates round-robin to the next provider. MegaETH RPC nodes return `"0x"` for codes they + /// have not yet synced (rather than a JSON-RPC error); without this guard, an unsynced + /// upstream would silently return zero bytes that then fail keccak verification, halting the + /// validator. Retrying lets a sync-ready provider serve the request. pub async fn get_code_with_deadline( &self, hash: B256, @@ -385,10 +402,21 @@ impl RpcClient { ) -> std::result::Result { self.call_with_deadline(RpcMethod::EthGetCodeByHash, deadline, move |provider| { Box::pin(async move { - provider.client().request("eth_getCodeByHash", (hash,)).await.map_err(|e| { - trace!(%hash, error = %e, "eth_getCodeByHash failed"); - eyre!("eth_getCodeByHash for hash {hash:?} failed: {e}") - }) + let bytes: Bytes = + provider.client().request("eth_getCodeByHash", (hash,)).await.map_err(|e| { + trace!(%hash, error = %e, "eth_getCodeByHash failed"); + eyre!("eth_getCodeByHash for hash {hash:?} failed: {e}") + })?; + // Guard against the legitimate case where the requested hash *is* the + // empty-code hash — then `"0x"` is the correct answer and retrying would loop + // forever. Every other empty response is treated as upstream-not-sync-ready. + if bytes.is_empty() && hash != revm::primitives::KECCAK_EMPTY { + trace!(%hash, "eth_getCodeByHash returned empty bytecode (provider not sync-ready?)"); + return Err(eyre!( + "eth_getCodeByHash for hash {hash:?} returned empty bytecode (provider may not be sync-ready)" + )); + } + Ok(bytes) }) }) .await @@ -544,6 +572,7 @@ impl RpcClient { &self.witness_providers, &self.witness_concurrency, &self.config.rpc_retry, + self.config.per_attempt_timeout, // Primary-failover: always start from provider 0 so the primary takes all traffic // while healthy. Backup endpoints are touched only while the primary is failing. 0, @@ -719,15 +748,17 @@ macro_rules! log_at { /// /// Used by both the data-method `call()` (rotates `rr_start` per call for load balancing) and /// `get_witness()` (pins `rr_start=0` for primary-failover). -// 8-argument retry primitive. Each field plays a distinct role (providers, concurrency, -// backoff policy, starting provider, method label, metrics sink, deadline, per-attempt closure) -// and bundling them into a struct would be ceremony without encapsulation — there are exactly -// two call sites in this crate. Prefer clarity at the definition over fewer commas at the call. +// 9-argument retry primitive. Each field plays a distinct role (providers, concurrency, +// backoff policy, per-attempt timeout, starting provider, method label, metrics sink, deadline, +// per-attempt closure) and bundling them into a struct would be ceremony without encapsulation — +// there are exactly two call sites in this crate. Prefer clarity at the definition over fewer +// commas at the call. #[allow(clippy::too_many_arguments)] async fn round_robin_with_backoff( providers: &[RootProvider], semaphore: &Semaphore, policy: &BackoffPolicy, + per_attempt_timeout: Duration, rr_start: usize, method: RpcMethod, metrics: Option<&Arc>, @@ -774,40 +805,53 @@ where // and success/error counters reflect what actually happened in the retry loop // rather than always showing "success" with the cumulative logical-call time. let attempt_start = Instant::now(); - // Bound the attempt by the remaining deadline when set — without this, a - // provider that accepts the TCP connection but never replies would block the - // retry loop forever even though the deadline has already passed. With no - // deadline the attempt runs unbounded, as before. - let result = match deadline { - Some(d) => { - let remaining = d.saturating_duration_since(Instant::now()); - match tokio::time::timeout(remaining, f(providers[provider_idx].clone())).await + // Bound every attempt by `per_attempt_timeout` — applied even with `deadline = + // None` so a provider that accepts the TCP connection but never replies cannot + // wedge the retry loop. With `deadline = Some(d)` the attempt is further capped + // by `d - now` so we never sleep past the caller's budget. + let attempt_timeout = match deadline { + Some(d) => d.saturating_duration_since(Instant::now()).min(per_attempt_timeout), + None => per_attempt_timeout, + }; + let result = match tokio::time::timeout( + attempt_timeout, + f(providers[provider_idx].clone()), + ) + .await + { + Ok(r) => r, + Err(_) => { + // Attempt-level timeout fired. Distinguish two cases: + // - deadline set and now past it ⇒ overall call budget exhausted; bail with + // the typed error so the caller sees one consistent failure mode. + // - otherwise ⇒ the provider stalled but the call still has budget; + // synthesize a normal error and rotate to the next provider in this round, + // mirroring the path a returned `Err` from the closure takes. + if let Some(d) = deadline && + Instant::now() >= d { - Ok(r) => r, - Err(_) => { - // Per-attempt timeout == call deadline hit; bail with the - // typed error so the caller sees one consistent failure mode. - drop(permit); - if let Some(m) = metrics { - // Only record `on_rpc_complete(false)` here — NOT `on_rpc_retry`. - // The non-timeout failure arm fires `on_rpc_retry` because - // it's about to loop and try another provider; this arm - // gives up, so counting it as a retry would inflate the - // retry metric above the actual number of attempts made. - m.on_rpc_complete( - method, - false, - Some(attempt_start.elapsed().as_secs_f64()), - ); - } - return Err(RpcDeadlineExceeded { + drop(permit); + if let Some(m) = metrics { + // Only record `on_rpc_complete(false)` here — NOT `on_rpc_retry`. + // The non-timeout failure arm fires `on_rpc_retry` because + // it's about to loop and try another provider; this arm + // gives up, so counting it as a retry would inflate the + // retry metric above the actual number of attempts made. + m.on_rpc_complete( method, - elapsed: call_start.elapsed(), - }); + false, + Some(attempt_start.elapsed().as_secs_f64()), + ); } + return Err(RpcDeadlineExceeded { method, elapsed: call_start.elapsed() }); } + Err(eyre!( + "{} attempt against provider {} timed out after {:?} (per_attempt_timeout)", + method.as_str(), + provider_idx, + attempt_timeout, + )) } - None => f(providers[provider_idx].clone()).await, }; let attempt_duration = attempt_start.elapsed().as_secs_f64(); drop(permit); @@ -1602,4 +1646,113 @@ mod tests { handle.stop().unwrap(); } + + /// `eth_getCodeByHash` returning `"0x"` for a non-empty codehash (the symptom of an + /// upstream that isn't sync-ready) must be retried, not propagated as success. Without + /// the retry guard this would land in `get_codes(verify=true)` as a bytecode whose + /// keccak doesn't match the requested hash → fatal `VerificationFailure` → halt. + #[tokio::test] + async fn test_get_code_retries_empty_response_until_deadline() { + let arbitrary_hash = B256::from([0xCC; 32]); + // Server always returns "0x" for any hash and bumps `hits` so the test can prove + // the call was actually retried (not accepted on first try). + let hits = Arc::new(AtomicUsize::new(0)); + let (handle, url) = serve(hits.clone(), |m| { + m.register_method("eth_getCodeByHash", |_p, hits, _| { + hits.fetch_add(1, Ordering::Relaxed); + Ok::(Bytes::from_static(&[])) + }) + .unwrap(); + }) + .await; + + // Tight retry config so the test wraps in well under a second. + let config = RpcClientConfig { + rpc_retry: BackoffPolicy::new(Duration::from_millis(5), Duration::from_millis(20)), + per_attempt_timeout: Duration::from_millis(500), + ..Default::default() + }; + let client = RpcClient::new_with_config(&[&url], &[&url], config, None).unwrap(); + + let deadline = Instant::now() + Duration::from_millis(300); + let err = client + .get_code_with_deadline(arbitrary_hash, Some(deadline)) + .await + .expect_err("empty response with non-empty-code hash must surface as deadline"); + assert_eq!(err.method, RpcMethod::EthGetCodeByHash); + assert!( + hits.load(Ordering::Relaxed) > 1, + "expected > 1 attempt before deadline (was {})", + hits.load(Ordering::Relaxed) + ); + + handle.stop().unwrap(); + } + + /// The `KECCAK_EMPTY` codehash is the legitimate "empty bytecode" case: `"0x"` is the + /// correct answer and must be returned, not retried (otherwise the call would loop forever). + /// + /// `client.get_code` uses `deadline = None` (unbounded retry); the outer + /// `tokio::time::timeout` is a regression guard — if the `!= KECCAK_EMPTY` check is ever + /// flipped or removed, the call would hang instead of returning, and CI would hang with + /// it. The timeout converts that failure mode into an explicit test failure. + #[tokio::test] + async fn test_get_code_accepts_empty_response_for_keccak_empty() { + let (handle, url) = start_code_rpc(HashMap::new()).await; // server returns "0x" for everything + let client = client_at(&url); + + let bytes = tokio::time::timeout( + Duration::from_millis(500), + client.get_code(revm::primitives::KECCAK_EMPTY), + ) + .await + .expect("KECCAK_EMPTY guard must short-circuit without retrying"); + assert!(bytes.is_empty(), "KECCAK_EMPTY hash must accept empty bytecode"); + + handle.stop().unwrap(); + } + + /// A provider that accepts the TCP connection but never replies must be detected by the + /// per-attempt timeout — even when the call has no deadline (the chain-sync contract). + /// Without per-attempt timing, the retry loop wedges on the stalled provider forever. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_per_attempt_timeout_rotates_past_stalled_provider() { + // Stalled "provider": bind a TCP listener that accepts connections but never reads + // or writes. A client request to it will hang on the response. + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let stalled_addr = listener.local_addr().unwrap(); + let stalled_url = format!("http://{stalled_addr}/"); + // Keep the listener alive (in scope) so the OS doesn't refuse the connection — we + // want accept + never-reply, not refused. Drops cleanly at end of test. + let _listener = listener; + + // Healthy provider as the second endpoint. Round-robin starts at index 0 so the + // call hits the stalled provider first, has to time out the attempt, then rotates. + let healthy_latest = 4242u64; + let (handle, healthy_url) = start_block_number_rpc(healthy_latest).await; + + let config = RpcClientConfig { + rpc_retry: BackoffPolicy::new(Duration::from_millis(5), Duration::from_millis(20)), + // Short per-attempt so a stalled provider rotates quickly. + per_attempt_timeout: Duration::from_millis(150), + ..Default::default() + }; + let client = RpcClient::new_with_config( + &[&stalled_url, &healthy_url], + &[&healthy_url], + config, + None, + ) + .unwrap(); + + // Bound the test wall clock with `tokio::time::timeout` (NOT a deadline parameter): + // this tests the `deadline = None` path. If the bug regressed, the call would hang + // and this outer timeout would fire instead of the call returning Ok. + let result = tokio::time::timeout(Duration::from_secs(3), client.get_latest_block_number()) + .await + .expect("call must return — per-attempt timeout should rotate past stalled provider"); + assert_eq!(result, healthy_latest); + + handle.stop().unwrap(); + } }