Skip to content

Commit 3452b82

Browse files
authored
use dedicated runtime for tpu_client_next (#6021)
Use dedicated runtime for tpu_client_next instead of relying on RPC runtime
1 parent 07eba27 commit 3452b82

File tree

2 files changed

+39
-18
lines changed

2 files changed

+39
-18
lines changed

core/src/validator.rs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,10 @@ pub struct Validator {
589589
repair_quic_endpoints: Option<[Endpoint; 3]>,
590590
repair_quic_endpoints_runtime: Option<TokioRuntime>,
591591
repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
592+
// This runtime is used to run the client owned by SendTransactionService.
593+
// We don't wait for its JoinHandle here because ownership and shutdown
594+
// are managed elsewhere. This variable is intentionally unused.
595+
_tpu_client_next_runtime: Option<TokioRuntime>,
592596
}
593597

594598
impl Validator {
@@ -1139,6 +1143,22 @@ impl Validator {
11391143
))
11401144
};
11411145

1146+
// test-validator crate may start the validator in a tokio runtime
1147+
// context which forces us to use the same runtime because a nested
1148+
// runtime will cause panic at drop. Outside test-validator crate, we
1149+
// always need a tokio runtime (and the respective handle) to initialize
1150+
// the turbine QUIC endpoint.
1151+
let current_runtime_handle = tokio::runtime::Handle::try_current();
1152+
let tpu_client_next_runtime =
1153+
(current_runtime_handle.is_err() && config.use_tpu_client_next).then(|| {
1154+
tokio::runtime::Builder::new_multi_thread()
1155+
.enable_all()
1156+
.worker_threads(2)
1157+
.thread_name("solTpuClientRt")
1158+
.build()
1159+
.unwrap()
1160+
});
1161+
11421162
let rpc_override_health_check =
11431163
Arc::new(AtomicBool::new(config.rpc_config.disable_health_check));
11441164
let (
@@ -1163,6 +1183,19 @@ impl Validator {
11631183
None
11641184
};
11651185

1186+
let client_option = if config.use_tpu_client_next {
1187+
let runtime_handle = tpu_client_next_runtime
1188+
.as_ref()
1189+
.map(TokioRuntime::handle)
1190+
.unwrap_or_else(|| current_runtime_handle.as_ref().unwrap());
1191+
ClientOption::TpuClientNext(
1192+
Arc::as_ref(&identity_keypair),
1193+
node.sockets.rpc_sts_client,
1194+
runtime_handle.clone(),
1195+
)
1196+
} else {
1197+
ClientOption::ConnectionCache(connection_cache.clone())
1198+
};
11661199
let rpc_svc_config = JsonRpcServiceConfig {
11671200
rpc_addr,
11681201
rpc_config: config.rpc_config.clone(),
@@ -1185,14 +1218,7 @@ impl Validator {
11851218
max_complete_transaction_status_slot,
11861219
max_complete_rewards_slot,
11871220
prioritization_fee_cache: prioritization_fee_cache.clone(),
1188-
client_option: if config.use_tpu_client_next {
1189-
ClientOption::TpuClientNext(
1190-
Arc::as_ref(&identity_keypair),
1191-
node.sockets.rpc_sts_client,
1192-
)
1193-
} else {
1194-
ClientOption::ConnectionCache(connection_cache.clone())
1195-
},
1221+
client_option,
11961222
};
11971223
let json_rpc_service =
11981224
JsonRpcService::new_with_config(rpc_svc_config).map_err(ValidatorError::Other)?;
@@ -1363,12 +1389,6 @@ impl Validator {
13631389
.as_ref()
13641390
.map(|service| service.sender_cloned());
13651391

1366-
// test-validator crate may start the validator in a tokio runtime
1367-
// context which forces us to use the same runtime because a nested
1368-
// runtime will cause panic at drop.
1369-
// Outside test-validator crate, we always need a tokio runtime (and
1370-
// the respective handle) to initialize the turbine QUIC endpoint.
1371-
let current_runtime_handle = tokio::runtime::Handle::try_current();
13721392
let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
13731393
&& genesis_config.cluster_type != ClusterType::MainnetBeta)
13741394
.then(|| {
@@ -1686,6 +1706,7 @@ impl Validator {
16861706
repair_quic_endpoints,
16871707
repair_quic_endpoints_runtime,
16881708
repair_quic_endpoints_join_handle,
1709+
_tpu_client_next_runtime: tpu_client_next_runtime,
16891710
})
16901711
}
16911712

rpc/src/rpc_service.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use {
5252
},
5353
thread::{self, Builder, JoinHandle},
5454
},
55-
tokio::runtime::{Builder as TokioBuilder, Runtime as TokioRuntime},
55+
tokio::runtime::{Builder as TokioBuilder, Handle as RuntimeHandle, Runtime as TokioRuntime},
5656
tokio_util::codec::{BytesCodec, FramedRead},
5757
};
5858

@@ -409,7 +409,7 @@ pub struct JsonRpcServiceConfig<'a> {
409409
/// requires a reference to a [`Keypair`].
410410
pub enum ClientOption<'a> {
411411
ConnectionCache(Arc<ConnectionCache>),
412-
TpuClientNext(&'a Keypair, UdpSocket),
412+
TpuClientNext(&'a Keypair, UdpSocket, RuntimeHandle),
413413
}
414414

415415
impl JsonRpcService {
@@ -466,7 +466,7 @@ impl JsonRpcService {
466466
)?;
467467
Ok(json_rpc_service)
468468
}
469-
ClientOption::TpuClientNext(identity_keypair, tpu_client_socket) => {
469+
ClientOption::TpuClientNext(identity_keypair, tpu_client_socket, client_runtime) => {
470470
let my_tpu_address = config
471471
.cluster_info
472472
.my_contact_info()
@@ -476,7 +476,7 @@ impl JsonRpcService {
476476
Protocol::QUIC
477477
))?;
478478
let client = TpuClientNextClient::new(
479-
runtime.handle().clone(),
479+
client_runtime,
480480
my_tpu_address,
481481
config.send_transaction_service_config.tpu_peers.clone(),
482482
leader_info,

0 commit comments

Comments
 (0)