Skip to content

Commit 54aef2d

Browse files
authored
Admin add/remove peer (#7198)
N/A Adds endpoints to add and remove trusted peers from the http api. The added peers are trusted peers so they won't be disconnected for bad scores. We try to maintain a connection to the peer in case they disconnect from us by trying to dial it every heartbeat.
1 parent a5ea05c commit 54aef2d

File tree

10 files changed

+217
-7
lines changed

10 files changed

+217
-7
lines changed

beacon_node/http_api/src/lib.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use eth2::types::{
5353
use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
5454
use health_metrics::observe::Observe;
5555
use lighthouse_network::rpc::methods::MetaData;
56-
use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
56+
use lighthouse_network::{types::SyncState, Enr, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
5757
use lighthouse_version::version_with_platform;
5858
use logging::SSELoggingComponents;
5959
use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage};
@@ -72,6 +72,7 @@ use std::future::Future;
7272
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
7373
use std::path::PathBuf;
7474
use std::pin::Pin;
75+
use std::str::FromStr;
7576
use std::sync::Arc;
7677
use sysinfo::{System, SystemExt};
7778
use system_health::{observe_nat, observe_system_health_bn};
@@ -3676,7 +3677,7 @@ pub fn serve<T: BeaconChainTypes>(
36763677
.and(task_spawner_filter.clone())
36773678
.and(chain_filter.clone())
36783679
.and(warp_utils::json::json())
3679-
.and(network_tx_filter)
3680+
.and(network_tx_filter.clone())
36803681
.and(log_filter.clone())
36813682
.then(
36823683
|not_synced_filter: Result<(), Rejection>,
@@ -4122,6 +4123,77 @@ pub fn serve<T: BeaconChainTypes>(
41224123
},
41234124
);
41244125

4126+
// POST lighthouse/add_peer
4127+
let post_lighthouse_add_peer = warp::path("lighthouse")
4128+
.and(warp::path("add_peer"))
4129+
.and(warp::path::end())
4130+
.and(warp_utils::json::json())
4131+
.and(task_spawner_filter.clone())
4132+
.and(network_globals.clone())
4133+
.and(network_tx_filter.clone())
4134+
.and(log_filter.clone())
4135+
.then(
4136+
|request_data: api_types::AdminPeer,
4137+
task_spawner: TaskSpawner<T::EthSpec>,
4138+
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
4139+
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
4140+
log: Logger| {
4141+
task_spawner.blocking_json_task(Priority::P0, move || {
4142+
let enr = Enr::from_str(&request_data.enr).map_err(|e| {
4143+
warp_utils::reject::custom_bad_request(format!("invalid enr error {}", e))
4144+
})?;
4145+
info!(
4146+
log,
4147+
"Adding trusted peer";
4148+
"peer_id" => %enr.peer_id(),
4149+
"multiaddr" => ?enr.multiaddr()
4150+
);
4151+
network_globals.add_trusted_peer(enr.clone());
4152+
4153+
publish_network_message(&network_tx, NetworkMessage::ConnectTrustedPeer(enr))?;
4154+
4155+
Ok(())
4156+
})
4157+
},
4158+
);
4159+
4160+
// POST lighthouse/remove_peer
4161+
let post_lighthouse_remove_peer = warp::path("lighthouse")
4162+
.and(warp::path("remove_peer"))
4163+
.and(warp::path::end())
4164+
.and(warp_utils::json::json())
4165+
.and(task_spawner_filter.clone())
4166+
.and(network_globals.clone())
4167+
.and(network_tx_filter.clone())
4168+
.and(log_filter.clone())
4169+
.then(
4170+
|request_data: api_types::AdminPeer,
4171+
task_spawner: TaskSpawner<T::EthSpec>,
4172+
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
4173+
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
4174+
log: Logger| {
4175+
task_spawner.blocking_json_task(Priority::P0, move || {
4176+
let enr = Enr::from_str(&request_data.enr).map_err(|e| {
4177+
warp_utils::reject::custom_bad_request(format!("invalid enr error {}", e))
4178+
})?;
4179+
info!(
4180+
log,
4181+
"Removing trusted peer";
4182+
"peer_id" => %enr.peer_id(),
4183+
"multiaddr" => ?enr.multiaddr()
4184+
);
4185+
network_globals.remove_trusted_peer(enr.clone());
4186+
4187+
publish_network_message(
4188+
&network_tx,
4189+
NetworkMessage::DisconnectTrustedPeer(enr),
4190+
)?;
4191+
4192+
Ok(())
4193+
})
4194+
},
4195+
);
4196+
41254197
// POST lighthouse/liveness
41264198
let post_lighthouse_liveness = warp::path("lighthouse")
41274199
.and(warp::path("liveness"))
@@ -4896,6 +4968,8 @@ pub fn serve<T: BeaconChainTypes>(
48964968
.uor(post_lighthouse_ui_validator_info)
48974969
.uor(post_lighthouse_finalize)
48984970
.uor(post_lighthouse_compaction)
4971+
.uor(post_lighthouse_add_peer)
4972+
.uor(post_lighthouse_remove_peer)
48994973
.recover(warp_utils::reject::handle_rejection),
49004974
),
49014975
)

beacon_node/http_api/tests/tests.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5768,6 +5768,27 @@ impl ApiTester {
57685768
self
57695769
}
57705770

5771+
pub async fn test_post_lighthouse_add_remove_peer(self) -> Self {
5772+
let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers();
5773+
// Check that there aren't any trusted peers on startup
5774+
assert!(trusted_peers.is_empty());
5775+
let enr = AdminPeer {enr: "enr:-QESuEDpVVjo8dmDuneRhLnXdIGY3e9NQiaG4sJR3GS-VMQCQDsmBYoQhJRaPeZzPlTsZj2F8v-iV4lKJEYIRIyztqexHodhdHRuZXRziAwAAAAAAAAAhmNsaWVudNiKTGlnaHRob3VzZYw3LjAuMC1iZXRhLjSEZXRoMpDS8Zl_YAAJEAAIAAAAAAAAgmlkgnY0gmlwhIe11XmDaXA2kCoBBPkAOitZAAAAAAAAAAKEcXVpY4IjKYVxdWljNoIjg4lzZWNwMjU2azGhA43ihEr9BUVVnIHIfFqBR3Izs4YRHHPsTqIbUgEb3Hc8iHN5bmNuZXRzD4N0Y3CCIyiEdGNwNoIjgoN1ZHCCIyiEdWRwNoIjgg".to_string()};
5776+
self.client
5777+
.post_lighthouse_add_peer(enr.clone())
5778+
.await
5779+
.unwrap();
5780+
let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers();
5781+
// Should have 1 trusted peer
5782+
assert_eq!(trusted_peers.len(), 1);
5783+
5784+
self.client.post_lighthouse_remove_peer(enr).await.unwrap();
5785+
let trusted_peers = self.ctx.network_globals.as_ref().unwrap().trusted_peers();
5786+
// Should be empty after removing
5787+
assert!(trusted_peers.is_empty());
5788+
5789+
self
5790+
}
5791+
57715792
pub async fn test_post_lighthouse_liveness(self) -> Self {
57725793
let epoch = self.chain.epoch().unwrap();
57735794
let head_state = self.chain.head_beacon_state_cloned();
@@ -7334,6 +7355,8 @@ async fn lighthouse_endpoints() {
73347355
.test_post_lighthouse_database_reconstruct()
73357356
.await
73367357
.test_post_lighthouse_liveness()
7358+
.await
7359+
.test_post_lighthouse_add_remove_peer()
73377360
.await;
73387361
}
73397362

beacon_node/lighthouse_network/src/peer_manager/mod.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ pub struct PeerManager<E: EthSpec> {
114114
metrics_enabled: bool,
115115
/// Keeps track of whether the QUIC protocol is enabled or not.
116116
quic_enabled: bool,
117+
trusted_peers: HashSet<Enr>,
117118
/// The logger associated with the `PeerManager`.
118119
log: slog::Logger,
119120
}
@@ -195,6 +196,7 @@ impl<E: EthSpec> PeerManager<E> {
195196
discovery_enabled,
196197
metrics_enabled,
197198
quic_enabled,
199+
trusted_peers: Default::default(),
198200
log: log.clone(),
199201
})
200202
}
@@ -894,7 +896,7 @@ impl<E: EthSpec> PeerManager<E> {
894896
}
895897

896898
// Gracefully disconnects a peer without banning them.
897-
fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
899+
pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
898900
self.events
899901
.push(PeerManagerEvent::DisconnectPeer(peer_id, reason));
900902
self.network_globals
@@ -943,6 +945,13 @@ impl<E: EthSpec> PeerManager<E> {
943945
}
944946
}
945947

