Skip to content

Commit 33e2163

Browse files
eserilevdapplion
andauthored
Custody backfill sync (#7907)
#7603 #### Custody backfill sync service Similar in many ways to the current backfill service. There may be ways to unify the two services. The difficulty there is that the current backfill service tightly couples blocks and their associated blobs/data columns. Any attempts to unify the two services should be left to a separate PR in my opinion. #### `SyncNeworkContext` `SyncNetworkContext` manages custody sync data columns by range requests separetly from other sync RPC requests. I think this is a nice separation considering that custody backfill is its own service. #### Data column import logic The import logic verifies KZG committments and that the data columns block root matches the block root in the nodes store before importing columns #### New channel to send messages to `SyncManager` Now external services can communicate with the `SyncManager`. In this PR this channel is used to trigger a custody sync. Alternatively we may be able to use the existing `mpsc` channel that the `SyncNetworkContext` uses to communicate with the `SyncManager`. I will spend some time reviewing this. Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu> Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com> Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
1 parent 46dde9a commit 33e2163

File tree

30 files changed

+2957
-199
lines changed

30 files changed

+2957
-199
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6991,6 +6991,95 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
69916991
}
69926992
}
69936993

6994+
/// Safely update data column custody info by ensuring that:
6995+
/// - cgc values at the updated epoch and the earliest custodied column epoch are equal
6996+
/// - we are only decrementing the earliest custodied data column epoch by one epoch
6997+
/// - the new earliest data column slot is set to the first slot in `effective_epoch`.
6998+
pub fn safely_backfill_data_column_custody_info(
6999+
&self,
7000+
effective_epoch: Epoch,
7001+
) -> Result<(), Error> {
7002+
let Some(earliest_data_column_epoch) = self.earliest_custodied_data_column_epoch() else {
7003+
return Ok(());
7004+
};
7005+
7006+
if effective_epoch >= earliest_data_column_epoch {
7007+
return Ok(());
7008+
}
7009+
7010+
let cgc_at_effective_epoch = self
7011+
.data_availability_checker
7012+
.custody_context()
7013+
.custody_group_count_at_epoch(effective_epoch, &self.spec);
7014+
7015+
let cgc_at_earliest_data_colum_epoch = self
7016+
.data_availability_checker
7017+
.custody_context()
7018+
.custody_group_count_at_epoch(earliest_data_column_epoch, &self.spec);
7019+
7020+
let can_update_data_column_custody_info = cgc_at_effective_epoch
7021+
== cgc_at_earliest_data_colum_epoch
7022+
&& effective_epoch == earliest_data_column_epoch - 1;
7023+
7024+
if can_update_data_column_custody_info {
7025+
self.store.put_data_column_custody_info(Some(
7026+
effective_epoch.start_slot(T::EthSpec::slots_per_epoch()),
7027+
))?;
7028+
} else {
7029+
error!(
7030+
?cgc_at_effective_epoch,
7031+
?cgc_at_earliest_data_colum_epoch,
7032+
?effective_epoch,
7033+
?earliest_data_column_epoch,
7034+
"Couldn't update data column custody info"
7035+
);
7036+
return Err(Error::FailedColumnCustodyInfoUpdate);
7037+
}
7038+
7039+
Ok(())
7040+
}
7041+
7042+
/// Compare columns custodied for `epoch` versus columns custodied for the head of the chain
7043+
/// and return any column indices that are missing.
7044+
pub fn get_missing_columns_for_epoch(&self, epoch: Epoch) -> HashSet<ColumnIndex> {
7045+
let custody_context = self.data_availability_checker.custody_context();
7046+
7047+
let columns_required = custody_context
7048+
.custody_columns_for_epoch(None, &self.spec)
7049+
.iter()
7050+
.cloned()
7051+
.collect::<HashSet<_>>();
7052+
7053+
let current_columns_at_epoch = custody_context
7054+
.custody_columns_for_epoch(Some(epoch), &self.spec)
7055+
.iter()
7056+
.cloned()
7057+
.collect::<HashSet<_>>();
7058+
7059+
columns_required
7060+
.difference(&current_columns_at_epoch)
7061+
.cloned()
7062+
.collect::<HashSet<_>>()
7063+
}
7064+
7065+
/// The da boundary for custodying columns. It will just be the DA boundary unless we are near the Fulu fork epoch.
7066+
pub fn get_column_da_boundary(&self) -> Option<Epoch> {
7067+
match self.data_availability_boundary() {
7068+
Some(da_boundary_epoch) => {
7069+
if let Some(fulu_fork_epoch) = self.spec.fulu_fork_epoch {
7070+
if da_boundary_epoch < fulu_fork_epoch {
7071+
Some(fulu_fork_epoch)
7072+
} else {
7073+
Some(da_boundary_epoch)
7074+
}
7075+
} else {
7076+
None
7077+
}
7078+
}
7079+
None => None, // If no DA boundary set, dont try to custody backfill
7080+
}
7081+
}
7082+
69947083
/// This method serves to get a sense of the current chain health. It is used in block proposal
69957084
/// to determine whether we should outsource payload production duties.
69967085
///

