From 3339d5ca2feac41a63f0472811cb38205e5fbaf2 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 12 Feb 2025 17:23:49 +0800 Subject: [PATCH] feat: bump api from v2.1.23 to v2.1.25 and optimize gc for persistent cache task (#978) Signed-off-by: Gaius --- Cargo.lock | 20 +-- Cargo.toml | 18 +-- dragonfly-client-storage/src/content.rs | 138 ++++++++++++++++-- dragonfly-client-storage/src/lib.rs | 9 +- dragonfly-client-storage/src/metadata.rs | 3 +- dragonfly-client/src/bin/dfcache/import.rs | 9 ++ dragonfly-client/src/gc/mod.rs | 4 +- .../src/grpc/dfdaemon_download.rs | 5 + dragonfly-client/src/grpc/dfdaemon_upload.rs | 6 + .../src/resource/persistent_cache_task.rs | 7 + 10 files changed, 179 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5843db7a..0d08a8f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -923,9 +923,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.1.23" +version = "2.1.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee86ce7919760ff9e043ea089d398e367acf18451ae607aa614b3e65c4cafcd7" +checksum = "303e4462081b03a42316ec8569d9ce5fc6a220a4489d2029bf6120311a9dd2a8" dependencies = [ "prost 0.13.4", "prost-types", @@ -938,7 +938,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.2.10" +version = "0.2.11" dependencies = [ "anyhow", "blake3", @@ -1010,7 +1010,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.2.10" +version = "0.2.11" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1041,7 +1041,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.2.10" +version = "0.2.11" dependencies = [ "bytesize", "bytesize-serde", @@ -1067,7 +1067,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.2.10" +version = "0.2.11" dependencies = [ "headers 0.4.0", "hyper 1.5.2", @@ -1086,7 +1086,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.2.10" +version = "0.2.11" dependencies = [ "anyhow", "clap", @@ -1104,7 +1104,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.2.10" +version = "0.2.11" dependencies = [ "base16ct", "bincode", @@ -1130,7 +1130,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.2.10" +version = "0.2.11" dependencies = [ "base16ct", "base64 0.22.1", @@ -1532,7 +1532,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.2.10" +version = "0.2.11" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 01193616..b97ed874 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.2.10" +version = "0.2.11" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,15 +22,15 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.2.10" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.10" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.10" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.10" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.10" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.10" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.10" } +dragonfly-client = { path = "dragonfly-client", version = "0.2.11" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.11" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.11" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.11" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.11" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.11" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.11" } thiserror = "1.0" -dragonfly-api = "=2.1.23" +dragonfly-api = "=2.1.25" reqwest = { version = "0.12.4", features = [ "stream", "native-tls", diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index 9dd28ded..84265b75 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -413,7 +413,10 @@ impl Content { info!("remove {:?} failed: {}", to, err); }); - if let Err(err) = self.hard_link_task(task.id.as_str(), to).await { + if let Err(err) = self + .hard_link_persistent_cache_task(task.id.as_str(), to) + .await + { warn!("hard link {:?} to {:?} failed: {}", task_path, to, err); // If the persistent cache task is empty, no need to copy. Need to open the file to @@ -427,7 +430,7 @@ impl Content { return Ok(()); } - self.copy_task(task.id.as_str(), to) + self.copy_persistent_cache_task(task.id.as_str(), to) .await .inspect_err(|err| { error!("copy {:?} to {:?} failed: {}", task_path, to, err); @@ -441,21 +444,98 @@ impl Content { Ok(()) } - /// copy_persistent_cache_task copies the persistent cache task content to the destination. + /// read_persistent_cache_piece reads the persistent cache piece from the content. #[instrument(skip_all)] - pub async fn write_persistent_cache_task( + pub async fn read_persistent_cache_piece( &self, task_id: &str, - from: &Path, - ) -> Result { - // Open the file to copy the content. - let from_f = File::open(from).await?; + offset: u64, + length: u64, + range: Option, + ) -> Result { + let task_path = self.get_persistent_cache_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = calculate_piece_range(offset, length, range); + + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_reader + .seek(SeekFrom::Start(target_offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + Ok(f_reader.take(target_length)) + } + + /// read_persistent_cache_piece_with_dual_read return two readers, one is the range reader, and the other is the + /// full reader of the persistent cache piece. It is used for cache the piece content to the proxy cache. + #[instrument(skip_all)] + pub async fn read_persistent_cache_piece_with_dual_read( + &self, + task_id: &str, + offset: u64, + length: u64, + range: Option, + ) -> Result<(impl AsyncRead, impl AsyncRead)> { + let task_path = self.get_persistent_cache_task_path(task_id); + + // Calculate the target offset and length based on the range. + let (target_offset, target_length) = calculate_piece_range(offset, length, range); + + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_range_reader + .seek(SeekFrom::Start(target_offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + let range_reader = f_range_reader.take(target_length); + + // Create full reader of the piece. + let f = File::open(task_path.as_path()).await.inspect_err(|err| { + error!("open {:?} failed: {}", task_path, err); + })?; + let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f); + + f_reader + .seek(SeekFrom::Start(offset)) + .await + .inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + let reader = f_reader.take(length); + + Ok((range_reader, reader)) + } + /// write_persistent_cache_piece_with_crc32_castagnoli writes the persistent cache piece to the content with crc32 castagnoli. + /// Calculate the hash of the piece by crc32 castagnoli with hardware acceleration. + #[instrument(skip_all)] + pub async fn write_persistent_cache_piece_with_crc32_castagnoli< + R: AsyncRead + Unpin + ?Sized, + >( + &self, + task_id: &str, + offset: u64, + reader: &mut R, + ) -> Result { + // Open the file and seek to the offset. let task_path = self .create_or_get_persistent_cache_task_path(task_id) .await?; - let to_f = OpenOptions::new() - .create_new(true) + let mut f = OpenOptions::new() + .create(true) + .truncate(false) .write(true) .open(task_path.as_path()) .await @@ -463,16 +543,20 @@ impl Content { error!("open {:?} failed: {}", task_path, err); })?; - // Copy the content to the file while updating the CRC32 value. - let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, from_f); + f.seek(SeekFrom::Start(offset)).await.inspect_err(|err| { + error!("seek {:?} failed: {}", task_path, err); + })?; + + // Copy the piece to the file while updating the CRC32 value. + let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); let crc = Crc::>::new(&CRC_32_ISCSI); let mut digest = crc.digest(); - let mut tee = InspectReader::new(&mut reader, |bytes| { + let mut tee = InspectReader::new(reader, |bytes| { digest.update(bytes); }); - let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, to_f); + let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f); let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| { error!("copy {:?} failed: {}", task_path, err); })?; @@ -481,12 +565,36 @@ impl Content { error!("flush {:?} failed: {}", task_path, err); })?; - Ok(WritePersistentCacheTaskResponse { + // Calculate the hash of the piece. + Ok(WritePieceResponse { length, hash: digest.finalize().to_string(), }) } + /// hard_link_persistent_cache_task hard links the persistent cache task content. + #[instrument(skip_all)] + async fn hard_link_persistent_cache_task(&self, task_id: &str, link: &Path) -> Result<()> { + fs::hard_link(self.get_persistent_cache_task_path(task_id), link).await?; + Ok(()) + } + + /// copy_persistent_cache_task copies the persistent cache task content to the destination. + #[instrument(skip_all)] + async fn copy_persistent_cache_task(&self, task_id: &str, to: &Path) -> Result<()> { + // Ensure the parent directory of the destination exists. + if let Some(parent) = to.parent() { + if !parent.exists() { + fs::create_dir_all(parent).await.inspect_err(|err| { + error!("failed to create directory {:?}: {}", parent, err); + })?; + } + } + + fs::copy(self.get_persistent_cache_task_path(task_id), to).await?; + Ok(()) + } + /// delete_task deletes the persistent cache task content. #[instrument(skip_all)] pub async fn delete_persistent_cache_task(&self, task_id: &str) -> Result<()> { diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 3736c30e..b0e935d6 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +use chrono::NaiveDateTime; use dragonfly_api::common::v2::Range; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{Error, Result}; @@ -214,6 +215,7 @@ impl Storage { persistent: bool, piece_length: u64, content_length: u64, + created_at: NaiveDateTime, ) -> Result { self.metadata.download_persistent_cache_task_started( id, @@ -221,6 +223,7 @@ impl Storage { persistent, piece_length, content_length, + created_at, ) } @@ -312,7 +315,7 @@ impl Storage { ) -> Result { let response = self .content - .write_piece_with_crc32_castagnoli(task_id, offset, reader) + .write_persistent_cache_piece_with_crc32_castagnoli(task_id, offset, reader) .await?; let digest = Digest::new(Algorithm::Crc32, response.hash); @@ -562,7 +565,7 @@ impl Storage { ) -> Result { let response = self .content - .write_piece_with_crc32_castagnoli(task_id, offset, reader) + .write_persistent_cache_piece_with_crc32_castagnoli(task_id, offset, reader) .await?; let length = response.length; @@ -613,7 +616,7 @@ impl Storage { Ok(Some(piece)) => { match self .content - .read_piece(task_id, piece.offset, piece.length, range) + .read_persistent_cache_piece(task_id, piece.offset, piece.length, range) .await { Ok(reader) => { diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index d9463bbb..f09744cb 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -583,6 +583,7 @@ impl Metadata { persistent: bool, piece_length: u64, content_length: u64, + created_at: NaiveDateTime, ) -> Result { let task = match self.db.get::(id.as_bytes())? { Some(mut task) => { @@ -601,7 +602,7 @@ impl Metadata { piece_length, content_length, updated_at: Utc::now().naive_utc(), - created_at: Utc::now().naive_utc(), + created_at, ..Default::default() }, }; diff --git a/dragonfly-client/src/bin/dfcache/import.rs b/dragonfly-client/src/bin/dfcache/import.rs index 75aa2dcd..1966f053 100644 --- a/dragonfly-client/src/bin/dfcache/import.rs +++ b/dragonfly-client/src/bin/dfcache/import.rs @@ -354,6 +354,15 @@ impl ImportCommand { /// validate_args validates the command line arguments. fn validate_args(&self) -> Result<()> { + if self.ttl < Duration::from_secs(5 * 60) + || self.ttl > Duration::from_secs(7 * 24 * 60 * 60) + { + return Err(Error::ValidationError(format!( + "ttl must be between 5 minutes and 7 days, but got {}", + self.ttl.as_secs() + ))); + } + if let Some(id) = self.id.as_ref() { if id.len() != 64 { return Err(Error::ValidationError(format!( diff --git a/dragonfly-client/src/gc/mod.rs b/dragonfly-client/src/gc/mod.rs index e9f41b84..93c08bc4 100644 --- a/dragonfly-client/src/gc/mod.rs +++ b/dragonfly-client/src/gc/mod.rs @@ -232,7 +232,7 @@ impl GC { /// evict_persistent_cache_task_by_ttl evicts the persistent cache task by ttl. #[instrument(skip_all)] async fn evict_persistent_cache_task_by_ttl(&self) -> Result<()> { - info!("start to evict by persistent cache task ttl * 2"); + info!("start to evict by persistent cache task ttl"); for task in self.storage.get_persistent_cache_tasks()? { // If the persistent cache task is expired and not uploading, evict the persistent cache task. if task.is_expired() { @@ -308,7 +308,7 @@ impl GC { } // Evict the task. - self.storage.delete_task(&task.id).await; + self.storage.delete_persistent_cache_task(&task.id).await; // Update the evicted space. let task_space = task.content_length(); diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index d3dd2b7b..50b634d7 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -802,6 +802,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { // Download persistent cache task succeeded. info!("download persistent cache task succeeded"); + if let Err(err) = + task_manager_clone.download_finished(task_clone.id.as_str()) + { + error!("download persistent cache task finished: {}", err); + } // Hard link or copy the persistent cache task content to the destination. if let Some(output_path) = request_clone.output_path { diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index 394a5426..3f16eed8 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -882,6 +882,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { cost: None, created_at: None, }), + digest: None, })) } @@ -1024,6 +1025,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { // Download persistent cache task succeeded. info!("download persistent cache task succeeded"); + if let Err(err) = + task_manager_clone.download_finished(task_clone.id.as_str()) + { + error!("download persistent cache task finished: {}", err); + } // Hard link or copy the persistent cache task content to the destination. if let Some(output_path) = request_clone.output_path { diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index 5cc035d0..ce84fc2b 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -15,6 +15,7 @@ */ use crate::grpc::{scheduler::SchedulerClient, REQUEST_TIMEOUT}; +use chrono::DateTime; use dragonfly_api::common::v2::{ PersistentCachePeer, PersistentCacheTask as CommonPersistentCacheTask, Piece, TrafficType, }; @@ -431,6 +432,11 @@ impl PersistentCacheTask { let ttl = Duration::try_from(response.ttl.ok_or(Error::InvalidParameter)?) .or_err(ErrorType::ParseError)?; + // Convert prost_wkt_types::Timestamp to chrono::DateTime. + let created_at = response.created_at.ok_or(Error::InvalidParameter)?; + let created_at = DateTime::from_timestamp(created_at.seconds, created_at.nanos as u32) + .ok_or(Error::InvalidParameter)?; + // If the persistent cache task is not found, check if the storage has enough space to // store the persistent cache task. if let Ok(None) = self.get(task_id) { @@ -449,6 +455,7 @@ impl PersistentCacheTask { request.persistent, response.piece_length, response.content_length, + created_at.naive_utc(), ) }