Skip to content

Commit

Permalink
feat: support the rate limit for preheat request (#881)
Browse files Browse the repository at this point in the history
Signed-off-by: chlins <[email protected]>
Co-authored-by: suhan.zcy <[email protected]>
  • Loading branch information
chlins and suhan.zcy authored Dec 6, 2024
1 parent d8f0931 commit 865c778
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 30 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 19 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.123"
version = "0.1.124"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,16 +22,25 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.123" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.123" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.123" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.123" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.123" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.123" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.123" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.124" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.124" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.124" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.124" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.124" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.124" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.124" }
thiserror = "1.0"
dragonfly-api = "=2.0.173"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls", "gzip", "brotli", "zstd", "deflate"] }
dragonfly-api = "=2.0.174"
reqwest = { version = "0.12.4", features = [
"stream",
"native-tls",
"default-tls",
"rustls-tls",
"gzip",
"brotli",
"zstd",
"deflate",
] }
reqwest-middleware = "0.4"
rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.5", features = ["full"] }
Expand Down
13 changes: 13 additions & 0 deletions dragonfly-client-config/src/dfdaemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ pub fn default_proxy_read_buffer_size() -> usize {
32 * 1024
}

/// default_prefetch_rate_limit is the default rate limit of the prefetch speed in GiB/Mib/Kib per second.
/// The prefetch request has lower priority so limit the rate to avoid occupying the bandwidth impact other download tasks.
#[inline]
fn default_prefetch_rate_limit() -> ByteSize {
// Default rate limit is 2GiB/s.
ByteSize::gib(2)
}

/// default_s3_filtered_query_params is the default filtered query params with s3 protocol to generate the task id.
#[inline]
fn s3_filtered_query_params() -> Vec<String> {
Expand Down Expand Up @@ -1089,6 +1097,10 @@ pub struct Proxy {
/// prefetch pre-downloads full of the task when download with range request.
pub prefetch: bool,

/// rate_limit is the rate limit of the prefetch speed in GiB/Mib/Kib per second.
#[serde(with = "bytesize_serde", default = "default_prefetch_rate_limit")]
pub prefetch_rate_limit: ByteSize,

/// cache_capacity is the capacity of the cache by LRU algorithm for HTTP proxy, default is 150.
/// The cache is used to store the hot piece content of the task, piece length is 4MB~16MB.
/// If the capacity is 150, the cache size is 600MB~2.4GB, need to adjust according to the
Expand All @@ -1110,6 +1122,7 @@ impl Default for Proxy {
registry_mirror: RegistryMirror::default(),
disable_back_to_source: false,
prefetch: false,
prefetch_rate_limit: default_prefetch_rate_limit(),
cache_capacity: default_proxy_cache_capacity(),
read_buffer_size: default_proxy_read_buffer_size(),
}
Expand Down
11 changes: 6 additions & 5 deletions dragonfly-client/src/bin/dfget/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ Examples:
# Download a file from Amazon Simple Storage Service(S3).
$ dfget s3://<bucket>/<path> -O /tmp/file.txt --storage-access-key-id=<access_key_id> --storage-access-key-secret=<access_key_secret>
# Download a file from Google Cloud Storage Service(GCS).
$ dfget gs://<bucket>/<path> -O /tmp/file.txt --storage-credential-path=<credential_path>
# Download a file from Azure Blob Storage Service(ABS).
$ dfget abs://<container>/<path> -O /tmp/file.txt --storage-access-key-id=<account_name> --storage-access-key-secret=<account_key>
# Download a file from Aliyun Object Storage Service(OSS).
$ dfget oss://<bucket>/<path> -O /tmp/file.txt --storage-access-key-id=<access_key_id> --storage-access-key-secret=<access_key_secret> --storage-endpoint=<endpoint>
# Download a file from Huawei Cloud Object Storage Service(OBS).
$ dfget obs://<bucket>/<path> -O /tmp/file.txt --storage-access-key-id=<access_key_id> --storage-access-key-secret=<access_key_secret> --storage-endpoint=<endpoint>
# Download a file from Tencent Cloud Object Storage Service(COS).
$ dfget cos://<bucket>/<path> -O /tmp/file.txt --storage-access-key-id=<access_key_id> --storage-access-key-secret=<access_key_secret> --storage-endpoint=<endpoint>
"#;
Expand Down Expand Up @@ -738,6 +738,7 @@ async fn download(
disable_back_to_source: args.disable_back_to_source,
certificate_chain: Vec::new(),
prefetch: false,
is_prefetch: false,
object_storage,
hdfs,
}),
Expand Down
3 changes: 3 additions & 0 deletions dragonfly-client/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ pub async fn prefetch_task(
// Remove the prefetch flag for prevent the infinite loop.
download.prefetch = false;

// Mark the is_prefetch flag as true to represents it is a prefetch request.
download.is_prefetch = true;

// Remove the range header for download full task.
download
.request_header
Expand Down
2 changes: 2 additions & 0 deletions dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,7 @@ async fn proxy_via_dfdaemon(
piece.length,
download_task_started_response.range,
true,
false,
)
.await
{
Expand Down Expand Up @@ -1089,6 +1090,7 @@ fn make_download_task_request(
prefetch: need_prefetch(config.clone(), &header),
object_storage: None,
hdfs: None,
is_prefetch: false,
}),
})
}
Expand Down
1 change: 1 addition & 0 deletions dragonfly-client/src/resource/persistent_cache_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ impl PersistentCacheTask {
number,
length,
parent.clone(),
false,
)
.await
.map_err(|err| {
Expand Down
40 changes: 35 additions & 5 deletions dragonfly-client/src/resource/piece.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub struct Piece {

/// upload_rate_limiter is the rate limiter of the upload speed in bps(bytes per second).
upload_rate_limiter: Arc<RateLimiter>,

/// prefetch_rate_limiter is the rate limiter of the prefetch speed in bps(bytes per second).
prefetch_rate_limiter: Arc<RateLimiter>,
}

/// Piece implements the piece manager.
Expand Down Expand Up @@ -106,6 +109,13 @@ impl Piece {
.interval(Duration::from_secs(1))
.build(),
),
prefetch_rate_limiter: Arc::new(
RateLimiter::builder()
.initial(config.proxy.prefetch_rate_limit.as_u64() as usize)
.refill(config.proxy.prefetch_rate_limit.as_u64() as usize)
.interval(Duration::from_secs(1))
.build(),
),
}
}

