Skip to content

Commit 1d5bb96

Browse files
authored
refactor(config): make rpc client option configurable (#20993) (#21002)
1 parent f599b09 commit 1d5bb96

File tree

14 files changed

+120
-48
lines changed

14 files changed

+120
-48
lines changed

src/batch/src/execution/local_exchange.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ mod tests {
7979
use std::sync::Arc;
8080
use std::time::Duration;
8181

82+
use risingwave_common::config::RpcClientConfig;
8283
use risingwave_pb::batch_plan::{TaskId, TaskOutputId};
8384
use risingwave_pb::data::DataChunk;
8485
use risingwave_pb::task_service::exchange_service_server::{
@@ -154,7 +155,9 @@ mod tests {
154155
sleep(Duration::from_secs(1)).await;
155156
assert!(server_run.load(Ordering::SeqCst));
156157

157-
let client = ComputeClient::new(addr.into()).await.unwrap();
158+
let client = ComputeClient::new(addr.into(), &RpcClientConfig::default())
159+
.await
160+
.unwrap();
158161
let task_output_id = TaskOutputId {
159162
task_id: Some(TaskId::default()),
160163
..Default::default()

src/common/src/config.rs

+24
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,21 @@ pub struct MetaDeveloperConfig {
543543
/// Max number of epoch-to-version inserted into meta store per INSERT, during time travel metadata writing.
544544
#[serde(default = "default::developer::hummock_time_travel_epoch_version_insert_batch_size")]
545545
pub hummock_time_travel_epoch_version_insert_batch_size: usize,
546+
547+
#[serde(default)]
548+
pub compute_client_config: RpcClientConfig,
549+
550+
#[serde(default)]
551+
pub stream_client_config: RpcClientConfig,
552+
553+
#[serde(default)]
554+
pub frontend_client_config: RpcClientConfig,
555+
}
556+
557+
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
558+
pub struct RpcClientConfig {
559+
#[serde(default = "default::developer::rpc_client_connect_timeout_secs")]
560+
pub connect_timeout_secs: u64,
546561
}
547562

548563
/// The section `[server]` in `risingwave.toml`.
@@ -1176,6 +1191,9 @@ pub struct StreamingDeveloperConfig {
11761191
/// When true, all jdbc sinks with connector='jdbc' and jdbc.url="jdbc:postgresql://..."
11771192
/// will be switched from jdbc postgresql sinks to rust native (connector='postgres') sinks.
11781193
pub switch_jdbc_pg_to_native: bool,
1194+
1195+
#[serde(default)]
1196+
pub compute_client_config: RpcClientConfig,
11791197
}
11801198

11811199
/// The subsections `[batch.developer]`.
@@ -1206,6 +1224,9 @@ pub struct BatchDeveloperConfig {
12061224
/// If not specified, the value of `server.connection_pool_size` will be used.
12071225
#[serde(default = "default::developer::batch_exchange_connection_pool_size")]
12081226
exchange_connection_pool_size: Option<u16>,
1227+
1228+
#[serde(default)]
1229+
pub compute_client_config: RpcClientConfig,
12091230
}
12101231

12111232
macro_rules! define_system_config {
@@ -2160,6 +2181,9 @@ pub mod default {
21602181
pub fn switch_jdbc_pg_to_native() -> bool {
21612182
false
21622183
}
2184+
pub fn rpc_client_connect_timeout_secs() -> u64 {
2185+
5
2186+
}
21632187
}
21642188

21652189
pub use crate::system_param::default as system;

src/compute/src/server.rs

+2
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ pub async fn compute_node_serve(
349349
// Initialize batch environment.
350350
let batch_client_pool = Arc::new(ComputeClientPool::new(
351351
config.batch_exchange_connection_pool_size(),
352+
config.batch.developer.compute_client_config.clone(),
352353
));
353354
let batch_env = BatchEnvironment::new(
354355
batch_mgr.clone(),
@@ -368,6 +369,7 @@ pub async fn compute_node_serve(
368369
// Initialize the streaming environment.
369370
let stream_client_pool = Arc::new(ComputeClientPool::new(
370371
config.streaming_exchange_connection_pool_size(),
372+
config.streaming.developer.compute_client_config.clone(),
371373
));
372374
let stream_env = StreamEnvironment::new(
373375
advertise_addr.clone(),

src/config/example.toml

+15
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,15 @@ meta_hummock_time_travel_sst_info_insert_batch_size = 100
105105
meta_time_travel_vacuum_interval_sec = 30
106106
meta_hummock_time_travel_epoch_version_insert_batch_size = 1000
107107

108+
[meta.developer.meta_compute_client_config]
109+
connect_timeout_secs = 5
110+
111+
[meta.developer.meta_stream_client_config]
112+
connect_timeout_secs = 5
113+
114+
[meta.developer.meta_frontend_client_config]
115+
connect_timeout_secs = 5
116+
108117
[meta.meta_store_config]
109118
max_connections = 10
110119
min_connections = 1
@@ -127,6 +136,9 @@ batch_receiver_channel_size = 1000
127136
batch_root_stage_channel_size = 100
128137
batch_chunk_size = 1024
129138

139+
[batch.developer.batch_compute_client_config]
140+
connect_timeout_secs = 5
141+
130142
[streaming]
131143
in_flight_barrier_nums = 10000
132144
async_stack_trace = "ReleaseVerbose"
@@ -161,6 +173,9 @@ stream_enable_auto_schema_change = true
161173
stream_enable_shared_source = true
162174
stream_switch_jdbc_pg_to_native = false
163175

176+
[streaming.developer.stream_compute_client_config]
177+
connect_timeout_secs = 5
178+
164179
[storage]
165180
share_buffers_sync_parallelism = 1
166181
share_buffer_compaction_worker_threads_number = 4

src/ctl/src/cmd_impl/compute.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use risingwave_common::config::{BatchConfig, StreamingConfig};
15+
use risingwave_common::config::{BatchConfig, RpcClientConfig, StreamingConfig};
1616
use risingwave_common::util::addr::HostAddr;
1717
use risingwave_rpc_client::ComputeClient;
1818

1919
pub async fn show_config(host: &str) -> anyhow::Result<()> {
2020
let listen_addr = HostAddr::try_from(host)?;
21-
let client = ComputeClient::new(listen_addr).await?;
21+
let client = ComputeClient::new(listen_addr, &RpcClientConfig::default()).await?;
2222
let config_response = client.show_config().await?;
2323
let batch_config: BatchConfig = serde_json::from_str(&config_response.batch_config)?;
2424
let stream_config: StreamingConfig = serde_json::from_str(&config_response.stream_config)?;

src/ctl/src/cmd_impl/hummock/resize_cache.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::process::exit;
1616

1717
use futures::future::try_join_all;
18+
use risingwave_common::config::RpcClientConfig;
1819
use risingwave_pb::compute::ResizeCacheRequest;
1920
use risingwave_pb::meta::GetClusterInfoResponse;
2021
use risingwave_rpc_client::ComputeClient;
@@ -45,7 +46,7 @@ pub async fn resize_cache(
4546

4647
let futures = worker_nodes.iter().map(|worker| async {
4748
let addr = worker.get_host().expect("worker host must be set");
48-
let client = ComputeClient::new(addr.into())
49+
let client = ComputeClient::new(addr.into(), &RpcClientConfig::default())
4950
.await
5051
.unwrap_or_else(|_| panic!("Cannot open client to compute node {addr:?}"));
5152
client

src/frontend/src/session.rs

+1
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ impl FrontendEnv {
320320

321321
let compute_client_pool = Arc::new(ComputeClientPool::new(
322322
config.batch_exchange_connection_pool_size(),
323+
config.batch.developer.compute_client_config.clone(),
323324
));
324325
let query_manager = QueryManager::new(
325326
worker_node_manager.clone(),

src/meta/node/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,9 @@ pub fn start(
476476
.developer
477477
.actor_cnt_per_worker_parallelism_soft_limit,
478478
license_key_path: opts.license_key_path,
479+
compute_client_config: config.meta.developer.compute_client_config.clone(),
480+
stream_client_config: config.meta.developer.stream_client_config.clone(),
481+
frontend_client_config: config.meta.developer.frontend_client_config.clone(),
479482
},
480483
config.system.into_init_system_params(),
481484
Default::default(),

src/meta/node/src/server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ pub async fn start_service_as_election_leader(
402402
prometheus_client,
403403
prometheus_selector,
404404
metadata_manager: metadata_manager.clone(),
405-
compute_clients: ComputeClientPool::new(1), // typically no need for plural clients
405+
compute_clients: ComputeClientPool::new(1, env.opts.compute_client_config.clone()), /* typically no need for plural clients */
406406
diagnose_command,
407407
trace_state,
408408
};

src/meta/src/manager/env.rs

+16-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use std::path::PathBuf;
1717
use std::sync::Arc;
1818

1919
use anyhow::Context;
20-
use risingwave_common::config::{CompactionConfig, DefaultParallelism, ObjectStoreConfig};
20+
use risingwave_common::config::{
21+
CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig,
22+
};
2123
use risingwave_common::session_config::SessionConfig;
2224
use risingwave_common::system_param::reader::SystemParamsReader;
2325
use risingwave_common::{bail, system_param};
@@ -254,6 +256,10 @@ pub struct MetaOpts {
254256
pub actor_cnt_per_worker_parallelism_soft_limit: usize,
255257

256258
pub license_key_path: Option<PathBuf>,
259+
260+
pub compute_client_config: RpcClientConfig,
261+
pub stream_client_config: RpcClientConfig,
262+
pub frontend_client_config: RpcClientConfig,
257263
}
258264

259265
impl MetaOpts {
@@ -331,6 +337,9 @@ impl MetaOpts {
331337
table_stat_throuput_window_seconds_for_merge: 240,
332338
periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
333339
license_key_path: None,
340+
compute_client_config: RpcClientConfig::default(),
341+
stream_client_config: RpcClientConfig::default(),
342+
frontend_client_config: RpcClientConfig::default(),
334343
}
335344
}
336345
}
@@ -343,8 +352,12 @@ impl MetaSrvEnv {
343352
meta_store_impl: SqlMetaStore,
344353
) -> MetaResult<Self> {
345354
let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
346-
let stream_client_pool = Arc::new(StreamClientPool::new(1)); // typically no need for plural clients
347-
let frontend_client_pool = Arc::new(FrontendClientPool::new(1));
355+
let stream_client_pool =
356+
Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); // typically no need for plural clients
357+
let frontend_client_pool = Arc::new(FrontendClientPool::new(
358+
1,
359+
opts.frontend_client_config.clone(),
360+
));
348361
let event_log_manager = Arc::new(start_event_log_manager(
349362
opts.event_log_enabled,
350363
opts.event_log_channel_max_size,

src/rpc_client/src/compute_client.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::time::Duration;
1818
use async_trait::async_trait;
1919
use futures::StreamExt;
2020
use risingwave_common::catalog::DatabaseId;
21-
use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE};
21+
use risingwave_common::config::{RpcClientConfig, MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE};
2222
use risingwave_common::monitor::{EndpointExt, TcpConfig};
2323
use risingwave_common::util::addr::HostAddr;
2424
use risingwave_common::util::tracing::TracingContext;
@@ -67,11 +67,11 @@ pub struct ComputeClient {
6767
}
6868

6969
impl ComputeClient {
70-
pub async fn new(addr: HostAddr) -> Result<Self> {
70+
pub async fn new(addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
7171
let channel = Endpoint::from_shared(format!("http://{}", &addr))?
7272
.initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
7373
.initial_stream_window_size(STREAM_WINDOW_SIZE)
74-
.connect_timeout(Duration::from_secs(5))
74+
.connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
7575
.monitored_connect(
7676
"grpc-compute-client",
7777
TcpConfig {
@@ -295,8 +295,8 @@ impl ComputeClient {
295295

296296
#[async_trait]
297297
impl RpcClient for ComputeClient {
298-
async fn new_client(host_addr: HostAddr) -> Result<Self> {
299-
Self::new(host_addr).await
298+
async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
299+
Self::new(host_addr, opts).await
300300
}
301301
}
302302

src/rpc_client/src/frontend_client.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::sync::Arc;
1616
use std::time::Duration;
1717

1818
use async_trait::async_trait;
19-
use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE;
19+
use risingwave_common::config::{RpcClientConfig, MAX_CONNECTION_WINDOW_SIZE};
2020
use risingwave_common::monitor::{EndpointExt, TcpConfig};
2121
use risingwave_common::util::addr::HostAddr;
2222
use risingwave_pb::frontend_service::frontend_service_client::FrontendServiceClient;
@@ -37,10 +37,10 @@ const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 10;
3737
struct FrontendClient(FrontendServiceClient<Channel>);
3838

3939
impl FrontendClient {
40-
async fn new(host_addr: HostAddr) -> Result<Self> {
40+
async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
4141
let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
4242
.initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
43-
.connect_timeout(Duration::from_secs(5))
43+
.connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
4444
.monitored_connect(
4545
"grpc-frontend-client",
4646
TcpConfig {
@@ -63,8 +63,8 @@ pub type FrontendClientPoolRef = Arc<FrontendClientPool>;
6363

6464
#[async_trait]
6565
impl RpcClient for FrontendRetryClient {
66-
async fn new_client(host_addr: HostAddr) -> Result<Self> {
67-
Self::new(host_addr).await
66+
async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
67+
Self::new(host_addr, opts).await
6868
}
6969
}
7070

@@ -74,8 +74,8 @@ pub struct FrontendRetryClient {
7474
}
7575

7676
impl FrontendRetryClient {
77-
async fn new(host_addr: HostAddr) -> Result<Self> {
78-
let client = FrontendClient::new(host_addr).await?;
77+
async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
78+
let client = FrontendClient::new(host_addr, opts).await?;
7979
Ok(Self { client })
8080
}
8181

0 commit comments

Comments
 (0)