948+
fn maintain_trusted_peers(&mut self) {
949+
let trusted_peers = self.trusted_peers.clone();
950+
for trusted_peer in trusted_peers {
951+
self.dial_peer(trusted_peer);
952+
}
953+
}
954+
946955
/// This function checks the status of our current peers and optionally requests a discovery
947956
/// query if we need to find more peers to maintain the current number of peers
948957
fn maintain_peer_count(&mut self, dialing_peers: usize) {
@@ -1234,6 +1243,7 @@ impl<E: EthSpec> PeerManager<E> {
12341243
fn heartbeat(&mut self) {
12351244
// Optionally run a discovery query if we need more peers.
12361245
self.maintain_peer_count(0);
1246+
self.maintain_trusted_peers();
12371247

12381248
// Cleans up the connection state of dialing peers.
12391249
// Libp2p dials peer-ids, but sometimes the response is from another peer-id or libp2p
@@ -1470,6 +1480,14 @@ impl<E: EthSpec> PeerManager<E> {
14701480
)
14711481
})
14721482
}
1483+
1484+
pub fn add_trusted_peer(&mut self, enr: Enr) {
1485+
self.trusted_peers.insert(enr);
1486+
}
1487+
1488+
pub fn remove_trusted_peer(&mut self, enr: Enr) {
1489+
self.trusted_peers.remove(&enr);
1490+
}
14731491
}
14741492

