Skip to content

Commit 971f814

Browse files
authored
fix(network): Persist links between archival nodes (#3376)
Each archival node will allocate some of its connections slots for other archival nodes. When it is rebalancing, some connections are dropped, but it won't drop any connection to other archival node, if the number of direct archival nodes is lower than some threshold. Fixes: #3364 Test plan ========= archival_node @ chain/network/tests/routing.rs should pass This test connects to archival nodes to each other, and perform several rounds of connection from other nodes forcing a rebalancing, and check that connection between archival node is not dropped.
1 parent 523eb4f commit 971f814

7 files changed

Lines changed: 142 additions & 6 deletions

File tree

chain/network/src/peer_manager.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,13 @@ impl PeerManagerActor {
458458
.count()
459459
}
460460

461+
fn num_archival_peers(&self) -> usize {
462+
self.active_peers
463+
.values()
464+
.filter(|active_peer| active_peer.full_peer_info.chain_info.archival)
465+
.count()
466+
}
467+
461468
/// Check if it is needed to create a new outbound connection.
462469
/// If the number of active connections is less than `ideal_connections_lo` or
463470
/// (the number of outgoing connections is less than `minimum_outbound_peers`
@@ -663,6 +670,8 @@ impl PeerManagerActor {
663670
/// find the one we connected earlier and add it to the safe set.
664671
/// else break
665672
fn try_stop_active_connection(&self) {
673+
debug!(target: "network", "Trying to stop an active connection. Number of active connections: {}", self.active_peers.len());
674+
666675
// Build safe set
667676
let mut safe_set = HashSet::new();
668677

@@ -676,6 +685,17 @@ impl PeerManagerActor {
676685
}
677686
}
678687

688+
if self.config.archive
689+
&& self.num_archival_peers()
690+
<= self.config.archival_peer_connections_lower_bound as usize
691+
{
692+
for (peer, active) in self.active_peers.iter() {
693+
if active.full_peer_info.chain_info.archival {
694+
safe_set.insert(peer.clone());
695+
}
696+
}
697+
}
698+
679699
// Find all recent connections
680700
let mut recent_connections = self
681701
.active_peers

chain/network/src/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ impl NetworkConfig {
6767
ideal_connections_hi: 35,
6868
peer_recent_time_window: Duration::from_secs(600),
6969
safe_set_size: 20,
70+
archival_peer_connections_lower_bound: 10,
7071
ban_window: Duration::from_secs(1),
7172
peer_expiration_duration: Duration::from_secs(60 * 60),
7273
max_send_peers: 512,
@@ -78,6 +79,7 @@ impl NetworkConfig {
7879
push_info_period: Duration::from_millis(100),
7980
blacklist: HashMap::new(),
8081
outbound_disabled: false,
82+
archive: false,
8183
}
8284
}
8385
}

chain/network/src/types.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,9 @@ pub struct NetworkConfig {
803803
/// Number of peers to keep while removing a connection.
804804
/// Used to avoid disconnecting from peers we have been connected since long time.
805805
pub safe_set_size: u32,
806+
/// Lower bound of the number of connections to archival peers to keep
807+
/// if we are an archival node.
808+
pub archival_peer_connections_lower_bound: u32,
806809
/// Duration of the ban for misbehaving peers.
807810
pub ban_window: Duration,
808811
/// Remove expired peers.
@@ -833,6 +836,8 @@ pub struct NetworkConfig {
833836
/// are satisfied.
834837
/// This flag should be ALWAYS FALSE. Only set to true for testing purposes.
835838
pub outbound_disabled: bool,
839+
/// Not clear old data, set `true` for archive nodes.
840+
pub archive: bool,
836841
}
837842

838843
impl NetworkConfig {

chain/network/tests/full_network.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub fn connect_at_max_capacity(
3939

4040
// Wait until all running nodes have expected connections
4141
for node_id in 0..num_node {
42-
runner.push_action(check_expected_connections(node_id, expected_connections));
42+
runner.push_action(check_expected_connections(node_id, Some(expected_connections), None));
4343
}
4444

4545
// Restart stopped nodes
@@ -49,7 +49,7 @@ pub fn connect_at_max_capacity(
4949

5050
// Wait until all nodes have expected connections
5151
for node_id in 0..total_nodes {
52-
runner.push_action(check_expected_connections(node_id, expected_connections));
52+
runner.push_action(check_expected_connections(node_id, Some(expected_connections), None));
5353
}
5454

5555
start_test(runner);

chain/network/tests/routing.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,3 +242,37 @@ fn max_num_peers_limit() {
242242

243243
start_test(runner);
244244
}
245+
246+
/// Check that two archival nodes keep connected after network rebalance. Nodes 0 and 1 are archival nodes, others aren't.
247+
/// Initially connect 2, 3, 4 to 0. Then connect 1 to 0, this connection should persist, even after other nodes tries
248+
/// to connect to node 0 again.
249+
///
250+
/// Do four rounds where 2, 3, 4 tries to connect to 0 and check that connection between 0 and 1 was never dropped.
251+
#[test]
252+
fn archival_node() {
253+
let mut runner = Runner::new(5, 5)
254+
.max_num_peers(3)
255+
.ideal_connections(2, 2)
256+
.safe_set_size(0)
257+
.set_as_archival(0)
258+
.set_as_archival(1);
259+
260+
runner.push(Action::AddEdge(2, 0));
261+
runner.push(Action::AddEdge(3, 0));
262+
runner.push(Action::AddEdge(4, 0));
263+
runner.push_action(check_expected_connections(0, Some(2), Some(2)));
264+
265+
runner.push(Action::AddEdge(1, 0));
266+
runner.push_action(check_expected_connections(0, Some(2), Some(2)));
267+
runner.push_action(check_direct_connection(0, 1));
268+
269+
for _step in 0..4 {
270+
runner.push(Action::AddEdge(2, 0));
271+
runner.push(Action::AddEdge(3, 0));
272+
runner.push(Action::AddEdge(4, 0));
273+
runner.push_action(check_expected_connections(0, Some(2), Some(2)));
274+
runner.push_action(check_direct_connection(0, 1));
275+
}
276+
277+
start_test(runner);
278+
}

chain/network/tests/runner/mod.rs

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub fn setup_network_node(
6262

6363
let peer_manager = PeerManagerActor::create(move |ctx| {
6464
let mut client_config = ClientConfig::test(false, 100, 200, num_validators, false);
65+
client_config.archive = config.archive;
6566
client_config.ttl_account_id_router = config.ttl_account_id_router;
6667
let network_adapter = NetworkRecipient::new();
6768
network_adapter.set_recipient(ctx.address().recipient());
@@ -363,6 +364,7 @@ struct TestConfig {
363364
ideal_connections: Option<(u32, u32)>,
364365
minimum_outbound_peers: Option<u32>,
365366
safe_set_size: Option<u32>,
367+
archive: bool,
366368
}
367369

368370
impl TestConfig {
@@ -377,6 +379,7 @@ impl TestConfig {
377379
ideal_connections: None,
378380
minimum_outbound_peers: None,
379381
safe_set_size: None,
382+
archive: false,
380383
}
381384
}
382385
}
@@ -425,6 +428,12 @@ impl Runner {
425428
self
426429
}
427430

431+
/// Set node `u` as archival node.
432+
pub fn set_as_archival(mut self, u: usize) -> Self {
433+
self.test_config[u].archive = true;
434+
self
435+
}
436+
428437
/// Specify boot nodes. By default there are no boot nodes.
429438
pub fn use_boot_nodes(mut self, boot_nodes: Vec<usize>) -> Self {
430439
self.apply_all(move |test_config| {
@@ -453,7 +462,7 @@ impl Runner {
453462

454463
pub fn safe_set_size(mut self, safe_set_size: u32) -> Self {
455464
self.apply_all(move |test_config| {
456-
test_config.minimum_outbound_peers = Some(safe_set_size);
465+
test_config.safe_set_size = Some(safe_set_size);
457466
});
458467
self
459468
}
@@ -539,6 +548,7 @@ impl Runner {
539548
network_config.blacklist = blacklist;
540549
network_config.outbound_disabled = test_config.outbound_disabled;
541550
network_config.boot_nodes = boot_nodes;
551+
network_config.archive = test_config.archive;
542552

543553
network_config.ideal_connections_lo =
544554
test_config.ideal_connections.map_or(network_config.ideal_connections_lo, |(lo, _)| lo);
@@ -665,8 +675,14 @@ impl Handler<RunnerMessage> for Runner {
665675
}
666676
}
667677

668-
/// Check that `node_id` has at least `expected_connections` as active peers.
669-
pub fn check_expected_connections(node_id: usize, expected_connections: usize) -> ActionFn {
678+
/// Check that the number of connections of `node_id` is in the range:
679+
/// [expected_connections_lo, expected_connections_hi]
680+
/// Use None to denote semi-open interval
681+
pub fn check_expected_connections(
682+
node_id: usize,
683+
expected_connections_lo: Option<usize>,
684+
expected_connections_hi: Option<usize>,
685+
) -> ActionFn {
670686
Box::new(
671687
move |info: SharedRunningInfo,
672688
flag: Arc<AtomicBool>,
@@ -681,7 +697,19 @@ pub fn check_expected_connections(node_id: usize, expected_connections: usize) -
681697
.send(GetInfo {})
682698
.map_err(|_| ())
683699
.and_then(move |res| {
684-
if res.num_active_peers >= expected_connections {
700+
let left = if let Some(expected_connections_lo) = expected_connections_lo {
701+
expected_connections_lo <= res.num_active_peers
702+
} else {
703+
true
704+
};
705+
706+
let right = if let Some(expected_connections_hi) = expected_connections_hi {
707+
res.num_active_peers <= expected_connections_hi
708+
} else {
709+
true
710+
};
711+
712+
if left && right {
685713
flag.store(true, Ordering::Relaxed);
686714
}
687715
future::ok(())
@@ -692,6 +720,39 @@ pub fn check_expected_connections(node_id: usize, expected_connections: usize) -
692720
)
693721
}
694722

723+
/// Check that `node_id` has a direct connection to `target_id`.
724+
pub fn check_direct_connection(node_id: usize, target_id: usize) -> ActionFn {
725+
Box::new(
726+
move |info: SharedRunningInfo,
727+
flag: Arc<AtomicBool>,
728+
_ctx: &mut Context<WaitOrTimeout>,
729+
_runner| {
730+
let info = info.read().unwrap();
731+
let target_peer_id = info.peers_info[target_id].id.clone();
732+
733+
actix::spawn(
734+
info.pm_addr
735+
.get(node_id)
736+
.unwrap()
737+
.send(NetworkRequests::FetchRoutingTable)
738+
.map_err(|_| ())
739+
.and_then(move |res| {
740+
if let NetworkResponses::RoutingTableInfo(routing_table) = res {
741+
if let Some(routes) = routing_table.peer_forwarding.get(&target_peer_id)
742+
{
743+
if routes.contains(&target_peer_id) {
744+
flag.store(true, Ordering::Relaxed);
745+
}
746+
}
747+
}
748+
future::ok(())
749+
})
750+
.map(drop),
751+
);
752+
},
753+
)
754+
}
755+
695756
/// Restart a node that was already stopped.
696757
pub fn restart(node_id: usize) -> ActionFn {
697758
Box::new(

neard/src/config.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,11 @@ fn default_peer_recent_time_window() -> Duration {
178178
fn default_safe_set_size() -> u32 {
179179
20
180180
}
181+
/// Lower bound of the number of connections to archival peers to keep
182+
/// if we are an archival node.
183+
fn default_archival_peer_connections_lower_bound() -> u32 {
184+
10
185+
}
181186
/// Time to persist Accounts Id in the router without removing them in seconds.
182187
fn default_ttl_account_id_router() -> Duration {
183188
Duration::from_secs(TTL_ACCOUNT_ID_ROUTER)
@@ -215,6 +220,10 @@ pub struct Network {
215220
/// Used to avoid disconnecting from peers we have been connected since long time.
216221
#[serde(default = "default_safe_set_size")]
217222
pub safe_set_size: u32,
223+
/// Lower bound of the number of connections to archival peers to keep
224+
/// if we are an archival node.
225+
#[serde(default = "default_archival_peer_connections_lower_bound")]
226+
pub archival_peer_connections_lower_bound: u32,
218227
/// Handshake timeout.
219228
pub handshake_timeout: Duration,
220229
/// Duration before trying to reconnect to a peer.
@@ -247,6 +256,7 @@ impl Default for Network {
247256
ideal_connections_hi: default_ideal_connections_hi(),
248257
peer_recent_time_window: default_peer_recent_time_window(),
249258
safe_set_size: default_safe_set_size(),
259+
archival_peer_connections_lower_bound: default_archival_peer_connections_lower_bound(),
250260
handshake_timeout: Duration::from_secs(20),
251261
reconnect_delay: Duration::from_secs(60),
252262
skip_sync_wait: false,
@@ -613,6 +623,9 @@ impl NearConfig {
613623
ideal_connections_hi: config.network.ideal_connections_hi,
614624
peer_recent_time_window: config.network.peer_recent_time_window,
615625
safe_set_size: config.network.safe_set_size,
626+
archival_peer_connections_lower_bound: config
627+
.network
628+
.archival_peer_connections_lower_bound,
616629
ban_window: config.network.ban_window,
617630
max_send_peers: 512,
618631
peer_expiration_duration: Duration::from_secs(7 * 24 * 60 * 60),
@@ -624,6 +637,7 @@ impl NearConfig {
624637
push_info_period: Duration::from_millis(100),
625638
blacklist: blacklist_from_iter(config.network.blacklist),
626639
outbound_disabled: false,
640+
archive: config.archive,
627641
},
628642
telemetry_config: config.telemetry,
629643
rpc_config: config.rpc,

0 commit comments

Comments
 (0)