MTG-703 Adding peer to peer consistency checks#316
MTG-703 Adding peer to peer consistency checks#316snorochevskiy wants to merge 6 commits intomainfrom
Conversation
9a4547b to
baedb6f
Compare
002990c to
46d522d
Compare
| pub account_pubkey: Pubkey, | ||
| pub slot: u64, | ||
| pub write_version: u64, | ||
| pub data_hash: u64, |
There was a problem hiding this comment.
As far as we are tracking not only slot+write_version for accounts but also data_hash, we need to discuss the consensus mechanism.
If i understood correctly, the current implementation would identify accounts with the same slot+write_version but different data_hashes as different updates and will try to synchronize them between nodes. However the current accounts processing mechanism will not allow updating existing accounts with the same slot+write_version. So nodes will try to receive "updates" from each other, send these updates to each other but not process them correctly.
Maybe this sounds like a separate task but we need to discuss approach that we think will be the better
There was a problem hiding this comment.
This is purely for account NFTs, because for account we don't have cleaning of forks, and as the result we cannot rely on slot. That's why we ignore slots and take only pubkey+write_version_data_hash when we calculate account changes checksum, and when we compare peer's account change with local account change.
If the slot is different, but the data_hash is same, we assume that's the same account change.
There was a problem hiding this comment.
For the consensus part - that's for us to define. Essentially, we should expect the following:
- same write_version, same slot, same hash - all good
- same write_version, same slot, different hash - we need the consensus to define, what the correct version is. I'd suggest we ask RPC of the current state (if it has not changed yet, of course)
- same write_version, different slot, same hash - all good, take the highest slot on merge probably
- same write_version, different slot, different hashes - looks like a fork, take the highest slot on merge
- different write_version, same/different slot, same/different hash - take the highest write_version
| pub account_pubkey: Pubkey, | ||
| pub slot: u64, | ||
| pub write_version: u64, | ||
| pub data_hash: u64, |
There was a problem hiding this comment.
Also, we do not track data_hashes for transaction-based protocol. Maybe we need also them here? I think it makes sense, because it will give us additional 'resist' from Solana forks
There was a problem hiding this comment.
IMO, we won't gain anything by adding data_hash to bubblegum, since we do already have forks cleaner and sequence consistency checks for it.
| slot_updated: account_update.slot as i64, | ||
| amount: ta.amount as i64, | ||
| write_version: account_update.write_version, | ||
| data_hash: calc_solana_account_data_hash(&account_update.data), |
There was a problem hiding this comment.
Could we approximately calculate the performance impact of calculating hashes "in place" for each account? We have many account updates, way larger updates amount than we have for transactions, so it would be great to understand if this increases account processing time
There was a problem hiding this comment.
According to my tests, it takes ~ 9 microseconds to calculate hash for 1KB of data, which looks acceptable.
(anyway xxhash is one of most performant hashing algorithms)
| metrics_state.checksum_calculation_metrics.clone(), | ||
| ); | ||
|
|
||
| if let Some(peer_urls_file) = config.peer_urls_file.as_ref() { |
There was a problem hiding this comment.
Do we need to add mounting for this file in docker compose?
There was a problem hiding this comment.
Probably, we do...
| let mut missing_bbgm_changes: HashMap<BbgmChangeRecord, HashSet<usize>> = HashMap::new(); | ||
| let trusted_peers = peers_provider.list_trusted_peers().await; | ||
|
|
||
| for (peer_ind, trusted_peer) in trusted_peers.iter().enumerate() { |
There was a problem hiding this comment.
Maybe we need to try to make this process more async? Here many i/o with other peers, so maybe it makes sense to spawn separate tasks for communicating with each peer?
There was a problem hiding this comment.
This can be a good improvement in the future, but for the "phase 1", I'd prefer to run it sequentially, and collect CPU and disk metrics to understand whether we have some potential bandwidth for parallelization.
Cause we can easily fall into a scenario when we are stressing the disk with bunch of concurrent peer-2-peer consistency checks, and as the result the indexing part lacks resources for fulfilling it's main duty.
| .chain(ge_cmp_res.different.iter()) | ||
| .map(|&a| a.tree_pubkey) | ||
| .collect::<Vec<_>>(); | ||
| for tree_pk in ge_trees_to_check { |
There was a problem hiding this comment.
Maybe processing each grand_epoch also deserves to be spawned in separate task because of many i/o here
There was a problem hiding this comment.
And same here: a potentially good improvement in the future.
| } | ||
|
|
||
| #[allow(clippy::while_let_on_iterator)] | ||
| async fn handle_missing_bbgm_changes( |
There was a problem hiding this comment.
Point to discuss: a consensus mechanism
If we have many hosts in the network we need to think about not just fetching any updates from each host but finding consensus between them and rejecting updates from some hosts if they contradict the majority of the network
There was a problem hiding this comment.
Right, this could be a good thing for interaction with non-trusted peers, but we've decided to not make it as part of the initial implementation.
| clients.get_mut(peer_ind).unwrap() | ||
| }; | ||
| if let Ok(block) = client | ||
| .get_block(change.slot, Option::<Arc<grpc::client::Client>>::None) |
There was a problem hiding this comment.
We fetching the whole block, without indicating which trees we want to receive. In the future we need to add methods for syncing on concrete trees
| .db_bubblegum_get_grand_epochs_latency | ||
| .observe(start.elapsed().as_secs_f64()); | ||
|
|
||
| let ge_cmp_res = cmp(&my_ge_chksms, &peer_ge_chksms); |
There was a problem hiding this comment.
Maybe we need to add some config for understanding which trees we are indexing. Because for now if i understood right we may mark as missed trees that we do not want to index
| loop { | ||
| let calc_msg = tokio::select! { | ||
| msg = rcv.recv() => msg, | ||
| _ = shutdown_signal.recv() => { |
There was a problem hiding this comment.
nit: as far as this runs inside simple tokio task that is not related to any JoinSet, shutdown_signal is not really needed here because we do not wait for this task to complete anywhere
| pub fn solana_change_info(&self) -> (Pubkey, u64, u64, u64) { | ||
| let (slot, write_version, data_hash) = match &self.account { | ||
| UnprocessedAccount::MetadataInfo(v) => (v.slot_updated, v.write_version, v.data_hash), | ||
| UnprocessedAccount::Token(v) => (v.slot_updated as u64, v.write_version, v.data_hash), |
There was a problem hiding this comment.
Are we ok with converting i64 as u64? In case it cannot be negative, why then TokenAccount stores it as i64? Some kind of restrictions from the DB?
| rpc GetAccsInBucket(GetAccReq) returns (AccList); | ||
|
|
||
| rpc ProposeMissingAccChanges(AccList) returns (google.protobuf.Empty); | ||
| } No newline at end of file |
| pub async fn connect(peer_discovery: impl PeerDiscovery) -> Result<Self, GrpcError> { | ||
| let url = Uri::from_str(peer_discovery.get_gapfiller_peer_addr().as_str()) | ||
| .map_err(|e| GrpcError::UriCreate(e.to_string()))?; | ||
| Client::connect_to_url(peer_discovery.get_gapfiller_peer_addr().as_str()).await |
There was a problem hiding this comment.
| Client::connect_to_url(peer_discovery.get_gapfiller_peer_addr().as_str()).await | |
| Client::connect_to_url(&peer_discovery.get_gapfiller_peer_addr()).await |
Just a preference of style, feel free to ignore
|
|
||
| /// Interface for querying bubblegum checksums from peer | ||
| /// or local storage. | ||
| #[async_trait] |
There was a problem hiding this comment.
Isn't async trait stabilized?
There was a problem hiding this comment.
Yes, but #[async_trait] is still required to build trait object
| pub found_missing_accounts: Gauge, | ||
| } | ||
|
|
||
| impl Default for Peer2PeerConsistencyMetricsConfig { |
There was a problem hiding this comment.
It seems to me such default has no usage since new() can be freely called instead. Seems like only one should survive imo.
From my perspective, new() with no parameters = default (if the function doesn't provoke side effects)
metrics_utils/src/lib.rs
Outdated
|
|
||
| impl Peer2PeerConsistencyMetricsConfig { | ||
| pub fn new() -> Peer2PeerConsistencyMetricsConfig { | ||
| let mk_histogram = || Histogram::new(exponential_buckets(20.0, 1.8, 10)); |
There was a problem hiding this comment.
Those numbers are slightly magical. What do they mean?
46d522d to
31e0f50
Compare
|
|
||
| /// Type of checksum for bubblegum epochs and account NFT buckets. | ||
| /// It is technically a SHA3 hash. | ||
| pub type Chksm = [u8; 32]; |
There was a problem hiding this comment.
I thought the type above was for this
| pub type Chksm = [u8; 32]; | |
| pub checksum: Option<Chksm>, |
There was a problem hiding this comment.
pub type Chksm = [u8; 32];is just an alias that should make easier a potential change of checksum type in the future (that will never happen)
There was a problem hiding this comment.
I guess, maybe would it be more convenient/idiomatic to use a newtype instead of the alias?
There was a problem hiding this comment.
I believe it will just introduce a hell of wrap and unroll calls 🥲
There was a problem hiding this comment.
Deref for the rescue?
There was a problem hiding this comment.
Just thinking aloud tho, not a call to action
metrics_utils/src/lib.rs
Outdated
| pub db_bubblegum_get_grand_epochs_latency: Histogram, | ||
| pub db_bubblegum_get_epochs_latency: Histogram, | ||
| pub db_bubblegum_get_changes_latency: Histogram, | ||
| pub db_account_get_grand_buckets_latency: Histogram, | ||
| pub db_account_get_buckets_latency: Histogram, | ||
| pub db_account_get_latests_latency: Histogram, |
There was a problem hiding this comment.
Why not a single Family<MetricLabel, Histogram>?
There was a problem hiding this comment.
I usually prefer to have a set of plain metrics, instead of one labeled, because it is much easier to query them by other monitoring systems. But sure, I can turn these into a family. Should I?
There was a problem hiding this comment.
Yes, please. With our stack of Prometheus + grafana the primary flow is to put up the metrics onto the dashboard and have some simple alerts. With every added new metric there is an increased chance, it'll be not added as it'll require a dedicated query for itself for every chart and every alert rule. Those metrics are super generic, reusing even the existing RED approach is more favorable. If it doesn't fit into RED - having dedicated family is the next best choice. Please don't leave us with the need to create multiple charts to monitor every request. Metrics should be kept as simple as possible.
metrics_utils/src/lib.rs
Outdated
| pub peers_bubblegum_get_grand_epochs_for_tree_errors: Family<MetricLabel, Counter>, | ||
| pub peers_bubblegum_get_grand_epochs_errors: Family<MetricLabel, Counter>, | ||
| pub peers_bubblegum_get_epochs_errors: Family<MetricLabel, Counter>, | ||
| pub peers_bubblegum_get_changes_errors: Family<MetricLabel, Counter>, |
There was a problem hiding this comment.
it looks more like an added label to me. Peers_sync_latency(protocol: "bubblegum/account", method/endpoint: "get_grand_epochs/get_epochs/get_changes")
| // prepare | ||
| let tree1 = Pubkey::new_unique(); | ||
|
|
||
| // This change is for epoch we won't calculate in the test, |
There was a problem hiding this comment.
the comment is not valid in this context
| .put(k1_2.clone(), v1_2.clone()) | ||
| .unwrap(); | ||
|
|
||
| // This will be also ignored |
31e0f50 to
e0d46c1
Compare
StanChe
left a comment
There was a problem hiding this comment.
Great work, thank you. Several open questions need clarification and the metrics should be simplified/moved to more appropriate measuring places.
| // Verify account last state updated | ||
| let latest_acc1_key = AccountNftKey::new(acc1_pubkey); | ||
| let latest_acc1_val = storage | ||
| .acc_nft_last |
There was a problem hiding this comment.
acc_nft_last holds the last calculated epoch value?
There was a problem hiding this comment.
It is the last seen change of the account with the given pubkey.
|
|
||
| #[async_trait::async_trait] | ||
| impl AuraPeersProvides for FileSrcAuraPeersProvides { | ||
| async fn list_trusted_peers(&self) -> Vec<String> { |
There was a problem hiding this comment.
Why not a list of URLs directly? Those are parsed every time anyway.
There was a problem hiding this comment.
That's to make possible to change the list of peers without restarting the appliction
rocks-db/src/storage_consistency.rs
Outdated
| /// To prevent such inconsistency of a checksum, roght before the calulating, | ||
| /// we mark the epoch checksum to be calculated is "Calculating", | ||
| /// and after the checksum is calculated, we write this value only in case | ||
| /// if the previous value is still in "Calculated" state. |
There was a problem hiding this comment.
| /// if the previous value is still in "Calculated" state. | |
| /// if the previous value is still in the same "Calculating" state. |
rocks-db/src/storage_consistency.rs
Outdated
| /// if the previous value is still in "Calculated" state. | ||
| /// | ||
| /// At the same time, when the Bubblegum updated processor receives | ||
| /// a new update with slot that epoch is from the previous epoch perioud, |
There was a problem hiding this comment.
| /// a new update with slot that epoch is from the previous epoch perioud, | |
| /// a new update with slot that epoch is from the previous epoch period, |
rocks-db/src/storage_consistency.rs
Outdated
| /// | ||
| /// At the same time, when the Bubblegum updated processor receives | ||
| /// a new update with slot that epoch is from the previous epoch perioud, | ||
| /// it not only writed the bubblegum change, but also updated |
There was a problem hiding this comment.
| /// it not only writed the bubblegum change, but also updated | |
| /// it not only writes the bubblegum change, but also updates |
|
|
||
| /// This flag is set to true before bubblegum epoch calculation is started, | ||
| /// and set to false after the calculation is finished. | ||
| static IS_CALCULATING_BBGM_EPOCH: AtomicI32 = AtomicI32::new(-1); |
| ) { | ||
| tracing::info!("Starting bubblegum changes peer-to-peer exchange for epoch={epoch}"); | ||
| while get_calculating_bbgm_epoch() | ||
| .map(|e| e == epoch) |
There was a problem hiding this comment.
can we end up calculating any other epoch - a previous one, or a next one?
There was a problem hiding this comment.
Theoretically that should not happen, but I've changed it to compare current epoch with the last calculated.
|
|
||
| metrics | ||
| .found_missing_bubblegums | ||
| .set(changes_we_miss.len() as i64); |
There was a problem hiding this comment.
I'd suggest incrementing this, compared to setting. Set will be way more flickery given the periodic nature of metrics collectors
| return result; | ||
| } | ||
| }; | ||
| metrics |
There was a problem hiding this comment.
this metric collection should be tied closely to the actual io - the Grpc client in our case, not to the business logic level
There was a problem hiding this comment.
This one measures how much time it takes on our side to fetch the data.
On the GRPC client side (BbgmConsistencyApiClientImpl and AccConsistencyApiClientImpl) there are separate metrics that measure how much time it takes to call the peer.
metrics_utils/src/lib.rs
Outdated
| pub db_bubblegum_get_grand_epochs_latency: Histogram, | ||
| pub db_bubblegum_get_epochs_latency: Histogram, | ||
| pub db_bubblegum_get_changes_latency: Histogram, | ||
| pub db_account_get_grand_buckets_latency: Histogram, | ||
| pub db_account_get_buckets_latency: Histogram, | ||
| pub db_account_get_latests_latency: Histogram, |
There was a problem hiding this comment.
Yes, please. With our stack of Prometheus + grafana the primary flow is to put up the metrics onto the dashboard and have some simple alerts. With every added new metric there is an increased chance, it'll be not added as it'll require a dedicated query for itself for every chart and every alert rule. Those metrics are super generic, reusing even the existing RED approach is more favorable. If it doesn't fit into RED - having dedicated family is the next best choice. Please don't leave us with the need to create multiple charts to monitor every request. Metrics should be kept as simple as possible.
955a3bb to
a1cdfab
Compare
This pull request contains full implementation of peer to peer2 consistency checking and missing blocks fetching for bubblegum and account nfts.
It includes:
Design document:
https://github.com/metaplex-foundation/aura/wiki/Data-consistency-for-peer%E2%80%90to%E2%80%90peer-indexers