beacon_node/beacon_chain/src/errors.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ pub enum BeaconChainError {
247247
cache_epoch: Epoch,
248248
},
249249
SkipProposerPreparation,
250+
FailedColumnCustodyInfoUpdate,
250251
}
251252

252253
easy_from_to!(SlotProcessingError, BeaconChainError);
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use std::collections::{HashMap, HashSet};
2+
3+
use crate::{
4+
BeaconChain, BeaconChainError, BeaconChainTypes,
5+
data_column_verification::verify_kzg_for_data_column_list,
6+
};
7+
use store::{Error as StoreError, KeyValueStore};
8+
use tracing::{Span, debug, instrument};
9+
use types::{ColumnIndex, DataColumnSidecarList, Epoch, EthSpec, Hash256, Slot};
10+
11+
#[derive(Debug)]
12+
pub enum HistoricalDataColumnError {
13+
// The provided data column sidecar pertains to a block that doesn't exist in the database.
14+
NoBlockFound {
15+
data_column_block_root: Hash256,
16+
expected_block_root: Hash256,
17+
},
18+
19+
/// Logic error: should never occur.
20+
IndexOutOfBounds,
21+
22+
/// The provided data column sidecar list doesn't contain columns for the full range of slots for the given epoch.
23+
MissingDataColumns {
24+
missing_slots_and_data_columns: Vec<(Slot, ColumnIndex)>,
25+
},
26+
27+
/// The provided data column sidecar list contains at least one column with an invalid kzg commitment.
28+
InvalidKzg,
29+
30+
/// Internal store error
31+
StoreError(StoreError),
32+
33+
/// Internal beacon chain error
34+
BeaconChainError(Box<BeaconChainError>),
35+
}
36+
37+
impl From<StoreError> for HistoricalDataColumnError {
38+
fn from(e: StoreError) -> Self {
39+
Self::StoreError(e)
40+
}
41+
}
42+
43+
impl<T: BeaconChainTypes> BeaconChain<T> {
44+
/// Store a batch of historical data columns in the database.
45+
///
46+
/// The data columns block roots and proposer signatures are verified with the existing
47+
/// block stored in the DB. This function also verifies the columns KZG committments.
48+
///
49+
/// This function requires that the data column sidecar list contains columns for a full epoch.
50+
///
51+
/// Return the number of `data_columns` successfully imported.
52+
#[instrument(skip_all, fields(columns_imported_count = tracing::field::Empty ))]
53+
pub fn import_historical_data_column_batch(
54+
&self,
55+
epoch: Epoch,
56+
historical_data_column_sidecar_list: DataColumnSidecarList<T::EthSpec>,
57+
) -> Result<usize, HistoricalDataColumnError> {
58+
let mut total_imported = 0;
59+
let mut ops = vec![];
60+
61+
let unique_column_indices = historical_data_column_sidecar_list
62+
.iter()
63+
.map(|item| item.index)
64+
.collect::<HashSet<_>>();
65+
66+
let mut slot_and_column_index_to_data_columns = historical_data_column_sidecar_list
67+
.iter()
68+
.map(|data_column| ((data_column.slot(), data_column.index), data_column))
69+
.collect::<HashMap<_, _>>();
70+
71+
let forward_blocks_iter = self
72+
.forwards_iter_block_roots_until(
73+
epoch.start_slot(T::EthSpec::slots_per_epoch()),
74+
epoch.end_slot(T::EthSpec::slots_per_epoch()),
75+
)
76+
.map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?;
77+
78+
for block_iter_result in forward_blocks_iter {
79+
let (block_root, slot) = block_iter_result
80+
.map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?;
81+
82+
for column_index in unique_column_indices.clone() {
83+
if let Some(data_column) =
84+
slot_and_column_index_to_data_columns.remove(&(slot, column_index))
85+
{
86+
if self
87+
.store
88+
.get_data_column(&block_root, &data_column.index)?
89+
.is_some()
90+
{
91+
debug!(
92+
block_root = ?block_root,
93+
column_index = data_column.index,
94+
"Skipping data column import as identical data column exists"
95+
);
96+
continue;
97+
}
98+
if block_root != data_column.block_root() {
99+
return Err(HistoricalDataColumnError::NoBlockFound {
100+
data_column_block_root: data_column.block_root(),
101+
expected_block_root: block_root,
102+
});
103+
}
104+
self.store.data_column_as_kv_store_ops(
105+
&block_root,
106+
data_column.clone(),
107+
&mut ops,
108+
);
109+
total_imported += 1;
110+
}
111+
}
112+
}
113+
114+
// If we've made it to here with no columns to import, this means there are no blobs for this epoch.
115+
// `RangeDataColumnBatchRequest` logic should have caught any bad peers withholding columns
116+
if historical_data_column_sidecar_list.is_empty() {
117+
if !ops.is_empty() {
118+
// This shouldn't be a valid case. If there are no columns to import,
119+
// there should be no generated db operations.
120+
return Err(HistoricalDataColumnError::IndexOutOfBounds);
121+
}
122+
} else {
123+
verify_kzg_for_data_column_list(historical_data_column_sidecar_list.iter(), &self.kzg)
124+
.map_err(|_| HistoricalDataColumnError::InvalidKzg)?;
125+
126+
self.store.blobs_db.do_atomically(ops)?;
127+
}
128+
129+
if !slot_and_column_index_to_data_columns.is_empty() {
130+
debug!(
131+
?epoch,
132+
extra_data = ?slot_and_column_index_to_data_columns.keys().map(|(slot, _)| slot),
133+
"We've received unexpected extra data columns, these will not be imported"
134+
);
135+
}
136+
137+
self.data_availability_checker
138+
.custody_context()
139+
.update_and_backfill_custody_count_at_epoch(epoch);
140+
141+
self.safely_backfill_data_column_custody_info(epoch)
142+
.map_err(|e| HistoricalDataColumnError::BeaconChainError(Box::new(e)))?;
143+
144+
debug!(?epoch, total_imported, "Imported historical data columns");
145+
146+
let current_span = Span::current();
147+
current_span.record("columns_imported_count", total_imported);
148+
149+
Ok(total_imported)
150+
}
151+
}

