From 58da38d9f9018fa7b93d3e4002a86983ac0fb1aa Mon Sep 17 00:00:00 2001 From: Gaius Date: Sun, 26 Jan 2025 22:25:05 +0800 Subject: [PATCH] feat: support persist cache task when scheduler replicates task (#953) Signed-off-by: Gaius --- Cargo.lock | 21 +- Cargo.toml | 19 +- dragonfly-client-config/src/dfdaemon.rs | 2 +- dragonfly-client-core/src/error/mod.rs | 4 + dragonfly-client-storage/Cargo.toml | 1 + dragonfly-client-storage/src/content.rs | 27 ++ dragonfly-client-storage/src/lib.rs | 23 +- dragonfly-client-storage/src/metadata.rs | 16 ++ .../src/storage_engine/rocksdb.rs | 25 +- dragonfly-client/Cargo.toml | 2 +- dragonfly-client/src/bin/dfcache/import.rs | 17 ++ dragonfly-client/src/bin/dfcache/main.rs | 11 - dragonfly-client/src/bin/dfcache/remove.rs | 251 ------------------ dragonfly-client/src/gc/mod.rs | 59 +--- .../src/grpc/dfdaemon_download.rs | 87 ++---- dragonfly-client/src/grpc/dfdaemon_upload.rs | 59 +++- dragonfly-client/src/metrics/mod.rs | 38 +++ dragonfly-client/src/proxy/cache.rs | 2 +- .../src/resource/persistent_cache_task.rs | 37 ++- .../src/resource/piece_collector.rs | 16 +- dragonfly-client/src/resource/task.rs | 20 +- 21 files changed, 296 insertions(+), 441 deletions(-) delete mode 100644 dragonfly-client/src/bin/dfcache/remove.rs diff --git a/Cargo.lock b/Cargo.lock index afcf9cf2..d02a40bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -923,9 +923,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.1.16" +version = "2.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a940c6a5682da214cadb78c76e2d7c4d26ad9ab5f110fb0d17fd1475a2c0f0c" +checksum = "ee86ce7919760ff9e043ea089d398e367acf18451ae607aa614b3e65c4cafcd7" dependencies = [ "prost 0.13.4", "prost-types", @@ -938,7 +938,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.2.5" +version = "0.2.6" dependencies = [ "anyhow", "blake3", @@ -1010,7 +1010,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.2.5" +version = "0.2.6" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1041,7 +1041,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.2.5" +version = "0.2.6" dependencies = [ "bytesize", "bytesize-serde", @@ -1067,7 +1067,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.2.5" +version = "0.2.6" dependencies = [ "headers 0.4.0", "hyper 1.5.2", @@ -1086,7 +1086,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.2.5" +version = "0.2.6" dependencies = [ "anyhow", "clap", @@ -1104,7 +1104,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.2.5" +version = "0.2.6" dependencies = [ "base16ct", "bincode", @@ -1114,6 +1114,7 @@ dependencies = [ "dragonfly-client-config", "dragonfly-client-core", "dragonfly-client-util", + "fs2", "num_cpus", "prost-wkt-types", "rayon", @@ -1129,7 +1130,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.2.5" +version = "0.2.6" dependencies = [ "base16ct", "base64 0.22.1", @@ -1519,7 +1520,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.2.5" +version = "0.2.6" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index e05c5a5f..e871e2a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.2.5" +version = "0.2.6" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,15 +22,15 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.2.5" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.5" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.5" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.5" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.5" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.5" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.5" } +dragonfly-client = { path = "dragonfly-client", version = "0.2.6" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.6" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.6" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.6" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.6" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.6" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.6" } thiserror = "1.0" -dragonfly-api = "=2.1.16" +dragonfly-api = "=2.1.23" reqwest = { version = "0.12.4", features = [ "stream", "native-tls", @@ -102,6 +102,7 @@ tempfile = "3.14.0" tokio-rustls = "0.25.0-alpha.4" serde_json = "1.0.137" lru = "0.12.5" +fs2 = "0.4.3" [profile.release] opt-level = "z" diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 5df53ed2..3ac6f357 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -155,7 +155,7 @@ fn default_scheduler_announce_interval() -> Duration { /// default_scheduler_schedule_timeout is the default timeout for scheduling. #[inline] fn default_scheduler_schedule_timeout() -> Duration { - Duration::from_secs(10) + Duration::from_secs(180) } /// default_dynconfig_refresh_interval is the default interval to refresh dynamic configuration from manager. diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index 55f031d3..518563f1 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -42,6 +42,10 @@ pub enum DFError { #[error{"hashring {0} is failed"}] HashRing(String), + /// NoSpace is the error when there is no space left on device. + #[error("no space left on device: {0}")] + NoSpace(String), + /// HostNotFound is the error when the host is not found. #[error{"host {0} not found"}] HostNotFound(String), diff --git a/dragonfly-client-storage/Cargo.toml b/dragonfly-client-storage/Cargo.toml index a9fb6e9f..29ad9f65 100644 --- a/dragonfly-client-storage/Cargo.toml +++ b/dragonfly-client-storage/Cargo.toml @@ -25,6 +25,7 @@ tokio-util.workspace = true sha2.workspace = true crc.workspace = true base16ct.workspace = true +fs2.workspace = true num_cpus = "1.0" bincode = "1.3.3" rayon = "1.10.0" diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index 9a826da3..9dd28ded 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -84,6 +84,33 @@ impl Content { Ok(Content { config, dir }) } + /// available_space returns the available space of the disk. + pub fn available_space(&self) -> Result { + let stat = fs2::statvfs(&self.dir)?; + Ok(stat.available_space()) + } + + /// total_space returns the total space of the disk. + pub fn total_space(&self) -> Result { + let stat = fs2::statvfs(&self.dir)?; + Ok(stat.total_space()) + } + + /// has_enough_space checks if the storage has enough space to store the content. + pub fn has_enough_space(&self, content_length: u64) -> Result { + let available_space = self.available_space()?; + if available_space < content_length { + warn!( + "not enough space to store the persistent cache task: available_space={}, content_length={}", + available_space, content_length + ); + + return Ok(false); + } + + Ok(true) + } + /// hard_link_or_copy_task hard links or copies the task content to the destination. #[instrument(skip_all)] pub async fn hard_link_or_copy_task( diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 61af65b8..3736c30e 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -24,7 +24,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::io::AsyncRead; -use tracing::{debug, error, instrument}; +use tracing::{debug, error, instrument, warn}; pub mod content; pub mod metadata; @@ -59,6 +59,21 @@ impl Storage { }) } + /// total_space returns the total space of the disk. + pub fn total_space(&self) -> Result { + self.content.total_space() + } + + /// available_space returns the available space of the disk. + pub fn available_space(&self) -> Result { + self.content.available_space() + } + + /// has_enough_space checks if the storage has enough space to store the content. + pub fn has_enough_space(&self, content_length: u64) -> Result { + self.content.has_enough_space(content_length) + } + /// hard_link_or_copy_task hard links or copies the task content to the destination. #[instrument(skip_all)] pub async fn hard_link_or_copy_task( @@ -245,6 +260,12 @@ impl Storage { self.metadata.get_persistent_cache_task(id) } + /// persist_persistent_cache_task persists the persistent cache task metadata. + #[instrument(skip_all)] + pub fn persist_persistent_cache_task(&self, id: &str) -> Result { + self.metadata.persist_persistent_cache_task(id) + } + /// is_persistent_cache_task_exists returns whether the persistent cache task exists. #[instrument(skip_all)] pub fn is_persistent_cache_task_exists(&self, id: &str) -> Result { diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index 8eb091fc..b6e95f9d 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -700,6 +700,22 @@ impl Metadata { Ok(task) } + /// persist_persistent_cache_task persists the persistent cache task metadata. + #[instrument(skip_all)] + pub fn persist_persistent_cache_task(&self, id: &str) -> Result { + let task = match self.db.get::(id.as_bytes())? { + Some(mut task) => { + task.persistent = true; + task.updated_at = Utc::now().naive_utc(); + task + } + None => return Err(Error::TaskNotFound(id.to_string())), + }; + + self.db.put(id.as_bytes(), &task)?; + Ok(task) + } + /// get_persistent_cache_task gets the persistent cache task metadata. #[instrument(skip_all)] pub fn get_persistent_cache_task(&self, id: &str) -> Result> { diff --git a/dragonfly-client-storage/src/storage_engine/rocksdb.rs b/dragonfly-client-storage/src/storage_engine/rocksdb.rs index cdc88125..8636b8a2 100644 --- a/dragonfly-client-storage/src/storage_engine/rocksdb.rs +++ b/dragonfly-client-storage/src/storage_engine/rocksdb.rs @@ -117,9 +117,7 @@ impl RocksdbStorageEngine { // If the storage is kept, open the db and drop the unused column families. // Otherwise, destroy the db. - if keep { - drop_unused_cfs(&dir); - } else { + if !keep { rocksdb::DB::destroy(&options, &dir).unwrap_or_else(|err| { warn!("destroy {:?} failed: {}", dir, err); }); @@ -259,24 +257,3 @@ where db.cf_handle(cf_name) .ok_or_else(|| Error::ColumnFamilyNotFound(cf_name.to_string())) } - -/// drop_unused_cfs drops the unused column families. -fn drop_unused_cfs(dir: &Path) { - let old_cf_names = vec!["task", "piece", "cache_task"]; - let unused_cf_names = vec!["cache_task"]; - - let mut db = match rocksdb::DB::open_cf(&rocksdb::Options::default(), dir, old_cf_names) { - Ok(db) => db, - Err(err) => { - warn!("open cf failed: {}", err); - return; - } - }; - - for cf_name in unused_cf_names { - match db.drop_cf(cf_name) { - Ok(_) => info!("drop cf [{}] success", cf_name), - Err(err) => warn!("drop cf [{}] failed: {}", cf_name, err), - } - } -} diff --git a/dragonfly-client/Cargo.toml b/dragonfly-client/Cargo.toml index 9eeb154a..9a421ee8 100644 --- a/dragonfly-client/Cargo.toml +++ b/dragonfly-client/Cargo.toml @@ -64,6 +64,7 @@ percent-encoding.workspace = true tokio-rustls.workspace = true serde_json.workspace = true lru.workspace = true +fs2.workspace = true lazy_static = "1.5" tracing-log = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "chrono"] } @@ -81,7 +82,6 @@ sysinfo = "0.32.1" tower = "0.4.13" indicatif = "0.17.9" dashmap = "6.1.0" -fs2 = "0.4.3" hashring = "0.3.6" fslock = "0.2.1" leaky-bucket = "1.1.2" diff --git a/dragonfly-client/src/bin/dfcache/import.rs b/dragonfly-client/src/bin/dfcache/import.rs index bff0c266..66fdaf33 100644 --- a/dragonfly-client/src/bin/dfcache/import.rs +++ b/dragonfly-client/src/bin/dfcache/import.rs @@ -37,6 +37,13 @@ pub struct ImportCommand { #[arg(help = "Specify the path of the file to import")] path: PathBuf, + #[arg( + long = "id", + required = false, + help = "Specify the id of the persistent cache task, its length must be 64 bytes. If id is none, dfdaemon will generate the new task id based on the file content, tag and application by wyhash algorithm." + )] + id: Option, + #[arg( long = "persistent-replica-count", default_value_t = default_dfcache_persistent_replica_count(), @@ -321,6 +328,7 @@ impl ImportCommand { let persistent_cache_task = dfdaemon_download_client .upload_persistent_cache_task(UploadPersistentCacheTaskRequest { + task_id: self.id.clone(), path: self.path.clone().into_os_string().into_string().unwrap(), persistent_replica_count: self.persistent_replica_count, tag: self.tag.clone(), @@ -341,6 +349,15 @@ impl ImportCommand { /// validate_args validates the command line arguments. fn validate_args(&self) -> Result<()> { + if let Some(id) = self.id.as_ref() { + if id.len() != 64 { + return Err(Error::ValidationError(format!( + "id length must be 64 bytes, but got {}", + id.len() + ))); + } + } + if self.path.is_dir() { return Err(Error::ValidationError(format!( "path {} is a directory", diff --git a/dragonfly-client/src/bin/dfcache/main.rs b/dragonfly-client/src/bin/dfcache/main.rs index 53e293d2..badabaab 100644 --- a/dragonfly-client/src/bin/dfcache/main.rs +++ b/dragonfly-client/src/bin/dfcache/main.rs @@ -26,7 +26,6 @@ use tracing::Level; pub mod export; pub mod import; -pub mod remove; pub mod stat; #[derive(Debug, Parser)] @@ -83,15 +82,6 @@ pub enum Command { long_about = "Stat a file in Dragonfly P2P network by task ID. If stat successfully, it will return the file information." )] Stat(stat::StatCommand), - - #[command( - name = "rm", - author, - version, - about = "Remove a file from Dragonfly P2P network", - long_about = "Remove the P2P cache in Dragonfly P2P network by task ID." - )] - Remove(remove::RemoveCommand), } /// Implement the execute for Command. @@ -102,7 +92,6 @@ impl Command { Self::Import(cmd) => cmd.execute().await, Self::Export(cmd) => cmd.execute().await, Self::Stat(cmd) => cmd.execute().await, - Self::Remove(cmd) => cmd.execute().await, } } } diff --git a/dragonfly-client/src/bin/dfcache/remove.rs b/dragonfly-client/src/bin/dfcache/remove.rs deleted file mode 100644 index 615e4d0a..00000000 --- a/dragonfly-client/src/bin/dfcache/remove.rs +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Copyright 2024 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use clap::Parser; -use dragonfly_api::dfdaemon::v2::DeletePersistentCacheTaskRequest; -use dragonfly_client_core::{Error, Result}; -use indicatif::{ProgressBar, ProgressStyle}; -use std::time::Duration; -use termion::{color, style}; - -use super::*; - -/// DEFAULT_PROGRESS_BAR_STEADY_TICK_INTERVAL is the default steady tick interval of progress bar. -const DEFAULT_PROGRESS_BAR_STEADY_TICK_INTERVAL: Duration = Duration::from_millis(80); - -/// RemoveCommand is the subcommand of remove. -#[derive(Debug, Clone, Parser)] -pub struct RemoveCommand { - #[arg(help = "Specify the persistent cache task ID to remove")] - id: String, - - #[arg( - short = 'e', - long = "endpoint", - default_value_os_t = dfdaemon::default_download_unix_socket_path(), - help = "Endpoint of dfdaemon's GRPC server" - )] - endpoint: PathBuf, - - #[arg( - short = 'l', - long, - default_value = "info", - help = "Specify the logging level [trace, debug, info, warn, error]" - )] - log_level: Level, - - #[arg( - long, - default_value_os_t = dfcache::default_dfcache_log_dir(), - help = "Specify the log directory" - )] - log_dir: PathBuf, - - #[arg( - long, - default_value_t = 6, - help = "Specify the max number of log files" - )] - log_max_files: usize, - - #[arg( - long = "verbose", - default_value_t = false, - help = "Specify whether to print log" - )] - verbose: bool, -} - -/// Implement the execute for RemoveCommand. -impl RemoveCommand { - /// execute executes the delete command. - pub async fn execute(&self) -> Result<()> { - // Parse command line arguments. - Args::parse(); - - // Initialize tracing. - let _guards = init_tracing( - dfcache::NAME, - self.log_dir.clone(), - self.log_level, - self.log_max_files, - None, - false, - self.verbose, - ); - - // Get dfdaemon download client. - let dfdaemon_download_client = - match get_dfdaemon_download_client(self.endpoint.to_path_buf()).await { - Ok(client) => client, - Err(err) => { - println!( - "{}{}{}Connect Dfdaemon Failed!{}", - color::Fg(color::Red), - style::Italic, - style::Bold, - style::Reset - ); - - println!( - "{}{}{}****************************************{}", - color::Fg(color::Black), - style::Italic, - style::Bold, - style::Reset - ); - - println!( - "{}{}{}Message:{}, can not connect {}, please check the unix socket {}", - color::Fg(color::Cyan), - style::Italic, - style::Bold, - style::Reset, - err, - self.endpoint.to_string_lossy(), - ); - - println!( - "{}{}{}****************************************{}", - color::Fg(color::Black), - style::Italic, - style::Bold, - style::Reset - ); - - std::process::exit(1); - } - }; - - // Run delete sub command. - if let Err(err) = self.run(dfdaemon_download_client).await { - match err { - Error::TonicStatus(status) => { - println!( - "{}{}{}Removing Failed!{}", - color::Fg(color::Red), - style::Italic, - style::Bold, - style::Reset, - ); - - println!( - "{}{}{}*********************************{}", - color::Fg(color::Black), - style::Italic, - style::Bold, - style::Reset - ); - - println!( - "{}{}{}Bad Code:{} {}", - color::Fg(color::Red), - style::Italic, - style::Bold, - style::Reset, - status.code() - ); - - println!( - "{}{}{}Message:{} {}", - color::Fg(color::Cyan), - style::Italic, - style::Bold, - style::Reset, - status.message() - ); - - println!( - "{}{}{}Details:{} {}", - color::Fg(color::Cyan), - style::Italic, - style::Bold, - style::Reset, - std::str::from_utf8(status.details()).unwrap() - ); - - println!( - "{}{}{}*********************************{}", - color::Fg(color::Black), - style::Italic, - style::Bold, - style::Reset - ); - } - err => { - println!( - "{}{}{}Removing Failed!{}", - color::Fg(color::Red), - style::Italic, - style::Bold, - style::Reset - ); - - println!( - "{}{}{}****************************************{}", - color::Fg(color::Black), - style::Italic, - style::Bold, - style::Reset - ); - - println!( - "{}{}{}Message:{} {}", - color::Fg(color::Red), - style::Italic, - style::Bold, - style::Reset, - err - ); - - println!( - "{}{}{}****************************************{}", - color::Fg(color::Black), - style::Italic, - style::Bold, - style::Reset - ); - } - } - - std::process::exit(1); - } - - Ok(()) - } - - /// run runs the delete command. - async fn run(&self, dfdaemon_download_client: DfdaemonDownloadClient) -> Result<()> { - let pb = ProgressBar::new_spinner(); - pb.enable_steady_tick(DEFAULT_PROGRESS_BAR_STEADY_TICK_INTERVAL); - pb.set_style( - ProgressStyle::with_template("{spinner:.blue} {msg}") - .unwrap() - .tick_strings(&["⣾", "⣽", "⣻", "⢿", "⡿", "⣟", "⣯", "⣷"]), - ); - pb.set_message("Removing..."); - - dfdaemon_download_client - .delete_persistent_cache_task(DeletePersistentCacheTaskRequest { - task_id: self.id.clone(), - }) - .await?; - - pb.finish_with_message("Done"); - Ok(()) - } -} diff --git a/dragonfly-client/src/gc/mod.rs b/dragonfly-client/src/gc/mod.rs index 85243d14..e9f41b84 100644 --- a/dragonfly-client/src/gc/mod.rs +++ b/dragonfly-client/src/gc/mod.rs @@ -17,13 +17,10 @@ use crate::grpc::scheduler::SchedulerClient; use crate::shutdown; use chrono::Utc; -use dragonfly_api::scheduler::v2::{DeletePersistentCacheTaskRequest, DeleteTaskRequest}; +use dragonfly_api::scheduler::v2::DeleteTaskRequest; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::Result; -use dragonfly_client_storage::{ - content::{DEFAULT_CONTENT_DIR, DEFAULT_PERSISTENT_CACHE_TASK_DIR, DEFAULT_TASK_DIR}, - metadata, Storage, -}; +use dragonfly_client_storage::{metadata, Storage}; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; @@ -136,16 +133,8 @@ impl GC { /// evict_task_by_disk_usage evicts the task by disk usage. #[instrument(skip_all)] async fn evict_task_by_disk_usage(&self) -> Result<()> { - let stats = fs2::statvfs( - self.config - .storage - .dir - .join(DEFAULT_CONTENT_DIR) - .join(DEFAULT_TASK_DIR) - .as_path(), - )?; - let available_space = stats.available_space(); - let total_space = stats.total_space(); + let available_space = self.storage.available_space()?; + let total_space = self.storage.total_space()?; // Calculate the usage percent. let usage_percent = (100 - available_space * 100 / total_space) as u8; @@ -249,10 +238,6 @@ impl GC { if task.is_expired() { self.storage.delete_persistent_cache_task(&task.id).await; info!("evict persistent cache task {}", task.id); - - self.delete_persistent_cache_task_from_scheduler(task.clone()) - .await; - info!("delete persistent cache task {} from scheduler", task.id); } } @@ -262,16 +247,8 @@ impl GC { /// evict_persistent_cache_task_by_disk_usage evicts the persistent cache task by disk usage. #[instrument(skip_all)] async fn evict_persistent_cache_task_by_disk_usage(&self) -> Result<()> { - let stats = fs2::statvfs( - self.config - .storage - .dir - .join(DEFAULT_CONTENT_DIR) - .join(DEFAULT_PERSISTENT_CACHE_TASK_DIR) - .as_path(), - )?; - let available_space = stats.available_space(); - let total_space = stats.total_space(); + let available_space = self.storage.available_space()?; + let total_space = self.storage.total_space()?; // Calculate the usage percent. let usage_percent = (100 - available_space * 100 / total_space) as u8; @@ -340,33 +317,9 @@ impl GC { "evict persistent cache task {} size {}", task.id, task_space ); - - self.delete_persistent_cache_task_from_scheduler(task.clone()) - .await; - info!("delete persistent cache task {} from scheduler", task.id); } info!("evict total size {}", evicted_space); Ok(()) } - - /// delete_persistent_cache_task_from_scheduler deletes the persistent cache task from the scheduler. - #[instrument(skip_all)] - async fn delete_persistent_cache_task_from_scheduler( - &self, - task: metadata::PersistentCacheTask, - ) { - self.scheduler_client - .delete_persistent_cache_task(DeletePersistentCacheTaskRequest { - host_id: self.host_id.clone(), - task_id: task.id.clone(), - }) - .await - .unwrap_or_else(|err| { - error!( - "failed to delete persistent cache peer {}: {}", - task.id, err - ); - }); - } } diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index 4538d588..d3dd2b7b 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -30,10 +30,9 @@ use dragonfly_api::dfdaemon::v2::{ dfdaemon_download_server::{ DfdaemonDownload, DfdaemonDownloadServer as DfdaemonDownloadGRPCServer, }, - DeletePersistentCacheTaskRequest, DeleteTaskRequest, DownloadPersistentCacheTaskRequest, - DownloadPersistentCacheTaskResponse, DownloadTaskRequest, DownloadTaskResponse, - StatPersistentCacheTaskRequest, StatTaskRequest as DfdaemonStatTaskRequest, - UploadPersistentCacheTaskRequest, + DeleteTaskRequest, DownloadPersistentCacheTaskRequest, DownloadPersistentCacheTaskResponse, + DownloadTaskRequest, DownloadTaskResponse, StatPersistentCacheTaskRequest, + StatTaskRequest as DfdaemonStatTaskRequest, UploadPersistentCacheTaskRequest, }; use dragonfly_api::errordetails::v2::Backend; use dragonfly_api::scheduler::v2::DeleteHostRequest as SchedulerDeleteHostRequest; @@ -57,7 +56,7 @@ use tonic::{ Code, Request, Response, Status, }; use tower::service_fn; -use tracing::{debug, error, info, instrument, Instrument, Span}; +use tracing::{error, info, instrument, Instrument, Span}; use super::interceptor::TracingInterceptor; @@ -199,8 +198,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { &self, request: Request, ) -> Result, Status> { - debug!("download task in download server"); - // Record the start time. let start_time = Instant::now(); @@ -238,6 +235,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); Span::current().record("peer_id", peer_id.as_str()); + info!("download task in download server"); // Download task started. info!("download task started: {:?}", download); @@ -590,6 +588,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { // Span record the host id and task id. Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); + info!("stat task in download server"); // Collect the stat task metrics. collect_stat_task_started_metrics(TaskType::Standard as i32); @@ -628,6 +627,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { // Span record the host id and task id. Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); + info!("delete task in download server"); // Collect the delete task started metrics. collect_delete_task_started_metrics(TaskType::Standard as i32); @@ -655,6 +655,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { // Span record the host id. Span::current().record("host_id", host_id.as_str()); + info!("delete host in download server"); // Collect the delete host started metrics. collect_delete_host_started_metrics(); @@ -684,8 +685,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { &self, request: Request, ) -> Result, Status> { - info!("download persistent cache task in download server"); - // Record the start time. let start_time = Instant::now(); @@ -709,6 +708,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); Span::current().record("peer_id", peer_id.as_str()); + info!("download persistent cache task in download server"); // Download task started. info!("download persistent cache task started: {:?}", request); @@ -876,18 +876,21 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { info!("upload persistent cache task {:?}", request); // Generate the task id. - let task_id = self - .task - .id_generator - .persistent_cache_task_id( - &path.to_path_buf(), - request.tag.as_deref(), - request.application.as_deref(), - ) - .map_err(|err| { - error!("generate persistent cache task id: {}", err); - Status::invalid_argument(err.to_string()) - })?; + let task_id = match request.task_id.as_deref() { + Some(task_id) => task_id.to_string(), + None => self + .task + .id_generator + .persistent_cache_task_id( + &path.to_path_buf(), + request.tag.as_deref(), + request.application.as_deref(), + ) + .map_err(|err| { + error!("generate persistent cache task id: {}", err); + Status::invalid_argument(err.to_string()) + })?, + }; info!("generate persistent cache task id: {}", task_id); // Generate the host id. @@ -900,6 +903,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); Span::current().record("peer_id", peer_id.as_str()); + info!("upload persistent cache task in download server"); // Collect upload task started metrics. collect_upload_task_started_metrics( @@ -966,6 +970,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { // Span record the host id and task id. Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); + info!("stat persistent cache task in download server"); // Collect the stat persistent cache task started metrics. collect_stat_task_started_metrics(TaskType::PersistentCache as i32); @@ -984,31 +989,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { Ok(Response::new(task)) } - - /// delete_persistent_cache_task deletes the persistent cache task. - #[instrument(skip_all, fields(host_id, task_id))] - async fn delete_persistent_cache_task( - &self, - request: Request, - ) -> Result, Status> { - // Clone the request. - let request = request.into_inner(); - - // Generate the host id. - let host_id = self.task.id_generator.host_id(); - - // Get the task id from the request. - let task_id = request.task_id; - - // Span record the host id and task id. - Span::current().record("host_id", host_id.as_str()); - Span::current().record("task_id", task_id.as_str()); - - // Collect the delete persistent cache task started metrics. - collect_delete_task_started_metrics(TaskType::PersistentCache as i32); - self.persistent_cache_task.delete(task_id.as_str()).await; - Ok(Response::new(())) - } } /// DfdaemonDownloadClient is a wrapper of DfdaemonDownloadGRPCClient. @@ -1166,21 +1146,6 @@ impl DfdaemonDownloadClient { Ok(response.into_inner()) } - /// delete_persistent_cache_task deletes the persistent cache task. - #[instrument(skip_all)] - pub async fn delete_persistent_cache_task( - &self, - request: DeletePersistentCacheTaskRequest, - ) -> ClientResult<()> { - let request = Self::make_request(request); - let _response = self - .client - .clone() - .delete_persistent_cache_task(request) - .await?; - Ok(()) - } - /// make_request creates a new request with timeout. #[instrument(skip_all)] fn make_request(request: T) -> tonic::Request { diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index 996605aa..394a5426 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -18,7 +18,8 @@ use crate::metrics::{ collect_delete_task_failure_metrics, collect_delete_task_started_metrics, collect_download_task_failure_metrics, collect_download_task_finished_metrics, collect_download_task_started_metrics, collect_stat_task_failure_metrics, - collect_stat_task_started_metrics, collect_upload_piece_failure_metrics, + collect_stat_task_started_metrics, collect_update_task_failure_metrics, + collect_update_task_started_metrics, collect_upload_piece_failure_metrics, collect_upload_piece_finished_metrics, collect_upload_piece_started_metrics, }; use crate::resource::{persistent_cache_task, task}; @@ -32,7 +33,7 @@ use dragonfly_api::dfdaemon::v2::{ DownloadPersistentCacheTaskResponse, DownloadPieceRequest, DownloadPieceResponse, DownloadTaskRequest, DownloadTaskResponse, StatPersistentCacheTaskRequest, StatTaskRequest, SyncHostRequest, SyncPersistentCachePiecesRequest, SyncPersistentCachePiecesResponse, - SyncPiecesRequest, SyncPiecesResponse, + SyncPiecesRequest, SyncPiecesResponse, UpdatePersistentCacheTaskRequest, }; use dragonfly_api::errordetails::v2::Backend; use dragonfly_client_config::dfdaemon::Config; @@ -54,7 +55,7 @@ use tonic::{ transport::{Channel, Server}, Code, Request, Response, Status, }; -use tracing::{debug, error, info, instrument, Instrument, Span}; +use tracing::{error, info, instrument, Instrument, Span}; use url::Url; use super::interceptor::TracingInterceptor; @@ -195,8 +196,6 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { - debug!("download task in upload server"); - // Record the start time. let start_time = Instant::now(); @@ -234,6 +233,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); Span::current().record("peer_id", peer_id.as_str()); + info!("download task in upload server"); // Download task started. info!("download task started: {:?}", download); @@ -586,6 +586,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { // Span record the host id and task id. Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); + info!("stat task in upload server"); // Collect the stat task metrics. collect_stat_task_started_metrics(TaskType::Standard as i32); @@ -624,6 +625,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { // Span record the host id and task id. Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); + info!("delete task in upload server"); // Collect the delete task started metrics. collect_delete_task_started_metrics(TaskType::Standard as i32); @@ -668,6 +670,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { Span::current().record("host_id", host_id.clone()); Span::current().record("remote_host_id", remote_host_id.as_str()); Span::current().record("task_id", task_id.clone()); + info!("sync pieces in upload server"); // Get the interested piece numbers from the request. let mut interested_piece_numbers = request.interested_piece_numbers.clone(); @@ -812,6 +815,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { Span::current().record("remote_host_id", remote_host_id.as_str()); Span::current().record("task_id", task_id.as_str()); Span::current().record("piece_id", piece_id.as_str()); + info!("download piece content in upload server"); // Get the piece metadata from the local storage. let piece = self @@ -903,8 +907,6 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { - info!("download persistent cache task in download server"); - // Record the start time. let start_time = Instant::now(); @@ -928,6 +930,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); Span::current().record("peer_id", peer_id.as_str()); + info!("download persistent cache task in download server"); // Download task started. info!("download persistent cache task started: {:?}", request); @@ -1070,6 +1073,44 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { Ok(Response::new(ReceiverStream::new(out_stream_rx))) } + /// update_persistent_cache_task update metadata of the persistent cache task. + #[instrument(skip_all, fields(host_id, task_id))] + async fn update_persistent_cache_task( + &self, + request: Request, + ) -> Result, Status> { + // Clone the request. + let request = request.into_inner(); + + // Generate the host id. + let host_id = self.task.id_generator.host_id(); + + // Get the task id from the request. + let task_id = request.task_id; + + // Span record the host id and task id. + Span::current().record("host_id", host_id.as_str()); + Span::current().record("task_id", task_id.as_str()); + info!("update persistent cache task in upload server"); + + // Collect the update task started metrics. + collect_update_task_started_metrics(TaskType::PersistentCache as i32); + + if request.persistent { + self.persistent_cache_task + .persist(task_id.as_str()) + .map_err(|err| { + // Collect the update task failure metrics. + collect_update_task_failure_metrics(TaskType::PersistentCache as i32); + + error!("update persistent cache task: {}", err); + Status::internal(err.to_string()) + })?; + } + + Ok(Response::new(())) + } + /// stat_persistent_cache_task stats the persistent cache task. #[instrument(skip_all, fields(host_id, task_id))] async fn stat_persistent_cache_task( @@ -1088,6 +1129,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { // Span record the host id and task id. Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); + info!("stat persistent cache task in upload server"); // Collect the stat task started metrics. collect_stat_task_started_metrics(TaskType::PersistentCache as i32); @@ -1125,6 +1167,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { // Span record the host id and task id. Span::current().record("host_id", host_id.as_str()); Span::current().record("task_id", task_id.as_str()); + info!("delete persistent cache task in upload server"); // Collect the delete task started metrics. collect_delete_task_started_metrics(TaskType::PersistentCache as i32); @@ -1158,6 +1201,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { Span::current().record("host_id", host_id.clone()); Span::current().record("remote_host_id", remote_host_id.as_str()); Span::current().record("task_id", task_id.clone()); + info!("sync persistent cache pieces in upload server"); // Get the interested piece numbers from the request. let mut interested_piece_numbers = request.interested_piece_numbers.clone(); @@ -1296,6 +1340,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { Span::current().record("remote_host_id", remote_host_id.as_str()); Span::current().record("task_id", task_id.as_str()); Span::current().record("piece_id", piece_id.as_str()); + info!("download persistent cache piece in upload server"); // Get the piece metadata from the local storage. let piece = self diff --git a/dragonfly-client/src/metrics/mod.rs b/dragonfly-client/src/metrics/mod.rs index 5c89d241..c597e30d 100644 --- a/dragonfly-client/src/metrics/mod.rs +++ b/dragonfly-client/src/metrics/mod.rs @@ -193,6 +193,20 @@ lazy_static! { &[] ).expect("metric can be created"); + /// UPDATE_TASK_COUNT is used to count the number of update tasks. + pub static ref UPDATE_TASK_COUNT: IntCounterVec = + IntCounterVec::new( + Opts::new("update_task_total", "Counter of the number of the update task.").namespace(dragonfly_client_config::SERVICE_NAME).subsystem(dragonfly_client_config::NAME), + &["type"] + ).expect("metric can be created"); + + /// UPDATE_TASK_FAILURE_COUNT is used to count the failed number of update tasks. + pub static ref UPDATE_TASK_FAILURE_COUNT: IntCounterVec = + IntCounterVec::new( + Opts::new("update_task_failure_total", "Counter of the number of failed of the update task.").namespace(dragonfly_client_config::SERVICE_NAME).subsystem(dragonfly_client_config::NAME), + &["type"] + ).expect("metric can be created"); + /// STAT_TASK_COUNT is used to count the number of stat tasks. pub static ref STAT_TASK_COUNT: IntCounterVec = IntCounterVec::new( @@ -337,6 +351,14 @@ fn register_custom_metrics() { )) .expect("metric can be registered"); + REGISTRY + .register(Box::new(UPDATE_TASK_COUNT.clone())) + .expect("metric can be registered"); + + REGISTRY + .register(Box::new(UPDATE_TASK_FAILURE_COUNT.clone())) + .expect("metric can be registered"); + REGISTRY .register(Box::new(STAT_TASK_COUNT.clone())) .expect("metric can be registered"); @@ -398,6 +420,8 @@ fn reset_custom_metrics() { PROXY_REQUEST_FAILURE_COUNT.reset(); PROXY_REQUEST_VIA_DFDAEMON_COUNT.reset(); PROXY_REQUEST_VIA_DFDAEMON_AND_CACHE_HITS_COUNT.reset(); + UPDATE_TASK_COUNT.reset(); + UPDATE_TASK_FAILURE_COUNT.reset(); STAT_TASK_COUNT.reset(); STAT_TASK_FAILURE_COUNT.reset(); DELETE_TASK_COUNT.reset(); @@ -746,6 +770,20 @@ pub fn collect_proxy_request_via_dfdaemon_and_cache_hits_metrics() { .inc(); } +/// collect_update_task_started_metrics collects the update task started metrics. +pub fn collect_update_task_started_metrics(typ: i32) { + UPDATE_TASK_COUNT + .with_label_values(&[typ.to_string().as_str()]) + .inc(); +} + +/// collect_update_task_failure_metrics collects the update task failure metrics. +pub fn collect_update_task_failure_metrics(typ: i32) { + UPDATE_TASK_FAILURE_COUNT + .with_label_values(&[typ.to_string().as_str()]) + .inc(); +} + /// collect_stat_task_started_metrics collects the stat task started metrics. pub fn collect_stat_task_started_metrics(typ: i32) { STAT_TASK_COUNT diff --git a/dragonfly-client/src/proxy/cache.rs b/dragonfly-client/src/proxy/cache.rs index 1d12588d..2e0037da 100644 --- a/dragonfly-client/src/proxy/cache.rs +++ b/dragonfly-client/src/proxy/cache.rs @@ -59,7 +59,7 @@ impl Cache { download.filtered_query_params.clone(), )?; - let Some(task) = self.task.get(&task_id).await? else { + let Some(task) = self.task.get(&task_id)? else { return Ok(None); }; diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index e996d65c..5cc035d0 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -108,6 +108,12 @@ impl PersistentCacheTask { }) } + /// get gets a persistent cache task from local. + #[instrument(skip_all)] + pub fn get(&self, task_id: &str) -> ClientResult> { + self.storage.get_persistent_cache_task(task_id) + } + /// create_persistent creates a persistent cache task from local. #[instrument(skip_all)] pub async fn create_persistent( @@ -161,6 +167,15 @@ impl PersistentCacheTask { } } + // Check if the storage has enough space to store the persistent cache task. + let has_enough_space = self.storage.has_enough_space(content_length)?; + if !has_enough_space { + return Err(Error::NoSpace(format!( + "not enough space to store the persistent cache task: content_length={}", + content_length + ))); + } + self.storage .create_persistent_cache_task_started(task_id, ttl, piece_length, content_length) .await?; @@ -416,6 +431,18 @@ impl PersistentCacheTask { let ttl = Duration::try_from(response.ttl.ok_or(Error::InvalidParameter)?) .or_err(ErrorType::ParseError)?; + // If the persistent cache task is not found, check if the storage has enough space to + // store the persistent cache task. + if let Ok(None) = self.get(task_id) { + let has_enough_space = self.storage.has_enough_space(response.content_length)?; + if !has_enough_space { + return Err(Error::NoSpace(format!( + "not enough space to store the persistent cache task: content_length={}", + response.content_length + ))); + } + } + self.storage.download_persistent_cache_task_started( task_id, ttl, @@ -646,6 +673,7 @@ impl PersistentCacheTask { request: Some( announce_persistent_cache_peer_request::Request::RegisterPersistentCachePeerRequest( RegisterPersistentCachePeerRequest { + persistent: request.persistent, tag: request.tag.clone(), application: request.application.clone(), piece_length: task.piece_length, @@ -680,7 +708,9 @@ impl PersistentCacheTask { .timeout(self.config.scheduler.schedule_timeout); tokio::pin!(out_stream); - while let Some(message) = out_stream.try_next().await? { + while let Some(message) = out_stream.try_next().await.inspect_err(|err| { + error!("receive message from scheduler failed: {:?}", err); + })? { // Check if the schedule count is exceeded. schedule_count += 1; if schedule_count >= self.config.scheduler.max_schedule_count { @@ -1319,6 +1349,11 @@ impl PersistentCacheTask { Ok(finished_pieces) } + /// persist persists the persistent cache task. + pub fn persist(&self, task_id: &str) -> ClientResult { + self.storage.persist_persistent_cache_task(task_id) + } + /// stat stats the persistent cache task from the scheduler. #[instrument(skip_all)] pub async fn stat( diff --git a/dragonfly-client/src/resource/piece_collector.rs b/dragonfly-client/src/resource/piece_collector.rs index d3d56938..ccc13643 100644 --- a/dragonfly-client/src/resource/piece_collector.rs +++ b/dragonfly-client/src/resource/piece_collector.rs @@ -19,7 +19,6 @@ use dashmap::DashMap; use dragonfly_api::common::v2::Host; use dragonfly_api::dfdaemon::v2::{SyncPersistentCachePiecesRequest, SyncPiecesRequest}; use dragonfly_client_config::dfdaemon::Config; -use dragonfly_client_core::error::{ErrorType, OrErr}; use dragonfly_client_core::{Error, Result}; use dragonfly_client_storage::metadata; use std::sync::Arc; @@ -197,9 +196,9 @@ impl PieceCollector { let out_stream = response.into_inner().timeout(collected_piece_timeout); tokio::pin!(out_stream); - while let Some(message) = - out_stream.try_next().await.or_err(ErrorType::StreamError)? - { + while let Some(message) = out_stream.try_next().await.inspect_err(|err| { + error!("sync pieces from parent {} failed: {}", parent.id, err); + })? { let message = message?; let mut parent_id = match collected_pieces.try_get_mut(&message.number).try_unwrap() { @@ -432,9 +431,12 @@ impl PersistentCachePieceCollector { let out_stream = response.into_inner().timeout(collected_piece_timeout); tokio::pin!(out_stream); - while let Some(message) = - out_stream.try_next().await.or_err(ErrorType::StreamError)? - { + while let Some(message) = out_stream.try_next().await.inspect_err(|err| { + error!( + "sync persistent cache pieces from parent {} failed: {}", + parent.id, err + ); + })? { let message = message?; let mut parent_id = match collected_pieces.try_get_mut(&message.number).try_unwrap() { diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 5d0061a0..3dcc6ef6 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -117,7 +117,7 @@ impl Task { } /// get gets the metadata of the task. - pub async fn get(&self, id: &str) -> ClientResult> { + pub fn get(&self, id: &str) -> ClientResult> { self.storage.get_task(id) } @@ -206,6 +206,18 @@ impl Task { content_length, ); + // If the task is not found, check if the storage has enough space to + // store the task. + if let Ok(None) = self.get(id) { + let has_enough_space = self.storage.has_enough_space(content_length)?; + if !has_enough_space { + return Err(Error::NoSpace(format!( + "not enough space to store the persistent cache task: content_length={}", + content_length + ))); + } + } + self.storage.download_task_started( id, Some(piece_length), @@ -534,7 +546,9 @@ impl Task { .timeout(self.config.scheduler.schedule_timeout); tokio::pin!(out_stream); - while let Some(message) = out_stream.try_next().await? { + while let Some(message) = out_stream.try_next().await.inspect_err(|err| { + error!("receive message from scheduler failed: {:?}", err); + })? { // Check if the schedule count is exceeded. schedule_count += 1; if schedule_count >= self.config.scheduler.max_schedule_count { @@ -840,7 +854,7 @@ impl Task { partial_finished_pieces.clone(), ); - if finished_pieces.len() == remaining_interested_pieces.len() { + if partial_finished_pieces.len() == remaining_interested_pieces.len() { // Send the download peer finished request. match in_stream_tx .send_timeout(