diff --git a/charts/coprocessor/Chart.yaml b/charts/coprocessor/Chart.yaml index 700f61e64d..1f6710f793 100644 --- a/charts/coprocessor/Chart.yaml +++ b/charts/coprocessor/Chart.yaml @@ -1,6 +1,6 @@ name: coprocessor description: A helm chart to distribute and deploy Zama fhevm Co-Processor services -version: 0.7.11 +version: 0.7.12 apiVersion: v2 keywords: - fhevm diff --git a/charts/coprocessor/values.yaml b/charts/coprocessor/values.yaml index 64b4f104bd..b95e2bf774 100644 --- a/charts/coprocessor/values.yaml +++ b/charts/coprocessor/values.yaml @@ -170,6 +170,9 @@ hostListener: - --service-name="host-listener" - --catchup-finalization-in-blocks=20 # Continuous catchup will wait for block "finalization" + ### New in v0.11 + # - --timeout-request-websocket=15 # OPTIONAL, Default timeout in seconds for websocket interactions + # Service ports configuration ports: metrics: 9100 diff --git a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs index 529913c37c..82f29eddd2 100644 --- a/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs +++ b/coprocessor/fhevm-engine/host-listener/src/cmd/mod.rs @@ -40,6 +40,8 @@ pub const DEFAULT_DEPENDENCE_CACHE_SIZE: u16 = 10_000; pub const DEFAULT_DEPENDENCE_BY_CONNEXITY: bool = false; pub const DEFAULT_DEPENDENCE_CROSS_BLOCK: bool = true; +const TIMEOUT_REQUEST_ON_WEBSOCKET: u64 = 15; + #[derive(Parser, Debug, Clone)] #[command(version, about, long_about = None)] pub struct Args { @@ -156,6 +158,13 @@ pub struct Args { help = "Sleep duration in seconds between catchup loop iterations" )] pub catchup_loop_sleep_secs: u64, + + #[arg( + long, + default_value_t = TIMEOUT_REQUEST_ON_WEBSOCKET, + help = "Timeout in seconds for RPC calls over websocket" + )] + pub timeout_request_websocket: u64, } // TODO: to merge with Levent works @@ -180,6 +189,7 @@ struct InfiniteLogIter { reorg_maximum_duration_in_blocks: u64, // in blocks block_history: BlockHistory, // to detect reorgs catchup_finalization_in_blocks: u64, + timeout_request_websocket: u64, } enum BlockOrTimeoutOrNone { @@ -243,6 +253,7 @@ impl InfiniteLogIter { args.reorg_maximum_duration_in_blocks as usize, ), catchup_finalization_in_blocks: args.catchup_finalization_in_blocks, + timeout_request_websocket: args.timeout_request_websocket, } } @@ -313,13 +324,27 @@ impl InfiniteLogIter { Ok(provider) => provider, Err(_) => anyhow::bail!("Cannot get a provider"), }; - provider.get_logs(&filter).await.map_err(|err| { - if eth_rpc_err::too_much_blocks_or_events(&err) { - anyhow::anyhow!("Too much blocks or events: {err}") - } else { - anyhow::anyhow!("Cannot get logs for {filter:?} due to {err}") + // Timeout to prevent hanging indefinitely on buggy node + match tokio::time::timeout( + Duration::from_secs(self.timeout_request_websocket), + provider.get_logs(&filter), + ) + .await + { + Err(_) => { + anyhow::bail!("Timeout getting range logs for {filter:?}") } - }) + Ok(Err(err)) => { + if eth_rpc_err::too_much_blocks_or_events(&err) { + anyhow::bail!("Too much blocks or events: {err}") + } else { + anyhow::bail!( + "Cannot get range logs for {filter:?} due to {err}" + ) + } + } + Ok(Ok(logs)) => Ok(logs), + } } async fn deduce_block_summary( @@ -501,17 +526,24 @@ impl InfiniteLogIter { error!("No provider, inconsistent state"); return Err(anyhow::anyhow!("No provider, inconsistent state")); }; - let block = provider.get_block(block_id).await; - match block { - Ok(Some(block)) => return Ok(block), - Ok(None) => error!( + let block = tokio::time::timeout( + Duration::from_secs(self.timeout_request_websocket), + provider.get_block(block_id), + ); + match block.await { + Ok(Ok(Some(block))) => return Ok(block), + Ok(Ok(None)) => error!( block_id = ?block_id, - "Cannot get current block {block_id}, retrying", + "Cannot get block {block_id}, retrying", ), - Err(err) => error!( + Ok(Err(err)) => error!( block_id = ?block_id, error = %err, - "Cannot get current block {block_id}, retrying", + "Cannot get block {block_id}, retrying", + ), + Err(_) => error!( + block_id = ?block_id, + "Timeout getting block {block_id}, retrying", ), } if i != REORG_RETRY_GET_BLOCK { @@ -530,17 +562,24 @@ impl InfiniteLogIter { error!("No provider, inconsistent state"); return Err(anyhow::anyhow!("No provider, inconsistent state")); }; - let block = provider.get_block_by_hash(block_hash).await; - match block { - Ok(Some(block)) => return Ok(block), - Ok(None) => error!( + let block = tokio::time::timeout( + Duration::from_secs(self.timeout_request_websocket), + provider.get_block_by_hash(block_hash), + ); + match block.await { + Ok(Ok(Some(block))) => return Ok(block), + Ok(Ok(None)) => error!( block_hash = ?block_hash, - "Cannot get block, retrying", + "Cannot get block by hash, retrying", ), - Err(err) => error!( + Ok(Err(err)) => error!( block_hash = ?block_hash, error = %err, - "Cannot get block, retrying", + "Cannot get block by hash, retrying", + ), + Err(_) => error!( + block_hash = ?block_hash, + "Timeout getting block by hash, retrying", ), } if i != REORG_RETRY_GET_BLOCK { @@ -551,7 +590,7 @@ impl InfiniteLogIter { } } Err(anyhow::anyhow!( - "Cannot get block {block_hash} after retries" + "Cannot get block by hash {block_hash} after retries" )) } @@ -568,10 +607,27 @@ impl InfiniteLogIter { error!("No provider, inconsistent state"); return Err(anyhow::anyhow!("No provider, inconsistent state")); }; - let logs = provider.get_logs(&filter).await; - match logs { - Ok(logs) => return Ok(logs), - Err(err) => { + match tokio::time::timeout( + Duration::from_secs(self.timeout_request_websocket), + provider.get_logs(&filter), + ) + .await + { + Err(_) => { + error!( + block_hash = ?block_hash, + "Timeout getting logs for block {block_hash}, retrying", + ); + tokio::time::sleep(Duration::from_millis( + RETRY_GET_LOGS_DELAY_IN_MS, + )) + .await; + continue; + } + Ok(Ok(logs)) => { + return Ok(logs); + } + Ok(Err(err)) => { error!( block_hash = ?block_hash, error = %err, @@ -853,6 +909,7 @@ impl InfiniteLogIter { let Ok(block_logs) = self.find_last_block_and_logs().await else { error!("Cannot get last block and logs"); + self.stream = None; // to restart continue; }; warn!( diff --git a/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs b/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs index a9b0f06c06..dc254de057 100644 --- a/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs +++ b/coprocessor/fhevm-engine/host-listener/tests/host_listener_integration_tests.rs @@ -234,6 +234,7 @@ async fn setup(node_chain_id: Option) -> Result { catchup_finalization_in_blocks: 2, dependence_by_connexity: false, dependence_cross_block: true, + timeout_request_websocket: 30, }; let health_check_url = format!("http://127.0.0.1:{}", args.health_port); @@ -288,6 +289,7 @@ async fn test_only_catchup_loop_requires_negative_start_at_block( catchup_loop_sleep_secs: 60, dependence_by_connexity: false, dependence_cross_block: true, + timeout_request_websocket: 30, }; let result = main(args).await;