beacon_node/beacon_chain/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub mod fork_choice_signal;
2828
pub mod fork_revert;
2929
pub mod graffiti_calculator;
3030
pub mod historical_blocks;
31+
pub mod historical_data_columns;
3132
pub mod kzg_utils;
3233
pub mod light_client_finality_update_verification;
3334
pub mod light_client_optimistic_update_verification;

beacon_node/beacon_chain/src/validator_custody.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use types::data_column_custody_group::{CustodyIndex, compute_columns_for_custody
1010
use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, Slot};
1111

1212
/// A delay before making the CGC change effective to the data availability checker.
13-
const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30;
13+
pub const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30;
1414

1515
/// Number of slots after which a validator's registration is removed if it has not re-registered.
1616
const VALIDATOR_REGISTRATION_EXPIRY_SLOTS: Slot = Slot::new(256);
@@ -30,8 +30,10 @@ struct ValidatorRegistrations {
3030
///
3131
/// Note: Only stores the epoch value when there's a change in custody requirement.
3232
/// So if epoch 10 and 11 has the same custody requirement, only 10 is stored.
33-
/// This map is never pruned, because currently we never decrease custody requirement, so this
34-
/// map size is contained at 128.
33+
/// This map is only pruned during custody backfill. If epoch 11 has custody requirements
34+
/// that are then backfilled to epoch 10, the value at epoch 11 will be removed and epoch 10
35+
/// will be added to the map instead. This should keep map size constrained to a maximum
36+
/// value of 128.
3537
epoch_validator_custody_requirements: BTreeMap<Epoch, u64>,
3638
}
3739

