diff --git a/Cargo.lock b/Cargo.lock index 957acfe3d..837fdc2c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8518,6 +8518,7 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "ethportal-api", + "futures", "lazy_static", "parking_lot 0.12.3", "quickcheck", diff --git a/crates/portalnet/src/overlay/protocol.rs b/crates/portalnet/src/overlay/protocol.rs index 6ff2dab68..77f32e6db 100644 --- a/crates/portalnet/src/overlay/protocol.rs +++ b/crates/portalnet/src/overlay/protocol.rs @@ -227,11 +227,7 @@ impl< content_key: TContentKey, content_value: RawContentValue, ) -> PutContentInfo { - let should_we_store = match self - .store - .lock() - .is_key_within_radius_and_unavailable(&content_key) - { + let should_we_store = match self.store.lock().should_we_store(&content_key) { Ok(should_we_store) => matches!(should_we_store, ShouldWeStoreContent::Store), Err(err) => { warn!( @@ -492,12 +488,14 @@ impl< Ok(Response::Content(found_content)) => { match found_content { Content::Content(content) => { - match self.validate_content(&content_key, &content).await { - Ok(_) => Ok((Content::Content(content), false)), - Err(msg) => Err(OverlayRequestError::FailedValidation(format!( - "Network: {:?}, Reason: {msg:?}", + let validation_result = self.validate_content(&content_key, &content).await; + if validation_result.is_valid() { + Ok((Content::Content(content), false)) + } else { + Err(OverlayRequestError::FailedValidation(format!( + "Network: {:?}, Reason: {validation_result:?}", self.protocol - ))), + ))) } } Content::Enrs(_) => Ok((found_content, false)), @@ -514,12 +512,14 @@ impl< }; let content = RawContentValue::from(bytes); - match self.validate_content(&content_key, &content).await { - Ok(_) => Ok((Content::Content(content), true)), - Err(msg) => Err(OverlayRequestError::FailedValidation(format!( - "Network: {:?}, Reason: {msg:?}", + let validation_result = self.validate_content(&content_key, &content).await; + if validation_result.is_valid() { + Ok((Content::Content(content), true)) + } else { + Err(OverlayRequestError::FailedValidation(format!( + "Network: {:?}, Reason: {validation_result:?}", self.protocol - ))), + ))) } } } @@ -533,13 +533,15 @@ impl< &self, content_key: &TContentKey, content: &[u8], - ) -> anyhow::Result> { + ) -> ValidationResult { let validation_result = self.validator.validate_content(content_key, content).await; - self.metrics.report_validation(validation_result.is_ok()); - - validation_result.map_err(|err| { - anyhow!("Content validation failed for content key {content_key:?} with error: {err:?}") - }) + self.metrics.report_validation(validation_result.is_valid()); + if !validation_result.is_valid() { + warn!( + "Content validation failed for content key {content_key:?}: {validation_result:?}" + ) + } + validation_result } /// Initialize FindContent uTP stream with remote node diff --git a/crates/portalnet/src/overlay/service/find_content.rs b/crates/portalnet/src/overlay/service/find_content.rs index c7cc72ae0..6311e71f0 100644 --- a/crates/portalnet/src/overlay/service/find_content.rs +++ b/crates/portalnet/src/overlay/service/find_content.rs @@ -525,37 +525,36 @@ impl< .await; utp_processing .metrics - .report_validation(validation_result.is_ok()); - - let validation_result = match validation_result { - Ok(validation_result) => validation_result, - Err(err) => { - warn!( - error = ?err, - content.id = %hex_encode_compact(content_id), - content.key = %content_key, - "Error validating content" - ); - // Indicate to the query that the content is invalid - let _ = valid_content_callback.send(None); - if let Some(query_trace_events_tx) = query_trace_events_tx { - let _ = query_trace_events_tx.send(QueryTraceEvent::Failure( - query_id, - sending_peer, - QueryFailureKind::InvalidContent, - )); - } - return; + .report_validation(validation_result.is_valid()); + + if !validation_result.is_valid() { + warn!( + content.id = %hex_encode_compact(content_id), + content.key = %content_key, + ?validation_result, + "Error validating content" + ); + // Indicate to the query that the content is invalid + let _ = valid_content_callback.send(None); + if let Some(query_trace_events_tx) = query_trace_events_tx { + let _ = query_trace_events_tx.send(QueryTraceEvent::Failure( + query_id, + sending_peer, + QueryFailureKind::InvalidContent, + )); } + return; }; - // skip storing if content is not valid for storing, the content - // is already stored or if there's an error reading the store - let should_store = validation_result.valid_for_storing + // store content that: + // - is canonically valid + // - is not already stored + // - is within radius (if applicable) + let should_store = validation_result.is_canonically_valid() && utp_processing .store .lock() - .is_key_within_radius_and_unavailable(&content_key) + .should_we_store(&content_key) .map_or_else( |err| { error!("Unable to read store: {err}"); @@ -571,17 +570,12 @@ impl< { Ok(dropped_content) => { let mut content_to_propagate = vec![(content_key.clone(), content.clone())]; - if let Some(additional_content_to_propagate) = - validation_result.additional_content_to_propagate - { - content_to_propagate.push(additional_content_to_propagate); - } if !dropped_content.is_empty() && utp_processing.gossip_dropped { debug!( "Dropped {:?} pieces of content after inserting new content, propagating them back into the network.", dropped_content.len(), ); - content_to_propagate.extend(dropped_content.clone()); + content_to_propagate.extend(dropped_content); } propagate_put_content_cross_thread::<_, TMetric>( content_to_propagate, diff --git a/crates/portalnet/src/overlay/service/offer.rs b/crates/portalnet/src/overlay/service/offer.rs index 662125516..8918d0208 100644 --- a/crates/portalnet/src/overlay/service/offer.rs +++ b/crates/portalnet/src/overlay/service/offer.rs @@ -17,6 +17,7 @@ use ethportal_api::{ OverlayContentKey, RawContentKey, RawContentValue, }; use futures::{channel::oneshot, future::join_all}; +use itertools::Itertools; use parking_lot::Mutex; use tokio::task::JoinHandle; use tracing::{debug, enabled, error, trace, warn, Level}; @@ -118,18 +119,19 @@ impl< let mut accepted_keys: Vec = Vec::default(); - for (i, key) in content_keys.iter().enumerate() { - // Accept content if within radius and not already present in the data store. - let accept = self - .store - .lock() - .is_key_within_radius_and_unavailable(key) - .map_err(|err| { - OverlayRequestError::AcceptError(format!( - "Unable to check content availability {err}" - )) - })?; - let accept_code = match accept { + let should_we_store_batch = self + .store + .lock() + .should_we_store_batch(content_keys.as_slice()) + .map_err(|err| { + OverlayRequestError::AcceptError(format!( + "Unable to check content availability {err}" + )) + })?; + for (i, should_we_store) in should_we_store_batch.iter().enumerate() { + let key = &content_keys[i]; + let accept_code = match should_we_store { + // Accept content if within radius and not already present in the data store. ShouldWeStoreContent::Store => { // accept all keys that are successfully added to the queue if self.accept_queue.write().add_key_to_queue(key, &enr) { @@ -162,10 +164,13 @@ impl< let cid: ConnectionId = self.utp_controller.cid(enr.node_id(), false); let cid_send = cid.send; - let content_keys_string: Vec = content_keys + let content_keys_string: Vec = request + .content_keys .iter() - .map(|content_key| content_key.to_hex()) + .map(ToString::to_string) .collect(); + let accepted_keys_string: Vec = + accepted_keys.iter().map(ToString::to_string).collect(); trace!( protocol = %self.protocol, @@ -174,6 +179,7 @@ impl< cid.recv = cid.recv, enr = enr_str, request.content_keys = ?content_keys_string, + accepted_keys = ?accepted_keys_string, "Content keys handled by offer", ); @@ -210,7 +216,11 @@ impl< protocol_version ) .await { - debug!(%err, ?content_key, "Fallback FINDCONTENT task failed, after uTP transfer failed"); + debug!( + %err, + ?content_key, + "Fallback FINDCONTENT task failed, after uTP transfer failed", + ); } }) }) @@ -221,13 +231,12 @@ impl< } }; - // Spawn fallback FINDCONTENT tasks for each content key - // in payloads that failed to be accepted. - let content_values = match decode_and_validate_content_payload(&accepted_keys, data) { + let content_items = match decode_payload(&accepted_keys, data) { Ok(content_values) => content_values, Err(err) => { - debug!(%err, ?content_keys_string, "Decoding and validating content payload failed"); - let handles: Vec> = content_keys + // Spawn fallback FINDCONTENT tasks for each content key that was accepted. + debug!(%err, ?accepted_keys_string, "Decoding and validating content payload failed"); + let handles: Vec> = accepted_keys .into_iter() .map(|content_key| { let utp_processing = utp_processing.clone(); @@ -238,7 +247,11 @@ impl< protocol_version ) .await { - debug!(%err, ?content_key, "Fallback FINDCONTENT task failed, decoding and validating content payload failed"); + debug!( + %err, + ?content_key, + "Fallback FINDCONTENT task failed, decoding and validating content payload failed", + ); } }) }) @@ -249,63 +262,39 @@ impl< } }; - let handles = accepted_keys + let validated_content = + Self::validate_content(content_items, utp_processing.clone()).await; + + let content_to_propagate = validated_content .into_iter() - .zip(content_values) - .map(|(key, value)| { - let utp_processing = utp_processing.clone(); - tokio::spawn(async move { - match Self::validate_and_store_content( - key.clone(), + .flat_map(|(key, value, valid)| { + if valid { + utp_processing.accept_queue.write().remove_key(&key); + Self::store_content( + &key, value, utp_processing.clone(), ) - .await - { - Some(validated_content) => { - utp_processing.accept_queue.write().remove_key(&key); - Some(validated_content) - } - None => { - // Spawn a fallback FINDCONTENT task for each content key - // that failed individual processing. - if let Err(err) = Self::fallback_find_content( - key.clone(), - utp_processing, - protocol_version - ) - .await { - debug!(%err, ?key, "Fallback FINDCONTENT task failed, after validating and storing content failed"); - } - None + } else { + // Spawn a fallback FINDCONTENT task for each content key that failed + // validation + let utp_processing = utp_processing.clone(); + tokio::spawn(async move { + if let Err(err) = Self::fallback_find_content( + key.clone(), + utp_processing, + protocol_version + ) + .await { + debug!(%err, ?key, "Fallback FINDCONTENT task failed, after validating and storing content failed"); } - } - }) + }); + vec![] + } }) .collect::>(); - let validated_content: Vec<(TContentKey, RawContentValue)> = join_all(handles) - .await - .into_iter() - .enumerate() - .filter_map(|(index, value)| { - value.unwrap_or_else(|err| { - let err = err.into_panic(); - let err = if let Some(err) = err.downcast_ref::<&'static str>() { - err.to_string() - } else if let Some(err) = err.downcast_ref::() { - err.clone() - } else { - format!("{err:?}") - }; - debug!(err, content_key = ?content_keys_string[index], "Process uTP payload tokio task failed:"); - // Do we want to fallback find content here? - None - }) - }) - .flatten() - .collect(); propagate_put_content_cross_thread::<_, TMetric>( - validated_content, + content_to_propagate, &utp_processing.kbuckets, utp_processing.command_tx.clone(), Some(utp_processing.utp_controller), @@ -485,54 +474,52 @@ impl< None, None, )))?; - let data: RawContentValue = match rx.await? { - Ok(Response::Content(found_content)) => { - match found_content { - Content::Content(content) => content, - Content::Enrs(_) => return Err(anyhow!("expected content, got ENRs")), - // Init uTP stream if `connection_id` is received - Content::ConnectionId(conn_id) => { - let conn_id = u16::from_be(conn_id); - let cid = utp_rs::cid::ConnectionId { - recv: conn_id, - send: conn_id.wrapping_add(1), - peer_id: fallback_peer.node_id(), - }; - let bytes = utp_processing - .utp_controller - .connect_inbound_stream(cid, UtpPeer(fallback_peer.clone())) - .await?; - - match protocol_version.is_v1_enabled() { - true => match decode_single_content_payload(bytes) { - Ok(bytes) => bytes, - Err(err) => bail!( - "Unable to decode content payload from FINDCONTENT v1 response {err:?}", - ), - }, - false => bytes, - } + let content_value: RawContentValue = match rx.await? { + Ok(Response::Content(Content::Content(content))) => content, + Ok(Response::Content(Content::Enrs(_))) => bail!("expected content, got ENRs"), + Ok(Response::Content(Content::ConnectionId(conn_id))) => { + let conn_id = u16::from_be(conn_id); + let cid = utp_rs::cid::ConnectionId { + recv: conn_id, + send: conn_id.wrapping_add(1), + peer_id: fallback_peer.node_id(), + }; + let bytes = utp_processing + .utp_controller + .connect_inbound_stream(cid, UtpPeer(fallback_peer.clone())) + .await?; + + if protocol_version.is_v1_enabled() { + match decode_single_content_payload(bytes) { + Ok(bytes) => bytes, + Err(err) => bail!( + "Unable to decode content payload from FINDCONTENT v1 response {err:?}", + ), } + } else { + bytes } } - _ => return Err(anyhow!("invalid response")), - }; - let validated_content = match Self::validate_and_store_content( - content_key, - data, - utp_processing.clone(), - ) - .await - { - Some(validated_content) => validated_content, - None => { - debug!("Fallback FINDCONTENT request to peer {fallback_peer} did not yield valid content"); - return Ok(()); - } + Ok(response) => bail!("invalid overlay response: {response:?}"), + Err(err) => bail!("overlay request error: {err}"), }; + let (content_key, content_value, valid) = + Self::validate_content(vec![(content_key, content_value)], utp_processing.clone()) + .await + .remove(0); + + if !valid { + debug!( + "Fallback FINDCONTENT request to peer {fallback_peer} did not yield valid content" + ); + return Ok(()); + } + + let content_to_propagate = + Self::store_content(&content_key, content_value, utp_processing.clone()); propagate_put_content_cross_thread::<_, TMetric>( - validated_content, + content_to_propagate, &utp_processing.kbuckets, utp_processing.command_tx.clone(), Some(utp_processing.utp_controller), @@ -540,71 +527,68 @@ impl< Ok(()) } - /// Validates & stores content value received from peer. - /// Checks if validated content should be stored, and stores it if true - /// Returns validated content/content dropped from storage to - /// propagate to other peers. - // (this step requires a dedicated task since it might require - // non-blocking requests to this/other overlay networks). - async fn validate_and_store_content( - key: TContentKey, - content_value: RawContentValue, + /// Validates content values received from a peer. + /// + /// Returns content items that were successfully validated. + async fn validate_content( + content_items: Vec<(TContentKey, RawContentValue)>, utp_processing: UtpProcessing, - ) -> Option> { - // Validate received content - let validation_result = utp_processing + ) -> Vec<(TContentKey, RawContentValue, bool)> { + let validation_results = utp_processing .validator - .validate_content(&key, &content_value) + .validate_content_batch(&content_items) .await; - utp_processing - .metrics - .report_validation(validation_result.is_ok()); - let validation_result = match validation_result { - Ok(validation_result) => validation_result, - Err(err) => { - // Skip storing & propagating content if it's not valid - warn!( - error = %err, - content.key = %key.to_hex(), - "Error validating accepted content" - ); - return None; - } - }; + content_items + .into_iter() + .zip_eq(validation_results) + .map(|((content_key, content_value), validation_result)| { + utp_processing + .metrics + .report_validation(validation_result.is_canonically_valid()); - if !validation_result.valid_for_storing { - // Content received via Offer/Accept should be valid for storing. - // If it isn't, don't store it and don't propagate it. - warn!( - content.key = %key.to_hex(), - "Error validating accepted content - not valid for storing" - ); - return None; - } + if validation_result.is_canonically_valid() { + (content_key, content_value, true) + } else { + warn!( + content.key = %content_key.to_hex(), + ?validation_result, + "Error validating accepted content" + ); + (content_key, content_value, false) + } + }) + .collect() + } + /// Stores content item received from a peer. + /// + /// It assumes that content is already validated. + /// + /// Returns stored content and content dropped from storage, that should be propagate to other + /// peers. + fn store_content( + key: &TContentKey, + content_value: RawContentValue, + utp_processing: UtpProcessing, + ) -> Vec<(TContentKey, RawContentValue)> { // Collect all content to propagate let mut content_to_propagate = vec![(key.clone(), content_value.clone())]; - if let Some(additional_content_to_propagate) = - validation_result.additional_content_to_propagate - { - content_to_propagate.push(additional_content_to_propagate); - } - // Check if data should be stored, and store if it is within our radius and not - // already stored. - let key_desired = utp_processing - .store - .lock() - .is_key_within_radius_and_unavailable(&key); + // Check if data should be stored, and store if it is within our radius and not already + // stored. + let key_desired = utp_processing.store.lock().should_we_store(key); match key_desired { Ok(ShouldWeStoreContent::Store) => { match utp_processing.store.lock().put(key.clone(), &content_value) { Ok(dropped_content) => { if !dropped_content.is_empty() && utp_processing.gossip_dropped { // add dropped content to validation result, so it will be propagated - debug!("Dropped {:?} pieces of content after inserting new content, propagating them back into the network.", dropped_content.len()); - content_to_propagate.extend(dropped_content.clone()); + debug!( + "Dropped {} pieces of content after inserting new content, propagating them back into the network.", + dropped_content.len(), + ); + content_to_propagate.extend(dropped_content); } } Err(err) => warn!( @@ -634,7 +618,7 @@ impl< ); } }; - Some(content_to_propagate) + content_to_propagate } /// Provide the requested content key and content value for the acceptor @@ -676,10 +660,10 @@ impl< } } -fn decode_and_validate_content_payload( +fn decode_payload( accepted_keys: &[TContentKey], payload: Bytes, -) -> anyhow::Result> { +) -> anyhow::Result> { let content_values = portal_wire::decode_content_payload(payload)?; // Accepted content keys len should match content value len let keys_len = accepted_keys.len(); @@ -691,5 +675,9 @@ fn decode_and_validate_content_payload( vals_len )); } - Ok(content_values) + Ok(accepted_keys + .iter() + .cloned() + .zip_eq(content_values) + .collect()) } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 5e6b8aeef..416e957de 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -39,6 +39,7 @@ pub trait ContentStore { fn get(&self, key: &Self::Key) -> Result, ContentStoreError>; /// Puts a piece of content into the store. + /// /// Returns a list of keys that were evicted from the store, which should be gossiped into the /// network. In the future this might be updated to a separate table that stores a queue /// of content keys to be gossiped and gossips them in a background task. @@ -51,10 +52,17 @@ pub trait ContentStore { /// Returns whether the content denoted by `key` is within the radius of the data store and not /// already stored within the data store. - fn is_key_within_radius_and_unavailable( + fn should_we_store(&self, key: &Self::Key) -> Result; + + /// Performs [ContentStore::should_we_store] for multiple content keys. + /// + /// The default implementation calls `self.should_we_store` for each key. + fn should_we_store_batch( &self, - key: &Self::Key, - ) -> Result; + keys: &[Self::Key], + ) -> Result, ContentStoreError> { + keys.iter().map(|key| self.should_we_store(key)).collect() + } /// Returns the radius of the data store. fn radius(&self) -> Distance; @@ -122,10 +130,7 @@ impl ContentStore for MemoryContentStore { Ok(vec![]) } - fn is_key_within_radius_and_unavailable( - &self, - key: &Self::Key, - ) -> Result { + fn should_we_store(&self, key: &Self::Key) -> Result { if key.affected_by_radius() && self.distance_to_key(key) > self.radius { return Ok(ShouldWeStoreContent::NotWithinRadius); } @@ -251,18 +256,14 @@ pub mod test { // Arbitrary key within radius and unavailable. let arb_key = IdentityContentKey::new(node_id.raw()); assert_eq!( - store - .is_key_within_radius_and_unavailable(&arb_key) - .unwrap(), + store.should_we_store(&arb_key).unwrap(), ShouldWeStoreContent::Store ); // Arbitrary key available. let _ = store.put(arb_key.clone(), val); assert_eq!( - store - .is_key_within_radius_and_unavailable(&arb_key) - .unwrap(), + store.should_we_store(&arb_key).unwrap(), ShouldWeStoreContent::AlreadyStored ); } diff --git a/crates/subnetworks/beacon/src/storage.rs b/crates/subnetworks/beacon/src/storage.rs index fe98abcb5..c891e5896 100644 --- a/crates/subnetworks/beacon/src/storage.rs +++ b/crates/subnetworks/beacon/src/storage.rs @@ -196,7 +196,7 @@ impl ContentStore for BeaconStorage { } /// The "radius" concept is not applicable for Beacon network - fn is_key_within_radius_and_unavailable( + fn should_we_store( &self, key: &BeaconContentKey, ) -> Result { @@ -836,21 +836,21 @@ mod test { assert_eq!(result, value.as_ssz_bytes()); // Test is_key_within_radius_and_unavailable for the same finalized slot - let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap(); + let should_store_content = storage.should_we_store(&key).unwrap(); assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored); // Test is_key_within_radius_and_unavailable for older finalized slot let key = BeaconContentKey::LightClientFinalityUpdate(LightClientFinalityUpdateKey { finalized_slot: finalized_slot - 1, }); - let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap(); + let should_store_content = storage.should_we_store(&key).unwrap(); assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored); // Test is_key_within_radius_and_unavailable for newer finalized slot let key = BeaconContentKey::LightClientFinalityUpdate(LightClientFinalityUpdateKey { finalized_slot: finalized_slot + 1, }); - let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap(); + let should_store_content = storage.should_we_store(&key).unwrap(); assert_eq!(should_store_content, ShouldWeStoreContent::Store); // Test getting the latest finality update @@ -875,21 +875,21 @@ mod test { assert_eq!(result, value.as_ssz_bytes()); // Test is_key_within_radius_and_unavailable for the same signature slot - let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap(); + let should_store_content = storage.should_we_store(&key).unwrap(); assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored); // Test is_key_within_radius_and_unavailable for older signature slot let key = BeaconContentKey::LightClientOptimisticUpdate(LightClientOptimisticUpdateKey { signature_slot: signature_slot - 1, }); - let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap(); + let should_store_content = storage.should_we_store(&key).unwrap(); assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored); // Test is_key_within_radius_and_unavailable for newer signature slot let key = BeaconContentKey::LightClientOptimisticUpdate(LightClientOptimisticUpdateKey { signature_slot: signature_slot + 1, }); - let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap(); + let should_store_content = storage.should_we_store(&key).unwrap(); assert_eq!(should_store_content, ShouldWeStoreContent::Store); // Test getting unavailable optimistic update @@ -927,21 +927,21 @@ mod test { assert_eq!(result, value.encode()); // Test is_key_within_radius_and_unavailable for the same epoch - let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap(); + let should_store_content = storage.should_we_store(&key).unwrap(); assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored); // Test is_key_within_radius_and_unavailable for older epoch let key = BeaconContentKey::HistoricalSummariesWithProof(HistoricalSummariesWithProofKey { epoch: epoch - 1, }); - let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap(); + let should_store_content = storage.should_we_store(&key).unwrap(); assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored); // Test is_key_within_radius_and_unavailable for newer epoch let key = BeaconContentKey::HistoricalSummariesWithProof(HistoricalSummariesWithProofKey { epoch: epoch + 1, }); - let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap(); + let should_store_content = storage.should_we_store(&key).unwrap(); assert_eq!(should_store_content, ShouldWeStoreContent::Store); // Test getting unavailable historical summaries with proof diff --git a/crates/subnetworks/beacon/src/validation.rs b/crates/subnetworks/beacon/src/validation.rs index 33753accc..a960c5117 100644 --- a/crates/subnetworks/beacon/src/validation.rs +++ b/crates/subnetworks/beacon/src/validation.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use alloy::primitives::B256; use anyhow::anyhow; use chrono::Duration; use ethportal_api::{ @@ -59,17 +58,17 @@ impl Validator for BeaconValidator { async fn validate_content( &self, content_key: &BeaconContentKey, - content: &[u8], - ) -> anyhow::Result> { + content_value: &[u8], + ) -> ValidationResult { match content_key { BeaconContentKey::LightClientBootstrap(_) => { - let bootstrap = ForkVersionedLightClientBootstrap::from_ssz_bytes(content) - .map_err(|err| { - anyhow!( - "Fork versioned light client bootstrap has invalid SSZ bytes: {:?}", - err - ) - })?; + let Ok(bootstrap) = + ForkVersionedLightClientBootstrap::from_ssz_bytes(content_value) + else { + return ValidationResult::Invalid( + "Error decoding light client bootstrap content value".to_string(), + ); + }; // Check if the light client bootstrap slot is ole than 4 months let four_months = Duration::days(30 * 4); @@ -79,9 +78,8 @@ impl Validator for BeaconValidator { let bootstrap_slot = bootstrap.get_slot(); if bootstrap_slot < four_months_ago_slot { - return Err(anyhow!( - "Light client bootstrap slot is too old: {}", - bootstrap_slot + return ValidationResult::Invalid(format!( + "Light client bootstrap slot is too old: {bootstrap_slot}", )); } @@ -90,24 +88,23 @@ impl Validator for BeaconValidator { if let Ok(finalized_header) = finalized_header { if finalized_header != bootstrap_block_header { - return Err(anyhow!( + return ValidationResult::Invalid(format!( "Light client bootstrap header does not match the finalized header: {finalized_header:?} != {bootstrap_block_header:?}", )); } } } BeaconContentKey::LightClientUpdatesByRange(key) => { - let lc_updates = - LightClientUpdatesByRange::from_ssz_bytes(content).map_err(|err| { - anyhow!( - "Light client updates by range has invalid SSZ bytes: {:?}", - err - ) - })?; + let Ok(lc_updates) = LightClientUpdatesByRange::from_ssz_bytes(content_value) + else { + return ValidationResult::Invalid( + "Error decoding Light client updates content value".to_string(), + ); + }; // Check if lc updates count match the content key count if lc_updates.0.len() as u64 != key.count { - return Err(anyhow!( + return ValidationResult::Invalid(format!( "Light client updates count does not match the content key count: {} != {}", lc_updates.0.len(), key.count @@ -126,35 +123,39 @@ impl Validator for BeaconValidator { match &update.update { LightClientUpdate::Electra(update) => { let generic_update: GenericUpdate = update.into(); - verify_generic_update( + if let Err(err) = verify_generic_update( &light_client_store, &generic_update, expected_slot, self.light_client_config.chain.genesis_root, self.light_client_config.forks.electra.fork_version, - )?; + ) { + return ValidationResult::Invalid(format!( + "Light client update not valid: {err:?}", + )); + } } _ => { - return Err(anyhow!("Unsupported light client update fork version")) + return ValidationResult::Invalid( + "Unsupported light client update fork version".to_string(), + ); } } } } } BeaconContentKey::LightClientFinalityUpdate(key) => { - let lc_finality_update = ForkVersionedLightClientFinalityUpdate::from_ssz_bytes( - content, - ) - .map_err(|err| { - anyhow!( - "Fork versioned light client finality update has invalid SSZ bytes: {:?}", - err - ) - })?; + let Ok(lc_finality_update) = + ForkVersionedLightClientFinalityUpdate::from_ssz_bytes(content_value) + else { + return ValidationResult::Invalid( + "Error decoding Light client finality update content value".to_string(), + ); + }; // Check if the light client finality update is from the recent fork if lc_finality_update.fork_name != ForkName::Electra { - return Err(anyhow!( + return ValidationResult::Invalid(format!( "Light client finality update is not from the recent fork. Expected Electra, got {}", lc_finality_update.fork_name )); @@ -165,7 +166,7 @@ impl Validator for BeaconValidator { let finalized_slot = lc_finality_update.get_finalized_slot(); if key.finalized_slot > finalized_slot { - return Err(anyhow!( + return ValidationResult::Invalid(format!( "Light client finality update finalized slot should be equal or greater than content key finalized slot: {} < {}", finalized_slot, key.finalized_slot @@ -182,36 +183,38 @@ impl Validator for BeaconValidator { .await { let generic_update: GenericUpdate = update.into(); - verify_generic_update( + if let Err(err) = verify_generic_update( &light_client_store, &generic_update, expected_current_slot(), self.light_client_config.chain.genesis_root, self.light_client_config.forks.electra.fork_version, - )?; + ) { + return ValidationResult::Invalid(format!( + "Light client finality update not valid: {err:?}", + )); + } } } _ => { - return Err(anyhow!( - "Unsupported light client finality update fork version" - )) + return ValidationResult::Invalid( + "Unsupported light client finality update fork version".to_string(), + ); } } } BeaconContentKey::LightClientOptimisticUpdate(key) => { - let lc_optimistic_update = - ForkVersionedLightClientOptimisticUpdate::from_ssz_bytes(content).map_err( - |err| { - anyhow!( - "Fork versioned light client optimistic update has invalid SSZ bytes: {:?}", - err - ) - }, - )?; + let Ok(lc_optimistic_update) = + ForkVersionedLightClientOptimisticUpdate::from_ssz_bytes(content_value) + else { + return ValidationResult::Invalid( + "Error decoding Light client optimistic update content value".to_string(), + ); + }; // Check if the light client optimistic update is from the recent fork if lc_optimistic_update.fork_name != ForkName::Electra { - return Err(anyhow!( + return ValidationResult::Invalid(format!( "Light client optimistic update is not from the recent fork. Expected Electra, got {}", lc_optimistic_update.fork_name )); @@ -220,7 +223,7 @@ impl Validator for BeaconValidator { // Check if key signature slot matches the light client optimistic update signature // slot if &key.signature_slot != lc_optimistic_update.update.signature_slot() { - return Err(anyhow!( + return ValidationResult::Invalid(format!( "Light client optimistic update signature slot does not match the content key signature slot: {} != {}", lc_optimistic_update.update.signature_slot(), key.signature_slot @@ -238,25 +241,32 @@ impl Validator for BeaconValidator { { let generic_update: GenericUpdate = update.into(); - verify_generic_update( + if let Err(err) = verify_generic_update( &light_client_store, &generic_update, expected_current_slot(), self.light_client_config.chain.genesis_root, self.light_client_config.forks.electra.fork_version, - )?; + ) { + return ValidationResult::Invalid(format!( + "Light client optimistic update not valid: {err:?}", + )); + } } } _ => { - return Err(anyhow!( - "Unsupported light client optimistic update fork version" - )) + return ValidationResult::Invalid( + "Unsupported light client optimistic update fork version".to_string(), + ); } } } BeaconContentKey::HistoricalSummariesWithProof(key) => { let historical_summaries_with_proof = - Self::general_summaries_validation(content, key)?; + match Self::general_summaries_validation(content_value, key) { + Ok(historical_summaries_with_proof) => historical_summaries_with_proof, + Err(err) => return ValidationResult::Invalid(err.to_string()), + }; let latest_finalized_root = self .header_oracle @@ -266,17 +276,28 @@ impl Validator for BeaconValidator { .await; if let Ok(latest_finalized_root) = latest_finalized_root { - Self::state_summaries_validation( - historical_summaries_with_proof, + // Validate historical summaries against the latest finalized state root + if !verify_merkle_proof( + historical_summaries_with_proof + .historical_summaries + .tree_hash_root(), + &historical_summaries_with_proof.proof, + HistoricalSummariesProof::capacity(), + HISTORICAL_SUMMARIES_GINDEX, latest_finalized_root, - )? + ) { + return ValidationResult::Invalid( + "Merkle proof validation failed for HistoricalSummariesProof" + .to_string(), + ); + } } else { warn!("Failed to get latest finalized state root. Bypassing historical summaries with proof validation"); } } } - Ok(ValidationResult::new(true)) + ValidationResult::CanonicallyValid } } @@ -287,9 +308,8 @@ impl BeaconValidator { key: &HistoricalSummariesWithProofKey, ) -> anyhow::Result { let fork_versioned_historical_summaries = - ForkVersionedHistoricalSummariesWithProof::from_ssz_bytes(content).map_err(|err| { - anyhow!("Historical summaries with proof has invalid SSZ bytes: {err:?}") - })?; + ForkVersionedHistoricalSummariesWithProof::from_ssz_bytes(content) + .map_err(|_| anyhow!("Error decoding hisorical summaries content value"))?; let historical_summaries_with_proof = fork_versioned_historical_summaries.historical_summaries_with_proof; @@ -302,33 +322,12 @@ impl BeaconValidator { key.epoch )); } - Ok(historical_summaries_with_proof) - } - /// Validate historical summaries against the latest finalized state root - fn state_summaries_validation( - historical_summaries_with_proof: HistoricalSummariesWithProof, - latest_finalized_root: B256, - ) -> anyhow::Result<()> { - if verify_merkle_proof( - historical_summaries_with_proof - .historical_summaries - .tree_hash_root(), - &historical_summaries_with_proof.proof, - HistoricalSummariesProof::capacity(), - HISTORICAL_SUMMARIES_GINDEX, - latest_finalized_root, - ) { - Ok(()) - } else { - Err(anyhow!( - "Merkle proof validation failed for HistoricalSummariesProof" - )) - } + Ok(historical_summaries_with_proof) } } + #[cfg(test)] -#[allow(clippy::unwrap_used)] mod tests { use ethportal_api::{ types::{ @@ -359,12 +358,10 @@ mod tests { let content_key = BeaconContentKey::LightClientBootstrap(LightClientBootstrapKey { block_hash: [0; 32], }); - let result = validator + assert!(validator .validate_content(&content_key, &content) .await - .unwrap(); - - assert!(result.valid_for_storing); + .is_canonically_valid()); // Expect error because the light client bootstrap slot is too old bootstrap @@ -374,14 +371,9 @@ mod tests { .beacon .slot = 0; let content = bootstrap.as_ssz_bytes(); - let result = validator - .validate_content(&content_key, &content) - .await - .unwrap_err(); - assert_eq!( - result.to_string(), - "Light client bootstrap slot is too old: 0" + validator.validate_content(&content_key, &content).await, + ValidationResult::Invalid("Light client bootstrap slot is too old: 0".to_string()), ); } @@ -396,25 +388,21 @@ mod tests { start_period: 0, count: 1, }); - let result = validator + assert!(validator .validate_content(&content_key, &content) .await - .unwrap(); - - assert!(result.valid_for_storing); + .is_canonically_valid()); let lc_update_1 = test_utils::get_light_client_update(1); let updates = LightClientUpdatesByRange(VariableList::from(vec![lc_update_0, lc_update_1])); let content = updates.as_ssz_bytes(); // Expect error because the count does not match the content key count - let result = validator - .validate_content(&content_key, &content) - .await - .unwrap_err(); - assert_eq!( - result.to_string(), - "Light client updates count does not match the content key count: 2 != 1" + validator.validate_content(&content_key, &content).await, + ValidationResult::Invalid( + "Light client updates count does not match the content key count: 2 != 1" + .to_string() + ), ); } @@ -429,12 +417,10 @@ mod tests { finalized_slot: 10934316269310501102, }); - let result = validator + assert!(validator .validate_content(&content_key, &content) .await - .unwrap(); - - assert!(result.valid_for_storing); + .is_canonically_valid()); // Expect error because the content key finalized slot is greaten than the light client // update finalized slot @@ -442,14 +428,12 @@ mod tests { BeaconContentKey::LightClientFinalityUpdate(LightClientFinalityUpdateKey { finalized_slot: 10934316269310501102 + 1, }); - let result = validator - .validate_content(&invalid_content_key, &content) - .await - .unwrap_err(); - assert_eq!( - result.to_string(), - "Light client finality update finalized slot should be equal or greater than content key finalized slot: 10934316269310501102 < 10934316269310501103" + validator.validate_content(&invalid_content_key, &content).await, + ValidationResult::Invalid( + "Light client finality update finalized slot should be equal or greater than content key finalized slot: 10934316269310501102 < 10934316269310501103" + .to_string() + ), ); } @@ -463,49 +447,45 @@ mod tests { BeaconContentKey::LightClientOptimisticUpdate(LightClientOptimisticUpdateKey { signature_slot: 15067541596220156845, }); - let result = validator + assert!(validator .validate_content(&content_key, &content) .await - .unwrap(); - - assert!(result.valid_for_storing); + .is_canonically_valid()); // Expect error because the signature slot does not match the content key signature slot let invalid_content_key = BeaconContentKey::LightClientOptimisticUpdate(LightClientOptimisticUpdateKey { signature_slot: 0, }); - let result = validator - .validate_content(&invalid_content_key, &content) - .await - .unwrap_err(); - assert_eq!( - result.to_string(), - "Light client optimistic update signature slot does not match the content key signature slot: 15067541596220156845 != 0" + validator.validate_content(&invalid_content_key, &content).await, + ValidationResult::Invalid( + "Light client optimistic update signature slot does not match the content key signature slot: 15067541596220156845 != 0" + .to_string() + ), ); } mod historical_summaries { + use alloy::primitives::B256; + use super::*; #[tokio::test] async fn validate() { let test_data = read_test_data(); - let validation_result = test_data + assert!(test_data .create_validator() .validate_content( &test_data.content.content_key, &test_data.content.raw_content_value, ) .await - .expect("Should validate content"); - assert!(validation_result.valid_for_storing); + .is_canonically_valid()); } #[tokio::test] - #[should_panic = "Historical summaries with proof epoch does not match the content key epoch"] async fn validate_invalid_epoch() { let test_data = read_test_data(); @@ -515,15 +495,19 @@ mod tests { key.epoch += 1; } - test_data - .create_validator() - .validate_content(&content_key, &test_data.content.raw_content_value) - .await - .unwrap(); + assert_eq!( + test_data + .create_validator() + .validate_content(&content_key, &test_data.content.raw_content_value) + .await, + ValidationResult::Invalid( + "Historical summaries with proof epoch does not match the content key epoch: 450508969718611630 != 450508969718611631" + .to_string() + ), + ); } #[tokio::test] - #[should_panic = "Merkle proof validation failed for HistoricalSummariesProof"] async fn validate_invalid_proof() { let test_data = read_test_data(); @@ -539,11 +523,15 @@ mod tests { .swap(0, 1); } - test_data - .create_validator() - .validate_content(&test_data.content.content_key, &content_value.encode()) - .await - .unwrap(); + assert_eq!( + test_data + .create_validator() + .validate_content(&test_data.content.content_key, &content_value.encode()) + .await, + ValidationResult::Invalid( + "Merkle proof validation failed for HistoricalSummariesProof".to_string() + ), + ); } fn read_test_data() -> TestData { diff --git a/crates/subnetworks/history/src/storage.rs b/crates/subnetworks/history/src/storage.rs index 2684ef138..202b03b27 100644 --- a/crates/subnetworks/history/src/storage.rs +++ b/crates/subnetworks/history/src/storage.rs @@ -34,7 +34,7 @@ impl ContentStore for HistoryStorage { .insert(&key, RawContentValue::copy_from_slice(value.as_ref())) } - fn is_key_within_radius_and_unavailable( + fn should_we_store( &self, key: &HistoryContentKey, ) -> Result { diff --git a/crates/subnetworks/history/src/validation.rs b/crates/subnetworks/history/src/validation.rs index 8d75b1a8e..403174bb8 100644 --- a/crates/subnetworks/history/src/validation.rs +++ b/crates/subnetworks/history/src/validation.rs @@ -1,12 +1,10 @@ use std::sync::Arc; -use alloy::{consensus::Header, primitives::B256}; -use anyhow::{anyhow, ensure}; +use alloy::{hex::ToHexExt, primitives::B256}; use ethportal_api::{ types::execution::{ block_body::BlockBody, header_with_proof::HeaderWithProof, receipts::Receipts, }, - utils::bytes::hex_encode, HistoryContentKey, }; use ssz::Decode; @@ -36,98 +34,127 @@ impl Validator for ChainHistoryValidator { async fn validate_content( &self, content_key: &HistoryContentKey, - content: &[u8], - ) -> anyhow::Result> { + content_value: &[u8], + ) -> ValidationResult { match content_key { HistoryContentKey::BlockHeaderByHash(key) => { - let header_with_proof = - HeaderWithProof::from_ssz_bytes(content).map_err(|err| { - anyhow!("Header by hash content has invalid encoding: {err:?}") - })?; + let Ok(header_with_proof) = HeaderWithProof::from_ssz_bytes(content_value) else { + return ValidationResult::Invalid( + "Header by hash content has invalid encoding".to_string(), + ); + }; + let header_hash = header_with_proof.header.hash_slow(); - ensure!( - header_hash == B256::from(key.block_hash), - "Content validation failed: Invalid header hash. Found: {header_hash:?} - Expected: {:?}", - hex_encode(header_hash) - ); - self.header_validator + if header_hash != B256::from(key.block_hash) { + return ValidationResult::Invalid(format!( + "Content validation failed: Invalid header hash. Found: {header_hash} - Expected: {:?}", + key.block_hash.encode_hex_with_prefix(), + )); + } + + if let Err(err) = self + .header_validator .validate_header_with_proof(&header_with_proof) - .await?; + .await + { + return ValidationResult::Invalid(err.to_string()); + } - Ok(ValidationResult::new(true)) + ValidationResult::CanonicallyValid } HistoryContentKey::BlockHeaderByNumber(key) => { - let header_with_proof = - HeaderWithProof::from_ssz_bytes(content).map_err(|err| { - anyhow!("Header by number content has invalid encoding: {err:?}") - })?; + let Ok(header_with_proof) = HeaderWithProof::from_ssz_bytes(content_value) else { + return ValidationResult::Invalid( + "Header by number content has invalid encoding".to_string(), + ); + }; + let header_number = header_with_proof.header.number; - ensure!( - header_number == key.block_number, - "Content validation failed: Invalid header number. Found: {header_number} - Expected: {}", - key.block_number - ); - self.header_validator + if header_number != key.block_number { + return ValidationResult::Invalid(format!( + "Content validation failed: Invalid header number. Found: {header_number} - Expected: {}", + key.block_number + )); + } + + if let Err(err) = self + .header_validator .validate_header_with_proof(&header_with_proof) - .await?; + .await + { + return ValidationResult::Invalid(err.to_string()); + } - Ok(ValidationResult::new(true)) + ValidationResult::CanonicallyValid } HistoryContentKey::BlockBody(key) => { - let block_body = BlockBody::from_ssz_bytes(content) - .map_err(|msg| anyhow!("Block Body content has invalid encoding: {:?}", msg))?; - let trusted_header: Header = self + let Ok(block_body) = BlockBody::from_ssz_bytes(content_value) else { + return ValidationResult::Invalid( + "Block Body content has invalid encoding".to_string(), + ); + }; + + let trusted_header = match self .header_oracle .read() .await .recursive_find_header_by_hash_with_proof(B256::from(key.block_hash)) - .await? - .header; - let actual_uncles_root = block_body.calculate_ommers_root(); - if actual_uncles_root != trusted_header.ommers_hash { - return Err(anyhow!( - "Content validation failed: Invalid uncles root. Found: {:?} - Expected: {:?}", - actual_uncles_root, - trusted_header.ommers_hash - )); - } - let actual_txs_root = block_body.transactions_root(); - if actual_txs_root != trusted_header.transactions_root { - return Err(anyhow!( - "Content validation failed: Invalid transactions root. Found: {:?} - Expected: {:?}", - actual_txs_root, - trusted_header.transactions_root - )); + .await + { + Ok(header_with_proof) => header_with_proof.header, + Err(err) => { + return ValidationResult::Invalid(format!( + "Can't find header by hash. Error: {err:?}", + )) + } + }; + + match block_body.validate_against_header(&trusted_header) { + Ok(_) => ValidationResult::CanonicallyValid, + Err(err) => { + ValidationResult::Invalid(format!("Error validating BlockBody: {err:?}",)) + } } - Ok(ValidationResult::new(true)) } HistoryContentKey::BlockReceipts(key) => { - let receipts = Receipts::from_ssz_bytes(content).map_err(|msg| { - anyhow!("Block Receipts content has invalid encoding: {:?}", msg) - })?; - let trusted_header: Header = self + let Ok(receipts) = Receipts::from_ssz_bytes(content_value) else { + return ValidationResult::Invalid( + "Block Receipts content has invalid encoding".to_string(), + ); + }; + + let trusted_header = match self .header_oracle .read() .await .recursive_find_header_by_hash_with_proof(B256::from(key.block_hash)) - .await? - .header; + .await + { + Ok(header_with_proof) => header_with_proof.header, + Err(err) => { + return ValidationResult::Invalid(format!( + "Can't find header by hash. Error: {err:?}", + )) + } + }; + let actual_receipts_root = receipts.root(); if actual_receipts_root != trusted_header.receipts_root { - return Err(anyhow!( + return ValidationResult::Invalid(format!( "Content validation failed: Invalid receipts root. Found: {:?} - Expected: {:?}", actual_receipts_root, trusted_header.receipts_root )); } - Ok(ValidationResult::new(true)) + + ValidationResult::CanonicallyValid } - HistoryContentKey::EphemeralHeaderOffer(_) => Err(anyhow!( - "Validation is not implemented for EphemeralHeaderOffer yet" - )), - HistoryContentKey::EphemeralHeadersFindContent(_) => Err(anyhow!( - "Validation is not implemented for EphemeralHeadersFindContent yet" - )), + HistoryContentKey::EphemeralHeaderOffer(_) => ValidationResult::Invalid( + "Validation is not implemented for EphemeralHeaderOffer yet".to_string(), + ), + HistoryContentKey::EphemeralHeadersFindContent(_) => ValidationResult::Invalid( + "Validation is not implemented for EphemeralHeadersFindContent yet".to_string(), + ), } } } @@ -163,14 +190,13 @@ mod tests { let chain_history_validator = ChainHistoryValidator::new(header_oracle); let content_key = HistoryContentKey::new_block_header_by_hash(header_with_proof.header.hash_slow()); - chain_history_validator + assert!(chain_history_validator .validate_content(&content_key, &header_with_proof_ssz) .await - .unwrap(); + .is_canonically_valid()); } #[test_log::test(tokio::test)] - #[should_panic(expected = "Execution block proof verification failed for pre-Merge header")] async fn invalidate_header_by_hash_with_invalid_number() { let header_with_proof_ssz = get_header_with_proof_ssz(); let mut header = @@ -183,14 +209,17 @@ mod tests { let header_oracle = default_header_oracle(); let chain_history_validator = ChainHistoryValidator::new(header_oracle); let content_key = HistoryContentKey::new_block_header_by_hash(header.header.hash_slow()); - chain_history_validator - .validate_content(&content_key, &content_value) - .await - .unwrap(); + assert_eq!( + chain_history_validator + .validate_content(&content_key, &content_value) + .await, + ValidationResult::Invalid( + "Execution block proof verification failed for pre-Merge header".to_string() + ), + ); } #[test_log::test(tokio::test)] - #[should_panic(expected = "Execution block proof verification failed for pre-Merge header")] async fn invalidate_header_by_hash_with_invalid_gaslimit() { let header_with_proof_ssz = get_header_with_proof_ssz(); let mut header = @@ -204,10 +233,15 @@ mod tests { let header_oracle = default_header_oracle(); let chain_history_validator = ChainHistoryValidator::new(header_oracle); let content_key = HistoryContentKey::new_block_header_by_hash(header.header.hash_slow()); - chain_history_validator - .validate_content(&content_key, &content_value) - .await - .unwrap(); + + assert_eq!( + chain_history_validator + .validate_content(&content_key, &content_value) + .await, + ValidationResult::Invalid( + "Execution block proof verification failed for pre-Merge header".to_string() + ), + ); } #[test_log::test(tokio::test)] @@ -219,14 +253,13 @@ mod tests { let chain_history_validator = ChainHistoryValidator::new(header_oracle); let content_key = HistoryContentKey::new_block_header_by_number(header_with_proof.header.number); - chain_history_validator + assert!(chain_history_validator .validate_content(&content_key, &header_with_proof_ssz) .await - .unwrap(); + .is_canonically_valid()); } #[test_log::test(tokio::test)] - #[should_panic(expected = "Execution block proof verification failed for pre-Merge header")] async fn invalidate_header_by_number_with_invalid_number() { let header_with_proof_ssz = get_header_with_proof_ssz(); let mut header = @@ -239,14 +272,17 @@ mod tests { let header_oracle = default_header_oracle(); let chain_history_validator = ChainHistoryValidator::new(header_oracle); let content_key = HistoryContentKey::new_block_header_by_number(header.header.number); - chain_history_validator - .validate_content(&content_key, &content_value) - .await - .unwrap(); + assert_eq!( + chain_history_validator + .validate_content(&content_key, &content_value) + .await, + ValidationResult::Invalid( + "Execution block proof verification failed for pre-Merge header".to_string() + ), + ); } #[test_log::test(tokio::test)] - #[should_panic(expected = "Execution block proof verification failed for pre-Merge header")] async fn invalidate_header_by_number_with_invalid_gaslimit() { let header_with_proof_ssz: Vec = get_header_with_proof_ssz(); let mut header = @@ -260,10 +296,14 @@ mod tests { let header_oracle = default_header_oracle(); let chain_history_validator = ChainHistoryValidator::new(header_oracle); let content_key = HistoryContentKey::new_block_header_by_number(header.header.number); - chain_history_validator - .validate_content(&content_key, &content_value) - .await - .unwrap(); + assert_eq!( + chain_history_validator + .validate_content(&content_key, &content_value) + .await, + ValidationResult::Invalid( + "Execution block proof verification failed for pre-Merge header".to_string() + ), + ); } fn default_header_oracle() -> Arc> { diff --git a/crates/subnetworks/state/src/storage.rs b/crates/subnetworks/state/src/storage.rs index f36b9498c..1b45a8108 100644 --- a/crates/subnetworks/state/src/storage.rs +++ b/crates/subnetworks/state/src/storage.rs @@ -49,7 +49,7 @@ impl ContentStore for StateStorage { } } - fn is_key_within_radius_and_unavailable( + fn should_we_store( &self, key: &StateContentKey, ) -> Result { diff --git a/crates/subnetworks/state/src/validation/validator.rs b/crates/subnetworks/state/src/validation/validator.rs index e8216bdf5..d471b7c24 100644 --- a/crates/subnetworks/state/src/validation/validator.rs +++ b/crates/subnetworks/state/src/validation/validator.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use alloy::primitives::{keccak256, B256}; -use anyhow::anyhow; use ethportal_api::{ types::content_key::state::{ AccountTrieNodeKey, ContractBytecodeKey, ContractStorageTrieNodeKey, @@ -31,21 +30,31 @@ impl Validator for StateValidator { &self, content_key: &StateContentKey, content_value: &[u8], - ) -> anyhow::Result> { - let content_value = StateContentValue::decode(content_key, content_value) - .map_err(|err| anyhow!("Error decoding StateContentValue: {err}"))?; + ) -> ValidationResult { + let content_value = match StateContentValue::decode(content_key, content_value) { + Ok(content_value) => content_value, + Err(err) => { + return ValidationResult::Invalid(format!( + "Error decoding StateContentValue: {err}" + )); + } + }; - match content_key { + let validation_result = match content_key { StateContentKey::AccountTrieNode(key) => { - Ok(self.validate_account_trie_node(key, content_value).await?) + self.validate_account_trie_node(key, content_value).await + } + StateContentKey::ContractStorageTrieNode(key) => { + self.validate_contract_storage_trie_node(key, content_value) + .await } - StateContentKey::ContractStorageTrieNode(key) => Ok(self - .validate_contract_storage_trie_node(key, content_value) - .await?), StateContentKey::ContractBytecode(key) => { - Ok(self.validate_contract_bytecode(key, content_value).await?) + self.validate_contract_bytecode(key, content_value).await } - } + }; + validation_result.unwrap_or_else(|err| { + ValidationResult::Invalid(format!("Error validating StateContentValue: {err:?}")) + }) } } @@ -54,11 +63,11 @@ impl StateValidator { &self, key: &AccountTrieNodeKey, value: StateContentValue, - ) -> Result, StateValidationError> { + ) -> Result { match value { StateContentValue::TrieNode(value) => { check_node_hash(&value.node, &key.node_hash)?; - Ok(ValidationResult::new(/* valid_for_storing= */ false)) + Ok(ValidationResult::Valid) } StateContentValue::AccountTrieNodeWithProof(value) => { let state_root = match DISABLE_HISTORY_HEADER_CHECK { @@ -67,7 +76,7 @@ impl StateValidator { }; validate_node_trie_proof(state_root, key.node_hash, &key.path, &value.proof)?; - Ok(ValidationResult::new(/* valid_for_storing= */ true)) + Ok(ValidationResult::CanonicallyValid) } _ => Err(StateValidationError::InvalidContentValueType( "AccountTrieNodeKey", @@ -79,11 +88,11 @@ impl StateValidator { &self, key: &ContractStorageTrieNodeKey, value: StateContentValue, - ) -> Result, StateValidationError> { + ) -> Result { match value { StateContentValue::TrieNode(value) => { check_node_hash(&value.node, &key.node_hash)?; - Ok(ValidationResult::new(/* valid_for_storing= */ false)) + Ok(ValidationResult::Valid) } StateContentValue::ContractStorageTrieNodeWithProof(value) => { let state_root = match DISABLE_HISTORY_HEADER_CHECK { @@ -99,7 +108,7 @@ impl StateValidator { &value.storage_proof, )?; - Ok(ValidationResult::new(/* valid_for_storing= */ true)) + Ok(ValidationResult::CanonicallyValid) } _ => Err(StateValidationError::InvalidContentValueType( "ContractStorageTrieNodeKey", @@ -111,12 +120,12 @@ impl StateValidator { &self, key: &ContractBytecodeKey, value: StateContentValue, - ) -> Result, StateValidationError> { + ) -> Result { match value { StateContentValue::ContractBytecode(value) => { let bytecode_hash = keccak256(&value.code[..]); if bytecode_hash == key.code_hash { - Ok(ValidationResult::new(/* valid_for_storing= */ false)) + Ok(ValidationResult::Valid) } else { Err(StateValidationError::InvalidBytecodeHash { bytecode_hash, @@ -140,7 +149,7 @@ impl StateValidator { let account_state = validate_account_state(state_root, &key.address_hash, &value.account_proof)?; if account_state.code_hash == key.code_hash { - Ok(ValidationResult::new(/* valid_for_storing= */ true)) + Ok(ValidationResult::CanonicallyValid) } else { Err(StateValidationError::InvalidBytecodeHash { bytecode_hash, @@ -235,10 +244,10 @@ mod tests { let validation_result = create_validator() .validate_content(&content_key, content_value.as_ref()) - .await?; + .await; assert_eq!( validation_result, - ValidationResult::new(false), + ValidationResult::Valid, "testing content_key: {}", content_key.to_hex() ); @@ -258,11 +267,11 @@ mod tests { let validation_result = create_validator_with_header(Header::decode(&mut header.as_ref())?) .validate_content(&content_key, content_value.as_ref()) - .await?; + .await; assert_eq!( validation_result, - ValidationResult::new(true), + ValidationResult::CanonicallyValid, "testing content_key: {}", content_key.to_hex() ); @@ -280,10 +289,10 @@ mod tests { let validation_result = create_validator() .validate_content(&content_key, content_value.as_ref()) - .await?; + .await; assert_eq!( validation_result, - ValidationResult::new(false), + ValidationResult::Valid, "testing content_key: {}", content_key.to_hex() ); @@ -303,11 +312,11 @@ mod tests { let validation_result = create_validator_with_header(Header::decode(&mut header.as_ref())?) .validate_content(&content_key, content_value.as_ref()) - .await?; + .await; assert_eq!( validation_result, - ValidationResult::new(true), + ValidationResult::CanonicallyValid, "testing content_key: {}", content_key.to_hex() ); @@ -325,10 +334,10 @@ mod tests { let validation_result = create_validator() .validate_content(&content_key, content_value.as_ref()) - .await?; + .await; assert_eq!( validation_result, - ValidationResult::new(false), + ValidationResult::Valid, "testing content_key: {}", content_key.to_hex() ); @@ -348,10 +357,10 @@ mod tests { let validation_result = create_validator_with_header(Header::decode(&mut header.as_ref())?) .validate_content(&content_key, content_value.as_ref()) - .await?; + .await; assert_eq!( validation_result, - ValidationResult::new(true), + ValidationResult::CanonicallyValid, "testing content_key: {}", content_key.to_hex() ); diff --git a/crates/validation/Cargo.toml b/crates/validation/Cargo.toml index 2e6e53217..de8ca0ed6 100644 --- a/crates/validation/Cargo.toml +++ b/crates/validation/Cargo.toml @@ -20,6 +20,7 @@ ethereum_hashing = "0.7.0" ethereum_ssz.workspace = true ethereum_ssz_derive.workspace = true ethportal-api.workspace = true +futures.workspace = true lazy_static.workspace = true parking_lot.workspace = true rust-embed.workspace = true diff --git a/crates/validation/src/validator.rs b/crates/validation/src/validator.rs index a73bca260..6d47802c0 100644 --- a/crates/validation/src/validator.rs +++ b/crates/validation/src/validator.rs @@ -1,56 +1,58 @@ +use std::{fmt::Debug, future::Future}; + use ethportal_api::{types::content_key::overlay::IdentityContentKey, RawContentValue}; +use futures::future::JoinAll; /// The result of the content key/value validation. -#[derive(Debug, PartialEq, Eq)] -pub struct ValidationResult { - /// Whether validation proved that content is canonical, in which case it's safe to store it. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ValidationResult { + /// The content is proved to be canonical, and it's safe to store it. + CanonicallyValid, + /// The content value is proven to match content key, but it's not proven that it's canonical. /// - /// Content obtained via Offer/Accept should always be provable, but that's not always the case - /// for content obtained via Find/Found Content (e.g.for the state network, we can verify that - /// content-value corresponds to the content-key, but not that it's canonical). - pub valid_for_storing: bool, - - /// The optional content key/value pair to be propagated (together with original content - /// key/value). This is used for Recursive Gossip in the state network (see [specs]( - /// https://github.com/ethereum/portal-network-specs/blob/04cc360179aeda179e0b1cac6fea900a74e87f2b/state-network.md#gossip - /// ) for details.). - pub additional_content_to_propagate: Option<(TContentKey, RawContentValue)>, + /// This type of content is not safe for storing, but it is safe to return as a result of Find + /// Content request. + Valid, + /// Content is invalid or validation failed for some other reason. + Invalid(String), } -impl ValidationResult { - pub fn new(valid_for_storing: bool) -> Self { - Self { - valid_for_storing, - additional_content_to_propagate: None, - } +impl ValidationResult { + /// Returns `true` if content is [ValidationResult::CanonicallyValid]. + pub fn is_canonically_valid(&self) -> bool { + self == &Self::CanonicallyValid } - pub fn new_with_additional_content_to_propagate( - additional_content_key: TContentKey, - additional_content_value: RawContentValue, - ) -> Self { - Self { - valid_for_storing: true, - additional_content_to_propagate: Some(( - additional_content_key, - additional_content_value, - )), - } + /// Returns `true` if content is [ValidationResult::CanonicallyValid] or + /// [ValidationResult::Valid]. + /// + /// See [ValidationResult] for details. + pub fn is_valid(&self) -> bool { + matches!(self, Self::CanonicallyValid | Self::Valid) } } /// Used by all overlay-network Validators to validate content in the overlay service. -pub trait Validator { - /// The `Ok` indicates that `content` corresponds to the `content_key`, but not necessarily - /// that content is canonical. See `ValidationResult` for details. - /// - /// The `Err` indicates that either content is not valid or that validation failed for some - /// other reason. +pub trait Validator { + /// Validates the provided content key/value pair. fn validate_content( &self, content_key: &TContentKey, - content: &[u8], - ) -> impl std::future::Future>> + Send; + content_value: &[u8], + ) -> impl Future + Send; + + /// Validates multiple content key/value pairs. + /// + /// The default implementation calls `self.validate_content(key, value)` for each content pair. + fn validate_content_batch( + &self, + content: &[(TContentKey, RawContentValue)], + ) -> impl Future> + Send { + content + .iter() + .map(|(content_key, content_value)| self.validate_content(content_key, content_value)) + .collect::>() + } } /// For use in tests where no validation needs to be performed. @@ -60,8 +62,8 @@ impl Validator for MockValidator { async fn validate_content( &self, _content_key: &IdentityContentKey, - _content: &[u8], - ) -> anyhow::Result> { - Ok(ValidationResult::new(true)) + _content_value: &[u8], + ) -> ValidationResult { + ValidationResult::CanonicallyValid } }