diff --git a/Cargo.lock b/Cargo.lock index c297c48a..d2e36e78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -861,7 +861,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.106" +version = "0.1.107" dependencies = [ "anyhow", "blake3", @@ -932,7 +932,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.106" +version = "0.1.107" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -955,7 +955,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.106" +version = "0.1.107" dependencies = [ "bytesize", "bytesize-serde", @@ -976,7 +976,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.106" +version = "0.1.107" dependencies = [ "hyper 1.4.1", "hyper-util", @@ -991,7 +991,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.106" +version = "0.1.107" dependencies = [ "anyhow", "clap", @@ -1007,7 +1007,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.106" +version = "0.1.107" dependencies = [ "base16ct", "bincode", @@ -1032,7 +1032,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.106" +version = "0.1.107" dependencies = [ "base16ct", "blake3", @@ -1386,7 +1386,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.106" +version = "0.1.107" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index c6a5d7b4..a95e89ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.106" +version = "0.1.107" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.106" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.106" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.106" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.106" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.106" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.106" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.106" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.107" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.107" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.107" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.107" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.107" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.107" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.107" } thiserror = "1.0" dragonfly-api = "=2.0.154" reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } diff --git a/dragonfly-client/src/bin/dfdaemon/main.rs b/dragonfly-client/src/bin/dfdaemon/main.rs index ce7404f9..23da40b0 100644 --- a/dragonfly-client/src/bin/dfdaemon/main.rs +++ b/dragonfly-client/src/bin/dfdaemon/main.rs @@ -298,10 +298,6 @@ async fn main() -> Result<(), anyhow::Error> { info!("stats server exited"); }, - _ = tokio::spawn(async move { proxy.run().await }) => { - info!("proxy server exited"); - }, - _ = tokio::spawn(async move { manager_announcer.run().await.unwrap_or_else(|err| error!("announcer manager failed: {}", err))}) => { info!("announcer manager exited"); }, @@ -318,6 +314,10 @@ async fn main() -> Result<(), anyhow::Error> { info!("dfdaemon download grpc unix server exited"); }, + _ = tokio::spawn(async move { proxy.run().await }) => { + info!("proxy server exited"); + }, + _ = tokio::spawn(async move { gc.run().await }) => { info!("garbage collector exited"); }, diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index e2f9881c..b0d17bfe 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -139,7 +139,6 @@ impl DfdaemonDownloadServer { .max_frame_size(super::MAX_FRAME_SIZE) .initial_connection_window_size(super::INITIAL_WINDOW_SIZE) .initial_stream_window_size(super::INITIAL_WINDOW_SIZE) - .concurrency_limit_per_connection(super::CONCURRENCY_LIMIT_PER_CONNECTION) .add_service(reflection.clone()) .add_service(health_service) .add_service(self.service.clone()) diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index eb0645d4..a1e28d1c 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -126,7 +126,6 @@ impl DfdaemonUploadServer { .max_frame_size(super::MAX_FRAME_SIZE) .initial_connection_window_size(super::INITIAL_WINDOW_SIZE) .initial_stream_window_size(super::INITIAL_WINDOW_SIZE) - .concurrency_limit_per_connection(super::CONCURRENCY_LIMIT_PER_CONNECTION) .add_service(reflection.clone()) .add_service(health_service) .add_service(self.service.clone()) diff --git a/dragonfly-client/src/grpc/mod.rs b/dragonfly-client/src/grpc/mod.rs index 8ed828b5..8330957d 100644 --- a/dragonfly-client/src/grpc/mod.rs +++ b/dragonfly-client/src/grpc/mod.rs @@ -37,9 +37,6 @@ pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(2); // REQUEST_TIMEOUT is the timeout for GRPC requests. pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); -// CONCURRENCY_LIMIT_PER_CONNECTION is the limit of concurrency for each connection. -pub const CONCURRENCY_LIMIT_PER_CONNECTION: usize = 8192; - // TCP_KEEPALIVE is the keepalive duration for TCP connection. pub const TCP_KEEPALIVE: Duration = Duration::from_secs(3600); diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 413921fe..5538f004 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -174,6 +174,10 @@ impl Proxy { let config = self.config.clone(); let task = self.task.clone(); + let dfdaemon_download_client = DfdaemonDownloadClient::new_unix( + config.download.server.socket_path.clone(), + ).await?; + let registry_certs = self.registry_certs.clone(); let server_ca_cert = self.server_ca_cert.clone(); tokio::task::spawn(async move { @@ -183,7 +187,7 @@ impl Proxy { .title_case_headers(true) .serve_connection( io, - service_fn(move |request| handler(config.clone(), task.clone(), request, registry_certs.clone(), server_ca_cert.clone())), + service_fn(move |request| handler(config.clone(), task.clone(), request, dfdaemon_download_client.clone(), registry_certs.clone(), server_ca_cert.clone())), ) .with_upgrades() .await @@ -209,6 +213,7 @@ pub async fn handler( config: Arc, task: Arc, request: Request, + dfdaemon_download_client: DfdaemonDownloadClient, registry_certs: Arc>>>, server_ca_cert: Arc>, ) -> ClientResult { @@ -224,13 +229,21 @@ pub async fn handler( config, task, request, + dfdaemon_download_client, registry_certs, server_ca_cert, ) .await; } - return registry_mirror_http_handler(config, task, request, registry_certs).await; + return registry_mirror_http_handler( + config, + task, + request, + dfdaemon_download_client, + registry_certs, + ) + .await; } // Span record the uri and method. @@ -239,10 +252,25 @@ pub async fn handler( // Handle CONNECT request. if Method::CONNECT == request.method() { - return https_handler(config, task, request, registry_certs, server_ca_cert).await; + return https_handler( + config, + task, + request, + dfdaemon_download_client, + registry_certs, + server_ca_cert, + ) + .await; } - http_handler(config, task, request, registry_certs).await + http_handler( + config, + task, + request, + dfdaemon_download_client, + registry_certs, + ) + .await } // registry_mirror_http_handler handles the http request for the registry mirror by client. @@ -251,10 +279,18 @@ pub async fn registry_mirror_http_handler( config: Arc, task: Arc, request: Request, + dfdaemon_download_client: DfdaemonDownloadClient, registry_certs: Arc>>>, ) -> ClientResult { let request = make_registry_mirror_request(config.clone(), request)?; - return http_handler(config, task, request, registry_certs).await; + return http_handler( + config, + task, + request, + dfdaemon_download_client, + registry_certs, + ) + .await; } // registry_mirror_https_handler handles the https request for the registry mirror by client. @@ -263,11 +299,20 @@ pub async fn registry_mirror_https_handler( config: Arc, task: Arc, request: Request, + dfdaemon_download_client: DfdaemonDownloadClient, registry_certs: Arc>>>, server_ca_cert: Arc>, ) -> ClientResult { let request = make_registry_mirror_request(config.clone(), request)?; - return https_handler(config, task, request, registry_certs, server_ca_cert).await; + return https_handler( + config, + task, + request, + dfdaemon_download_client, + registry_certs, + server_ca_cert, + ) + .await; } // http_handler handles the http request by client. @@ -276,6 +321,7 @@ pub async fn http_handler( config: Arc, task: Arc, request: Request, + dfdaemon_download_client: DfdaemonDownloadClient, registry_certs: Arc>>>, ) -> ClientResult { info!("handle HTTP request: {:?}", request); @@ -290,7 +336,14 @@ pub async fn http_handler( request.method(), request_uri ); - return proxy_by_dfdaemon(config, task, rule.clone(), request).await; + return proxy_by_dfdaemon( + config, + task, + rule.clone(), + request, + dfdaemon_download_client, + ) + .await; } if request.uri().scheme().cloned() == Some(http::uri::Scheme::HTTPS) { @@ -316,6 +369,7 @@ pub async fn https_handler( config: Arc, task: Arc, request: Request, + dfdaemon_download_client: DfdaemonDownloadClient, registry_certs: Arc>>>, server_ca_cert: Arc>, ) -> ClientResult { @@ -332,6 +386,7 @@ pub async fn https_handler( task, upgraded, host, + dfdaemon_download_client, registry_certs, server_ca_cert, ) @@ -359,6 +414,7 @@ async fn upgraded_tunnel( task: Arc, upgraded: Upgraded, host: String, + dfdaemon_download_client: DfdaemonDownloadClient, registry_certs: Arc>>>, server_ca_cert: Arc>, ) -> ClientResult<()> { @@ -399,6 +455,7 @@ async fn upgraded_tunnel( task.clone(), host.clone(), request, + dfdaemon_download_client.clone(), registry_certs.clone(), ) }), @@ -419,6 +476,7 @@ pub async fn upgraded_handler( task: Arc, host: String, mut request: Request, + dfdaemon_download_client: DfdaemonDownloadClient, registry_certs: Arc>>>, ) -> ClientResult { // Span record the uri and method. @@ -442,7 +500,14 @@ pub async fn upgraded_handler( request.method(), request_uri ); - return proxy_by_dfdaemon(config, task, rule.clone(), request).await; + return proxy_by_dfdaemon( + config, + task, + rule.clone(), + request, + dfdaemon_download_client, + ) + .await; } if request.uri().scheme().cloned() == Some(http::uri::Scheme::HTTPS) { @@ -469,20 +534,8 @@ async fn proxy_by_dfdaemon( task: Arc, rule: Rule, request: Request, + dfdaemon_download_client: DfdaemonDownloadClient, ) -> ClientResult { - // Initialize the dfdaemon download client. - let dfdaemon_download_client = - match DfdaemonDownloadClient::new_unix(config.download.server.socket_path.clone()).await { - Ok(client) => client, - Err(err) => { - error!("create dfdaemon download client failed: {}", err); - return Ok(make_error_response( - http::StatusCode::INTERNAL_SERVER_ERROR, - None, - )); - } - }; - // Make the download task request. let download_task_request = match make_download_task_request(config.clone(), rule, request) { Ok(download_task_request) => download_task_request,