Skip to content

Commit c493cea

Browse files
authored
Make shredstream optional (#997)
1 parent f57c465 commit c493cea

File tree

1 file changed

+50
-59
lines changed

1 file changed

+50
-59
lines changed

core/src/proxy/block_engine_stage.rs

Lines changed: 50 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,6 @@ pub struct BlockEngineConfig {
9797
pub struct BlockEngineStage {
9898
t_hdls: Vec<JoinHandle<()>>,
9999
}
100-
101-
#[allow(dead_code)]
102100
#[derive(Error, Debug)]
103101
enum PingError<'a> {
104102
#[error("Failed to send ping: {0}")]
@@ -229,39 +227,42 @@ impl BlockEngineStage {
229227
banking_packet_sender: &BankingPacketSender,
230228
exit: &Arc<AtomicBool>,
231229
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
232-
_shredstream_receiver_address: &Arc<ArcSwap<Option<SocketAddr>>>,
230+
shredstream_receiver_address: &Arc<ArcSwap<Option<SocketAddr>>>,
233231
local_block_engine_config: &BlockEngineConfig,
234232
) -> crate::proxy::Result<()> {
235233
let endpoint = Self::get_endpoint(&local_block_engine_config.block_engine_url)?;
236-
// if !local_block_engine_config.disable_block_engine_autoconfig {
237-
// datapoint_info!(
238-
// "block_engine_stage-connect",
239-
// "type" => "autoconfig",
240-
// ("count", 1, i64),
241-
// );
242-
// return Self::connect_auth_and_stream_autoconfig(
243-
// endpoint,
244-
// local_block_engine_config,
245-
// block_engine_config,
246-
// cluster_info,
247-
// bundle_tx,
248-
// packet_tx,
249-
// banking_packet_sender,
250-
// exit,
251-
// block_builder_fee_info,
252-
// shredstream_receiver_address,
253-
// )
254-
// .await;
255-
// }
256-
257-
// if let Some((_best_url, (best_socket, _best_latency_us))) =
258-
// Self::get_ranked_endpoints(&endpoint)
259-
// .await?
260-
// .into_iter()
261-
// .min_by_key(|(_url, (_socket, latency_us))| *latency_us)
262-
// {
263-
// shredstream_receiver_address.store(Arc::new(Some(best_socket))); // no else branch needed since we'll still send to shred_receiver_address
264-
// }
234+
if !local_block_engine_config.disable_block_engine_autoconfig {
235+
datapoint_info!(
236+
"block_engine_stage-connect",
237+
"type" => "autoconfig",
238+
("count", 1, i64),
239+
);
240+
return Self::connect_auth_and_stream_autoconfig(
241+
endpoint,
242+
local_block_engine_config,
243+
block_engine_config,
244+
cluster_info,
245+
bundle_tx,
246+
packet_tx,
247+
banking_packet_sender,
248+
exit,
249+
block_builder_fee_info,
250+
shredstream_receiver_address,
251+
)
252+
.await;
253+
}
254+
255+
if let Some((_best_url, (best_socket, _best_latency_us))) =
256+
Self::get_ranked_endpoints(&endpoint)
257+
.await?
258+
.into_iter()
259+
.min_by_key(|(_url, (_socket, latency_us))| *latency_us)
260+
{
261+
if best_socket.is_some() {
262+
// no else branch needed since we'll still send to shred_receiver_address
263+
shredstream_receiver_address.store(Arc::new(best_socket));
264+
}
265+
}
265266

266267
datapoint_info!(
267268
"block_engine_stage-connect",
@@ -291,7 +292,6 @@ impl BlockEngineStage {
291292
})
292293
}
293294

294-
#[allow(dead_code)]
295295
#[allow(clippy::too_many_arguments)]
296296
async fn connect_auth_and_stream_autoconfig(
297297
endpoint: Endpoint,
@@ -311,17 +311,19 @@ impl BlockEngineStage {
311311
let mut attempted = false;
312312
let mut backend_endpoint = endpoint.clone();
313313
let endpoint_count = candidates.len();
314-
for (block_engine_url, (shredstream_socket, latency_us)) in candidates
314+
for (block_engine_url, (maybe_shredstream_socket, latency_us)) in candidates
315315
.into_iter()
316316
.sorted_unstable_by_key(|(_endpoint, (_shredstream_socket, latency_us))| *latency_us)
317317
{
318318
if block_engine_url != local_block_engine_config.block_engine_url {
319-
info!("Selected best Block Engine url: {block_engine_url}, Shredstream socket: {shredstream_socket}, ping: ({:?})",
319+
info!("Selected best Block Engine url: {block_engine_url}, Shredstream socket: {maybe_shredstream_socket:?}, ping: ({:?})",
320320
Duration::from_micros(latency_us)
321321
);
322322
backend_endpoint = Self::get_endpoint(block_engine_url.as_str())?;
323323
}
324-
shredstream_receiver_address.store(Arc::new(Some(shredstream_socket)));
324+
if let Some(shredstream_socket) = maybe_shredstream_socket {
325+
shredstream_receiver_address.store(Arc::new(Some(shredstream_socket)));
326+
}
325327
attempted = true;
326328
let connect_start = Instant::now();
327329
match Self::connect_auth_and_stream(
@@ -384,15 +386,14 @@ impl BlockEngineStage {
384386

385387
/// Discover candidate endpoints either ranked via ping or using global fallback.
386388
/// Use u64::MAX for latency value to indicate global fallback (no ping data).
387-
#[allow(dead_code)]
388389
async fn get_ranked_endpoints(
389390
backend_endpoint: &Endpoint,
390391
) -> crate::proxy::Result<
391392
ahash::HashMap<
392393
String, /* block engine url */
393394
(
394-
SocketAddr, /* shredstream receiver */
395-
u64, /* latency us */
395+
Option<SocketAddr>, /* shredstream receiver, fallable when DNS can't resolve */
396+
u64, /* latency us */
396397
),
397398
>,
398399
> {
@@ -417,7 +418,7 @@ impl BlockEngineStage {
417418
));
418419
};
419420

420-
let Some(ss) = global
421+
let ss_res = global
421422
.shredstream_receiver_address
422423
.to_socket_addrs()
423424
.inspect_err(|e| {
@@ -430,16 +431,11 @@ impl BlockEngineStage {
430431
);
431432
})
432433
.ok()
433-
.and_then(|mut shredstream_sockets| shredstream_sockets.next())
434-
else {
435-
return Err(ProxyError::BlockEngineEndpointError(
436-
"Failed to resolve global shredstream receiver address".to_owned(),
437-
));
438-
};
434+
.and_then(|mut shredstream_sockets| shredstream_sockets.next());
439435

440436
return Ok(ahash::HashMap::from_iter([(
441437
global.block_engine_url,
442-
(ss, u64::MAX),
438+
(ss_res, u64::MAX),
443439
)]));
444440
}
445441

@@ -545,7 +541,6 @@ impl BlockEngineStage {
545541
}
546542

547543
/// Runs a single `ping -c 1 <ip>` command and returns the RTT in microseconds, or an error.
548-
#[allow(dead_code)]
549544
async fn ping(host: &str) -> Result<u64, PingError> {
550545
let output = tokio::process::Command::new("ping")
551546
.arg("-c")
@@ -585,14 +580,13 @@ impl BlockEngineStage {
585580
}
586581

587582
/// Ping all candidate endpoints concurrently, aggregate best RTT per endpoint
588-
#[allow(dead_code)]
589583
async fn ping_and_rank_endpoints(
590584
endpoints: &[BlockEngineEndpoint],
591585
) -> ahash::HashMap<
592586
String, /* block engine url */
593587
(
594-
SocketAddr, /* shredstream receiver */
595-
u64, /* latency us */
588+
Option<SocketAddr>, /* shredstream receiver, fallable when DNS can't resolve */
589+
u64, /* latency us */
596590
),
597591
> {
598592
const PING_COUNT: usize = 3;
@@ -625,8 +619,8 @@ impl BlockEngineStage {
625619
let mut agg_endpoints: ahash::HashMap<
626620
String, /* block engine url */
627621
(
628-
SocketAddr, /* shredstream receiver */
629-
u64, /* latency us */
622+
Option<SocketAddr>, /* shredstream receiver, fallable when DNS can't resolve */
623+
u64, /* latency us */
630624
),
631625
> = ahash::HashMap::with_capacity(endpoints.len());
632626
let mut best_endpoint = (None, u64::MAX);
@@ -651,7 +645,7 @@ impl BlockEngineStage {
651645
}
652646
}
653647
Entry::Vacant(entry) => {
654-
let Some(shredstream_socket) = endpoint
648+
let maybe_shredstream_socket = endpoint
655649
.shredstream_receiver_address
656650
.to_socket_addrs()
657651
.inspect_err(|e| {
@@ -661,11 +655,8 @@ impl BlockEngineStage {
661655
)
662656
})
663657
.ok()
664-
.and_then(|mut shredstream_sockets| shredstream_sockets.next())
665-
else {
666-
return;
667-
};
668-
entry.insert((shredstream_socket, *latency_us));
658+
.and_then(|mut shredstream_sockets| shredstream_sockets.next());
659+
entry.insert((maybe_shredstream_socket, *latency_us));
669660
}
670661
};
671662
},

0 commit comments

Comments
 (0)