@@ -99,6 +101,25 @@ impl ValidatorRegistrations {
99101
None
100102
}
101103
}
104+
105+
/// Updates the `epoch_validator_custody_requirements` map by pruning all values on/after `effective_epoch`
106+
/// and updating the map to store the latest validator custody requirements for the `effective_epoch`.
107+
pub fn backfill_validator_custody_requirements(&mut self, effective_epoch: Epoch) {
108+
if let Some(latest_validator_custody) = self.latest_validator_custody_requirement() {
109+
// Delete records if
110+
// 1. The epoch is greater than or equal than `effective_epoch`
111+
// 2. the cgc requirements match the latest validator custody requirements
112+
self.epoch_validator_custody_requirements
113+
.retain(|&epoch, custody_requirement| {
114+
!(epoch >= effective_epoch && *custody_requirement == latest_validator_custody)
115+
});
116+
117+
self.epoch_validator_custody_requirements
118+
.entry(effective_epoch)
119+
.and_modify(|old_custody| *old_custody = latest_validator_custody)
120+
.or_insert(latest_validator_custody);
121+
}
122+
}
102123
}
103124

104125
/// Given the `validator_custody_units`, return the custody requirement based on
@@ -250,6 +271,7 @@ impl<E: EthSpec> CustodyContext<E> {
250271
);
251272
return Some(CustodyCountChanged {
252273
new_custody_group_count: updated_cgc,
274+
old_custody_group_count: current_cgc,
253275
sampling_count: self.num_of_custody_groups_to_sample(effective_epoch, spec),
254276
effective_epoch,
255277
});
@@ -282,7 +304,7 @@ impl<E: EthSpec> CustodyContext<E> {
282304
/// minimum sampling size which may exceed the custody group count (CGC).
283305
///
284306
/// See also: [`Self::num_of_custody_groups_to_sample`].
285-
fn custody_group_count_at_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> u64 {
307+
pub fn custody_group_count_at_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> u64 {
286308
if self.current_is_supernode {
287309
spec.number_of_custody_groups
288310
} else {
@@ -360,14 +382,22 @@ impl<E: EthSpec> CustodyContext<E> {
360382
.all_custody_columns_ordered
361383
.get()
362384
.expect("all_custody_columns_ordered should be initialized");
385+
363386
&all_columns_ordered[..custody_group_count]
364387
}
388+
389+
pub fn update_and_backfill_custody_count_at_epoch(&self, effective_epoch: Epoch) {
390+
self.validator_registrations
391+
.write()
392+
.backfill_validator_custody_requirements(effective_epoch);
393+
}
365394
}
366395

367396
/// The custody count changed because of a change in the
368397
/// number of validators being managed.
369398
pub struct CustodyCountChanged {
370399
pub new_custody_group_count: u64,
400+
pub old_custody_group_count: u64,
371401
pub sampling_count: u64,
372402
pub effective_epoch: Epoch,
373403
}

0 commit comments

Comments
 (0)