Skip to content

Commit

Permalink
feat: support download persistent cache task from parents (#950)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jan 21, 2025
1 parent 1093318 commit d4096db
Show file tree
Hide file tree
Showing 12 changed files with 1,249 additions and 217 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.5
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.5" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.5" }
thiserror = "1.0"
dragonfly-api = "=2.1.9"
dragonfly-api = "=2.1.12"
reqwest = { version = "0.12.4", features = [
"stream",
"native-tls",
Expand Down
177 changes: 177 additions & 0 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,144 @@ impl Storage {
self.metadata.piece_id(task_id, number)
}

/// download_persistent_cache_piece_started updates the metadata of the persistent cache piece and writes
/// the data of piece to file when the persistent cache piece downloads started.
#[instrument(skip_all)]
pub async fn download_persistent_cache_piece_started(
&self,
piece_id: &str,
number: u32,
) -> Result<metadata::Piece> {
// Wait for the piece to be finished.
match self
.wait_for_persistent_cache_piece_finished(piece_id)
.await
{
Ok(piece) => Ok(piece),
// If piece is not found or wait timeout, create piece metadata.
Err(_) => self.metadata.download_piece_started(piece_id, number),
}
}

/// download_persistent_cache_piece_from_parent_finished is used for downloading persistent cache piece from parent.
#[instrument(skip_all)]
pub async fn download_persistent_cache_piece_from_parent_finished<
R: AsyncRead + Unpin + ?Sized,
>(
&self,
piece_id: &str,
task_id: &str,
offset: u64,
expected_digest: &str,
parent_id: &str,
reader: &mut R,
) -> Result<metadata::Piece> {
let response = self
.content
.write_piece_with_crc32_castagnoli(task_id, offset, reader)
.await?;

let length = response.length;
let digest = Digest::new(Algorithm::Crc32, response.hash);

// Check the digest of the piece.
if expected_digest != digest.to_string() {
return Err(Error::DigestMismatch(
expected_digest.to_string(),
digest.to_string(),
));
}

self.metadata.download_piece_finished(
piece_id,
offset,
length,
digest.to_string().as_str(),
Some(parent_id.to_string()),
)
}

/// download_persistent_cache_piece_failed updates the metadata of the persistent cache piece when the persistent cache piece downloads failed.
#[instrument(skip_all)]
pub fn download_persistent_cache_piece_failed(&self, piece_id: &str) -> Result<()> {
self.metadata.download_piece_failed(piece_id)
}

/// upload_persistent_cache_piece updates the metadata of the piece and_then
/// returns the data of the piece.
#[instrument(skip_all)]
pub async fn upload_persistent_cache_piece(
&self,
piece_id: &str,
task_id: &str,
range: Option<Range>,
) -> Result<impl AsyncRead> {
// Wait for the persistent cache piece to be finished.
self.wait_for_persistent_cache_piece_finished(piece_id)
.await?;

// Start uploading the persistent cache task.
self.metadata
.upload_persistent_cache_task_started(task_id)?;

// Get the persistent cache piece metadata and return the content of the persistent cache piece.
match self.metadata.get_piece(piece_id) {
Ok(Some(piece)) => {
match self
.content
.read_piece(task_id, piece.offset, piece.length, range)
.await
{
Ok(reader) => {
// Finish uploading the persistent cache task.
self.metadata
.upload_persistent_cache_task_finished(task_id)?;
Ok(reader)
}
Err(err) => {
// Failed uploading the persistent cache task.
self.metadata.upload_persistent_cache_task_failed(task_id)?;
Err(err)
}
}
}
Ok(None) => {
// Failed uploading the persistent cache task.
self.metadata.upload_persistent_cache_task_failed(task_id)?;
Err(Error::PieceNotFound(piece_id.to_string()))
}
Err(err) => {
// Failed uploading the persistent cache task.
self.metadata.upload_persistent_cache_task_failed(task_id)?;
Err(err)
}
}
}

/// get_persistent_cache_piece returns the persistent cache piece metadata.
#[instrument(skip_all)]
pub fn get_persistent_cache_piece(&self, piece_id: &str) -> Result<Option<metadata::Piece>> {
self.metadata.get_piece(piece_id)
}

/// is_persistent_cache_piece_exists returns whether the persistent cache piece exists.
#[instrument(skip_all)]
pub fn is_persistent_cache_piece_exists(&self, piece_id: &str) -> Result<bool> {
self.metadata.is_piece_exists(piece_id)
}

/// get_persistent_cache_pieces returns the persistent cache piece metadatas.
pub fn get_persistent_cache_pieces(&self, task_id: &str) -> Result<Vec<metadata::Piece>> {
self.metadata.get_pieces(task_id)
}

/// persistent_cache_piece_id returns the persistent cache piece id.
#[inline]
#[instrument(skip_all)]
pub fn persistent_cache_piece_id(&self, task_id: &str, number: u32) -> String {
self.metadata.piece_id(task_id, number)
}

/// wait_for_piece_finished waits for the piece to be finished.
#[instrument(skip_all)]
async fn wait_for_piece_finished(&self, piece_id: &str) -> Result<metadata::Piece> {
Expand Down Expand Up @@ -552,4 +690,43 @@ impl Storage {
}
}
}

/// wait_for_persistent_cache_piece_finished waits for the persistent cache piece to be finished.
#[instrument(skip_all)]
async fn wait_for_persistent_cache_piece_finished(
&self,
piece_id: &str,
) -> Result<metadata::Piece> {
// Initialize the timeout of piece.
let piece_timeout = tokio::time::sleep(self.config.download.piece_timeout);
tokio::pin!(piece_timeout);

// Initialize the interval of piece.
let mut wait_for_piece_count = 0;
let mut interval = tokio::time::interval(DEFAULT_WAIT_FOR_PIECE_FINISHED_INTERVAL);
loop {
tokio::select! {
_ = interval.tick() => {
let piece = self
.get_persistent_cache_piece(piece_id)?
.ok_or_else(|| Error::PieceNotFound(piece_id.to_string()))?;

// If the piece is finished, return.
if piece.is_finished() {
debug!("wait piece finished success");
return Ok(piece);
}

if wait_for_piece_count > 0 {
debug!("wait piece finished");
}
wait_for_piece_count += 1;
}
_ = &mut piece_timeout => {
self.metadata.wait_for_piece_finished_failed(piece_id).unwrap_or_else(|err| error!("delete piece metadata failed: {}", err));
return Err(Error::WaitForPieceFinishedTimeout(piece_id.to_string()));
}
}
}
}
}
17 changes: 12 additions & 5 deletions dragonfly-client-util/src/id_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,15 @@ impl IDGenerator {
hasher.write(application.as_bytes());
}

