Skip to content

Commit 0142ceb

Browse files
authored
Added option to specify tpu addresses in vortexor (#6116)
* Added option to specify tpu addresses in vortexor * Added option to specify tpu addresses in vortexor * fmt log messages * use multi_bind_in_range_with_config
1 parent f7f2f5f commit 0142ceb

File tree

4 files changed

+83
-20
lines changed

4 files changed

+83
-20
lines changed

vortexor/src/cli.rs

+13
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,19 @@ pub struct Cli {
9292
#[arg(long, value_parser = parse_port_range, value_name = "MIN_PORT-MAX_PORT", default_value = get_default_port_range())]
9393
pub dynamic_port_range: (u16, u16),
9494

95+
/// Optional TPU address to bind to. If not specified, the vortexor will bind to
96+
/// the first available port in the dynamic port range. When this argument is
97+
/// specified, the --bind-address and --dynamic-port-range arguments are ignored.
98+
#[arg(long, value_name = "HOST:PORT")]
99+
pub tpu_address: Option<SocketAddr>,
100+
101+
/// Optional TPU-forward address to bind to. If not specified, the vortexor will bind to
102+
/// the first available port in the dynamic port range after binding the tpu_address.
103+
/// When this argument is specified, the --bind-address and --dynamic-port-range
104+
/// arguments are ignored.
105+
#[arg(long, value_name = "HOST:PORT")]
106+
pub tpu_forward_address: Option<SocketAddr>,
107+
95108
/// Controls the max concurrent connections per IpAddr.
96109
#[arg(long, default_value_t = DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER)]
97110
pub max_connections_per_peer: usize,

vortexor/src/main.rs

+33-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use {
55
solana_core::banking_trace::BankingTracer,
66
solana_logger::redirect_stderr_to_file,
77
solana_net_utils::{bind_in_range_with_config, SocketConfig},
8-
solana_sdk::{signature::read_keypair_file, signer::Signer},
8+
solana_sdk::{quic::QUIC_PORT_OFFSET, signature::read_keypair_file, signer::Signer},
99
solana_streamer::streamer::StakedNodes,
1010
solana_vortexor::{
1111
cli::Cli,
@@ -20,7 +20,7 @@ use {
2020
std::{
2121
collections::HashMap,
2222
env,
23-
net::IpAddr,
23+
net::{IpAddr, SocketAddr},
2424
sync::{atomic::AtomicBool, Arc, RwLock},
2525
time::Duration,
2626
},
@@ -77,14 +77,21 @@ pub fn main() {
7777
let tpu_coalesce = Duration::from_millis(args.tpu_coalesce_ms);
7878
let dynamic_port_range = args.dynamic_port_range;
7979

80+
let tpu_address = args.tpu_address;
81+
let tpu_forward_address = args.tpu_forward_address;
8082
let max_streams_per_ms = args.max_streams_per_ms;
8183
let exit = Arc::new(AtomicBool::new(false));
8284
// To be linked with the Tpu sigverify and forwarder service
8385
let (tpu_sender, tpu_receiver) = bounded(DEFAULT_CHANNEL_SIZE);
8486
let (tpu_fwd_sender, _tpu_fwd_receiver) = bounded(DEFAULT_CHANNEL_SIZE);
8587

86-
let tpu_sockets =
87-
Vortexor::create_tpu_sockets(*bind_address, dynamic_port_range, num_quic_endpoints);
88+
let tpu_sockets = Vortexor::create_tpu_sockets(
89+
*bind_address,
90+
dynamic_port_range,
91+
tpu_address,
92+
tpu_forward_address,
93+
num_quic_endpoints,
94+
);
8895

8996
let (banking_tracer, _) = BankingTracer::new(
9097
None, // Not interesed in banking tracing
@@ -125,7 +132,7 @@ pub fn main() {
125132
DEFAULT_SENDER_THREADS_COUNT,
126133
DEFAULT_BATCH_SIZE,
127134
DEFAULT_RECV_TIMEOUT,
128-
destinations,
135+
destinations.clone(),
129136
);
130137

131138
info!("Creating the SigVerifier");
@@ -156,6 +163,27 @@ pub fn main() {
156163
tpu_sockets.tpu_quic_fwd[0].local_addr()
157164
);
158165

166+
let tpu_address = tpu_sockets.tpu_quic[0].local_addr().unwrap();
167+
let tpu_public_address = SocketAddr::new(
168+
tpu_address.ip(),
169+
tpu_address.port().saturating_sub(QUIC_PORT_OFFSET),
170+
);
171+
let tpu_fwd_address = tpu_sockets.tpu_quic_fwd[0].local_addr().unwrap();
172+
let tpu_fwd_public_address = SocketAddr::new(
173+
tpu_fwd_address.ip(),
174+
tpu_fwd_address.port().saturating_sub(QUIC_PORT_OFFSET),
175+
);
176+
177+
for destination in destinations.read().unwrap().iter() {
178+
info!(
179+
"To pair the validator with receiver address {destination} with this \
180+
vortexor, add the following arguments in the validator's start command: \
181+
--tpu-vortexor-receiver-address {destination} \
182+
--public-tpu-address {tpu_public_address} \
183+
--public-tpu-forward-address {tpu_fwd_public_address}",
184+
);
185+
}
186+
159187
let vortexor = Vortexor::create_vortexor(
160188
tpu_sockets,
161189
staked_nodes,

vortexor/src/vortexor.rs

+35-15
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use {
44
banking_trace::TracedSender, sigverify::TransactionSigVerifier,
55
sigverify_stage::SigVerifyStage,
66
},
7-
solana_net_utils::{bind_in_range_with_config, bind_more_with_config, SocketConfig},
7+
solana_net_utils::{multi_bind_in_range_with_config, SocketConfig},
88
solana_perf::packet::PacketBatch,
99
solana_sdk::{quic::NotifyKeyUpdate, signature::Keypair},
1010
solana_streamer::{
@@ -13,7 +13,7 @@ use {
1313
streamer::StakedNodes,
1414
},
1515
std::{
16-
net::UdpSocket,
16+
net::{SocketAddr, UdpSocket},
1717
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
1818
thread::{self, JoinHandle},
1919
time::Duration,
@@ -56,26 +56,27 @@ impl Vortexor {
5656
pub fn create_tpu_sockets(
5757
bind_address: std::net::IpAddr,
5858
dynamic_port_range: (u16, u16),
59+
tpu_address: Option<SocketAddr>,
60+
tpu_forward_address: Option<SocketAddr>,
5961
num_quic_endpoints: usize,
6062
) -> TpuSockets {
6163
let quic_config = SocketConfig::default().reuseport(true);
6264

63-
let (_, tpu_quic) =
64-
bind_in_range_with_config(bind_address, dynamic_port_range, quic_config)
65-
.expect("expected bind to succeed");
66-
67-
let tpu_quic_port = tpu_quic.local_addr().unwrap().port();
68-
let tpu_quic = bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config).unwrap();
69-
70-
let (_, tpu_quic_fwd) = bind_in_range_with_config(
65+
let tpu_quic = bind_sockets(
7166
bind_address,
72-
(tpu_quic_port.saturating_add(1), dynamic_port_range.1),
67+
dynamic_port_range,
68+
tpu_address,
69+
num_quic_endpoints,
7370
quic_config,
74-
)
75-
.expect("expected bind to succeed");
71+
);
7672

77-
let tpu_quic_fwd =
78-
bind_more_with_config(tpu_quic_fwd, num_quic_endpoints, quic_config).unwrap();
73+
let tpu_quic_fwd = bind_sockets(
74+
bind_address,
75+
dynamic_port_range,
76+
tpu_forward_address,
77+
num_quic_endpoints,
78+
quic_config,
79+
);
7980

8081
TpuSockets {
8182
tpu_quic,
@@ -177,3 +178,22 @@ impl Vortexor {
177178
Ok(())
178179
}
179180
}
181+
182+
/// Binds the sockets to the specified address and port range if address is Some.
183+
/// If the address is None, it binds to the specified bind_address and port range.
184+
fn bind_sockets(
185+
bind_address: std::net::IpAddr,
186+
port_range: (u16, u16),
187+
address: Option<SocketAddr>,
188+
num_quic_endpoints: usize,
189+
quic_config: SocketConfig,
190+
) -> Vec<UdpSocket> {
191+
let (bind_address, port_range) = address
192+
.map(|addr| (addr.ip(), (addr.port(), addr.port().saturating_add(1))))
193+
.unwrap_or((bind_address, port_range));
194+
195+
let (_, sockets) =
196+
multi_bind_in_range_with_config(bind_address, port_range, quic_config, num_quic_endpoints)
197+
.expect("expected bind to succeed");
198+
sockets
199+
}

vortexor/tests/vortexor.rs

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ async fn test_vortexor() {
4949
let tpu_sockets = Vortexor::create_tpu_sockets(
5050
bind_address,
5151
VALIDATOR_PORT_RANGE,
52+
None, // tpu_address
53+
None, // tpu_forward_address
5254
DEFAULT_NUM_QUIC_ENDPOINTS,
5355
);
5456

0 commit comments

Comments
 (0)