Skip to content

Commit

Permalink
feat: wait for piece finished before update piece metadata (#535)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jun 13, 2024
1 parent 3fa0e87 commit 832af14
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 31 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.79"
version = "0.1.80"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.79" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.79" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.79" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.79" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.79" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.79" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.79" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.80" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.80" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.80" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.80" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.80" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.80" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.80" }
thiserror = "1.0"
dragonfly-api = "2.0.115"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
Expand Down
17 changes: 5 additions & 12 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncRead;
use tracing::info;
use tracing::{error, info};

pub mod content;
pub mod metadata;
Expand Down Expand Up @@ -210,6 +210,9 @@ impl Storage {
number: u32,
range: Option<Range>,
) -> Result<impl AsyncRead> {
// Wait for the piece to be finished.
self.wait_for_piece_finished(task_id, number).await?;

// Start uploading the task.
self.metadata.upload_task_started(task_id)?;

Expand All @@ -220,16 +223,6 @@ impl Storage {
return Err(err);
}

// Wait for the piece to be finished.
if let Err(err) = self.wait_for_piece_finished(task_id, number).await {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;

// Failed uploading the piece.
self.metadata.upload_piece_failed(task_id, number)?;
return Err(err);
}

// Get the piece metadata and return the content of the piece.
match self.metadata.get_piece(task_id, number)? {
Some(piece) => {
Expand Down Expand Up @@ -294,7 +287,6 @@ impl Storage {
loop {
tokio::select! {
_ = interval.tick() => {

let piece = self
.get_piece(task_id, number)?
.ok_or_else(|| Error::PieceNotFound(self.piece_id(task_id, number)))?;
Expand All @@ -311,6 +303,7 @@ impl Storage {
wait_for_piece_count += 1;
}
_ = &mut piece_timeout => {
self.metadata.wait_for_piece_finished_failed(task_id, number).unwrap_or_else(|err| error!("delete piece metadata failed: {}", err));
return Err(Error::WaitForPieceFinishedTimeout(self.piece_id(task_id, number)));
}
}
Expand Down
18 changes: 15 additions & 3 deletions dragonfly-client-storage/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tracing::error;
use tracing::{error, info};

use crate::storage_engine::{rocksdb::RocksdbStorageEngine, DatabaseObject, StorageEngineOwned};

Expand Down Expand Up @@ -391,6 +391,7 @@ impl<E: StorageEngineOwned> Metadata<E> {

/// delete_task deletes the task metadata.
pub fn delete_task(&self, task_id: &str) -> Result<()> {
info!("delete task metadata {}", task_id);
self.db.delete::<Task>(task_id.as_bytes())
}

Expand Down Expand Up @@ -441,8 +442,12 @@ impl<E: StorageEngineOwned> Metadata<E> {

/// download_piece_failed updates the metadata of the piece when the piece downloads failed.
pub fn download_piece_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.db
.delete::<Piece>(self.piece_id(task_id, number).as_bytes())
self.delete_piece(task_id, number)
}

// wait_for_piece_finished_failed waits for the piece to be finished or failed.
pub fn wait_for_piece_finished_failed(&self, task_id: &str, number: u32) -> Result<()> {
self.delete_piece(task_id, number)
}

/// upload_piece_started updates the metadata of the piece when piece uploads started.
Expand Down Expand Up @@ -508,6 +513,13 @@ impl<E: StorageEngineOwned> Metadata<E> {
iter.map(|ele| ele.map(|(_, piece)| piece)).collect()
}

/// delete_piece deletes the piece metadata.
pub fn delete_piece(&self, task_id: &str, number: u32) -> Result<()> {
info!("delete piece metadata {}", self.piece_id(task_id, number));
self.db
.delete::<Piece>(self.piece_id(task_id, number).as_bytes())
}

/// delete_pieces deletes the piece metadatas.
pub fn delete_pieces(&self, task_id: &str) -> Result<()> {
let iter = self.db.prefix_iter::<Piece>(task_id.as_bytes())?;
Expand Down

0 comments on commit 832af14

Please sign in to comment.