-
Notifications
You must be signed in to change notification settings - Fork 7
fix: bound per-attempt RPC timeout and retry empty get_code responses #129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
36a3d57
78683a0
be3a310
20ff8e7
9a5d652
cf46d07
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -120,6 +120,11 @@ pub struct RpcClientConfig { | |
| /// jitter) before the next round and doubles the sleep each round up to `max`. Retry is | ||
| /// unbounded — caller-visible failures don't occur. | ||
| pub rpc_retry: BackoffPolicy, | ||
| /// Hard cap on a single provider attempt. Applied even when no overall deadline is set, | ||
| /// so a provider that accepts TCP but never replies cannot wedge the retry loop — | ||
| /// timing out the attempt rotates `round_robin_with_backoff` to the next provider. | ||
| /// With `deadline = Some(d)`, each attempt uses `min(per_attempt_timeout, d - now)`. | ||
| pub per_attempt_timeout: Duration, | ||
| } | ||
|
|
||
| impl Default for RpcClientConfig { | ||
|
|
@@ -133,6 +138,10 @@ impl Default for RpcClientConfig { | |
| // 30s. That's gentle enough to ride out near-tip witness generation latency (a | ||
| // few seconds) without hammering upstream when something is genuinely broken. | ||
| rpc_retry: BackoffPolicy::new(Duration::from_millis(500), Duration::from_secs(30)), | ||
| // 30s is well above any healthy single-attempt latency (witness near-tip can take | ||
| // several seconds; everything else is sub-second), but bounded enough that a | ||
| // stalled (TCP-accept-no-reply) provider is detected within reasonable time. | ||
| per_attempt_timeout: Duration::from_secs(30), | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -145,6 +154,7 @@ impl std::fmt::Debug for RpcClientConfig { | |
| .field("data_max_concurrent_requests", &self.data_max_concurrent_requests) | ||
| .field("witness_max_concurrent_requests", &self.witness_max_concurrent_requests) | ||
| .field("rpc_retry", &self.rpc_retry) | ||
| .field("per_attempt_timeout", &self.per_attempt_timeout) | ||
| .finish() | ||
| } | ||
| } | ||
|
|
@@ -363,6 +373,7 @@ impl RpcClient { | |
| &self.data_providers, | ||
| &self.data_concurrency, | ||
| &self.config.rpc_retry, | ||
| self.config.per_attempt_timeout, | ||
| rr_start, | ||
| method, | ||
| self.config.metrics.as_ref(), | ||
|
|
@@ -378,17 +389,34 @@ impl RpcClient { | |
| } | ||
|
|
||
| /// Deadline-aware counterpart of [`Self::get_code`]. | ||
| /// | ||
| /// An empty response (`"0x"`) for a non-empty codehash is treated as a transient error and | ||
| /// rotates round-robin to the next provider. MegaETH RPC nodes return `"0x"` for codes they | ||
| /// have not yet synced (rather than a JSON-RPC error); without this guard, an unsynced | ||
| /// upstream would silently return zero bytes that then fail keccak verification, halting the | ||
| /// validator. Retrying lets a sync-ready provider serve the request. | ||
| pub async fn get_code_with_deadline( | ||
| &self, | ||
| hash: B256, | ||
| deadline: Option<Instant>, | ||
| ) -> std::result::Result<Bytes, RpcDeadlineExceeded> { | ||
| self.call_with_deadline(RpcMethod::EthGetCodeByHash, deadline, move |provider| { | ||
| Box::pin(async move { | ||
| provider.client().request("eth_getCodeByHash", (hash,)).await.map_err(|e| { | ||
| trace!(%hash, error = %e, "eth_getCodeByHash failed"); | ||
| eyre!("eth_getCodeByHash for hash {hash:?} failed: {e}") | ||
| }) | ||
| let bytes: Bytes = | ||
| provider.client().request("eth_getCodeByHash", (hash,)).await.map_err(|e| { | ||
| trace!(%hash, error = %e, "eth_getCodeByHash failed"); | ||
| eyre!("eth_getCodeByHash for hash {hash:?} failed: {e}") | ||
| })?; | ||
| // Guard against the legitimate case where the requested hash *is* the | ||
| // empty-code hash — then `"0x"` is the correct answer and retrying would loop | ||
| // forever. Every other empty response is treated as upstream-not-sync-ready. | ||
| if bytes.is_empty() && hash != revm::primitives::KECCAK_EMPTY { | ||
| trace!(%hash, "eth_getCodeByHash returned empty bytecode (provider not sync-ready?)"); | ||
| return Err(eyre!( | ||
| "eth_getCodeByHash for hash {hash:?} returned empty bytecode (provider may not be sync-ready)" | ||
| )); | ||
| } | ||
| Ok(bytes) | ||
| }) | ||
| }) | ||
| .await | ||
|
|
@@ -544,6 +572,7 @@ impl RpcClient { | |
| &self.witness_providers, | ||
| &self.witness_concurrency, | ||
| &self.config.rpc_retry, | ||
| self.config.per_attempt_timeout, | ||
| // Primary-failover: always start from provider 0 so the primary takes all traffic | ||
| // while healthy. Backup endpoints are touched only while the primary is failing. | ||
| 0, | ||
|
|
@@ -719,15 +748,17 @@ macro_rules! log_at { | |
| /// | ||
| /// Used by both the data-method `call()` (rotates `rr_start` per call for load balancing) and | ||
| /// `get_witness()` (pins `rr_start=0` for primary-failover). | ||
| // 8-argument retry primitive. Each field plays a distinct role (providers, concurrency, | ||
| // backoff policy, starting provider, method label, metrics sink, deadline, per-attempt closure) | ||
| // and bundling them into a struct would be ceremony without encapsulation — there are exactly | ||
| // two call sites in this crate. Prefer clarity at the definition over fewer commas at the call. | ||
| // 9-argument retry primitive. Each field plays a distinct role (providers, concurrency, | ||
| // backoff policy, per-attempt timeout, starting provider, method label, metrics sink, deadline, | ||
| // per-attempt closure) and bundling them into a struct would be ceremony without encapsulation — | ||
| // there are exactly two call sites in this crate. Prefer clarity at the definition over fewer | ||
| // commas at the call. | ||
| #[allow(clippy::too_many_arguments)] | ||
| async fn round_robin_with_backoff<N, T>( | ||
| providers: &[RootProvider<N>], | ||
| semaphore: &Semaphore, | ||
| policy: &BackoffPolicy, | ||
| per_attempt_timeout: Duration, | ||
| rr_start: usize, | ||
| method: RpcMethod, | ||
| metrics: Option<&Arc<dyn RpcMetrics>>, | ||
|
|
@@ -774,40 +805,53 @@ where | |
| // and success/error counters reflect what actually happened in the retry loop | ||
| // rather than always showing "success" with the cumulative logical-call time. | ||
| let attempt_start = Instant::now(); | ||
| // Bound the attempt by the remaining deadline when set — without this, a | ||
| // provider that accepts the TCP connection but never replies would block the | ||
| // retry loop forever even though the deadline has already passed. With no | ||
| // deadline the attempt runs unbounded, as before. | ||
| let result = match deadline { | ||
| Some(d) => { | ||
| let remaining = d.saturating_duration_since(Instant::now()); | ||
| match tokio::time::timeout(remaining, f(providers[provider_idx].clone())).await | ||
| // Bound every attempt by `per_attempt_timeout` — applied even with `deadline = | ||
| // None` so a provider that accepts the TCP connection but never replies cannot | ||
| // wedge the retry loop. With `deadline = Some(d)` the attempt is further capped | ||
| // by `d - now` so we never sleep past the caller's budget. | ||
| let attempt_timeout = match deadline { | ||
| Some(d) => d.saturating_duration_since(Instant::now()).min(per_attempt_timeout), | ||
| None => per_attempt_timeout, | ||
| }; | ||
|
Comment on lines
+812
to
+815
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If rpc_per_attempt_timeout_ms is set to 0 via the new CLI/env knob, attempt_timeout becomes Duration::ZERO and tokio::time::timeout will fail every provider attempt immediately. On the validator's deadline = None paths this means the retry loop never makes a real RPC request and can spin forever with backoff; on deadline-bound paths it guarantees RpcDeadlineExceeded even against healthy providers. Since this option is
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 设置成最小为 100 ms 了 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Already addressed: both CLIs now use |
||
| let result = match tokio::time::timeout( | ||
| attempt_timeout, | ||
| f(providers[provider_idx].clone()), | ||
| ) | ||
| .await | ||
| { | ||
| Ok(r) => r, | ||
| Err(_) => { | ||
| // Attempt-level timeout fired. Distinguish two cases: | ||
| // - deadline set and now past it ⇒ overall call budget exhausted; bail with | ||
| // the typed error so the caller sees one consistent failure mode. | ||
| // - otherwise ⇒ the provider stalled but the call still has budget; | ||
| // synthesize a normal error and rotate to the next provider in this round, | ||
| // mirroring the path a returned `Err` from the closure takes. | ||
| if let Some(d) = deadline && | ||
| Instant::now() >= d | ||
| { | ||
| Ok(r) => r, | ||
| Err(_) => { | ||
| // Per-attempt timeout == call deadline hit; bail with the | ||
| // typed error so the caller sees one consistent failure mode. | ||
| drop(permit); | ||
| if let Some(m) = metrics { | ||
| // Only record `on_rpc_complete(false)` here — NOT `on_rpc_retry`. | ||
| // The non-timeout failure arm fires `on_rpc_retry` because | ||
| // it's about to loop and try another provider; this arm | ||
| // gives up, so counting it as a retry would inflate the | ||
| // retry metric above the actual number of attempts made. | ||
| m.on_rpc_complete( | ||
| method, | ||
| false, | ||
| Some(attempt_start.elapsed().as_secs_f64()), | ||
| ); | ||
| } | ||
| return Err(RpcDeadlineExceeded { | ||
| drop(permit); | ||
| if let Some(m) = metrics { | ||
| // Only record `on_rpc_complete(false)` here — NOT `on_rpc_retry`. | ||
| // The non-timeout failure arm fires `on_rpc_retry` because | ||
| // it's about to loop and try another provider; this arm | ||
| // gives up, so counting it as a retry would inflate the | ||
| // retry metric above the actual number of attempts made. | ||
| m.on_rpc_complete( | ||
| method, | ||
| elapsed: call_start.elapsed(), | ||
| }); | ||
| false, | ||
| Some(attempt_start.elapsed().as_secs_f64()), | ||
| ); | ||
| } | ||
| return Err(RpcDeadlineExceeded { method, elapsed: call_start.elapsed() }); | ||
| } | ||
| Err(eyre!( | ||
| "{} attempt against provider {} timed out after {:?} (per_attempt_timeout)", | ||
| method.as_str(), | ||
| provider_idx, | ||
| attempt_timeout, | ||
| )) | ||
| } | ||
| None => f(providers[provider_idx].clone()).await, | ||
| }; | ||
| let attempt_duration = attempt_start.elapsed().as_secs_f64(); | ||
| drop(permit); | ||
|
|
@@ -1602,4 +1646,103 @@ mod tests { | |
|
|
||
| handle.stop().unwrap(); | ||
| } | ||
|
|
||
| /// `eth_getCodeByHash` returning `"0x"` for a non-empty codehash (the symptom of an | ||
| /// upstream that isn't sync-ready) must be retried, not propagated as success. Without | ||
| /// the retry guard this would land in `get_codes(verify=true)` as a bytecode whose | ||
| /// keccak doesn't match the requested hash → fatal `VerificationFailure` → halt. | ||
| #[tokio::test] | ||
| async fn test_get_code_retries_empty_response_until_deadline() { | ||
| let arbitrary_hash = B256::from([0xCC; 32]); | ||
| // Server always returns "0x" for any hash and bumps `hits` so the test can prove | ||
| // the call was actually retried (not accepted on first try). | ||
| let hits = Arc::new(AtomicUsize::new(0)); | ||
| let (handle, url) = serve(hits.clone(), |m| { | ||
| m.register_method("eth_getCodeByHash", |_p, hits, _| { | ||
| hits.fetch_add(1, Ordering::Relaxed); | ||
| Ok::<Bytes, ErrorObjectOwned>(Bytes::from_static(&[])) | ||
| }) | ||
| .unwrap(); | ||
| }) | ||
| .await; | ||
|
|
||
| // Tight retry config so the test wraps in well under a second. | ||
| let config = RpcClientConfig { | ||
| rpc_retry: BackoffPolicy::new(Duration::from_millis(5), Duration::from_millis(20)), | ||
| per_attempt_timeout: Duration::from_millis(500), | ||
| ..Default::default() | ||
| }; | ||
| let client = RpcClient::new_with_config(&[&url], &[&url], config, None).unwrap(); | ||
|
|
||
| let deadline = Instant::now() + Duration::from_millis(300); | ||
| let err = client | ||
| .get_code_with_deadline(arbitrary_hash, Some(deadline)) | ||
| .await | ||
| .expect_err("empty response with non-empty-code hash must surface as deadline"); | ||
| assert_eq!(err.method, RpcMethod::EthGetCodeByHash); | ||
| assert!( | ||
| hits.load(Ordering::Relaxed) > 1, | ||
| "expected > 1 attempt before deadline (was {})", | ||
| hits.load(Ordering::Relaxed) | ||
| ); | ||
|
|
||
| handle.stop().unwrap(); | ||
| } | ||
|
|
||
| /// The `KECCAK_EMPTY` codehash is the legitimate "empty bytecode" case: `"0x"` is the | ||
| /// correct answer and must be returned, not retried (otherwise the call would loop forever). | ||
| #[tokio::test] | ||
| async fn test_get_code_accepts_empty_response_for_keccak_empty() { | ||
| let (handle, url) = start_code_rpc(HashMap::new()).await; // server returns "0x" for everything | ||
| let client = client_at(&url); | ||
|
|
||
| let bytes = client.get_code(revm::primitives::KECCAK_EMPTY).await; | ||
| assert!(bytes.is_empty(), "KECCAK_EMPTY hash must accept empty bytecode"); | ||
|
claude[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| handle.stop().unwrap(); | ||
| } | ||
|
|
||
| /// A provider that accepts the TCP connection but never replies must be detected by the | ||
| /// per-attempt timeout — even when the call has no deadline (the chain-sync contract). | ||
| /// Without per-attempt timing, the retry loop wedges on the stalled provider forever. | ||
| #[tokio::test(flavor = "multi_thread", worker_threads = 2)] | ||
| async fn test_per_attempt_timeout_rotates_past_stalled_provider() { | ||
| // Stalled "provider": bind a TCP listener that accepts connections but never reads | ||
| // or writes. A client request to it will hang on the response. | ||
| let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); | ||
| let stalled_addr = listener.local_addr().unwrap(); | ||
| let stalled_url = format!("http://{stalled_addr}/"); | ||
| // Keep the listener alive (in scope) so the OS doesn't refuse the connection — we | ||
| // want accept + never-reply, not refused. Drops cleanly at end of test. | ||
| let _listener = listener; | ||
|
|
||
| // Healthy provider as the second endpoint. Round-robin starts at index 0 so the | ||
| // call hits the stalled provider first, has to time out the attempt, then rotates. | ||
| let healthy_latest = 4242u64; | ||
| let (handle, healthy_url) = start_block_number_rpc(healthy_latest).await; | ||
|
|
||
| let config = RpcClientConfig { | ||
| rpc_retry: BackoffPolicy::new(Duration::from_millis(5), Duration::from_millis(20)), | ||
| // Short per-attempt so a stalled provider rotates quickly. | ||
| per_attempt_timeout: Duration::from_millis(150), | ||
| ..Default::default() | ||
| }; | ||
| let client = RpcClient::new_with_config( | ||
| &[&stalled_url, &healthy_url], | ||
| &[&healthy_url], | ||
| config, | ||
| None, | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| // Bound the test wall clock with `tokio::time::timeout` (NOT a deadline parameter): | ||
| // this tests the `deadline = None` path. If the bug regressed, the call would hang | ||
| // and this outer timeout would fire instead of the call returning Ok. | ||
| let result = tokio::time::timeout(Duration::from_secs(3), client.get_latest_block_number()) | ||
| .await | ||
| .expect("call must return — per-attempt timeout should rotate past stalled provider"); | ||
| assert_eq!(result, healthy_latest); | ||
|
|
||
| handle.stop().unwrap(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lib.rs:1-