Skip to content

Commit

Permalink
feat: add leave task and remove task finished only download from local (
Browse files Browse the repository at this point in the history
#480)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored May 16, 2024
1 parent 5c8e9f5 commit 05d0f93
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 132 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.64"
version = "0.1.65"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,15 +22,15 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.64" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.64" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.64" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.64" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.64" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.64" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.64" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.65" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.65" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.65" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.65" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.65" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.65" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.65" }
thiserror = "1.0"
dragonfly-api = "2.0.112"
dragonfly-api = "2.0.113"
reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.2", features = ["full"] }
Expand Down
3 changes: 1 addition & 2 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ impl Storage {
pub fn download_task_started(
&self,
id: &str,
peer_id: &str,
piece_length: u64,
response_header: Option<HeaderMap>,
) -> Result<metadata::Task> {
self.metadata
.download_task_started(id, peer_id, piece_length, response_header)
.download_task_started(id, piece_length, response_header)
}

// download_task_finished updates the metadata of the task when the task downloads finished.
Expand Down
18 changes: 3 additions & 15 deletions dragonfly-client-storage/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::http::reqwest_headermap_to_hashmap;
use reqwest::header::{self, HeaderMap};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
use tracing::error;
Expand All @@ -32,8 +32,6 @@ pub struct Task {
// id is the task id.
pub id: String,

pub peer_ids: HashSet<String>,

// piece_length is the length of the piece.
pub piece_length: u64,

Expand Down Expand Up @@ -226,7 +224,6 @@ impl<E: StorageEngineOwned> Metadata<E> {
pub fn download_task_started(
&self,
id: &str,
peer_id: &str,
piece_length: u64,
response_header: Option<HeaderMap>,
) -> Result<Task> {
Expand All @@ -241,7 +238,6 @@ impl<E: StorageEngineOwned> Metadata<E> {
// If the task exists, update the task metadata.
task.updated_at = Utc::now().naive_utc();
task.failed_at = None;
task.peer_ids.insert(peer_id.to_string());

// If the task has the response header, the response header
// will not be covered.
Expand All @@ -253,7 +249,6 @@ impl<E: StorageEngineOwned> Metadata<E> {
}
None => Task {
id: id.to_string(),
peer_ids: vec![peer_id.to_string()].into_iter().collect(),
piece_length,
response_header,
updated_at: Utc::now().naive_utc(),
Expand Down Expand Up @@ -556,18 +551,14 @@ mod tests {
let metadata = Metadata::new(dir.path()).unwrap();

let task_id = "task1";
let peer_id = "peer1";

// Test download_task_started.
metadata
.download_task_started(task_id, peer_id, 1024, None)
.unwrap();
metadata.download_task_started(task_id, 1024, None).unwrap();
let mut task = metadata
.get_task(task_id)
.unwrap()
.expect("task should exist after download_task_started");
assert_eq!(task.id, task_id);
assert_eq!(task.peer_ids.take(peer_id), Some(peer_id.to_string()));
assert_eq!(task.piece_length, 1024);
assert!(task.response_header.is_empty());
assert_eq!(task.uploading_count, 0);
Expand Down Expand Up @@ -616,11 +607,8 @@ mod tests {

// Test get_tasks.
let task_id = "task2";
let peer_id = "peer2";

metadata
.download_task_started(task_id, peer_id, 1024, None)
.unwrap();
metadata.download_task_started(task_id, 1024, None).unwrap();
let tasks = metadata.get_tasks().unwrap();
assert_eq!(tasks.len(), 2, "should get 2 tasks in total");

Expand Down
29 changes: 13 additions & 16 deletions dragonfly-client/src/gc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::grpc::scheduler::SchedulerClient;
use crate::shutdown;
use dragonfly_api::scheduler::v2::LeavePeerRequest;
use dragonfly_api::scheduler::v2::LeaveTaskRequest;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result;
use dragonfly_client_storage::{metadata, Storage};
Expand Down Expand Up @@ -195,20 +195,17 @@ impl GC {

// leave_task_from_scheduler leaves the task from the scheduler.
async fn leave_task_from_scheduler(&self, task: metadata::Task) {
for peer_id in task.peer_ids {
self.scheduler_client
.leave_peer(
task.id.as_str(),
LeavePeerRequest {
host_id: self.host_id.clone(),
task_id: task.id.clone(),
peer_id: peer_id.clone(),
},
)
.await
.unwrap_or_else(|err| {
error!("failed to leave peer {}: {}", peer_id, err);
});
}
self.scheduler_client
.leave_task(
task.id.as_str(),
LeaveTaskRequest {
host_id: self.host_id.clone(),
task_id: task.id.clone(),
},
)
.await
.unwrap_or_else(|err| {
error!("failed to leave peer {}: {}", task.id, err);
});
}
}
2 changes: 1 addition & 1 deletion dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
info!("download task started: {:?}", download);
let task = match self
.task
.download_started(task_id.as_str(), peer_id.as_str(), download.clone())
.download_started(task_id.as_str(), download.clone())
.await
{
Err(ClientError::HTTP(err)) => {
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
info!("download task started: {:?}", download);
let task = match self
.task
.download_started(task_id.as_str(), peer_id.as_str(), download.clone())
.download_started(task_id.as_str(), download.clone())
.await
{
Err(ClientError::HTTP(err)) => {
Expand Down
13 changes: 12 additions & 1 deletion dragonfly-client/src/grpc/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use dragonfly_api::manager::v2::Scheduler;
use dragonfly_api::scheduler::v2::{
scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest,
AnnouncePeerRequest, AnnouncePeerResponse, ExchangePeerRequest, ExchangePeerResponse,
LeaveHostRequest, LeavePeerRequest, StatPeerRequest, StatTaskRequest,
LeaveHostRequest, LeavePeerRequest, LeaveTaskRequest, StatPeerRequest, StatTaskRequest,
};
use dragonfly_client_core::error::{ErrorType, ExternalError, OrErr};
use dragonfly_client_core::{Error, Result};
Expand Down Expand Up @@ -146,6 +146,17 @@ impl SchedulerClient {
Ok(response.into_inner())
}

// leave_task tells the scheduler that the task is leaving.
#[instrument(skip(self))]
pub async fn leave_task(&self, task_id: &str, request: LeaveTaskRequest) -> Result<()> {
let request = Self::make_request(request);
self.client(task_id.to_string())
.await?
.leave_task(request)
.await?;
Ok(())
}

// init_announce_host announces the host to the scheduler.
#[instrument(skip(self))]
pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> {
Expand Down
Loading

0 comments on commit 05d0f93

Please sign in to comment.