From 2559da1c2000bf0f854bbc18195ca1db09d9aeeb Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 14 Feb 2025 17:48:10 +0800 Subject: [PATCH] feat: add concurrency limit for grpc server Signed-off-by: Gaius --- dragonfly-client-config/src/dfdaemon.rs | 24 +++++++++++++++++++ dragonfly-client/Cargo.toml | 2 +- dragonfly-client/src/bin/dfdaemon/main.rs | 1 + .../src/grpc/dfdaemon_download.rs | 17 ++++++++++++- dragonfly-client/src/grpc/dfdaemon_upload.rs | 9 +++++++ 5 files changed, 51 insertions(+), 2 deletions(-) diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 3ac6f357..c799aa58 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -60,6 +60,12 @@ pub fn default_download_unix_socket_path() -> PathBuf { crate::default_root_dir().join("dfdaemon.sock") } +/// default_download_request_rate_limit is the default rate limit of the download request in the +/// download grpc server, default is 4000 req/s. +pub fn default_download_request_rate_limit() -> u64 { + 4000 +} + /// default_parent_selector_sync_interval is the default interval to sync host information. #[inline] fn default_parent_selector_sync_interval() -> Duration { @@ -96,6 +102,12 @@ fn default_upload_grpc_server_port() -> u16 { 4000 } +/// default_upload_request_rate_limit is the default rate limit of the upload request in the +/// upload grpc server, default is 4000 req/s. +pub fn default_upload_request_rate_limit() -> u64 { + 4000 +} + /// default_upload_rate_limit is the default rate limit of the upload speed in GiB/Mib/Kib per second. #[inline] fn default_upload_rate_limit() -> ByteSize { @@ -426,6 +438,11 @@ pub struct DownloadServer { /// socket_path is the unix socket path for dfdaemon gRPC service. #[serde(default = "default_download_unix_socket_path")] pub socket_path: PathBuf, + + /// request_rate_limit is the rate limit of the download request in the download grpc server, + /// default is 4000 req/s. + #[serde(default = "default_download_request_rate_limit")] + pub request_rate_limit: u64, } /// DownloadServer implements Default. @@ -433,6 +450,7 @@ impl Default for DownloadServer { fn default() -> Self { DownloadServer { socket_path: default_download_unix_socket_path(), + request_rate_limit: default_download_request_rate_limit(), } } } @@ -496,6 +514,11 @@ pub struct UploadServer { /// key is the server key path with PEM format for the upload server and it is used for /// mutual TLS. pub key: Option, + + /// request_rate_limit is the rate limit of the upload request in the upload grpc server, + /// default is 4000 req/s. + #[serde(default = "default_upload_request_rate_limit")] + pub request_rate_limit: u64, } /// UploadServer implements Default. @@ -507,6 +530,7 @@ impl Default for UploadServer { ca_cert: None, cert: None, key: None, + request_rate_limit: default_upload_request_rate_limit(), } } } diff --git a/dragonfly-client/Cargo.toml b/dragonfly-client/Cargo.toml index 5caac8c5..af221fe9 100644 --- a/dragonfly-client/Cargo.toml +++ b/dragonfly-client/Cargo.toml @@ -79,7 +79,7 @@ prometheus = { version = "0.13", features = ["process"] } tonic-health = "0.12.3" bytes = "1.10" sysinfo = "0.32.1" -tower = "0.4.13" +tower = { version = "0.4.13", features = ["limit", "load-shed", "buffer"] } indicatif = "0.17.11" dashmap = "6.1.0" hashring = "0.3.6" diff --git a/dragonfly-client/src/bin/dfdaemon/main.rs b/dragonfly-client/src/bin/dfdaemon/main.rs index c76226e5..77eb8fb9 100644 --- a/dragonfly-client/src/bin/dfdaemon/main.rs +++ b/dragonfly-client/src/bin/dfdaemon/main.rs @@ -292,6 +292,7 @@ async fn main() -> Result<(), anyhow::Error> { // Initialize download grpc server. let mut dfdaemon_download_grpc = DfdaemonDownloadServer::new( + config.clone(), config.download.server.socket_path.clone(), task.clone(), persistent_cache_task.clone(), diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index bf073dc3..0d444589 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -36,6 +36,7 @@ use dragonfly_api::dfdaemon::v2::{ }; use dragonfly_api::errordetails::v2::Backend; use dragonfly_api::scheduler::v2::DeleteHostRequest as SchedulerDeleteHostRequest; +use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{ error::{ErrorType, OrErr}, Error as ClientError, Result as ClientResult, @@ -56,13 +57,16 @@ use tonic::{ transport::{Channel, Endpoint, Server, Uri}, Code, Request, Response, Status, }; -use tower::service_fn; +use tower::{service_fn, ServiceBuilder}; use tracing::{error, info, instrument, Instrument, Span}; use super::interceptor::TracingInterceptor; /// DfdaemonDownloadServer is the grpc unix server of the download. pub struct DfdaemonDownloadServer { + /// config is the configuration of the dfdaemon. + config: Arc, + /// socket_path is the path of the unix domain socket. socket_path: PathBuf, @@ -81,6 +85,7 @@ impl DfdaemonDownloadServer { /// new creates a new DfdaemonServer. #[instrument(skip_all)] pub fn new( + config: Arc, socket_path: PathBuf, task: Arc, persistent_cache_task: Arc, @@ -97,6 +102,7 @@ impl DfdaemonDownloadServer { .max_encoding_message_size(usize::MAX); Self { + config, socket_path, service, shutdown, @@ -130,16 +136,25 @@ impl DfdaemonDownloadServer { fs::remove_file(&self.socket_path).await?; } + // Bind the unix domain socket and set the permissions for the socket. let uds = UnixListener::bind(&self.socket_path)?; let perms = std::fs::Permissions::from_mode(0o660); fs::set_permissions(&self.socket_path, perms).await?; + // TODO(Gaius): RateLimitLayer is not implemented Clone, so we can't use it here. + // Only use the LoadShed layer and the ConcurrencyLimit layer. + let layer = ServiceBuilder::new() + .concurrency_limit(self.config.download.server.request_rate_limit as usize) + .load_shed() + .into_inner(); + let uds_stream = UnixListenerStream::new(uds); let server = Server::builder() .max_frame_size(super::MAX_FRAME_SIZE) .tcp_keepalive(Some(super::TCP_KEEPALIVE)) .http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL)) .http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT)) + .layer(layer) .add_service(reflection.clone()) .add_service(health_service) .add_service(self.service.clone()) diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index f370e2ee..97d30ddc 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -55,6 +55,7 @@ use tonic::{ transport::{Channel, Server}, Code, Request, Response, Status, }; +use tower::ServiceBuilder; use tracing::{error, info, instrument, Instrument, Span}; use url::Url; @@ -127,6 +128,13 @@ impl DfdaemonUploadServer { .set_serving::>() .await; + // TODO(Gaius): RateLimitLayer is not implemented Clone, so we can't use it here. + // Only use the LoadShed layer and the ConcurrencyLimit layer. + let layer = ServiceBuilder::new() + .concurrency_limit(self.config.upload.server.request_rate_limit as usize) + .load_shed() + .into_inner(); + // Start upload grpc server. let mut server_builder = Server::builder(); if let Ok(Some(server_tls_config)) = @@ -143,6 +151,7 @@ impl DfdaemonUploadServer { .tcp_keepalive(Some(super::TCP_KEEPALIVE)) .http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL)) .http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT)) + .layer(layer) .add_service(reflection.clone()) .add_service(health_service) .add_service(self.service.clone())