Skip to content

Commit

Permalink
feat: rename remote peer to parent (#901)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Dec 17, 2024
1 parent d6b613e commit c67f13e
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 61 deletions.
6 changes: 3 additions & 3 deletions dragonfly-client-core/src/error/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ pub struct BackendError {
pub header: Option<reqwest::header::HeaderMap>,
}

/// DownloadFromRemotePeerFailed is the error when the download from remote peer is failed.
/// DownloadFromParentFailed is the error when the download from parent is failed.
#[derive(Debug, thiserror::Error)]
#[error("download piece {piece_number} from remote peer {parent_id} failed")]
pub struct DownloadFromRemotePeerFailed {
#[error("download piece {piece_number} from parent {parent_id} failed")]
pub struct DownloadFromParentFailed {
/// piece_number is the number of the piece.
pub piece_number: u32,

Expand Down
6 changes: 3 additions & 3 deletions dragonfly-client-core/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use errors::ErrorType;
pub use errors::ExternalError;

pub use errors::OrErr;
pub use errors::{BackendError, DownloadFromRemotePeerFailed};
pub use errors::{BackendError, DownloadFromParentFailed};

/// DFError is the error for dragonfly.
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -70,9 +70,9 @@ pub enum DFError {
#[error{"available schedulers not found"}]
AvailableSchedulersNotFound,

/// DownloadFromRemotePeerFailed is the error when the download from remote peer is failed.
/// DownloadFromParentFailed is the error when the download from parent is failed.
#[error(transparent)]
DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed),
DownloadFromParentFailed(DownloadFromParentFailed),

/// ColumnFamilyNotFound is the error when the column family is not found.
#[error{"column family {0} not found"}]
Expand Down
4 changes: 2 additions & 2 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@ impl Storage {
)
}

/// download_piece_from_remote_peer_finished is used for downloading piece from remote peer.
/// download_piece_from_parent_finished is used for downloading piece from parent.
#[instrument(skip_all)]
pub async fn download_piece_from_remote_peer_finished<R: AsyncRead + Unpin + ?Sized>(
pub async fn download_piece_from_parent_finished<R: AsyncRead + Unpin + ?Sized>(
&self,
piece_id: &str,
task_id: &str,
Expand Down
8 changes: 4 additions & 4 deletions dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
/// SyncPiecesStream is the stream of the sync pieces response.
type SyncPiecesStream = ReceiverStream<Result<SyncPiecesResponse, Status>>;

/// sync_pieces provides the piece metadata for remote peer.
/// sync_pieces provides the piece metadata for parent.
#[instrument(skip_all, fields(host_id, remote_host_id, task_id))]
async fn sync_pieces(
&self,
Expand Down Expand Up @@ -760,7 +760,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
Ok(Response::new(ReceiverStream::new(out_stream_rx)))
}

/// download_piece provides the piece content for remote peer.
/// download_piece provides the piece content for parent.
#[instrument(skip_all, fields(host_id, remote_host_id, task_id, piece_id))]
async fn download_piece(
&self,
Expand Down Expand Up @@ -1183,7 +1183,7 @@ impl DfdaemonUploadClient {
Ok(response)
}

/// sync_pieces provides the piece metadata for remote peer.
/// sync_pieces provides the piece metadata for parent.
#[instrument(skip_all)]
pub async fn sync_pieces(
&self,
Expand All @@ -1194,7 +1194,7 @@ impl DfdaemonUploadClient {
Ok(response)
}

/// download_piece provides the piece content for remote peer.
/// download_piece provides the piece content for parent.
#[instrument(skip_all)]
pub async fn download_piece(
&self,
Expand Down
40 changes: 20 additions & 20 deletions dragonfly-client/src/resource/persistent_cache_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use dragonfly_api::scheduler::v2::{
};
use dragonfly_client_backend::BackendFactory;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{error::DownloadFromRemotePeerFailed, Error};
use dragonfly_client_core::{error::DownloadFromParentFailed, Error};
use dragonfly_client_core::{
error::{ErrorType, OrErr},
Result as ClientResult,
Expand Down Expand Up @@ -643,7 +643,7 @@ impl PersistentCacheTask {
announce_persistent_cache_peer_response::Response::NormalPersistentCacheTaskResponse(
response,
) => {
// If the persistent cache task is normal, download the pieces from the remote peer.
// If the persistent cache task is normal, download the pieces from the parent.
info!(
"normal persistent cache task response: {:?}",
response
Expand Down Expand Up @@ -683,9 +683,9 @@ impl PersistentCacheTask {
interested_pieces.clone(),
);

// Download the pieces from the remote peer.
// Download the pieces from the parent.
let partial_finished_pieces = match self
.download_partial_with_scheduler_from_remote_peer(
.download_partial_with_scheduler_from_parent(
task,
host_id,
peer_id,
Expand All @@ -698,15 +698,15 @@ impl PersistentCacheTask {
{
Ok(partial_finished_pieces) => {
info!(
"schedule {} finished {} pieces from remote peer",
"schedule {} finished {} pieces from parent",
schedule_count,
partial_finished_pieces.len()
);

partial_finished_pieces
}
Err(err) => {
error!("download from remote peer error: {:?}", err);
error!("download from parent error: {:?}", err);
return Ok(finished_pieces);
}
};
Expand Down Expand Up @@ -761,7 +761,7 @@ impl PersistentCacheTask {
ReschedulePersistentCachePeerRequest {
candidate_parents: response.candidate_cache_parents,
description: Some(
"not all pieces are downloaded from remote peer"
"not all pieces are downloaded from parent"
.to_string(),
),
},
Expand All @@ -787,10 +787,10 @@ impl PersistentCacheTask {
Ok(finished_pieces)
}

/// download_partial_with_scheduler_from_remote_peer downloads a partial persistent cache task with scheduler from a remote peer.
/// download_partial_with_scheduler_from_parent downloads a partial persistent cache task with scheduler from a parent.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn download_partial_with_scheduler_from_remote_peer(
async fn download_partial_with_scheduler_from_parent(
&self,
task: &metadata::PersistentCacheTask,
host_id: &str,
Expand Down Expand Up @@ -823,9 +823,9 @@ impl PersistentCacheTask {
self.config.download.concurrent_piece_count as usize,
));

// Download the pieces from the remote peers.
// Download the pieces from the parents.
while let Some(collect_piece) = piece_collector_rx.recv().await {
async fn download_from_remote_peer(
async fn download_from_parent(
task_id: String,
host_id: String,
peer_id: String,
Expand All @@ -843,13 +843,13 @@ impl PersistentCacheTask {

let piece_id = storage.piece_id(task_id.as_str(), number);
info!(
"start to download piece {} from remote peer {:?}",
"start to download piece {} from parent {:?}",
piece_id,
parent.id.clone()
);

let metadata = piece_manager
.download_from_remote_peer(
.download_from_parent(
peer_id.as_str(),
host_id.as_str(),
task_id.as_str(),
Expand All @@ -861,12 +861,12 @@ impl PersistentCacheTask {
.await
.map_err(|err| {
error!(
"download piece {} from remote peer {:?} error: {:?}",
"download piece {} from parent {:?} error: {:?}",
piece_id,
parent.id.clone(),
err
);
Error::DownloadFromRemotePeerFailed(DownloadFromRemotePeerFailed {
Error::DownloadFromParentFailed(DownloadFromParentFailed {
piece_number: number,
parent_id: parent.id.clone(),
})
Expand Down Expand Up @@ -932,15 +932,15 @@ impl PersistentCacheTask {
})?;

info!(
"finished piece {} from remote peer {:?}",
"finished piece {} from parent {:?}",
piece_id, metadata.parent_id
);

Ok(metadata)
}

join_set.spawn(
download_from_remote_peer(
download_from_parent(
task.id.clone(),
host_id.to_string(),
peer_id.to_string(),
Expand Down Expand Up @@ -972,9 +972,9 @@ impl PersistentCacheTask {
// Store the finished piece.
finished_pieces.push(metadata.clone());
}
Err(Error::DownloadFromRemotePeerFailed(err)) => {
Err(Error::DownloadFromParentFailed(err)) => {
error!(
"download piece {} from remote peer {} error: {:?}",
"download piece {} from parent {} error: {:?}",
self.storage.piece_id(task.id.as_str(), err.piece_number),
err.parent_id,
err
Expand Down Expand Up @@ -1007,7 +1007,7 @@ impl PersistentCacheTask {
continue;
}
Err(err) => {
error!("download from remote peer error: {:?}", err);
error!("download from parent error: {:?}", err);
continue;
}
}
Expand Down
6 changes: 3 additions & 3 deletions dragonfly-client/src/resource/piece.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,10 @@ impl Piece {
);
}

/// download_from_remote_peer downloads a single piece from a remote peer.
/// download_from_parent downloads a single piece from a parent.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(piece_id))]
pub async fn download_from_remote_peer(
pub async fn download_from_parent(
&self,
piece_id: &str,
host_id: &str,
Expand Down Expand Up @@ -455,7 +455,7 @@ impl Piece {
// Record the finish of downloading piece.
match self
.storage
.download_piece_from_remote_peer_finished(
.download_piece_from_parent_finished(
piece_id,
task_id,
offset,
Expand Down
8 changes: 4 additions & 4 deletions dragonfly-client/src/resource/piece_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tracing::{error, info, instrument, Instrument};

/// CollectedParent is the parent peer collected from the remote peer.
/// CollectedParent is the parent peer collected from the parent.
#[derive(Clone, Debug)]
pub struct CollectedParent {
/// id is the id of the parent.
Expand Down Expand Up @@ -110,7 +110,7 @@ impl PieceCollector {
let (collected_piece_tx, collected_piece_rx) = mpsc::channel(10 * 1024);
tokio::spawn(
async move {
Self::collect_from_remote_peers(
Self::collect_from_parents(
config,
&host_id,
&task_id,
Expand All @@ -131,10 +131,10 @@ impl PieceCollector {
collected_piece_rx
}

/// collect_from_remote_peers collects pieces from remote peers.
/// collect_from_parents collects pieces from parents.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn collect_from_remote_peers(
async fn collect_from_parents(
config: Arc<Config>,
host_id: &str,
task_id: &str,
Expand Down
Loading

0 comments on commit c67f13e

Please sign in to comment.