Skip to content

Commit

Permalink
feat: add buffer size config for dfdaemon (#434)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Apr 29, 2024
1 parent 9d94519 commit 4849535
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 30 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.43"
version = "0.1.44"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.43" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.43" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.43" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.43" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.43" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.43" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.43" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.44" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.44" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.44" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.44" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.44" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.44" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.44" }
thiserror = "1.0"
dragonfly-api = "2.0.110"
reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
Expand Down
48 changes: 47 additions & 1 deletion dragonfly-client-config/src/dfdaemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,18 @@ fn default_dynconfig_refresh_interval() -> Duration {
Duration::from_secs(300)
}

// default_storage_write_buffer_size is the default buffer size for writing piece to disk, default is 16KB.
#[inline]
fn default_storage_write_buffer_size() -> usize {
16 * 1024
}

// default_storage_read_buffer_size is the default buffer size for reading piece from disk, default is 16KB.
#[inline]
fn default_storage_read_buffer_size() -> usize {
16 * 1024
}

// default_seed_peer_cluster_id is the default cluster id of seed peer.
#[inline]
fn default_seed_peer_cluster_id() -> u64 {
Expand Down Expand Up @@ -187,6 +199,12 @@ pub fn default_proxy_server_port() -> u16 {
4001
}

// default_proxy_read_buffer_size is the default buffer size for reading piece, default is 16KB.
#[inline]
pub fn default_proxy_read_buffer_size() -> usize {
16 * 1024
}

// default_s3_filtered_query_params is the default filtered query params with s3 protocol to generate the task id.
#[inline]
fn s3_filtered_query_params() -> Vec<String> {
Expand Down Expand Up @@ -578,13 +596,23 @@ pub struct Storage {
// dir is the directory to store task's metadata and content.
#[serde(default = "crate::default_storage_dir")]
pub dir: PathBuf,

// write_buffer_size is the buffer size for writing piece to disk, default is 16KB.
#[serde(default = "default_storage_write_buffer_size")]
pub write_buffer_size: usize,

// read_buffer_size is the buffer size for reading piece from disk, default is 16KB.
#[serde(default = "default_storage_read_buffer_size")]
pub read_buffer_size: usize,
}

// Storage implements Default.
impl Default for Storage {
fn default() -> Self {
Storage {
dir: crate::default_storage_dir(),
write_buffer_size: default_storage_write_buffer_size(),
read_buffer_size: default_storage_read_buffer_size(),
}
}
}
Expand Down Expand Up @@ -761,7 +789,7 @@ impl Default for RegistryMirror {
}

// Proxy is the proxy configuration for dfdaemon.
#[derive(Debug, Clone, Default, Validate, Deserialize)]
#[derive(Debug, Clone, Validate, Deserialize)]
#[serde(default, rename_all = "camelCase")]
pub struct Proxy {
// server is the proxy server configuration for dfdaemon.
Expand All @@ -779,6 +807,24 @@ pub struct Proxy {

// prefetch pre-downloads full of the task when download with range request.
pub prefetch: bool,

// read_buffer_size is the buffer size for reading piece from disk, default is 16KB.
#[serde(default = "default_proxy_read_buffer_size")]
pub read_buffer_size: usize,
}

// Proxy implements Default.
impl Default for Proxy {
fn default() -> Self {
Self {
server: ProxyServer::default(),
rules: None,
registry_mirror: RegistryMirror::default(),
disable_back_to_source: false,
prefetch: false,
read_buffer_size: default_proxy_read_buffer_size(),
}
}
}

// Security is the security configuration for dfdaemon.
Expand Down
17 changes: 10 additions & 7 deletions dragonfly-client-storage/src/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/

use dragonfly_api::common::v2::Range;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result;
use sha2::{Digest, Sha256};
use std::cmp::{max, min};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, SeekFrom};
use tokio_util::io::InspectReader;
Expand All @@ -27,11 +29,11 @@ use tracing::{error, info, warn};
// DEFAULT_DIR_NAME is the default directory name to store content.
const DEFAULT_DIR_NAME: &str = "content";

// DEFAULT_BUFFER_SIZE is the buffer size to read and write, default is 32KB.
const DEFAULT_BUFFER_SIZE: usize = 32 * 1024;

// Content is the content of a piece.
pub struct Content {
// config is the configuration of the dfdaemon.
config: Arc<Config>,

// dir is the directory to store content.
dir: PathBuf,
}
Expand All @@ -48,12 +50,12 @@ pub struct WritePieceResponse {
// Content implements the content storage.
impl Content {
// new returns a new content.
pub async fn new(dir: &Path) -> Result<Content> {
pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Content> {
let dir = dir.join(DEFAULT_DIR_NAME);
fs::create_dir_all(&dir).await?;
info!("content initialized directory: {:?}", dir);

Ok(Content { dir })
Ok(Content { config, dir })
}

// hard_link_or_copy_task hard links or copies the task content to the destination.
Expand Down Expand Up @@ -144,7 +146,8 @@ impl Content {
let range_reader = from_f.take(range.length);

// Use a buffer to read the range.
let mut range_reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, range_reader);
let mut range_reader =
BufReader::with_capacity(self.config.storage.read_buffer_size, range_reader);

let mut to_f = OpenOptions::new()
.create(true)
Expand Down Expand Up @@ -240,7 +243,7 @@ impl Content {
let task_path = self.dir.join(task_id);

// Use a buffer to read the piece.
let reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, reader);
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);

// Sha256 is used to calculate the hash of the piece.
let mut hasher = Sha256::new();
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Storage {
// new returns a new storage.
pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Self> {
let metadata = metadata::Metadata::new(dir)?;
let content = content::Content::new(dir).await?;
let content = content::Content::new(config.clone(), dir).await?;
Ok(Storage {
config,
metadata,
Expand Down
11 changes: 6 additions & 5 deletions dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ use tracing::{error, info, instrument, Span};

pub mod header;

// DEFAULT_BUFFER_SIZE is the buffer size to read and write, default is 32KB.
const DEFAULT_BUFFER_SIZE: usize = 32 * 1024;

// Response is the response of the proxy server.
pub type Response = hyper::Response<BoxBody<Bytes, ClientError>>;

Expand Down Expand Up @@ -474,7 +471,7 @@ async fn proxy_by_dfdaemon(
};

// Make the download task request.
let download_task_request = match make_download_task_request(config, rule, request) {
let download_task_request = match make_download_task_request(config.clone(), rule, request) {
Ok(download_task_request) => download_task_request,
Err(err) => {
error!("make download task request failed: {}", err);
Expand Down Expand Up @@ -562,6 +559,9 @@ async fn proxy_by_dfdaemon(
*response.headers_mut() = make_response_headers(download_task_started_response.clone())?;
*response.status_mut() = http::StatusCode::OK;

// Get the read buffer size from the config.
let read_buffer_size = config.proxy.read_buffer_size;

// Write task data to pipe. If grpc received error message,
// shutdown the writer.
tokio::spawn(async move {
Expand Down Expand Up @@ -616,8 +616,9 @@ async fn proxy_by_dfdaemon(
return;
}
};

// Use a buffer to read the piece.
let piece_reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, piece_reader);
let piece_reader = BufReader::with_capacity(read_buffer_size, piece_reader);

// Write the piece data to the pipe in order.
finished_piece_readers.insert(piece.number, piece_reader);
Expand Down

0 comments on commit 4849535

Please sign in to comment.