Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion mountpoint-s3-client/examples/client_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ struct CliArgs {
}

fn create_s3_client_config(region: &str, args: &CliArgs, nics: Vec<String>) -> S3ClientConfig {
let pool = PagedPool::new_with_candidate_sizes_unlimited([args.part_size]);
let pool = PagedPool::config()
.with_candidate_sizes([args.part_size])
.with_no_memory_limit()
.build();
let mut config = S3ClientConfig::new()
.endpoint_config(EndpointConfig::new(region))
.throughput_target_gbps(args.throughput_target_gbps)
Expand Down
5 changes: 4 additions & 1 deletion mountpoint-s3-client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ pub fn set_up_client_config(config: S3ClientConfig) -> S3ClientConfig {

#[cfg(feature = "fs_pool_tests")]
let config = config.memory_pool_factory(|options: mountpoint_s3_client::config::MemoryPoolFactoryOptions| {
mountpoint_s3_fs::memory::PagedPool::new_with_candidate_sizes_unlimited([options.part_size()])
mountpoint_s3_fs::memory::PagedPool::config()
.with_candidate_sizes([options.part_size()])
.with_no_memory_limit()
.build()
});

config
Expand Down
5 changes: 4 additions & 1 deletion mountpoint-s3-fs/benches/cache_serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ fn cache_read_benchmark(group: &mut BenchmarkGroup<'_, WallTime>, dir_path: &Pat
block_size: BLOCK_SIZE,
limit: mountpoint_s3_fs::data_cache::CacheLimit::Unbounded,
};
let pool = PagedPool::new_with_candidate_sizes_unlimited([BLOCK_SIZE as usize]);
let pool = PagedPool::config()
.with_candidate_sizes([BLOCK_SIZE as usize])
.with_no_memory_limit()
.build();
let cache = DiskDataCache::new(config, pool);
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
let bytes = ChecksummedBytes::new(data.to_owned().into());
Expand Down
5 changes: 4 additions & 1 deletion mountpoint-s3-fs/examples/fs_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ fn mount_file_system(
throughput_target_gbps: Option<f64>,
) -> FuseSession {
let part_size = 8 * 1024 * 1024;
let pool = PagedPool::new_with_candidate_sizes_minimally_limited([part_size]);
let pool = PagedPool::config()
.with_candidate_sizes([part_size])
.with_minimum_memory_limit()
.build();
let mut config = S3ClientConfig::new().endpoint_config(EndpointConfig::new(region));
config = config
.read_backpressure(true)
Expand Down
5 changes: 4 additions & 1 deletion mountpoint-s3-fs/examples/mount_from_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,10 @@ fn mount_filesystem(
let mem_limit = config
.memory_limit_bytes
.unwrap_or(mountpoint_s3_fs::memory::MINIMUM_MEM_LIMIT);
let pool = PagedPool::new_with_candidate_sizes([client_config.part_config.read_size_bytes], mem_limit);
let pool = PagedPool::config()
.with_candidate_sizes([client_config.part_config.read_size_bytes])
.with_memory_limit(mem_limit)
.build();
let client = client_config
.create_client(pool.clone(), None)
.context("Failed to create S3 client")?;
Expand Down
8 changes: 4 additions & 4 deletions mountpoint-s3-fs/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ fn main() -> anyhow::Result<()> {
let args = CliArgs::parse();

let bucket = args.bucket.as_str();
let pool = PagedPool::new_with_candidate_sizes(
[args.part_size.unwrap_or(8 * 1024 * 1024) as usize],
args.memory_target_in_bytes(),
);
let pool = PagedPool::config()
.with_candidate_sizes([args.part_size.unwrap_or(8 * 1024 * 1024) as usize])
.with_memory_limit(args.memory_target_in_bytes())
.build();
let client_config = args.s3_client_config().memory_pool(pool.clone());
let client = S3CrtClient::new(client_config).context("failed to create S3 CRT client")?;
let runtime = Runtime::new(client.event_loop_group());
Expand Down
5 changes: 4 additions & 1 deletion mountpoint-s3-fs/examples/s3io_benchmark/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ impl Executor {
};

let memory_target_bytes = (max_memory_target * 1024 * 1024) as u64;
let pool = PagedPool::new_with_candidate_sizes([read_part_size, write_part_size], memory_target_bytes);
let pool = PagedPool::config()
.with_candidate_sizes([read_part_size, write_part_size])
.with_memory_limit(memory_target_bytes)
.build();

let mut endpoint_config = EndpointConfig::new(region);
if let Some(url) = &global.endpoint_url {
Expand Down
5 changes: 4 additions & 1 deletion mountpoint-s3-fs/examples/upload_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ fn main() {
// Default to 95% of total system memory (cgroup-aware)
(effective_total_memory() as f64 * 0.95) as u64
};
let pool = PagedPool::new_with_candidate_sizes([args.write_part_size], max_memory_target);
let pool = PagedPool::config()
.with_candidate_sizes([args.write_part_size])
.with_memory_limit(max_memory_target)
.build();
let config = S3ClientConfig::new()
.endpoint_config(endpoint_config)
.throughput_target_gbps(args.throughput_target_gbps as f64)
Expand Down
35 changes: 28 additions & 7 deletions mountpoint-s3-fs/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,10 @@ mod tests {
#[test]
fn get_path_for_block_key() {
let cache_dir = PathBuf::from("mountpoint-cache/");
let pool = PagedPool::new_with_candidate_sizes_unlimited([1024]);
let pool = PagedPool::config()
.with_candidate_sizes([1024])
.with_no_memory_limit()
.build();
let data_cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_dir,
Expand Down Expand Up @@ -719,7 +722,10 @@ mod tests {
#[test]
fn get_path_for_block_key_huge_block_index() {
let cache_dir = PathBuf::from("mountpoint-cache/");
let pool = PagedPool::new_with_candidate_sizes_unlimited([1024]);
let pool = PagedPool::config()
.with_candidate_sizes([1024])
.with_no_memory_limit()
.build();
let data_cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_dir,
Expand Down Expand Up @@ -761,7 +767,10 @@ mod tests {
let object_2_size = data_2.len();

let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes_unlimited([pool_buffer_size]);
let pool = PagedPool::config()
.with_candidate_sizes([pool_buffer_size])
.with_no_memory_limit()
.build();
let cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
Expand Down Expand Up @@ -848,7 +857,10 @@ mod tests {
let slice = data.slice(1..5);

let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes_unlimited([8 * 1024 * 1024]);
let pool = PagedPool::config()
.with_candidate_sizes([8 * 1024 * 1024])
.with_no_memory_limit()
.build();
let cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
Expand Down Expand Up @@ -930,7 +942,10 @@ mod tests {
let small_object_key = ObjectId::new("small".into(), ETag::for_tests());

let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes_unlimited([BLOCK_SIZE]);
let pool = PagedPool::config()
.with_candidate_sizes([BLOCK_SIZE])
.with_no_memory_limit()
.build();
let cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
Expand Down Expand Up @@ -1113,7 +1128,10 @@ mod tests {
// "Corrupt" the serialized value with an invalid length.
replace_u64_at(&mut buf, offset, u64::MAX);

let pool = PagedPool::new_with_candidate_sizes_unlimited([MAX_LENGTH as usize]);
let pool = PagedPool::config()
.with_candidate_sizes([MAX_LENGTH as usize])
.with_no_memory_limit()
.build();
let err =
DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH, &pool, None).expect_err("deserialization should fail");
match length_to_corrupt {
Expand All @@ -1130,7 +1148,10 @@ mod tests {
fn test_concurrent_access() {
let block_size = 1024 * 1024;
let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes_unlimited([block_size]);
let pool = PagedPool::config()
.with_candidate_sizes([block_size])
.with_no_memory_limit()
.build();
let data_cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
Expand Down
5 changes: 4 additions & 1 deletion mountpoint-s3-fs/src/data_cache/multilevel_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ mod tests {

fn create_disk_cache() -> (TempDir, Arc<DiskDataCache>) {
let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes_unlimited([BLOCK_SIZE as usize, PART_SIZE]);
let pool = PagedPool::config()
.with_candidate_sizes([BLOCK_SIZE as usize, PART_SIZE])
.with_no_memory_limit()
.build();
let cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
Expand Down
11 changes: 8 additions & 3 deletions mountpoint-s3-fs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,6 @@ where
mod tests {
use super::*;

use crate::memory::MINIMUM_MEM_LIMIT;
use crate::prefetch::Prefetcher;
use crate::s3::{Bucket, S3Path};
use crate::{Superblock, SuperblockConfig};
Expand All @@ -809,7 +808,10 @@ mod tests {
client.add_object("dir1/file1.bin", MockObject::constant(0xa1, 15, ETag::for_tests()));

let runtime = Runtime::new(ThreadPool::builder().pool_size(1).create().unwrap());
let pool = PagedPool::new_with_candidate_sizes([32], MINIMUM_MEM_LIMIT);
let pool = PagedPool::config()
.with_candidate_sizes([32])
.with_minimum_memory_limit()
.build();
let prefetcher_builder = Prefetcher::default_builder(client.clone());
let server_side_encryption =
ServerSideEncryption::new(Some("aws:kms".to_owned()), Some("some_key_alias".to_owned()));
Expand Down Expand Up @@ -1073,7 +1075,10 @@ mod tests {
);

let runtime = Runtime::new(ThreadPool::builder().pool_size(10).create().unwrap());
let pool = PagedPool::new_with_candidate_sizes([32], MINIMUM_MEM_LIMIT);
let pool = PagedPool::config()
.with_candidate_sizes([32])
.with_minimum_memory_limit()
.build();
let prefetcher_builder = Prefetcher::default_builder(client.clone());
let fs_config = S3FilesystemConfig {
allow_overwrite,
Expand Down
25 changes: 20 additions & 5 deletions mountpoint-s3-fs/src/memory/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,10 @@ mod tests {
use crate::sync::atomic::Ordering;

fn new_pool() -> PagedPool {
PagedPool::new_with_candidate_sizes_minimally_limited([1024])
PagedPool::config()
.with_candidate_sizes([1024])
.with_minimum_memory_limit()
.build()
}

#[test]
Expand Down Expand Up @@ -430,7 +433,10 @@ mod tests {
fn test_on_pool_reserve_noop_after_release_cursor() {
// Simulates the cancellation race: on_reserve fires after release_cursor
// removed the entry. The callback should be a no-op.
let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]);
let pool = PagedPool::config()
.with_candidate_sizes([1024])
.with_minimum_memory_limit()
.build();
let limiter = pool.limiter();
let cursor = pool.create_cursor().state();

Expand All @@ -450,7 +456,10 @@ mod tests {

#[test]
fn test_on_pool_reserve_saturates_on_over_decrement() {
let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]);
let pool = PagedPool::config()
.with_candidate_sizes([1024])
.with_minimum_memory_limit()
.build();
let limiter = pool.limiter();
let cursor = pool.create_cursor().state();

Expand All @@ -471,7 +480,10 @@ mod tests {

#[test]
fn test_available_mem_accounts_for_pool_allocations() {
let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]);
let pool = PagedPool::config()
.with_candidate_sizes([1024])
.with_minimum_memory_limit()
.build();
let limiter = pool.limiter();
let cursor = pool.create_cursor().state();
let stats = pool.stats();
Expand All @@ -495,7 +507,10 @@ mod tests {

#[test]
fn test_upload_allocation_does_not_affect_mem_reserved() {
let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]);
let pool = PagedPool::config()
.with_candidate_sizes([1024])
.with_minimum_memory_limit()
.build();
let limiter = pool.limiter();
let stats = pool.stats();

Expand Down
Loading
Loading