-
Notifications
You must be signed in to change notification settings - Fork 961
Use events API to eager send attestations #7892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Changes from 38 commits
ea45814
6f20083
46fe220
de7226f
5f2a101
ae16705
c13d0c3
8405f01
65e04fb
a1d36e6
a489d32
2b974db
5600aef
b65fc30
9024931
b25703f
e68548b
c4d851c
daf5b2e
c49b8eb
0e35ee5
29867d2
fd43876
b054a10
eb057d0
91e5980
2e0c78c
32eed9a
dac9f00
bf1471c
9794737
b20ded5
39b9a58
2e6a8f9
07d9a12
a82960b
6bddeeb
409d937
0d8160f
4ea159b
70e6d18
84c8291
f174615
468fe8a
bdf62d9
94af503
8b45d57
ce91eca
eabbdd4
dfe2935
7b7ab09
435a93c
00962ea
c2092eb
b388efe
4bfafc1
face04f
b30d710
28dbe33
8cfa532
516ccd7
3d4dfe0
4b4d0d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| use crate::BeaconNodeFallback; | ||
| use std::collections::HashMap; | ||
| use tokio::sync::RwLock; | ||
|
|
||
| use types::EthSpec; | ||
|
|
||
michaelsproul marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| use eth2::types::{EventKind, EventTopic, SseHead}; | ||
| use tracing::{info, warn}; | ||
|
|
||
| use futures::StreamExt; | ||
| use slot_clock::SlotClock; | ||
| use std::sync::Arc; | ||
|
|
||
| type CacheHashMap = HashMap<usize, SseHead>; | ||
|
|
||
| // This is used send the index derived from `CandidateBeaconNode` to the | ||
| // `AttestationService` for further processing | ||
| #[derive(Debug)] | ||
| pub struct HeadEvent { | ||
| pub beacon_node_index: usize, | ||
michaelsproul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| /// Cache to maintain the latest head received from each of the beacon nodes | ||
| /// in the `BeaconNodeFallback`. | ||
| #[derive(Debug)] | ||
| pub struct BeaconHeadCache { | ||
| cache: RwLock<CacheHashMap>, | ||
| } | ||
|
Comment on lines
+23
to
+26
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this cache really needed? Can't we just send the slot of the new head to the attestation service via the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's useful to have the head monitor handle this, as it might eventually be useful for things other than just notifying the attestation service. E.g. when proposing a block we might want to query the head monitor, or the attestation consensus system might want to query the cached head of each BN to work out whether consensus exists (this is being worked on by Eitan in #8445) |
||
|
|
||
| impl BeaconHeadCache { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| cache: RwLock::new(HashMap::new()), | ||
| } | ||
| } | ||
|
|
||
| pub async fn get(&self, beacon_node_index: usize) -> Option<SseHead> { | ||
| self.cache.read().await.get(&beacon_node_index).cloned() | ||
| } | ||
|
|
||
| pub async fn insert(&self, beacon_node_index: usize, head: SseHead) { | ||
| self.cache.write().await.insert(beacon_node_index, head); | ||
| } | ||
|
|
||
| pub async fn is_latest(&self, head: &SseHead) -> bool { | ||
michaelsproul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let cache = self.cache.read().await; | ||
| cache | ||
| .values() | ||
| .all(|cache_head| head.slot >= cache_head.slot) | ||
michaelsproul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| pub async fn purge_cache(&self) { | ||
| self.cache.write().await.clear(); | ||
| } | ||
| } | ||
|
|
||
| impl Default for BeaconHeadCache { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| // Runs a non-terminating loop to update the `BeaconHeadCache` with the latest head received | ||
| // from the candidate beacon_nodes. This is an attempt to stream events to beacon nodes and | ||
| // potential start attestion duties earlier as soon as latest head is receive from any of the | ||
| // beacon node in contrast to attest at the 1/3rd mark in the slot. | ||
| // | ||
| // | ||
| // The cache and the candidate BNs list are refresh/purged to avoid dangling reference conditions | ||
| // that arise due to `update_candidates_list`. | ||
| // | ||
| // Starts the service to perpetually stream head events from connected beacon_nodes | ||
| pub async fn poll_head_event_from_beacon_nodes<E: EthSpec, T: SlotClock + 'static>( | ||
hopinheimer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| beacon_nodes: Arc<BeaconNodeFallback<T>>, | ||
| ) -> Result<(), String> { | ||
| let head_cache = beacon_nodes | ||
| .beacon_head_cache | ||
| .clone() | ||
| .expect("Unable to start head monitor without beacon_head_cache"); | ||
michaelsproul marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| let head_monitor_send = beacon_nodes | ||
| .head_monitor_send | ||
| .clone() | ||
| .expect("Unable to start head monitor without head_monitor_send"); | ||
michaelsproul marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| info!("Starting head monitoring service"); | ||
| let candidates = { | ||
| let candidates_guard = beacon_nodes.candidates.read().await; | ||
| candidates_guard.clone() | ||
| }; | ||
| let mut tasks = vec![]; | ||
|
|
||
| for candidate in candidates.iter() { | ||
| let head_event_stream = candidate | ||
| .beacon_node | ||
| .get_events::<E>(&[EventTopic::Head]) | ||
michaelsproul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .await; | ||
|
|
||
| let mut head_event_stream = match head_event_stream { | ||
| Ok(stream) => stream, | ||
| Err(e) => { | ||
| warn!("failed to get head event stream: {:?}", e); | ||
| continue; | ||
| } | ||
| }; | ||
|
|
||
| let sender_tx = head_monitor_send.clone(); | ||
| let head_cache_ref = head_cache.clone(); | ||
|
|
||
| let stream_fut = async move { | ||
| while let Some(event_result) = head_event_stream.next().await { | ||
| if let Ok(EventKind::Head(head)) = event_result { | ||
| head_cache_ref.insert(candidate.index, head.clone()).await; | ||
|
|
||
| if !head_cache_ref.is_latest(&head).await { | ||
| continue; | ||
| } | ||
|
|
||
| if sender_tx | ||
| .send(HeadEvent { | ||
| beacon_node_index: candidate.index, | ||
| }) | ||
| .await | ||
| .is_err() | ||
| { | ||
| warn!("Head monitoring service channel closed"); | ||
| } | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| tasks.push(stream_fut); | ||
| } | ||
|
|
||
| if tasks.is_empty() { | ||
| head_cache.purge_cache().await; | ||
| return Err( | ||
| "No beacon nodes available for head event streaming, retry in sometime".to_string(), | ||
| ); | ||
| } | ||
|
|
||
| futures::future::join_all(tasks).await; | ||
|
|
||
| drop(candidates); | ||
| head_cache.purge_cache().await; | ||
|
|
||
| Ok(()) | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -466,6 +466,17 @@ pub struct ValidatorClient { | |||||
| )] | ||||||
| pub beacon_nodes_sync_tolerances: Vec<u64>, | ||||||
|
|
||||||
| #[clap( | ||||||
| long, | ||||||
| help = "Enable the beacon head monitor so fallback head updates trigger duties when a lagging primary is detected. \ | ||||||
|
||||||
| help = "Enable the beacon head monitor so fallback head updates trigger duties when a lagging primary is detected. \ | |
| help = "Enable the beacon head monitor so fallback head updates trigger duties when a lagging primary is detected. \ |
Uh oh!
There was an error while loading. Please reload this page.