Skip to content

Commit c9fbce8

Browse files
hangvanejiangliu
authored andcommitted
nydusd: add the config support of amplify_io
Add the support of `amplify_io` in the config file of nydusd to configure read amplification. Signed-off-by: Wenhao Ren <[email protected]>
1 parent 468eeaa commit c9fbce8

File tree

9 files changed

+51
-29
lines changed

9 files changed

+51
-29
lines changed

api/src/config.rs

+11-10
Original file line numberDiff line numberDiff line change
@@ -819,8 +819,8 @@ pub struct RafsConfigV2 {
819819
/// Filesystem metadata cache mode.
820820
#[serde(default = "default_rafs_mode")]
821821
pub mode: String,
822-
/// Batch size to read data from storage cache layer.
823-
#[serde(rename = "batch_size", default = "default_batch_size")]
822+
/// Amplified user IO request batch size to read data from remote storage backend / local cache.
823+
#[serde(rename = "batch_size", default = "default_user_io_batch_size")]
824824
pub user_io_batch_size: usize,
825825
/// Whether to validate data digest.
826826
#[serde(default)]
@@ -874,7 +874,7 @@ pub struct PrefetchConfigV2 {
874874
/// Number of data prefetching working threads.
875875
#[serde(rename = "threads", default = "default_prefetch_threads_count")]
876876
pub threads_count: usize,
877-
/// The batch size to prefetch data from backend.
877+
/// The amplify batch size to prefetch data from backend.
878878
#[serde(default = "default_prefetch_batch_size")]
879879
pub batch_size: usize,
880880
/// Network bandwidth rate limit in unit of Bytes and Zero means no limit.
@@ -1194,11 +1194,11 @@ fn default_work_dir() -> String {
11941194
".".to_string()
11951195
}
11961196

1197-
pub fn default_batch_size() -> usize {
1198-
128 * 1024
1197+
pub fn default_user_io_batch_size() -> usize {
1198+
1024 * 1024
11991199
}
12001200

1201-
fn default_prefetch_batch_size() -> usize {
1201+
pub fn default_prefetch_batch_size() -> usize {
12021202
1024 * 1024
12031203
}
12041204

@@ -1363,8 +1363,9 @@ struct RafsConfig {
13631363
/// Record file name if file access trace log.
13641364
#[serde(default)]
13651365
pub latest_read_files: bool,
1366+
// Amplified user IO request batch size to read data from remote storage backend / local cache.
13661367
// ZERO value means, amplifying user io is not enabled.
1367-
#[serde(rename = "amplify_io", default = "default_batch_size")]
1368+
#[serde(rename = "amplify_io", default = "default_user_io_batch_size")]
13681369
pub user_io_batch_size: usize,
13691370
}
13701371

@@ -1410,8 +1411,8 @@ struct FsPrefetchControl {
14101411
#[serde(default = "default_prefetch_threads_count")]
14111412
pub threads_count: usize,
14121413

1413-
/// Window size in unit of bytes to merge request to backend.
1414-
#[serde(rename = "merging_size", default = "default_batch_size")]
1414+
/// The amplify batch size to prefetch data from backend.
1415+
#[serde(rename = "merging_size", default = "default_prefetch_batch_size")]
14151416
pub batch_size: usize,
14161417

14171418
/// Network bandwidth limitation for prefetching.
@@ -1449,7 +1450,7 @@ struct BlobPrefetchConfig {
14491450
pub enable: bool,
14501451
/// Number of data prefetching working threads.
14511452
pub threads_count: usize,
1452-
/// The maximum size of a merged IO request.
1453+
/// The amplify batch size to prefetch data from backend.
14531454
#[serde(rename = "merging_size")]
14541455
pub batch_size: usize,
14551456
/// Network bandwidth rate limit in unit of Bytes and Zero means no limit.

docs/nydusd.md

+3
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ We are working on enabling cloud-hypervisor support for nydus.
130130
"iostats_files": true,
131131
// Enable support of fs extended attributes
132132
"enable_xattr": false,
133+
// Amplified user IO request batch size to read data from remote storage backend / local cache
134+
// in unit of Bytes, valid values: 0-268435456, default: 1048576
135+
"amplify_io": 1048576,
133136
"fs_prefetch": {
134137
// Enable blob prefetch
135138
"enable": false,

misc/configs/nydusd-config-v2.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ bandwidth_limit = 10000000
142142
[rafs]
143143
# Filesystem metadata cache mode, "direct" or "cached". "direct" is almost what you want.
144144
mode = "direct"
145-
# Batch size to read data from storage cache layer, valid values: 0-0x10000000
145+
# Amplified user IO request batch size to read data from remote storage backend / local cache,
146+
# valid values: 0-0x10000000
146147
batch_size = 1000000
147148
# Whether to validate data digest.
148149
validate = true

service/src/fs_cache.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -518,8 +518,8 @@ impl FsCacheHandler {
518518
.map_err(|e| eother!(format!("failed to start prefetch worker, {}", e)))?;
519519

520520
let size = match cache_cfg.prefetch.batch_size.checked_next_power_of_two() {
521-
None => nydus_api::default_batch_size() as u64,
522-
Some(1) => nydus_api::default_batch_size() as u64,
521+
None => nydus_api::default_prefetch_batch_size() as u64,
522+
Some(1) => nydus_api::default_prefetch_batch_size() as u64,
523523
Some(s) => s as u64,
524524
};
525525
let size = std::cmp::max(0x4_0000u64, size);

storage/src/cache/cachedfile.rs

+9-8
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ pub(crate) struct FileCacheEntry {
164164
pub(crate) dio_enabled: bool,
165165
// Data from the file cache should be validated before use.
166166
pub(crate) need_validation: bool,
167-
pub(crate) batch_size: u64,
167+
// Amplified user IO request batch size to read data from remote storage backend / local cache.
168+
pub(crate) user_io_batch_size: u32,
168169
pub(crate) prefetch_config: Arc<AsyncPrefetchConfig>,
169170
}
170171

@@ -308,11 +309,11 @@ impl FileCacheEntry {
308309
}
309310
}
310311

311-
fn ondemand_batch_size(&self) -> u64 {
312-
if self.batch_size < 0x2_0000 {
312+
fn user_io_batch_size(&self) -> u64 {
313+
if self.user_io_batch_size < 0x2_0000 {
313314
0x2_0000
314315
} else {
315-
self.batch_size
316+
self.user_io_batch_size as u64
316317
}
317318
}
318319

@@ -745,7 +746,7 @@ impl BlobObject for FileCacheEntry {
745746

746747
let meta = self.meta.as_ref().ok_or_else(|| einval!())?;
747748
let meta = meta.get_blob_meta().ok_or_else(|| einval!())?;
748-
let mut chunks = meta.get_chunks_uncompressed(offset, size, self.ondemand_batch_size())?;
749+
let mut chunks = meta.get_chunks_uncompressed(offset, size, self.user_io_batch_size())?;
749750
if let Some(meta) = self.get_blob_meta_info()? {
750751
chunks = self.strip_ready_chunks(meta, None, chunks);
751752
}
@@ -934,7 +935,7 @@ impl FileCacheEntry {
934935
fn read_iter(&self, bios: &mut [BlobIoDesc], buffers: &[FileVolatileSlice]) -> Result<usize> {
935936
// Merge requests with continuous blob addresses.
936937
let requests = self
937-
.merge_requests_for_user(bios, self.ondemand_batch_size())
938+
.merge_requests_for_user(bios, self.user_io_batch_size())
938939
.ok_or_else(|| {
939940
for bio in bios.iter() {
940941
self.update_chunk_pending_status(&bio.chunkinfo, false);
@@ -1100,14 +1101,14 @@ impl FileCacheEntry {
11001101
+ region.chunks[idx].compressed_size() as u64;
11011102
let start = region.chunks[idx + 1].compressed_offset();
11021103
assert!(end <= start);
1103-
assert!(start - end <= self.ondemand_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT);
1104+
assert!(start - end <= self.user_io_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT);
11041105
assert!(region.chunks[idx].id() < region.chunks[idx + 1].id());
11051106
}
11061107
}
11071108

11081109
// Try to extend requests.
11091110
let mut region_hold;
1110-
if let Some(v) = self.extend_pending_chunks(&region.chunks, self.ondemand_batch_size())? {
1111+
if let Some(v) = self.extend_pending_chunks(&region.chunks, self.user_io_batch_size())? {
11111112
if v.len() > r.chunks.len() {
11121113
let mut tag_set = HashSet::new();
11131114
for (idx, chunk) in region.chunks.iter().enumerate() {

storage/src/cache/filecache/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::cache::state::{
2323
use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
2424
use crate::cache::{BlobCache, BlobCacheMgr};
2525
use crate::device::{BlobFeatures, BlobInfo};
26-
use crate::RAFS_DEFAULT_CHUNK_SIZE;
2726

2827
pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw";
2928
pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data";
@@ -46,6 +45,7 @@ pub struct FileCacheMgr {
4645
cache_convergent_encryption: bool,
4746
cache_encryption_key: String,
4847
closed: Arc<AtomicBool>,
48+
user_io_batch_size: u32,
4949
}
5050

5151
impl FileCacheMgr {
@@ -55,6 +55,7 @@ impl FileCacheMgr {
5555
backend: Arc<dyn BlobBackend>,
5656
runtime: Arc<Runtime>,
5757
id: &str,
58+
user_io_batch_size: u32,
5859
) -> Result<FileCacheMgr> {
5960
let blob_cfg = config.get_filecache_config()?;
6061
let work_dir = blob_cfg.get_work_dir()?;
@@ -77,6 +78,7 @@ impl FileCacheMgr {
7778
cache_convergent_encryption: blob_cfg.enable_convergent_encryption,
7879
cache_encryption_key: blob_cfg.encryption_key.clone(),
7980
closed: Arc::new(AtomicBool::new(false)),
81+
user_io_batch_size,
8082
})
8183
}
8284

@@ -339,7 +341,7 @@ impl FileCacheEntry {
339341
is_zran,
340342
dio_enabled: false,
341343
need_validation,
342-
batch_size: RAFS_DEFAULT_CHUNK_SIZE,
344+
user_io_batch_size: mgr.user_io_batch_size,
343345
prefetch_config,
344346
})
345347
}

storage/src/cache/fscache/mod.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
2020
use crate::cache::{BlobCache, BlobCacheMgr};
2121
use crate::device::{BlobFeatures, BlobInfo, BlobObject};
2222
use crate::factory::BLOB_FACTORY;
23-
use crate::RAFS_DEFAULT_CHUNK_SIZE;
2423

2524
use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX;
2625

@@ -40,6 +39,7 @@ pub struct FsCacheMgr {
4039
need_validation: bool,
4140
blobs_check_count: Arc<AtomicU8>,
4241
closed: Arc<AtomicBool>,
42+
user_io_batch_size: u32,
4343
}
4444

4545
impl FsCacheMgr {
@@ -49,6 +49,7 @@ impl FsCacheMgr {
4949
backend: Arc<dyn BlobBackend>,
5050
runtime: Arc<Runtime>,
5151
id: &str,
52+
user_io_batch_size: u32,
5253
) -> Result<FsCacheMgr> {
5354
if config.cache_compressed {
5455
return Err(enosys!("fscache doesn't support compressed cache mode"));
@@ -73,6 +74,7 @@ impl FsCacheMgr {
7374
need_validation: config.cache_validate,
7475
blobs_check_count: Arc::new(AtomicU8::new(0)),
7576
closed: Arc::new(AtomicBool::new(false)),
77+
user_io_batch_size,
7678
})
7779
}
7880

@@ -290,7 +292,7 @@ impl FileCacheEntry {
290292
is_zran,
291293
dio_enabled: true,
292294
need_validation,
293-
batch_size: RAFS_DEFAULT_CHUNK_SIZE,
295+
user_io_batch_size: mgr.user_io_batch_size,
294296
prefetch_config,
295297
})
296298
}
@@ -374,7 +376,7 @@ mod tests {
374376
use nydus_api::ConfigV2;
375377
use nydus_utils::{compress, metrics::BackendMetrics};
376378

377-
use crate::{factory::ASYNC_RUNTIME, test::MockBackend};
379+
use crate::{factory::ASYNC_RUNTIME, test::MockBackend, RAFS_DEFAULT_CHUNK_SIZE};
378380

379381
use super::*;
380382

@@ -407,6 +409,7 @@ mod tests {
407409
Arc::new(backend),
408410
ASYNC_RUNTIME.clone(),
409411
&cfg.id,
412+
0,
410413
)
411414
.unwrap();
412415
assert!(mgr.init().is_ok());

storage/src/cache/worker.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub(crate) struct AsyncPrefetchConfig {
2525
pub enable: bool,
2626
/// Number of working threads.
2727
pub threads_count: usize,
28-
/// Window size to merge/amplify requests.
28+
/// The amplify batch size to prefetch data from backend.
2929
pub batch_size: usize,
3030
/// Network bandwidth for prefetch, in unit of Bytes and Zero means no rate limit is set.
3131
#[allow(unused)]

storage/src/factory.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::sync::{Arc, Mutex};
1717
use std::time::Duration;
1818

1919
use lazy_static::lazy_static;
20-
use nydus_api::{BackendConfigV2, ConfigV2};
20+
use nydus_api::{default_user_io_batch_size, BackendConfigV2, ConfigV2};
2121
use tokio::runtime::{Builder, Runtime};
2222
use tokio::time;
2323

@@ -117,6 +117,10 @@ impl BlobFactory {
117117
) -> IOResult<Arc<dyn BlobCache>> {
118118
let backend_cfg = config.get_backend_config()?;
119119
let cache_cfg = config.get_cache_config()?;
120+
let user_io_batch_size = config
121+
.get_rafs_config()
122+
.map_or_else(|_| default_user_io_batch_size(), |v| v.user_io_batch_size)
123+
as u32;
120124
let key = BlobCacheMgrKey {
121125
config: config.clone(),
122126
};
@@ -128,7 +132,13 @@ impl BlobFactory {
128132
let backend = Self::new_backend(backend_cfg, &blob_info.blob_id())?;
129133
let mgr = match cache_cfg.cache_type.as_str() {
130134
"blobcache" | "filecache" => {
131-
let mgr = FileCacheMgr::new(cache_cfg, backend, ASYNC_RUNTIME.clone(), &config.id)?;
135+
let mgr = FileCacheMgr::new(
136+
cache_cfg,
137+
backend,
138+
ASYNC_RUNTIME.clone(),
139+
&config.id,
140+
user_io_batch_size,
141+
)?;
132142
mgr.init()?;
133143
Arc::new(mgr) as Arc<dyn BlobCacheMgr>
134144
}
@@ -139,6 +149,7 @@ impl BlobFactory {
139149
backend,
140150
ASYNC_RUNTIME.clone(),
141151
&config.id,
152+
user_io_batch_size,
142153
)?;
143154
mgr.init()?;
144155
Arc::new(mgr) as Arc<dyn BlobCacheMgr>

0 commit comments

Comments
 (0)