Skip to content

Commit a2c15c6

Browse files
committed
fix: 🚑 Avoid iterating through all buckets at every event in block import notification
1 parent c7b5a8d commit a2c15c6

File tree

1 file changed

+99
-76
lines changed

1 file changed

+99
-76
lines changed

‎client/blockchain-service/src/handler_msp.rs‎

Lines changed: 99 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use anyhow::Result;
12
use log::{debug, error, info, trace, warn};
23
use std::{str, sync::Arc};
34
use tokio::sync::{oneshot::error::TryRecvError, Mutex};
@@ -267,26 +268,20 @@ where
267268
event_info,
268269
}) => {
269270
// The mutations are applied to a Bucket's Forest root.
270-
// Check that this MSP is managing at least one bucket.
271-
let buckets_managed_by_msp =
272-
self.client
273-
.runtime_api()
274-
.query_buckets_for_msp(*block_hash, managed_msp_id)
275-
.inspect_err(|e| error!(target: LOG_TARGET, "Runtime API call failed while querying buckets for MSP [{:?}]: {:?}", managed_msp_id, e))
276-
.ok()
277-
.and_then(|api_result| {
278-
api_result
279-
.inspect_err(|e| error!(target: LOG_TARGET, "Runtime API error while querying buckets for MSP [{:?}]: {:?}", managed_msp_id, e))
280-
.ok()
281-
});
271+
let bucket_id = match self
272+
.get_bucket_id_from_mutations_applied_event_info(block_hash, event_info)
273+
{
274+
Ok(bucket_id) => bucket_id,
275+
Err(e) => {
276+
error!(target: LOG_TARGET, "Failed to get bucket ID from MutationsApplied event info: {:?}", e);
277+
return;
278+
}
279+
};
282280

283-
let Some(bucket_id) = self.validate_bucket_mutations_for_msp(
284-
block_hash,
285-
buckets_managed_by_msp,
286-
event_info,
287-
) else {
281+
if !self.validate_bucket_mutations_for_msp(block_hash, managed_msp_id, &bucket_id) {
282+
trace!(target: LOG_TARGET, "Bucket [0x{:x}] is not managed by this MSP [0x{:x}]. Skipping mutations applied event.", bucket_id, managed_msp_id);
288283
return;
289-
};
284+
}
290285

291286
self.emit(FinalisedBucketMutationsApplied {
292287
bucket_id,
@@ -457,35 +452,28 @@ where
457452
}
458453
};
459454

460-
// Preemptively getting the Buckets managed by this MSP, so that we do the query just once,
461-
// instead of doing it for every event.
462-
let buckets_managed_by_msp =
463-
self.client
464-
.runtime_api()
465-
.query_buckets_for_msp(*block_hash, managed_msp_id)
466-
.inspect_err(|e| error!(target: LOG_TARGET, "Runtime API call failed while querying buckets for MSP [{:?}]: {:?}", managed_msp_id, e))
467-
.ok()
468-
.and_then(|api_result| {
469-
api_result
470-
.inspect_err(|e| error!(target: LOG_TARGET, "Runtime API error while querying buckets for MSP [{:?}]: {:?}", managed_msp_id, e))
471-
.ok()
472-
});
473-
474455
match event {
475456
StorageEnableEvents::ProofsDealer(pallet_proofs_dealer::Event::MutationsApplied {
476457
mutations,
477458
old_root,
478459
new_root,
479460
event_info,
480461
}) => {
481-
let Some(bucket_id) = self.validate_bucket_mutations_for_msp(
482-
block_hash,
483-
buckets_managed_by_msp,
484-
event_info,
485-
) else {
486-
return;
462+
let bucket_id = match self
463+
.get_bucket_id_from_mutations_applied_event_info(block_hash, event_info)
464+
{
465+
Ok(bucket_id) => bucket_id,
466+
Err(e) => {
467+
error!(target: LOG_TARGET, "Failed to get bucket ID from MutationsApplied event info: {:?}", e);
468+
return;
469+
}
487470
};
488471

472+
if !self.validate_bucket_mutations_for_msp(block_hash, managed_msp_id, &bucket_id) {
473+
trace!(target: LOG_TARGET, "Bucket [0x{:x}] is not managed by this MSP [0x{:x}]. Skipping mutations applied event.", bucket_id, managed_msp_id);
474+
return;
475+
}
476+
489477
info!(target: LOG_TARGET, "🪾 Applying mutations to bucket [0x{:x}]", bucket_id);
490478

491479
// Apply forest root changes to the Bucket's Forest Storage.
@@ -555,43 +543,29 @@ where
555543
}
556544
}
557545

558-
/// Validates that a MutationsApplied event's bucket is managed by this MSP.
546+
/// Extracts the bucket ID encoded in a `MutationsApplied` event's `event_info`.
559547
///
560-
/// This helper performs the following validation steps:
561-
/// 1. Checks if this MSP is managing at least one bucket
562-
/// 2. Validates that the event_info contains the BucketId of the bucket that was mutated
563-
/// 3. Decodes the BucketId from the event_info
564-
/// 4. Verifies that the BucketId is in the list of buckets managed by this MSP
548+
/// The `ProofsDealer::MutationsApplied` event includes an opaque `event_info` payload which,
549+
/// for generic apply-delta mutations, is expected to contain the SCALE-encoded `BucketId` of
550+
/// the bucket whose Forest root was mutated.
565551
///
566-
/// Returns Some(bucket_id) if all validations pass, None otherwise.
567-
fn validate_bucket_mutations_for_msp(
552+
/// Behaviour:
553+
/// - Logs an error and returns an error if `event_info` is `None`.
554+
/// - Calls the runtime API (`decode_generic_apply_delta_event_info`) at `block_hash` to decode
555+
/// the bucket ID.
556+
/// - Logs an error and returns an error if the runtime API call fails or if decoding fails.
557+
fn get_bucket_id_from_mutations_applied_event_info(
568558
&self,
569559
block_hash: &Runtime::Hash,
570-
buckets_managed_by_msp: Option<Vec<BucketId<Runtime>>>,
571560
event_info: Option<Vec<u8>>,
572-
) -> Option<BucketId<Runtime>> {
573-
// Check that this MSP is managing at least one bucket.
574-
if buckets_managed_by_msp.is_none() {
575-
debug!(target: LOG_TARGET, "MSP is not managing any buckets. Skipping mutations applied event.");
576-
return None;
577-
}
578-
let buckets_managed_by_msp = buckets_managed_by_msp
579-
.as_ref()
580-
.expect("Just checked that this is not None; qed");
581-
if buckets_managed_by_msp.is_empty() {
582-
debug!(target: LOG_TARGET, "Buckets managed by MSP is an empty vector. Skipping mutations applied event.");
583-
return None;
584-
}
585-
561+
) -> Result<BucketId<Runtime>> {
586562
// In StorageHub, we assume that all `MutationsApplied` events are emitted by bucket
587563
// root changes, and they should contain the encoded `BucketId` of the bucket that was mutated
588564
// in the `event_info` field.
589565
let Some(event_info) = event_info else {
590-
error!(
591-
target: LOG_TARGET,
592-
"MutationsApplied event with `None` event info, when it is expected to contain the BucketId of the bucket that was mutated."
593-
);
594-
return None;
566+
let msg = "MutationsApplied event with `None` event info, when it is expected to contain the BucketId of the bucket that was mutated.";
567+
error!(target: LOG_TARGET, "{}", msg);
568+
return Err(anyhow::anyhow!(msg));
595569
};
596570
let bucket_id = match self
597571
.client
@@ -601,23 +575,72 @@ where
601575
Ok(runtime_api_result) => match runtime_api_result {
602576
Ok(bucket_id) => bucket_id,
603577
Err(e) => {
604-
error!(target: LOG_TARGET, "Failed to decode BucketId from event info: {:?}", e);
605-
return None;
578+
let msg = format!("Failed to decode BucketId from event info: {:?}", e);
579+
error!(target: LOG_TARGET, "{}", msg);
580+
return Err(anyhow::anyhow!(msg));
606581
}
607582
},
608583
Err(e) => {
609-
error!(target: LOG_TARGET, "Error while calling runtime API to decode BucketId from event info: {:?}", e);
610-
return None;
584+
let msg = format!(
585+
"Error while calling runtime API to decode BucketId from event info: {:?}",
586+
e
587+
);
588+
error!(target: LOG_TARGET, "{}", msg);
589+
return Err(anyhow::anyhow!(msg));
611590
}
612591
};
613592

614-
// Check if the bucket is managed by this MSP.
615-
if !buckets_managed_by_msp.contains(&bucket_id) {
616-
debug!(target: LOG_TARGET, "Bucket [{:?}] is not managed by this MSP. Skipping mutations applied event.", bucket_id);
617-
return None;
618-
}
593+
Ok(bucket_id)
594+
}
619595

620-
Some(bucket_id)
596+
/// Checks whether a bucket is managed by the MSP this node is handling.
597+
///
598+
/// Queries the runtime for the MSP ID associated with `bucket_id` and compares it with
599+
/// `managed_msp_id`.
600+
///
601+
/// Returns `true` iff the runtime reports the bucket is managed by `managed_msp_id`.
602+
/// Returns `false` when:
603+
/// - The bucket is managed by a different MSP
604+
/// - The bucket is not managed by any MSP
605+
/// - The runtime API call fails (an error is logged)
606+
fn validate_bucket_mutations_for_msp(
607+
&self,
608+
block_hash: &Runtime::Hash,
609+
managed_msp_id: &ProviderId<Runtime>,
610+
bucket_id: &BucketId<Runtime>,
611+
) -> bool {
612+
match self
613+
.client
614+
.runtime_api()
615+
.query_msp_id_of_bucket_id(*block_hash, bucket_id)
616+
{
617+
Ok(runtime_api_result) => match runtime_api_result {
618+
Ok(Some(msp_id)) => {
619+
if msp_id != *managed_msp_id {
620+
// This is a valid scenario. It would be the case where the mutation is being applied to a bucket that is managed by another MSP.
621+
trace!(target: LOG_TARGET, "Bucket [0x{:x}] is not managed by this MSP [0x{:x}]. It is managed by MSP [0x{:x}].", bucket_id, managed_msp_id, msp_id);
622+
false
623+
} else {
624+
// This is a valid scenario. It would be the case where the bucket is managed by this MSP.
625+
trace!(target: LOG_TARGET, "Bucket [0x{:x}] is managed by this MSP [0x{:x}].", bucket_id, managed_msp_id);
626+
true
627+
}
628+
}
629+
Ok(None) => {
630+
// This is a valid scenario. It would be the case where the bucket is not managed by any MSP.
631+
trace!(target: LOG_TARGET, "Bucket [0x{:x}] is not managed by any MSP.", bucket_id);
632+
false
633+
}
634+
Err(e) => {
635+
error!(target: LOG_TARGET, "Error querying MSP ID of bucket [0x{:x}]: {:?}", bucket_id, e);
636+
false
637+
}
638+
},
639+
Err(e) => {
640+
error!(target: LOG_TARGET, "Error while calling runtime API to query MSP ID of bucket [0x{:x}]: {:?}", bucket_id, e);
641+
false
642+
}
643+
}
621644
}
622645

623646
/// Scans pending storage requests for this MSP and triggers distribution tasks.

0 commit comments

Comments
 (0)