-
Notifications
You must be signed in to change notification settings - Fork 2.7k
[Fix] Coupling block sync to DAG state #3386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ff23fd6
15a566b
c19aff3
bae2f49
37a260c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -521,11 +521,16 @@ impl<N: Network> BFT<N> { | |
return Ok(()); | ||
} | ||
|
||
/* Proceeding to commit the leader. */ | ||
info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader)); | ||
// Commit the leader certificate if the primary is not syncing. | ||
if !IS_SYNCING { | ||
/* Proceeding to commit the leader. */ | ||
info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader)); | ||
// Commit the leader certificate, and all previous leader certificates since the last committed round. | ||
self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await?; | ||
} | ||
|
||
Ok(()) | ||
|
||
// Commit the leader certificate, and all previous leader certificates since the last committed round. | ||
self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await | ||
} | ||
|
||
/// Commits the leader certificate, and all previous leader certificates since the last committed round. | ||
|
@@ -814,6 +819,8 @@ impl<N: Network> BFT<N> { | |
mut rx_primary_certificate, | ||
mut rx_sync_bft_dag_at_bootup, | ||
mut rx_sync_bft, | ||
mut rx_commit_bft, | ||
mut rx_is_recently_committed, | ||
} = bft_receiver; | ||
|
||
// Process the current round from the primary. | ||
|
@@ -855,6 +862,30 @@ impl<N: Network> BFT<N> { | |
callback.send(result).ok(); | ||
} | ||
}); | ||
|
||
// Process the request to commit the leader certificate. | ||
let self_ = self.clone(); | ||
self.spawn(async move { | ||
while let Some((certificate, callback)) = rx_commit_bft.recv().await { | ||
// Update the DAG with the certificate. | ||
let result = self_.commit_leader_certificate::<true, true>(certificate).await; | ||
// Send the callback **after** updating the DAG. | ||
// Note: We must await the DAG update before proceeding. | ||
callback.send(result).ok(); | ||
} | ||
}); | ||
|
||
// Process the request to check if the batch certificate was recently committed. | ||
let self_ = self.clone(); | ||
self.spawn(async move { | ||
while let Some(((round, certificate_id), callback)) = rx_is_recently_committed.recv().await { | ||
// Check if the certificate was recently committed. | ||
let is_committed = self_.dag.read().is_recently_committed(round, certificate_id); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems like a lot of hassle just to perform this one check (in its own task with a dedicated channel), especially with both the |
||
// Send the callback **after** updating the DAG. | ||
// Note: We must await the DAG update before proceeding. | ||
callback.send(is_committed).ok(); | ||
} | ||
}); | ||
} | ||
|
||
/// Syncs the BFT DAG with the given batch certificates. These batch certificates **must** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -484,13 +484,46 @@ impl<N: Network> Sync<N> { | |
continue; | ||
} | ||
|
||
if let Authority::Quorum(subdag) = block.authority() { | ||
// Retrieve the leader certificate of the subdag. | ||
let leader_certificate = subdag.leader_certificate(); | ||
let leader_round = leader_certificate.round(); | ||
let leader_author = leader_certificate.author(); | ||
let leader_id = leader_certificate.id(); | ||
|
||
// If a BFT sender was provided, commit the leader certificate. | ||
if let Some(bft_sender) = self.bft_sender.get() { | ||
// Send the leader certificate to the BFT. | ||
if let Err(e) = bft_sender.send_commit_bft(leader_certificate.clone()).await { | ||
bail!("Sync - {e}"); | ||
}; | ||
|
||
// Ensure that leader certificate was recently committed in the DAG. | ||
match bft_sender.send_sync_is_recently_committed(leader_round, leader_id).await { | ||
Ok(is_recently_committed) => { | ||
if !is_recently_committed { | ||
bail!( | ||
"Sync - Failed to advance blocks - leader certificate with author {leader_author} from round {leader_round} was not recently committed.", | ||
); | ||
} | ||
debug!( | ||
"Sync - Leader certificate with author {leader_author} from round {leader_round} was recently committed.", | ||
); | ||
Comment on lines
+503
to
+511
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe a nit but the boolean can be moved into the match statement you get a compile warning if missing a branch and this has better readability IMO. i.e: Ok(true) => {
debug!("...");
}
Ok(false) => {
bail!("...");
}
// rest of cases |
||
} | ||
Err(e) => { | ||
bail!("Sync - Failed to check if leader certificate was recently committed - {e}"); | ||
} | ||
}; | ||
} | ||
} | ||
|
||
// Advance the ledger state. | ||
let self_ = self.clone(); | ||
tokio::task::spawn_blocking(move || { | ||
// Check the next block. | ||
self_.ledger.check_next_block(&block)?; | ||
// Attempt to advance to the next block. | ||
self_.ledger.advance_to_next_block(&block)?; | ||
|
||
// Sync the height with the block. | ||
self_.storage.sync_height_with_block(block.height()); | ||
// Sync the round with the block. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment needs to be extended and explain why we don't commit during sync.