Skip to content

Commit b165c18

Browse files
committed
reorg of clusterinfo port init logic
1 parent 4c80dc0 commit b165c18

File tree

2 files changed

+77
-189
lines changed

2 files changed

+77
-189
lines changed

gossip/src/cluster_info.rs

+28-189
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ use {
5151
solana_keypair::{signable::Signable, Keypair},
5252
solana_ledger::shred::Shred,
5353
solana_net_utils::{
54-
bind_common_in_range_with_config, bind_common_with_config, bind_in_range,
55-
bind_in_range_with_config, bind_more_with_config, bind_to_unspecified,
56-
bind_two_in_range_with_offset_and_config, find_available_port_in_range,
57-
multi_bind_in_range_with_config, PortRange, SocketConfig, VALIDATOR_PORT_RANGE,
54+
bind_in_range, bind_in_range_with_config, bind_more_with_config, bind_to_unspecified,
55+
bind_to_with_config, find_available_port_in_range, multi_bind_in_range_with_config,
56+
sockets::{get_gossip_port, localhost_port_range_for_tests},
57+
PortRange, SocketConfig, VALIDATOR_PORT_RANGE,
5858
},
5959
solana_perf::{
6060
data_budget::DataBudget,
@@ -90,7 +90,7 @@ use {
9090
rc::Rc,
9191
result::Result,
9292
sync::{
93-
atomic::{AtomicBool, AtomicU16, Ordering},
93+
atomic::{AtomicBool, Ordering},
9494
Arc, Mutex, RwLock, RwLockReadGuard,
9595
},
9696
thread::{sleep, Builder, JoinHandle},
@@ -2331,7 +2331,7 @@ impl ClusterInfo {
23312331
) -> (ContactInfo, UdpSocket, Option<TcpListener>) {
23322332
let bind_ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
23332333
let (port, (gossip_socket, ip_echo)) =
2334-
Node::get_gossip_port(gossip_addr, VALIDATOR_PORT_RANGE, bind_ip_addr);
2334+
get_gossip_port(gossip_addr, VALIDATOR_PORT_RANGE, bind_ip_addr);
23352335
let contact_info =
23362336
Self::gossip_contact_info(id, SocketAddr::new(gossip_addr.ip(), port), shred_version);
23372337

@@ -2405,70 +2405,23 @@ pub struct Node {
24052405
pub sockets: Sockets,
24062406
}
24072407

2408-
const NODE_BASE_PORT: u16 = 40000;
24092408
impl Node {
24102409
/// create localhost node for tests
24112410
pub fn new_localhost() -> Self {
24122411
let pubkey = solana_pubkey::new_rand();
24132412
Self::new_localhost_with_pubkey(&pubkey)
24142413
}
24152414

2416-
fn localhost_port_range_for_tests() -> (u16, u16) {
2417-
static SLICE: AtomicU16 = AtomicU16::new(NODE_BASE_PORT);
2418-
let start = match std::env::var("NEXTEST_TEST_GLOBAL_SLOT") {
2419-
Ok(slot) => {
2420-
let slot: u16 = slot.parse().unwrap();
2421-
NODE_BASE_PORT + slot * 25
2422-
}
2423-
Err(_) => SLICE.fetch_add(100, Ordering::Relaxed),
2424-
};
2425-
assert!(start < 65500, "ran out of port numbers!");
2426-
(start, start + 25)
2427-
}
2428-
24292415
/// create localhost node for tests with provided pubkey
24302416
/// unlike the public IP version, this will also bind RPC sockets.
24312417
pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self {
24322418
let addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
2433-
let port_range = Self::localhost_port_range_for_tests();
2419+
let port_range = localhost_port_range_for_tests();
24342420
let gossip_port = find_available_port_in_range(addr, port_range)
24352421
.expect("At least one open port should be available");
2436-
let config = NodeConfig {
2437-
gossip_addr: SocketAddr::new(addr, gossip_port),
2438-
port_range,
2439-
bind_ip_addr: addr,
2440-
public_tpu_addr: None,
2441-
public_tpu_forwards_addr: None,
2442-
num_tvu_receive_sockets: NonZero::new(1).unwrap(),
2443-
num_tvu_retransmit_sockets: NonZero::new(1).unwrap(),
2444-
num_quic_endpoints: NonZero::new(DEFAULT_QUIC_ENDPOINTS)
2445-
.expect("Number of QUIC endpoints can not be zero"),
2446-
};
2447-
let mut node = Self::new_with_external_ip(pubkey, config);
2448-
let rpc_port = find_available_port_in_range(addr, port_range).unwrap();
2449-
let rpc_pubsub_port = find_available_port_in_range(addr, port_range).unwrap();
2450-
node.info.set_rpc((addr, rpc_port)).unwrap();
2451-
node.info.set_rpc_pubsub((addr, rpc_pubsub_port)).unwrap();
2452-
node
2453-
}
2454-
2455-
fn get_gossip_port(
2456-
gossip_addr: &SocketAddr,
2457-
port_range: PortRange,
2458-
bind_ip_addr: IpAddr,
2459-
) -> (u16, (UdpSocket, TcpListener)) {
2460-
let config = SocketConfig::default();
2461-
if gossip_addr.port() != 0 {
2462-
(
2463-
gossip_addr.port(),
2464-
bind_common_with_config(bind_ip_addr, gossip_addr.port(), config).unwrap_or_else(
2465-
|e| panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e),
2466-
),
2467-
)
2468-
} else {
2469-
bind_common_in_range_with_config(bind_ip_addr, port_range, config)
2470-
.expect("Failed to bind")
2471-
}
2422+
let gossip_addr = SocketAddr::new(addr, gossip_port);
2423+
#[allow(deprecated)] // new_single_bind will be merged in this function
2424+
Self::new_single_bind(pubkey, &gossip_addr, port_range, addr)
24722425
}
24732426

24742427
fn bind_with_config(
@@ -2486,139 +2439,25 @@ impl Node {
24862439
port_range: PortRange,
24872440
bind_ip_addr: IpAddr,
24882441
) -> Self {
2489-
let (gossip_port, (gossip, ip_echo)) =
2490-
Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr);
2491-
2492-
let socket_config = SocketConfig::default();
2493-
let socket_config_reuseport = SocketConfig::default().reuseport(true);
2494-
let (tvu_port, tvu) = Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2495-
let (tvu_quic_port, tvu_quic) =
2496-
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2497-
let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
2498-
bind_two_in_range_with_offset_and_config(
2499-
bind_ip_addr,
2500-
port_range,
2501-
QUIC_PORT_OFFSET,
2502-
socket_config,
2503-
socket_config_reuseport,
2504-
)
2505-
.unwrap();
2506-
let tpu_quic: Vec<UdpSocket> =
2507-
bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, socket_config_reuseport)
2508-
.unwrap();
2509-
2510-
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
2511-
bind_two_in_range_with_offset_and_config(
2512-
bind_ip_addr,
2513-
port_range,
2514-
QUIC_PORT_OFFSET,
2515-
socket_config,
2516-
socket_config_reuseport,
2517-
)
2518-
.unwrap();
2519-
let tpu_forwards_quic = bind_more_with_config(
2520-
tpu_forwards_quic,
2521-
DEFAULT_QUIC_ENDPOINTS,
2522-
socket_config_reuseport,
2523-
)
2524-
.unwrap();
2525-
2526-
let (tpu_vote_port, tpu_vote) =
2527-
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2528-
let (tpu_vote_quic_port, tpu_vote_quic) =
2529-
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2530-
let tpu_vote_quic: Vec<UdpSocket> = bind_more_with_config(
2531-
tpu_vote_quic,
2532-
DEFAULT_QUIC_ENDPOINTS,
2533-
socket_config_reuseport,
2534-
)
2535-
.unwrap();
2536-
2537-
let (_, retransmit_socket) =
2538-
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2539-
let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2540-
let (_, repair_quic) = Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2541-
let (serve_repair_port, serve_repair) =
2542-
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2543-
let (serve_repair_quic_port, serve_repair_quic) =
2544-
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2545-
let (_, broadcast) = Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2546-
let (_, ancestor_hashes_requests) =
2547-
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2548-
let (_, ancestor_hashes_requests_quic) =
2549-
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2550-
2442+
let config = NodeConfig {
2443+
gossip_addr: *gossip_addr,
2444+
port_range,
2445+
bind_ip_addr,
2446+
public_tpu_addr: None,
2447+
public_tpu_forwards_addr: None,
2448+
num_tvu_receive_sockets: NonZero::new(1).unwrap(),
2449+
num_tvu_retransmit_sockets: NonZero::new(1).unwrap(),
2450+
num_quic_endpoints: NonZero::new(DEFAULT_QUIC_ENDPOINTS)
2451+
.expect("Number of QUIC endpoints can not be zero"),
2452+
};
2453+
let mut node = Self::new_with_external_ip(pubkey, config);
25512454
let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
25522455
let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
2553-
2554-
// These are client sockets, so the port is set to be 0 because it must be ephimeral.
2555-
let tpu_vote_forwards_client =
2556-
Self::bind_with_config(bind_ip_addr, 0, socket_config).unwrap();
2557-
2558-
let addr = gossip_addr.ip();
2559-
let mut info = ContactInfo::new(
2560-
*pubkey,
2561-
timestamp(), // wallclock
2562-
0u16, // shred_version
2563-
);
2564-
macro_rules! set_socket {
2565-
($method:ident, $port:ident, $name:literal) => {
2566-
info.$method((addr, $port)).expect(&format!(
2567-
"Operator must spin up node with valid {} address",
2568-
$name
2569-
))
2570-
};
2571-
($method:ident, $protocol:ident, $port:ident, $name:literal) => {{
2572-
info.$method(contact_info::Protocol::$protocol, (addr, $port))
2573-
.expect(&format!(
2574-
"Operator must spin up node with valid {} address",
2575-
$name
2576-
))
2577-
}};
2578-
}
2579-
set_socket!(set_gossip, gossip_port, "gossip");
2580-
set_socket!(set_tvu, UDP, tvu_port, "TVU");
2581-
set_socket!(set_tvu, QUIC, tvu_quic_port, "TVU QUIC");
2582-
set_socket!(set_tpu, tpu_port, "TPU");
2583-
set_socket!(set_tpu_forwards, tpu_forwards_port, "TPU-forwards");
2584-
set_socket!(set_tpu_vote, UDP, tpu_vote_port, "TPU-vote");
2585-
set_socket!(set_tpu_vote, QUIC, tpu_vote_quic_port, "TPU-vote QUIC");
2586-
set_socket!(set_rpc, rpc_port, "RPC");
2587-
set_socket!(set_rpc_pubsub, rpc_pubsub_port, "RPC-pubsub");
2588-
set_socket!(set_serve_repair, UDP, serve_repair_port, "serve-repair");
2589-
set_socket!(
2590-
set_serve_repair,
2591-
QUIC,
2592-
serve_repair_quic_port,
2593-
"serve-repair QUIC"
2594-
);
2595-
2596-
trace!("new ContactInfo: {:?}", info);
2597-
2598-
Node {
2599-
info,
2600-
sockets: Sockets {
2601-
gossip,
2602-
ip_echo: Some(ip_echo),
2603-
tvu: vec![tvu],
2604-
tvu_quic,
2605-
tpu: vec![tpu],
2606-
tpu_forwards: vec![tpu_forwards],
2607-
tpu_vote: vec![tpu_vote],
2608-
broadcast: vec![broadcast],
2609-
repair,
2610-
repair_quic,
2611-
retransmit_sockets: vec![retransmit_socket],
2612-
serve_repair,
2613-
serve_repair_quic,
2614-
ancestor_hashes_requests,
2615-
ancestor_hashes_requests_quic,
2616-
tpu_quic,
2617-
tpu_forwards_quic,
2618-
tpu_vote_quic,
2619-
tpu_vote_forwards_client,
2620-
},
2621-
}
2456+
node.info.set_rpc((bind_ip_addr, rpc_port)).unwrap();
2457+
node.info
2458+
.set_rpc_pubsub((bind_ip_addr, rpc_pubsub_port))
2459+
.unwrap();
2460+
node
26222461
}
26232462

26242463
pub fn new_with_external_ip(pubkey: &Pubkey, config: NodeConfig) -> Node {
@@ -2634,7 +2473,7 @@ impl Node {
26342473
} = config;
26352474

26362475
let (gossip_port, (gossip, ip_echo)) =
2637-
Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr);
2476+
get_gossip_port(&gossip_addr, port_range, bind_ip_addr);
26382477

26392478
let socket_config = SocketConfig::default();
26402479
let socket_config_reuseport = SocketConfig::default().reuseport(true);

net-utils/src/sockets.rs

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use {
2+
crate::{bind_common_in_range_with_config, bind_common_with_config, PortRange, SocketConfig},
3+
std::{
4+
net::{IpAddr, SocketAddr, TcpListener, UdpSocket},
5+
sync::atomic::{AtomicU16, Ordering},
6+
},
7+
};
8+
9+
const BASE_PORT: u16 = 20000;
10+
11+
/// Retrieve a free 100-port slice for unit tests
12+
///
13+
/// When running under nextest, this is guaranteed to provide
14+
/// a unique slice of port numbers (assuming no other nextest processes
15+
/// are running on the same host) based on NEXTEST_TEST_GLOBAL_SLOT variable
16+
/// The port ranges will be reused following nextest logic.
17+
///
18+
/// When running without nextest, this will bump an atomic and eventually
19+
/// panic when it runs out of port numbers to assign.
20+
pub fn localhost_port_range_for_tests() -> (u16, u16) {
21+
static SLICE: AtomicU16 = AtomicU16::new(BASE_PORT);
22+
let start = match std::env::var("NEXTEST_TEST_GLOBAL_SLOT") {
23+
Ok(slot) => {
24+
let slot: u16 = slot.parse().unwrap();
25+
BASE_PORT + slot * 100
26+
}
27+
Err(_) => SLICE.fetch_add(100, Ordering::Relaxed),
28+
};
29+
assert!(start < 65500, "ran out of port numbers!");
30+
(start, start + 100)
31+
}
32+
33+
pub fn get_gossip_port(
34+
gossip_addr: &SocketAddr,
35+
port_range: PortRange,
36+
bind_ip_addr: IpAddr,
37+
) -> (u16, (UdpSocket, TcpListener)) {
38+
let config = SocketConfig::default();
39+
if gossip_addr.port() != 0 {
40+
(
41+
gossip_addr.port(),
42+
bind_common_with_config(bind_ip_addr, gossip_addr.port(), config).unwrap_or_else(|e| {
43+
panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e)
44+
}),
45+
)
46+
} else {
47+
bind_common_in_range_with_config(bind_ip_addr, port_range, config).expect("Failed to bind")
48+
}
49+
}

0 commit comments

Comments
 (0)