|
2 | 2 |
|
3 | 3 | use std::time::Duration;
|
4 | 4 |
|
5 |
| -use strata_p2p::swarm::{ |
6 |
| - self, handle::P2PHandle, P2PConfig, DEFAULT_CONNECTION_CHECK_INTERVAL, DEFAULT_DIAL_TIMEOUT, |
7 |
| - DEFAULT_GENERAL_TIMEOUT, P2P, |
| 5 | +use strata_p2p::{ |
| 6 | + commands::{Command, ConnectToPeerCommand}, |
| 7 | + swarm::{ |
| 8 | + self, handle::P2PHandle, P2PConfig, DEFAULT_CONNECTION_CHECK_INTERVAL, |
| 9 | + DEFAULT_DIAL_TIMEOUT, DEFAULT_GENERAL_TIMEOUT, P2P, |
| 10 | + }, |
8 | 11 | };
|
9 | 12 | use tokio::task::JoinHandle;
|
10 | 13 | use tokio_util::sync::CancellationToken;
|
11 |
| -use tracing::info; |
| 14 | +use tracing::{debug, info, warn}; |
12 | 15 |
|
13 | 16 | use crate::{config::Configuration, constants::DEFAULT_IDLE_CONNECTION_TIMEOUT};
|
14 | 17 |
|
15 | 18 | /// Bootstrap the p2p node by hooking up all the required services.
|
16 | 19 | pub async fn bootstrap(
|
17 | 20 | config: &Configuration,
|
18 | 21 | ) -> anyhow::Result<(P2PHandle, CancellationToken, JoinHandle<()>)> {
|
| 22 | + let allowlist_len = config.allowlist.len(); |
| 23 | + |
19 | 24 | let p2p_config = P2PConfig {
|
20 | 25 | keypair: config.keypair.clone(),
|
21 | 26 | idle_connection_timeout: config
|
@@ -48,5 +53,35 @@ pub async fn bootstrap(
|
48 | 53 | info!("listening for network events and commands from handles");
|
49 | 54 | let listen_task = tokio::spawn(p2p.listen());
|
50 | 55 |
|
| 56 | + let connect_handle = handle.clone(); |
| 57 | + let connect_to = config.connect_to.clone(); |
| 58 | + let allowlist = config.allowlist.clone(); |
| 59 | + let _connect_task = tokio::spawn(async move { |
| 60 | + loop { |
| 61 | + let connected_peers = connect_handle.get_connected_peers().await; |
| 62 | + if connected_peers.len() < allowlist_len { |
| 63 | + debug!( |
| 64 | + connected_peers=%connected_peers.len(), |
| 65 | + allowlist=%allowlist_len, |
| 66 | + "initializing period re-establishing connections" |
| 67 | + ); |
| 68 | + for (addr, peer_id) in connect_to.iter().zip(allowlist.iter()) { |
| 69 | + warn!( |
| 70 | + %peer_id, |
| 71 | + "re-connecting to peer" |
| 72 | + ); |
| 73 | + let command = Command::ConnectToPeer(ConnectToPeerCommand { |
| 74 | + peer_id: *peer_id, |
| 75 | + peer_addr: addr.clone(), |
| 76 | + }); |
| 77 | + let _ = connect_handle.send_command(command).await; |
| 78 | + debug!(%peer_id, "command sent"); |
| 79 | + } |
| 80 | + } |
| 81 | + // TODO(@storopoli): make this configurable |
| 82 | + tokio::time::sleep(Duration::from_secs(10)).await; |
| 83 | + } |
| 84 | + }); |
| 85 | + |
51 | 86 | Ok((handle, cancel, listen_task))
|
52 | 87 | }
|
0 commit comments