14751493
enum ConnectingType {

beacon_node/lighthouse_network/src/peer_manager/peerdb.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::net::IpAddr;
99
use std::time::Instant;
1010
use std::{cmp::Ordering, fmt::Display};
1111
use std::{
12-
collections::{HashMap, HashSet},
12+
collections::{hash_map::Entry, HashMap, HashSet},
1313
fmt::Formatter,
1414
};
1515
use sync_status::SyncStatus;
@@ -79,6 +79,33 @@ impl<E: EthSpec> PeerDB<E> {
7979
self.peers.iter()
8080
}
8181

82+
pub fn set_trusted_peer(&mut self, enr: Enr) {
83+
match self.peers.entry(enr.peer_id()) {
84+
Entry::Occupied(mut info) => {
85+
let entry = info.get_mut();
86+
entry.score = Score::max_score();
87+
entry.is_trusted = true;
88+
}
89+
Entry::Vacant(entry) => {
90+
entry.insert(PeerInfo::trusted_peer_info());
91+
}
92+
}
93+
}
94+
95+
pub fn unset_trusted_peer(&mut self, enr: Enr) {
96+
if let Some(info) = self.peers.get_mut(&enr.peer_id()) {
97+
info.is_trusted = false;
98+
info.score = Score::default();
99+
}
100+
}
101+
102+
pub fn trusted_peers(&self) -> Vec<PeerId> {
103+
self.peers
104+
.iter()
105+
.filter_map(|(id, info)| if info.is_trusted { Some(*id) } else { None })
106+
.collect()
107+
}
108+
82109
/// Gives the ids of all known peers.
83110
pub fn peer_ids(&self) -> impl Iterator<Item = &PeerId> {
84111
self.peers.keys()

beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use PeerConnectionStatus::*;
2121
#[serde(bound = "E: EthSpec")]
2222
pub struct PeerInfo<E: EthSpec> {
2323
/// The peers reputation
24-
score: Score,
24+
pub(crate) score: Score,
2525
/// Client managing this peer
2626
client: Client,
2727
/// Connection status of this peer
@@ -50,7 +50,7 @@ pub struct PeerInfo<E: EthSpec> {
5050
#[serde(skip)]
5151
min_ttl: Option<Instant>,
5252
/// Is the peer a trusted peer.
53-
is_trusted: bool,
53+
pub(crate) is_trusted: bool,
5454
/// Direction of the first connection of the last (or current) connected session with this peer.
5555
/// None if this peer was never connected.
5656
connection_direction: Option<ConnectionDirection>,

beacon_node/lighthouse_network/src/service/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,6 +1236,21 @@ impl<E: EthSpec> Network<E> {
12361236
}
12371237
}
12381238

1239+
/// Adds the given `enr` to the trusted peers mapping and tries to dial it
1240+
/// every heartbeat to maintain the connection.
1241+
pub fn dial_trusted_peer(&mut self, enr: Enr) {
1242+
self.peer_manager_mut().add_trusted_peer(enr.clone());
1243+
self.peer_manager_mut().dial_peer(enr);
1244+
}
1245+
1246+
/// Remove the given peer from the trusted peers mapping if it exists and disconnect
1247+
/// from it.
1248+
pub fn remove_trusted_peer(&mut self, enr: Enr) {
1249+
self.peer_manager_mut().remove_trusted_peer(enr.clone());
1250+
self.peer_manager_mut()
1251+
.disconnect_peer(enr.peer_id(), GoodbyeReason::TooManyPeers);
1252+
}
1253+
12391254
/* Sub-behaviour event handling functions */
12401255

12411256
/// Handle a gossipsub event.

beacon_node/lighthouse_network/src/types/globals.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,18 @@ impl<E: EthSpec> NetworkGlobals<E> {
162162
.unwrap_or_default()
163163
}
164164

165+
pub fn add_trusted_peer(&self, enr: Enr) {
166+
self.peers.write().set_trusted_peer(enr);
167+
}
168+
169+
pub fn remove_trusted_peer(&self, enr: Enr) {
170+
self.peers.write().unset_trusted_peer(enr);
171+
}
172+
173+
pub fn trusted_peers(&self) -> Vec<PeerId> {
174+
self.peers.read().trusted_peers()
175+
}
176+
165177
/// Updates the syncing state of the node.
166178
///
167179
/// The old state is returned

beacon_node/network/src/service.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use futures::StreamExt;
1414
use lighthouse_network::rpc::{RequestId, RequestType};
1515
use lighthouse_network::service::Network;
1616
use lighthouse_network::types::GossipKind;
17+
use lighthouse_network::Enr;
1718
use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance};
1819
use lighthouse_network::{
1920
rpc::{GoodbyeReason, RpcErrorResponse},
@@ -101,6 +102,10 @@ pub enum NetworkMessage<E: EthSpec> {
101102
reason: GoodbyeReason,
102103
source: ReportSource,
103104
},
105+
/// Connect to a trusted peer and try to maintain the connection.
106+
ConnectTrustedPeer(Enr),
107+
/// Disconnect from a trusted peer and remove it from the `trusted_peers` mapping.
108+
DisconnectTrustedPeer(Enr),
104109
}
105110

106111
/// Messages triggered by validators that may trigger a subscription to a subnet.
@@ -688,6 +693,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
688693
reason,
689694
source,
690695
} => self.libp2p.goodbye_peer(&peer_id, reason, source),
696+
NetworkMessage::ConnectTrustedPeer(enr) => {
697+
self.libp2p.dial_trusted_peer(enr);
698+
}
699+
NetworkMessage::DisconnectTrustedPeer(enr) => {
700+
self.libp2p.remove_trusted_peer(enr);
701+
}
691702
NetworkMessage::SubscribeCoreTopics => {
692703
if self.subscribed_core_topics() {
693704
return;

common/eth2/src/lighthouse.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ mod sync_committee_rewards;
99

1010
use crate::{
1111
types::{
12-
DepositTreeSnapshot, Epoch, EthSpec, FinalizedExecutionBlock, GenericResponse, ValidatorId,
12+
AdminPeer, DepositTreeSnapshot, Epoch, EthSpec, FinalizedExecutionBlock, GenericResponse,
13+
ValidatorId,
1314
},
1415
BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot,
1516
};
@@ -406,6 +407,30 @@ impl BeaconNodeHttpClient {
406407
self.post_with_response(path, &()).await
407408
}
408409

410+
/// `POST lighthouse/add_peer`
411+
pub async fn post_lighthouse_add_peer(&self, req: AdminPeer) -> Result<(), Error> {
412+
let mut path = self.server.full.clone();
413+
414+
path.path_segments_mut()
415+
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
416+
.push("lighthouse")
417+
.push("add_peer");
418+
419+
self.post_with_response(path, &req).await
420+
}
421+
422+
/// `POST lighthouse/remove_peer`
423+
pub async fn post_lighthouse_remove_peer(&self, req: AdminPeer) -> Result<(), Error> {
424+
let mut path = self.server.full.clone();
425+
426+
path.path_segments_mut()
427+
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
428+
.push("lighthouse")
429+
.push("remove_peer");
430+
431+
self.post_with_response(path, &req).await
432+
}
433+
409434
/*
410435
Analysis endpoints.
411436
*/

common/eth2/src/types.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1431,6 +1431,11 @@ pub struct ManualFinalizationRequestData {
14311431
pub block_root: Hash256,
14321432
}
14331433

1434+
#[derive(Debug, Serialize, Deserialize, Clone)]
1435+
pub struct AdminPeer {
1436+
pub enr: String,
1437+
}
1438+
14341439
#[derive(Debug, Serialize, Deserialize)]
14351440
pub struct LivenessRequestData {
14361441
pub epoch: Epoch,

0 commit comments

Comments
 (0)