Skip to content

Commit

Permalink
feat: support persist cache task when scheduler replicates task (#953)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jan 26, 2025
1 parent e787afe commit 58da38d
Show file tree
Hide file tree
Showing 21 changed files with 296 additions and 441 deletions.
21 changes: 11 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.2.5"
version = "0.2.6"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,15 +22,15 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.2.5" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.5" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.5" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.5" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.5" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.5" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.5" }
dragonfly-client = { path = "dragonfly-client", version = "0.2.6" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.6" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.6" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.6" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.6" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.6" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.6" }
thiserror = "1.0"
dragonfly-api = "=2.1.16"
dragonfly-api = "=2.1.23"
reqwest = { version = "0.12.4", features = [
"stream",
"native-tls",
Expand Down Expand Up @@ -102,6 +102,7 @@ tempfile = "3.14.0"
tokio-rustls = "0.25.0-alpha.4"
serde_json = "1.0.137"
lru = "0.12.5"
fs2 = "0.4.3"

[profile.release]
opt-level = "z"
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client-config/src/dfdaemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ fn default_scheduler_announce_interval() -> Duration {
/// default_scheduler_schedule_timeout is the default timeout for scheduling.
#[inline]
fn default_scheduler_schedule_timeout() -> Duration {
Duration::from_secs(10)
Duration::from_secs(180)
}

/// default_dynconfig_refresh_interval is the default interval to refresh dynamic configuration from manager.
Expand Down
4 changes: 4 additions & 0 deletions dragonfly-client-core/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ pub enum DFError {
#[error{"hashring {0} is failed"}]
HashRing(String),

/// NoSpace is the error when there is no space left on device.
#[error("no space left on device: {0}")]
NoSpace(String),

/// HostNotFound is the error when the host is not found.
#[error{"host {0} not found"}]
HostNotFound(String),
Expand Down
1 change: 1 addition & 0 deletions dragonfly-client-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tokio-util.workspace = true
sha2.workspace = true
crc.workspace = true
base16ct.workspace = true
fs2.workspace = true
num_cpus = "1.0"
bincode = "1.3.3"
rayon = "1.10.0"
Expand Down
27 changes: 27 additions & 0 deletions dragonfly-client-storage/src/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,33 @@ impl Content {
Ok(Content { config, dir })
}

/// available_space returns the available space of the disk.
pub fn available_space(&self) -> Result<u64> {
let stat = fs2::statvfs(&self.dir)?;
Ok(stat.available_space())
}

/// total_space returns the total space of the disk.
pub fn total_space(&self) -> Result<u64> {
let stat = fs2::statvfs(&self.dir)?;
Ok(stat.total_space())
}

/// has_enough_space checks if the storage has enough space to store the content.
pub fn has_enough_space(&self, content_length: u64) -> Result<bool> {
let available_space = self.available_space()?;
if available_space < content_length {
warn!(
"not enough space to store the persistent cache task: available_space={}, content_length={}",
available_space, content_length
);

return Ok(false);
}

Ok(true)
}

/// hard_link_or_copy_task hard links or copies the task content to the destination.
#[instrument(skip_all)]
pub async fn hard_link_or_copy_task(
Expand Down
23 changes: 22 additions & 1 deletion dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncRead;
use tracing::{debug, error, instrument};
use tracing::{debug, error, instrument, warn};

pub mod content;
pub mod metadata;
Expand Down Expand Up @@ -59,6 +59,21 @@ impl Storage {
})
}

/// total_space returns the total space of the disk.
pub fn total_space(&self) -> Result<u64> {
self.content.total_space()
}

/// available_space returns the available space of the disk.
pub fn available_space(&self) -> Result<u64> {
self.content.available_space()
}

/// has_enough_space checks if the storage has enough space to store the content.
pub fn has_enough_space(&self, content_length: u64) -> Result<bool> {
self.content.has_enough_space(content_length)
}

/// hard_link_or_copy_task hard links or copies the task content to the destination.
#[instrument(skip_all)]
pub async fn hard_link_or_copy_task(
Expand Down Expand Up @@ -245,6 +260,12 @@ impl Storage {
self.metadata.get_persistent_cache_task(id)
}

/// persist_persistent_cache_task persists the persistent cache task metadata.
#[instrument(skip_all)]
pub fn persist_persistent_cache_task(&self, id: &str) -> Result<metadata::PersistentCacheTask> {
self.metadata.persist_persistent_cache_task(id)
}

/// is_persistent_cache_task_exists returns whether the persistent cache task exists.
#[instrument(skip_all)]
pub fn is_persistent_cache_task_exists(&self, id: &str) -> Result<bool> {
Expand Down
16 changes: 16 additions & 0 deletions dragonfly-client-storage/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,22 @@ impl<E: StorageEngineOwned> Metadata<E> {
Ok(task)
}

/// persist_persistent_cache_task persists the persistent cache task metadata.
#[instrument(skip_all)]
pub fn persist_persistent_cache_task(&self, id: &str) -> Result<PersistentCacheTask> {
let task = match self.db.get::<PersistentCacheTask>(id.as_bytes())? {
Some(mut task) => {
task.persistent = true;
task.updated_at = Utc::now().naive_utc();
task
}
None => return Err(Error::TaskNotFound(id.to_string())),
};

self.db.put(id.as_bytes(), &task)?;
Ok(task)
}

/// get_persistent_cache_task gets the persistent cache task metadata.
#[instrument(skip_all)]
pub fn get_persistent_cache_task(&self, id: &str) -> Result<Option<PersistentCacheTask>> {
Expand Down
25 changes: 1 addition & 24 deletions dragonfly-client-storage/src/storage_engine/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ impl RocksdbStorageEngine {

// If the storage is kept, open the db and drop the unused column families.
// Otherwise, destroy the db.
if keep {
drop_unused_cfs(&dir);
} else {
if !keep {
rocksdb::DB::destroy(&options, &dir).unwrap_or_else(|err| {
warn!("destroy {:?} failed: {}", dir, err);
});
Expand Down Expand Up @@ -259,24 +257,3 @@ where
db.cf_handle(cf_name)
.ok_or_else(|| Error::ColumnFamilyNotFound(cf_name.to_string()))
}

/// drop_unused_cfs drops the unused column families.
fn drop_unused_cfs(dir: &Path) {
let old_cf_names = vec!["task", "piece", "cache_task"];
let unused_cf_names = vec!["cache_task"];

let mut db = match rocksdb::DB::open_cf(&rocksdb::Options::default(), dir, old_cf_names) {
Ok(db) => db,
Err(err) => {
warn!("open cf failed: {}", err);
return;
}
};

for cf_name in unused_cf_names {
match db.drop_cf(cf_name) {
Ok(_) => info!("drop cf [{}] success", cf_name),
Err(err) => warn!("drop cf [{}] failed: {}", cf_name, err),
}
}
}
2 changes: 1 addition & 1 deletion dragonfly-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ percent-encoding.workspace = true
tokio-rustls.workspace = true
serde_json.workspace = true
lru.workspace = true
fs2.workspace = true
lazy_static = "1.5"
tracing-log = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "chrono"] }
Expand All @@ -81,7 +82,6 @@ sysinfo = "0.32.1"
tower = "0.4.13"
indicatif = "0.17.9"
dashmap = "6.1.0"
fs2 = "0.4.3"
hashring = "0.3.6"
fslock = "0.2.1"
leaky-bucket = "1.1.2"
Expand Down
17 changes: 17 additions & 0 deletions dragonfly-client/src/bin/dfcache/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ pub struct ImportCommand {
#[arg(help = "Specify the path of the file to import")]
path: PathBuf,

#[arg(
long = "id",
required = false,
help = "Specify the id of the persistent cache task, its length must be 64 bytes. If id is none, dfdaemon will generate the new task id based on the file content, tag and application by wyhash algorithm."
)]
id: Option<String>,

#[arg(
long = "persistent-replica-count",
default_value_t = default_dfcache_persistent_replica_count(),
Expand Down Expand Up @@ -321,6 +328,7 @@ impl ImportCommand {

let persistent_cache_task = dfdaemon_download_client
.upload_persistent_cache_task(UploadPersistentCacheTaskRequest {
task_id: self.id.clone(),
path: self.path.clone().into_os_string().into_string().unwrap(),
persistent_replica_count: self.persistent_replica_count,
tag: self.tag.clone(),
Expand All @@ -341,6 +349,15 @@ impl ImportCommand {

/// validate_args validates the command line arguments.
fn validate_args(&self) -> Result<()> {
if let Some(id) = self.id.as_ref() {
if id.len() != 64 {
return Err(Error::ValidationError(format!(
"id length must be 64 bytes, but got {}",
id.len()
)));
}
}

if self.path.is_dir() {
return Err(Error::ValidationError(format!(
"path {} is a directory",
Expand Down
11 changes: 0 additions & 11 deletions dragonfly-client/src/bin/dfcache/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use tracing::Level;

pub mod export;
pub mod import;
pub mod remove;
pub mod stat;

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -83,15 +82,6 @@ pub enum Command {
long_about = "Stat a file in Dragonfly P2P network by task ID. If stat successfully, it will return the file information."
)]
Stat(stat::StatCommand),

#[command(
name = "rm",
author,
version,
about = "Remove a file from Dragonfly P2P network",
long_about = "Remove the P2P cache in Dragonfly P2P network by task ID."
)]
Remove(remove::RemoveCommand),
}

/// Implement the execute for Command.
Expand All @@ -102,7 +92,6 @@ impl Command {
Self::Import(cmd) => cmd.execute().await,
Self::Export(cmd) => cmd.execute().await,
Self::Stat(cmd) => cmd.execute().await,
Self::Remove(cmd) => cmd.execute().await,
}
}
}
Expand Down
Loading

0 comments on commit 58da38d

Please sign in to comment.