Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion bin/debug-trace-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ struct Args {
#[clap(long, env = "DEBUG_TRACE_SERVER_WITNESS_MAX_CONCURRENT_REQUESTS")]
witness_max_concurrent_requests: Option<usize>,

/// Per-attempt RPC timeout (milliseconds). Bounds every individual provider attempt
/// even when the caller passes no overall deadline (e.g. background chain sync), so
/// a provider that accepts the TCP connection but never replies is detected and
/// rotated past instead of wedging the retry loop.
#[clap(long, env = "DEBUG_TRACE_SERVER_RPC_PER_ATTEMPT_TIMEOUT_MS")]
rpc_per_attempt_timeout_ms: Option<u64>,

/// Logging configuration.
#[command(flatten)]
log: LogArgs,
Expand Down Expand Up @@ -284,10 +291,16 @@ async fn main() -> Result<()> {
// Initialize components
let data_apis: Vec<&str> = args.rpc_endpoint.iter().map(String::as_str).collect();
let witness_apis: Vec<&str> = args.witness_endpoint.iter().map(String::as_str).collect();
let rpc_defaults = RpcClientConfig::trace_server();
let per_attempt_timeout = args
.rpc_per_attempt_timeout_ms
.map(std::time::Duration::from_millis)
.unwrap_or(rpc_defaults.per_attempt_timeout);
let rpc_config = RpcClientConfig {
data_max_concurrent_requests: args.data_max_concurrent_requests,
witness_max_concurrent_requests: args.witness_max_concurrent_requests,
..RpcClientConfig::trace_server()
per_attempt_timeout,
..rpc_defaults
}
.with_metrics(Arc::new(metrics::TraceRpcMetrics));
let rpc_client =
Expand Down
9 changes: 9 additions & 0 deletions bin/stateless-validator/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ pub struct CommandLineArgs {
#[clap(long, env = "STATELESS_VALIDATOR_RPC_MAX_BACKOFF_MS")]
pub rpc_max_backoff_ms: Option<u64>,

/// Per-attempt RPC timeout (milliseconds). Bounds every individual provider attempt
/// even when the chain-sync caller passes no overall deadline, so a provider that
/// accepts the TCP connection but never replies is detected and rotated past.
#[clap(long, env = "STATELESS_VALIDATOR_RPC_PER_ATTEMPT_TIMEOUT_MS")]
pub rpc_per_attempt_timeout_ms: Option<u64>,

/// Soft cap on rows retained in the canonical-chain table. Old rows are pruned inline
/// when `advance_chain` exceeds this. Larger values bound the reorg-lookup window;
/// smaller values reduce redb file growth. Defaults to `DEFAULT_MAX_CHAIN_LENGTH`
Expand Down Expand Up @@ -178,10 +184,13 @@ pub async fn run() -> Result<()> {
initial: override_ms(args.rpc_initial_backoff_ms, rpc_defaults.rpc_retry.initial),
max: override_ms(args.rpc_max_backoff_ms, rpc_defaults.rpc_retry.max),
};
let per_attempt_timeout =
override_ms(args.rpc_per_attempt_timeout_ms, rpc_defaults.per_attempt_timeout);
let rpc_config = RpcClientConfig {
data_max_concurrent_requests: args.data_max_concurrent_requests,
witness_max_concurrent_requests: args.witness_max_concurrent_requests,
rpc_retry,
per_attempt_timeout,
..rpc_defaults
}
.with_metrics(Arc::new(metrics::ValidatorMetrics));
Expand Down
227 changes: 190 additions & 37 deletions crates/stateless-common/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ pub struct RpcClientConfig {
/// jitter) before the next round and doubles the sleep each round up to `max`. Retry is
/// unbounded — caller-visible failures don't occur.
pub rpc_retry: BackoffPolicy,
/// Hard cap on a single provider attempt. Applied even when no overall deadline is set,
/// so a provider that accepts TCP but never replies cannot wedge the retry loop —
/// timing out the attempt rotates `round_robin_with_backoff` to the next provider.
/// With `deadline = Some(d)`, each attempt uses `min(per_attempt_timeout, d - now)`.
pub per_attempt_timeout: Duration,
}

impl Default for RpcClientConfig {
Expand All @@ -133,6 +138,10 @@ impl Default for RpcClientConfig {
// 30s. That's gentle enough to ride out near-tip witness generation latency (a
// few seconds) without hammering upstream when something is genuinely broken.
rpc_retry: BackoffPolicy::new(Duration::from_millis(500), Duration::from_secs(30)),
// 30s is well above any healthy single-attempt latency (witness near-tip can take
// several seconds; everything else is sub-second), but bounded enough that a
// stalled (TCP-accept-no-reply) provider is detected within reasonable time.
per_attempt_timeout: Duration::from_secs(30),
}
}
}
Expand All @@ -145,6 +154,7 @@ impl std::fmt::Debug for RpcClientConfig {
.field("data_max_concurrent_requests", &self.data_max_concurrent_requests)
.field("witness_max_concurrent_requests", &self.witness_max_concurrent_requests)
.field("rpc_retry", &self.rpc_retry)
.field("per_attempt_timeout", &self.per_attempt_timeout)
.finish()
}
}
Expand Down Expand Up @@ -363,6 +373,7 @@ impl RpcClient {
&self.data_providers,
&self.data_concurrency,
&self.config.rpc_retry,
self.config.per_attempt_timeout,
rr_start,
method,
self.config.metrics.as_ref(),
Expand All @@ -378,17 +389,34 @@ impl RpcClient {
}

/// Deadline-aware counterpart of [`Self::get_code`].
///
/// An empty response (`"0x"`) for a non-empty codehash is treated as a transient error and
/// rotates round-robin to the next provider. MegaETH RPC nodes return `"0x"` for codes they
/// have not yet synced (rather than a JSON-RPC error); without this guard, an unsynced
/// upstream would silently return zero bytes that then fail keccak verification, halting the
/// validator. Retrying lets a sync-ready provider serve the request.
pub async fn get_code_with_deadline(
&self,
hash: B256,
deadline: Option<Instant>,
) -> std::result::Result<Bytes, RpcDeadlineExceeded> {
self.call_with_deadline(RpcMethod::EthGetCodeByHash, deadline, move |provider| {
Box::pin(async move {
provider.client().request("eth_getCodeByHash", (hash,)).await.map_err(|e| {
trace!(%hash, error = %e, "eth_getCodeByHash failed");
eyre!("eth_getCodeByHash for hash {hash:?} failed: {e}")
})
let bytes: Bytes =
provider.client().request("eth_getCodeByHash", (hash,)).await.map_err(|e| {
trace!(%hash, error = %e, "eth_getCodeByHash failed");
eyre!("eth_getCodeByHash for hash {hash:?} failed: {e}")
})?;
// Guard against the legitimate case where the requested hash *is* the
// empty-code hash — then `"0x"` is the correct answer and retrying would loop
// forever. Every other empty response is treated as upstream-not-sync-ready.
if bytes.is_empty() && hash != revm::primitives::KECCAK_EMPTY {
trace!(%hash, "eth_getCodeByHash returned empty bytecode (provider not sync-ready?)");
return Err(eyre!(
"eth_getCodeByHash for hash {hash:?} returned empty bytecode (provider may not be sync-ready)"
));
}
Ok(bytes)
})
})
.await
Expand Down Expand Up @@ -544,6 +572,7 @@ impl RpcClient {
&self.witness_providers,
&self.witness_concurrency,
&self.config.rpc_retry,
self.config.per_attempt_timeout,
// Primary-failover: always start from provider 0 so the primary takes all traffic
// while healthy. Backup endpoints are touched only while the primary is failing.
0,
Expand Down Expand Up @@ -719,15 +748,17 @@ macro_rules! log_at {
///
/// Used by both the data-method `call()` (rotates `rr_start` per call for load balancing) and
/// `get_witness()` (pins `rr_start=0` for primary-failover).
// 8-argument retry primitive. Each field plays a distinct role (providers, concurrency,
// backoff policy, starting provider, method label, metrics sink, deadline, per-attempt closure)
// and bundling them into a struct would be ceremony without encapsulation — there are exactly
// two call sites in this crate. Prefer clarity at the definition over fewer commas at the call.
// 9-argument retry primitive. Each field plays a distinct role (providers, concurrency,
// backoff policy, per-attempt timeout, starting provider, method label, metrics sink, deadline,
// per-attempt closure) and bundling them into a struct would be ceremony without encapsulation —
// there are exactly two call sites in this crate. Prefer clarity at the definition over fewer
// commas at the call.
#[allow(clippy::too_many_arguments)]
async fn round_robin_with_backoff<N, T>(
providers: &[RootProvider<N>],
semaphore: &Semaphore,
policy: &BackoffPolicy,
per_attempt_timeout: Duration,
rr_start: usize,
method: RpcMethod,
metrics: Option<&Arc<dyn RpcMetrics>>,
Expand Down Expand Up @@ -774,40 +805,53 @@ where
// and success/error counters reflect what actually happened in the retry loop
// rather than always showing "success" with the cumulative logical-call time.
let attempt_start = Instant::now();
// Bound the attempt by the remaining deadline when set — without this, a
// provider that accepts the TCP connection but never replies would block the
// retry loop forever even though the deadline has already passed. With no
// deadline the attempt runs unbounded, as before.
let result = match deadline {
Some(d) => {
let remaining = d.saturating_duration_since(Instant::now());
match tokio::time::timeout(remaining, f(providers[provider_idx].clone())).await
// Bound every attempt by `per_attempt_timeout` — applied even with `deadline =
// None` so a provider that accepts the TCP connection but never replies cannot
// wedge the retry loop. With `deadline = Some(d)` the attempt is further capped
// by `d - now` so we never sleep past the caller's budget.
let attempt_timeout = match deadline {
Some(d) => d.saturating_duration_since(Instant::now()).min(per_attempt_timeout),
None => per_attempt_timeout,
};
let result = match tokio::time::timeout(
attempt_timeout,
f(providers[provider_idx].clone()),
)
.await
{
Ok(r) => r,
Err(_) => {
// Attempt-level timeout fired. Distinguish two cases:
// - deadline set and now past it ⇒ overall call budget exhausted; bail with
// the typed error so the caller sees one consistent failure mode.
// - otherwise ⇒ the provider stalled but the call still has budget;
// synthesize a normal error and rotate to the next provider in this round,
// mirroring the path a returned `Err` from the closure takes.
if let Some(d) = deadline &&
Instant::now() >= d
{
Ok(r) => r,
Err(_) => {
// Per-attempt timeout == call deadline hit; bail with the
// typed error so the caller sees one consistent failure mode.
drop(permit);
if let Some(m) = metrics {
// Only record `on_rpc_complete(false)` here — NOT `on_rpc_retry`.
// The non-timeout failure arm fires `on_rpc_retry` because
// it's about to loop and try another provider; this arm
// gives up, so counting it as a retry would inflate the
// retry metric above the actual number of attempts made.
m.on_rpc_complete(
method,
false,
Some(attempt_start.elapsed().as_secs_f64()),
);
}
return Err(RpcDeadlineExceeded {
drop(permit);
if let Some(m) = metrics {
// Only record `on_rpc_complete(false)` here — NOT `on_rpc_retry`.
// The non-timeout failure arm fires `on_rpc_retry` because
// it's about to loop and try another provider; this arm
// gives up, so counting it as a retry would inflate the
// retry metric above the actual number of attempts made.
m.on_rpc_complete(
method,
elapsed: call_start.elapsed(),
});
false,
Some(attempt_start.elapsed().as_secs_f64()),
);
}
return Err(RpcDeadlineExceeded { method, elapsed: call_start.elapsed() });
}
Err(eyre!(
"{} attempt against provider {} timed out after {:?} (per_attempt_timeout)",
method.as_str(),
provider_idx,
attempt_timeout,
))
}
None => f(providers[provider_idx].clone()).await,
};
let attempt_duration = attempt_start.elapsed().as_secs_f64();
drop(permit);
Expand Down Expand Up @@ -1602,4 +1646,113 @@ mod tests {

handle.stop().unwrap();
}

/// `eth_getCodeByHash` returning `"0x"` for a non-empty codehash (the symptom of an
/// upstream that isn't sync-ready) must be retried, not propagated as success. Without
/// the retry guard this would land in `get_codes(verify=true)` as a bytecode whose
/// keccak doesn't match the requested hash → fatal `VerificationFailure` → halt.
#[tokio::test]
async fn test_get_code_retries_empty_response_until_deadline() {
let arbitrary_hash = B256::from([0xCC; 32]);
// Server always returns "0x" for any hash and bumps `hits` so the test can prove
// the call was actually retried (not accepted on first try).
let hits = Arc::new(AtomicUsize::new(0));
let (handle, url) = serve(hits.clone(), |m| {
m.register_method("eth_getCodeByHash", |_p, hits, _| {
hits.fetch_add(1, Ordering::Relaxed);
Ok::<Bytes, ErrorObjectOwned>(Bytes::from_static(&[]))
})
.unwrap();
})
.await;

// Tight retry config so the test wraps in well under a second.
let config = RpcClientConfig {
rpc_retry: BackoffPolicy::new(Duration::from_millis(5), Duration::from_millis(20)),
per_attempt_timeout: Duration::from_millis(500),
..Default::default()
};
let client = RpcClient::new_with_config(&[&url], &[&url], config, None).unwrap();

let deadline = Instant::now() + Duration::from_millis(300);
let err = client
.get_code_with_deadline(arbitrary_hash, Some(deadline))
.await
.expect_err("empty response with non-empty-code hash must surface as deadline");
assert_eq!(err.method, RpcMethod::EthGetCodeByHash);
assert!(
hits.load(Ordering::Relaxed) > 1,
"expected > 1 attempt before deadline (was {})",
hits.load(Ordering::Relaxed)
);

handle.stop().unwrap();
}

/// The `KECCAK_EMPTY` codehash is the legitimate "empty bytecode" case: `"0x"` is the
/// correct answer and must be returned, not retried (otherwise the call would loop forever).
///
/// `client.get_code` uses `deadline = None` (unbounded retry); the outer
/// `tokio::time::timeout` is a regression guard — if the `!= KECCAK_EMPTY` check is ever
/// flipped or removed, the call would hang instead of returning, and CI would hang with
/// it. The timeout converts that failure mode into an explicit test failure.
#[tokio::test]
async fn test_get_code_accepts_empty_response_for_keccak_empty() {
let (handle, url) = start_code_rpc(HashMap::new()).await; // server returns "0x" for everything
let client = client_at(&url);

let bytes = tokio::time::timeout(
Duration::from_millis(500),
client.get_code(revm::primitives::KECCAK_EMPTY),
)
.await
.expect("KECCAK_EMPTY guard must short-circuit without retrying");
assert!(bytes.is_empty(), "KECCAK_EMPTY hash must accept empty bytecode");

handle.stop().unwrap();
}

/// A provider that accepts the TCP connection but never replies must be detected by the
/// per-attempt timeout — even when the call has no deadline (the chain-sync contract).
/// Without per-attempt timing, the retry loop wedges on the stalled provider forever.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_per_attempt_timeout_rotates_past_stalled_provider() {
// Stalled "provider": bind a TCP listener that accepts connections but never reads
// or writes. A client request to it will hang on the response.
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let stalled_addr = listener.local_addr().unwrap();
let stalled_url = format!("http://{stalled_addr}/");
// Keep the listener alive (in scope) so the OS doesn't refuse the connection — we
// want accept + never-reply, not refused. Drops cleanly at end of test.
let _listener = listener;

// Healthy provider as the second endpoint. Round-robin starts at index 0 so the
// call hits the stalled provider first, has to time out the attempt, then rotates.
let healthy_latest = 4242u64;
let (handle, healthy_url) = start_block_number_rpc(healthy_latest).await;

let config = RpcClientConfig {
rpc_retry: BackoffPolicy::new(Duration::from_millis(5), Duration::from_millis(20)),
// Short per-attempt so a stalled provider rotates quickly.
per_attempt_timeout: Duration::from_millis(150),
..Default::default()
};
let client = RpcClient::new_with_config(
&[&stalled_url, &healthy_url],
&[&healthy_url],
config,
None,
)
.unwrap();

// Bound the test wall clock with `tokio::time::timeout` (NOT a deadline parameter):
// this tests the `deadline = None` path. If the bug regressed, the call would hang
// and this outer timeout would fire instead of the call returning Ok.
let result = tokio::time::timeout(Duration::from_secs(3), client.get_latest_block_number())
.await
.expect("call must return — per-attempt timeout should rotate past stalled provider");
assert_eq!(result, healthy_latest);

handle.stop().unwrap();
}
}
Loading