Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 23 additions & 21 deletions crates/portalnet/src/overlay/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,7 @@ impl<
content_key: TContentKey,
content_value: RawContentValue,
) -> PutContentInfo {
let should_we_store = match self
.store
.lock()
.is_key_within_radius_and_unavailable(&content_key)
{
let should_we_store = match self.store.lock().should_we_store(&content_key) {
Ok(should_we_store) => matches!(should_we_store, ShouldWeStoreContent::Store),
Err(err) => {
warn!(
Expand Down Expand Up @@ -492,12 +488,14 @@ impl<
Ok(Response::Content(found_content)) => {
match found_content {
Content::Content(content) => {
match self.validate_content(&content_key, &content).await {
Ok(_) => Ok((Content::Content(content), false)),
Err(msg) => Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {msg:?}",
let validation_result = self.validate_content(&content_key, &content).await;
if validation_result.is_valid() {
Ok((Content::Content(content), false))
} else {
Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {validation_result:?}",
self.protocol
))),
)))
}
}
Content::Enrs(_) => Ok((found_content, false)),
Expand All @@ -514,12 +512,14 @@ impl<
};
let content = RawContentValue::from(bytes);

match self.validate_content(&content_key, &content).await {
Ok(_) => Ok((Content::Content(content), true)),
Err(msg) => Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {msg:?}",
let validation_result = self.validate_content(&content_key, &content).await;
if validation_result.is_valid() {
Ok((Content::Content(content), true))
} else {
Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {validation_result:?}",
self.protocol
))),
)))
}
}
}
Expand All @@ -533,13 +533,15 @@ impl<
&self,
content_key: &TContentKey,
content: &[u8],
) -> anyhow::Result<ValidationResult<TContentKey>> {
) -> ValidationResult {
let validation_result = self.validator.validate_content(content_key, content).await;
self.metrics.report_validation(validation_result.is_ok());

validation_result.map_err(|err| {
anyhow!("Content validation failed for content key {content_key:?} with error: {err:?}")
})
self.metrics.report_validation(validation_result.is_valid());
if !validation_result.is_valid() {
warn!(
"Content validation failed for content key {content_key:?}: {validation_result:?}"
)
}
validation_result
}

/// Initialize FindContent uTP stream with remote node
Expand Down
56 changes: 25 additions & 31 deletions crates/portalnet/src/overlay/service/find_content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,37 +525,36 @@ impl<
.await;
utp_processing
.metrics
.report_validation(validation_result.is_ok());

let validation_result = match validation_result {
Ok(validation_result) => validation_result,
Err(err) => {
warn!(
error = ?err,
content.id = %hex_encode_compact(content_id),
content.key = %content_key,
"Error validating content"
);
// Indicate to the query that the content is invalid
let _ = valid_content_callback.send(None);
if let Some(query_trace_events_tx) = query_trace_events_tx {
let _ = query_trace_events_tx.send(QueryTraceEvent::Failure(
query_id,
sending_peer,
QueryFailureKind::InvalidContent,
));
}
return;
.report_validation(validation_result.is_valid());

if !validation_result.is_valid() {
warn!(
content.id = %hex_encode_compact(content_id),
content.key = %content_key,
?validation_result,
"Error validating content"
);
// Indicate to the query that the content is invalid
let _ = valid_content_callback.send(None);
if let Some(query_trace_events_tx) = query_trace_events_tx {
let _ = query_trace_events_tx.send(QueryTraceEvent::Failure(
query_id,
sending_peer,
QueryFailureKind::InvalidContent,
));
}
return;
};

// skip storing if content is not valid for storing, the content
// is already stored or if there's an error reading the store
let should_store = validation_result.valid_for_storing
// store content that:
// - is canonically valid
// - is not already stored
// - is within radius (if applicable)
let should_store = validation_result.is_canonically_valid()
&& utp_processing
.store
.lock()
.is_key_within_radius_and_unavailable(&content_key)
.should_we_store(&content_key)
.map_or_else(
|err| {
error!("Unable to read store: {err}");
Expand All @@ -571,17 +570,12 @@ impl<
{
Ok(dropped_content) => {
let mut content_to_propagate = vec![(content_key.clone(), content.clone())];
if let Some(additional_content_to_propagate) =
validation_result.additional_content_to_propagate
{
content_to_propagate.push(additional_content_to_propagate);
}
if !dropped_content.is_empty() && utp_processing.gossip_dropped {
debug!(
"Dropped {:?} pieces of content after inserting new content, propagating them back into the network.",
dropped_content.len(),
);
content_to_propagate.extend(dropped_content.clone());
content_to_propagate.extend(dropped_content);
}
propagate_put_content_cross_thread::<_, TMetric>(
content_to_propagate,
Expand Down
Loading
Loading