Expand Down Expand Up @@ -343,13 +353,20 @@ impl Piece {
length: u64,
range: Option<Range>,
disable_rate_limit: bool,
is_prefetch: bool,
) -> Result<impl AsyncRead> {
// Span record the piece_id.
Span::current().record("piece_id", piece_id);

// Acquire the download rate limiter.
if !disable_rate_limit {
self.download_rate_limiter.acquire(length as usize).await;
if is_prefetch {
// Acquire the prefetch rate limiter.
self.prefetch_rate_limiter.acquire(length as usize).await;
} else {
// Acquire the download rate limiter.
self.download_rate_limiter.acquire(length as usize).await;
}
}

// Upload the piece content.
Expand All @@ -368,6 +385,7 @@ impl Piece {
}

/// download_from_remote_peer downloads a single piece from a remote peer.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(piece_id))]
pub async fn download_from_remote_peer(
&self,
Expand All @@ -377,12 +395,18 @@ impl Piece {
number: u32,
length: u64,
parent: piece_collector::CollectedParent,
is_prefetch: bool,
) -> Result<metadata::Piece> {
// Span record the piece_id.
Span::current().record("piece_id", piece_id);

// Acquire the download rate limiter.
self.download_rate_limiter.acquire(length as usize).await;
if is_prefetch {
// Acquire the prefetch rate limiter.
self.prefetch_rate_limiter.acquire(length as usize).await;
} else {
// Acquire the download rate limiter.
self.download_rate_limiter.acquire(length as usize).await;
}

// Record the start of downloading piece.
let piece = self
Expand Down Expand Up @@ -506,14 +530,20 @@ impl Piece {
offset: u64,
length: u64,
request_header: HeaderMap,
is_prefetch: bool,
object_storage: Option<ObjectStorage>,
hdfs: Option<Hdfs>,
) -> Result<metadata::Piece> {
// Span record the piece_id.
Span::current().record("piece_id", piece_id);

// Acquire the download rate limiter.
self.download_rate_limiter.acquire(length as usize).await;
if is_prefetch {
// Acquire the prefetch rate limiter.
self.prefetch_rate_limiter.acquire(length as usize).await;
} else {
// Acquire the download rate limiter.
self.download_rate_limiter.acquire(length as usize).await;
}

// Record the start of downloading piece.
let piece = self
Expand Down
Loading

0 comments on commit 865c778

Please sign in to comment.