// Generate the persistent cache task id.
Ok(hasher.finish().to_string())
// Generate the task id by wyhash.
let id = hasher.finish().to_string();

// Generate the persistent cache task ID. The original ID is too short, so we calculate the SHA-256
// hash to ensure it can be prefix-searched by the storage engine.
let mut hasher = Sha256::new();
hasher.update(id);

Ok(hex::encode(hasher.finalize()))
}

/// peer_id generates the peer id.
Expand Down Expand Up @@ -263,21 +270,21 @@ mod tests {
"This is a test file",
Some("tag1"),
Some("app1"),
"15288906590529426257",
"ed401a8aa6b9a47b426d2aa01245127d9ac2d1b7974ca866719da59b5456ac4d",
),
(
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"This is a test file",
None,
Some("app1"),
"16142504978820333826",
"4cbb2c5142f609e98a7d9a887c6404c7432475a52d6c64c52d543b5614a99c63",
),
(
IDGenerator::new("127.0.0.1".to_string(), "localhost".to_string(), false),
"This is a test file",
Some("tag1"),
None,
"3790865882910844849",
"65094f31f9997904f779a27ed0d1ce460c9c4082f214e7626a179f2ea491d34e",
),
];

Expand Down
Loading

0 comments on commit d4096db

Please sign in to comment.