Skip to content
Merged
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ jobs:
- name: Run cgroup-aware memory detection test
env:
TEST_CGROUP_MEM_LIMIT_MB: "2048"
run: cargo test -p mountpoint-s3-fs --lib mem_limiter::tests::test_effective_total_memory_respects_cgroup
run: cargo test -p mountpoint-s3-fs --lib memory::limiter::tests::test_effective_total_memory_respects_cgroup

workflow-complete:
name: "Tests Workflow Complete"
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/examples/client_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ struct CliArgs {
}

fn create_s3_client_config(region: &str, args: &CliArgs, nics: Vec<String>) -> S3ClientConfig {
let pool = PagedPool::new_with_candidate_sizes([args.part_size]);
let pool = PagedPool::new_with_candidate_sizes_unlimited([args.part_size]);
let mut config = S3ClientConfig::new()
.endpoint_config(EndpointConfig::new(region))
.throughput_target_gbps(args.throughput_target_gbps)
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub fn set_up_client_config(config: S3ClientConfig) -> S3ClientConfig {

#[cfg(feature = "fs_pool_tests")]
let config = config.memory_pool_factory(|options: mountpoint_s3_client::config::MemoryPoolFactoryOptions| {
mountpoint_s3_fs::memory::PagedPool::new_with_candidate_sizes([options.part_size()])
mountpoint_s3_fs::memory::PagedPool::new_with_candidate_sizes_unlimited([options.part_size()])
});

config
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-fs/benches/cache_serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn cache_read_benchmark(group: &mut BenchmarkGroup<'_, WallTime>, dir_path: &Pat
block_size: BLOCK_SIZE,
limit: mountpoint_s3_fs::data_cache::CacheLimit::Unbounded,
};
let pool = PagedPool::new_with_candidate_sizes([BLOCK_SIZE as usize]);
let pool = PagedPool::new_with_candidate_sizes_unlimited([BLOCK_SIZE as usize]);
let cache = DiskDataCache::new(config, pool);
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
let bytes = ChecksummedBytes::new(data.to_owned().into());
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-fs/examples/fs_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ fn mount_file_system(
throughput_target_gbps: Option<f64>,
) -> FuseSession {
let part_size = 8 * 1024 * 1024;
let pool = PagedPool::new_with_candidate_sizes([part_size]);
let pool = PagedPool::new_with_candidate_sizes_minimally_limited([part_size]);
let mut config = S3ClientConfig::new().endpoint_config(EndpointConfig::new(region));
config = config
.read_backpressure(true)
Expand Down
8 changes: 4 additions & 4 deletions mountpoint-s3-fs/examples/mount_from_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ impl ConfigOptions {
if let Some(gid) = self.gid {
fs_config.uid = gid;
}
if let Some(memory_limit_bytes) = self.memory_limit_bytes {
fs_config.mem_limit = memory_limit_bytes;
}
// For this binary we expect sequential read pattern. Thus, opt-out from the 1MB-initial request,
// trading-off latency for throughput and more accurate memory limiting.
fs_config.prefetcher_config.initial_request_size = 0;
Expand Down Expand Up @@ -310,7 +307,10 @@ fn mount_filesystem(

// Create the client and runtime
let client_config = config.build_client_config()?;
let pool = PagedPool::new_with_candidate_sizes([client_config.part_config.read_size_bytes]);
let mem_limit = config
.memory_limit_bytes
.unwrap_or(mountpoint_s3_fs::memory::MINIMUM_MEM_LIMIT);
let pool = PagedPool::new_with_candidate_sizes([client_config.part_config.read_size_bytes], mem_limit);
let client = client_config
.create_client(pool.clone(), None)
.context("Failed to create S3 client")?;
Expand Down
10 changes: 6 additions & 4 deletions mountpoint-s3-fs/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use mountpoint_s3_client::config::{EndpointConfig, RustLogAdapter, S3ClientConfi
use mountpoint_s3_client::types::HeadObjectParams;
use mountpoint_s3_client::{ObjectClient, S3CrtClient};
use mountpoint_s3_fs::Runtime;
use mountpoint_s3_fs::mem_limiter::{MemoryLimiter, effective_total_memory};
use mountpoint_s3_fs::memory::PagedPool;
use mountpoint_s3_fs::memory::effective_total_memory;
use mountpoint_s3_fs::object::ObjectId;
use mountpoint_s3_fs::prefetch::{PrefetchGetObject, Prefetcher, PrefetcherConfig};
use serde_json::{json, to_writer};
Expand Down Expand Up @@ -167,10 +167,12 @@ fn main() -> anyhow::Result<()> {
let args = CliArgs::parse();

let bucket = args.bucket.as_str();
let pool = PagedPool::new_with_candidate_sizes([args.part_size.unwrap_or(8 * 1024 * 1024) as usize]);
let pool = PagedPool::new_with_candidate_sizes(
[args.part_size.unwrap_or(8 * 1024 * 1024) as usize],
args.memory_target_in_bytes(),
);
let client_config = args.s3_client_config().memory_pool(pool.clone());
let client = S3CrtClient::new(client_config).context("failed to create S3 CRT client")?;
let mem_limiter = Arc::new(MemoryLimiter::new(pool, args.memory_target_in_bytes()));
let runtime = Runtime::new(client.event_loop_group());

// Verify if all objects exist and collect metadata
Expand All @@ -195,7 +197,7 @@ fn main() -> anyhow::Result<()> {
let start = Instant::now();
let manager = Prefetcher::default_builder(client.clone()).build(
runtime.clone(),
mem_limiter.clone(),
pool.clone(),
PrefetcherConfig::default(),
);

Expand Down
20 changes: 7 additions & 13 deletions mountpoint-s3-fs/examples/s3io_benchmark/executor.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::Instant;

use mountpoint_s3_client::config::{Allocator, EndpointConfig, S3ClientConfig, Uri};
use mountpoint_s3_client::types::HeadObjectParams;
use mountpoint_s3_client::{ObjectClient, S3CrtClient};
use mountpoint_s3_fs::mem_limiter::{MemoryLimiter, effective_total_memory};
use mountpoint_s3_fs::memory::PagedPool;
use mountpoint_s3_fs::memory::effective_total_memory;
Comment thread
passaro marked this conversation as resolved.
use mountpoint_s3_fs::object::ObjectId;
use mountpoint_s3_fs::prefetch::{Prefetcher, PrefetcherConfig};
use mountpoint_s3_fs::upload::{Uploader, UploaderConfig};
use mountpoint_s3_fs::{Runtime, ServerSideEncryption};
use rand::{RngExt, SeedableRng};
use rand_pcg::Pcg64;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::time::Instant;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert std on top.

use thiserror::Error;

use crate::config::{AccessPattern, ChecksumAlgorithm, GlobalConfig, ResolvedJobConfig, SseType, WorkloadType};
Expand Down Expand Up @@ -65,7 +63,8 @@ impl Executor {
ChecksumAlgorithm::Off => None,
};

let pool = PagedPool::new_with_candidate_sizes([read_part_size, write_part_size]);
let memory_target_bytes = (max_memory_target * 1024 * 1024) as u64;
let pool = PagedPool::new_with_candidate_sizes([read_part_size, write_part_size], memory_target_bytes);

let mut endpoint_config = EndpointConfig::new(region);
if let Some(url) = &global.endpoint_url {
Expand All @@ -92,9 +91,6 @@ impl Executor {
let client = S3CrtClient::new(client_config)
.map_err(|e| ExecutionError::ResourceInitError(format!("Failed to create S3 client: {}", e)))?;

let memory_target_bytes = (max_memory_target * 1024 * 1024) as u64;
let mem_limiter = Arc::new(MemoryLimiter::new(pool.clone(), memory_target_bytes));

let runtime = Runtime::new(client.event_loop_group());

let server_side_encryption = ServerSideEncryption::new(sse_type, global.sse_kms_key_id.clone());
Expand All @@ -103,14 +99,12 @@ impl Executor {
client.clone(),
runtime.clone(),
pool.clone(),
mem_limiter.clone(),
UploaderConfig::new(write_part_size)
.server_side_encryption(server_side_encryption)
.default_checksum_algorithm(checksum_algorithm),
);

let prefetcher =
Prefetcher::default_builder(client.clone()).build(runtime, mem_limiter, PrefetcherConfig::default());
let prefetcher = Prefetcher::default_builder(client.clone()).build(runtime, pool, PrefetcherConfig::default());

Ok(Self {
client,
Expand Down
19 changes: 8 additions & 11 deletions mountpoint-s3-fs/examples/upload_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use clap::Parser;
use mountpoint_s3_client::config::{Allocator, EndpointConfig, RustLogAdapter, S3ClientConfig, Uri};
use mountpoint_s3_client::types::ChecksumAlgorithm;
use mountpoint_s3_client::{ObjectClient, S3CrtClient};
use mountpoint_s3_fs::mem_limiter::{MemoryLimiter, effective_total_memory};
use mountpoint_s3_fs::memory::PagedPool;
use mountpoint_s3_fs::memory::effective_total_memory;
use mountpoint_s3_fs::upload::{Uploader, UploaderConfig};
use mountpoint_s3_fs::{Runtime, ServerSideEncryption};
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -96,7 +96,13 @@ fn main() {
let endpoint_uri = Uri::new_from_str(&Allocator::default(), url).expect("Failed to parse endpoint URL");
endpoint_config = endpoint_config.endpoint(endpoint_uri);
}
let pool = PagedPool::new_with_candidate_sizes([args.write_part_size]);
let max_memory_target = if let Some(target) = args.max_memory_target {
target * 1024 * 1024
} else {
// Default to 95% of total system memory (cgroup-aware)
(effective_total_memory() as f64 * 0.95) as u64
};
let pool = PagedPool::new_with_candidate_sizes([args.write_part_size], max_memory_target);
let config = S3ClientConfig::new()
.endpoint_config(endpoint_config)
.throughput_target_gbps(args.throughput_target_gbps as f64)
Expand All @@ -107,14 +113,6 @@ fn main() {
let runtime = Runtime::new(client.event_loop_group());

for i in 0..args.iterations {
let max_memory_target = if let Some(target) = args.max_memory_target {
target * 1024 * 1024
} else {
// Default to 95% of total system memory (cgroup-aware)
(effective_total_memory() as f64 * 0.95) as u64
};
let mem_limiter = Arc::new(MemoryLimiter::new(pool.clone(), max_memory_target));

let buffer_size = args.write_part_size;
let server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone());

Expand All @@ -130,7 +128,6 @@ fn main() {
client.clone(),
runtime.clone(),
pool.clone(),
mem_limiter,
UploaderConfig::new(buffer_size)
.server_side_encryption(server_side_encryption)
.default_checksum_algorithm(checksum_algorithm),
Expand Down
14 changes: 7 additions & 7 deletions mountpoint-s3-fs/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ mod tests {
#[test]
fn get_path_for_block_key() {
let cache_dir = PathBuf::from("mountpoint-cache/");
let pool = PagedPool::new_with_candidate_sizes([1024]);
let pool = PagedPool::new_with_candidate_sizes_unlimited([1024]);
let data_cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_dir,
Expand Down Expand Up @@ -719,7 +719,7 @@ mod tests {
#[test]
fn get_path_for_block_key_huge_block_index() {
let cache_dir = PathBuf::from("mountpoint-cache/");
let pool = PagedPool::new_with_candidate_sizes([1024]);
let pool = PagedPool::new_with_candidate_sizes_unlimited([1024]);
let data_cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_dir,
Expand Down Expand Up @@ -761,7 +761,7 @@ mod tests {
let object_2_size = data_2.len();

let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes([pool_buffer_size]);
let pool = PagedPool::new_with_candidate_sizes_unlimited([pool_buffer_size]);
let cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
Expand Down Expand Up @@ -848,7 +848,7 @@ mod tests {
let slice = data.slice(1..5);

let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes([8 * 1024 * 1024]);
let pool = PagedPool::new_with_candidate_sizes_unlimited([8 * 1024 * 1024]);
let cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
Expand Down Expand Up @@ -930,7 +930,7 @@ mod tests {
let small_object_key = ObjectId::new("small".into(), ETag::for_tests());

let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes([BLOCK_SIZE]);
let pool = PagedPool::new_with_candidate_sizes_unlimited([BLOCK_SIZE]);
let cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
Expand Down Expand Up @@ -1113,7 +1113,7 @@ mod tests {
// "Corrupt" the serialized value with an invalid length.
replace_u64_at(&mut buf, offset, u64::MAX);

let pool = PagedPool::new_with_candidate_sizes([MAX_LENGTH as usize]);
let pool = PagedPool::new_with_candidate_sizes_unlimited([MAX_LENGTH as usize]);
let err =
DiskBlock::read(&mut Cursor::new(buf), MAX_LENGTH, &pool, None).expect_err("deserialization should fail");
match length_to_corrupt {
Expand All @@ -1130,7 +1130,7 @@ mod tests {
fn test_concurrent_access() {
let block_size = 1024 * 1024;
let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes([block_size]);
let pool = PagedPool::new_with_candidate_sizes_unlimited([block_size]);
let data_cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-fs/src/data_cache/multilevel_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {

fn create_disk_cache() -> (TempDir, Arc<DiskDataCache>) {
let cache_directory = tempfile::tempdir().unwrap();
let pool = PagedPool::new_with_candidate_sizes([BLOCK_SIZE as usize, PART_SIZE]);
let pool = PagedPool::new_with_candidate_sizes_unlimited([BLOCK_SIZE as usize, PART_SIZE]);
let cache = DiskDataCache::new(
DiskDataCacheConfig {
cache_directory: cache_directory.path().to_path_buf(),
Expand Down
11 changes: 5 additions & 6 deletions mountpoint-s3-fs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use tracing::{Level, debug, trace};

use crate::async_util::Runtime;
use crate::logging;
use crate::mem_limiter::MemoryLimiter;
use crate::memory::PagedPool;
use crate::metablock::{AddDirEntry, AddDirEntryResult, InodeInformation, Metablock, PendingUploadHook, ReadWriteMode};
pub use crate::metablock::{InodeError, InodeKind, InodeNo};
Expand Down Expand Up @@ -150,13 +149,12 @@ where
) -> Self {
trace!(?config, "new filesystem");

let mem_limiter = Arc::new(MemoryLimiter::new(pool.clone(), config.mem_limit));
let prefetcher = prefetch_builder.build(runtime.clone(), mem_limiter.clone(), config.prefetcher_config);
let pool = pool.clone();
let prefetcher = prefetch_builder.build(runtime.clone(), pool.clone(), config.prefetcher_config);
let uploader = Uploader::new(
client.clone(),
runtime,
pool,
mem_limiter,
UploaderConfig::new(client.write_part_size())
.storage_class(config.storage_class.to_owned())
.server_side_encryption(config.server_side_encryption.clone())
Expand Down Expand Up @@ -787,6 +785,7 @@ where
mod tests {
use super::*;

use crate::memory::MINIMUM_MEM_LIMIT;
use crate::prefetch::Prefetcher;
use crate::s3::{Bucket, S3Path};
use crate::{Superblock, SuperblockConfig};
Expand All @@ -810,7 +809,7 @@ mod tests {
client.add_object("dir1/file1.bin", MockObject::constant(0xa1, 15, ETag::for_tests()));

let runtime = Runtime::new(ThreadPool::builder().pool_size(1).create().unwrap());
let pool = PagedPool::new_with_candidate_sizes([32]);
let pool = PagedPool::new_with_candidate_sizes([32], MINIMUM_MEM_LIMIT);
let prefetcher_builder = Prefetcher::default_builder(client.clone());
let server_side_encryption =
ServerSideEncryption::new(Some("aws:kms".to_owned()), Some("some_key_alias".to_owned()));
Expand Down Expand Up @@ -1074,7 +1073,7 @@ mod tests {
);

let runtime = Runtime::new(ThreadPool::builder().pool_size(10).create().unwrap());
let pool = PagedPool::new_with_candidate_sizes([32]);
let pool = PagedPool::new_with_candidate_sizes([32], MINIMUM_MEM_LIMIT);
let prefetcher_builder = Prefetcher::default_builder(client.clone());
let fs_config = S3FilesystemConfig {
allow_overwrite,
Expand Down
4 changes: 0 additions & 4 deletions mountpoint-s3-fs/src/fs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::time::Duration;
use nix::unistd::{getgid, getuid};

use crate::content_type::ContentTypeDetection;
use crate::mem_limiter::MINIMUM_MEM_LIMIT;
use crate::metablock::WriteMode;
use crate::prefetch::PrefetcherConfig;
use crate::s3::S3Personality;
Expand Down Expand Up @@ -40,8 +39,6 @@ pub struct S3FilesystemConfig {
pub server_side_encryption: ServerSideEncryption,
/// Use additional checksums for uploads
pub use_upload_checksums: bool,
/// Memory limit
pub mem_limit: u64,
/// Prefetcher configuration
pub prefetcher_config: PrefetcherConfig,
/// Content type inference mode for uploaded objects
Expand Down Expand Up @@ -73,7 +70,6 @@ impl Default for S3FilesystemConfig {
server_side_encryption: Default::default(),
use_upload_checksums: true,
content_type_detection: ContentTypeDetection::Disabled,
mem_limit: MINIMUM_MEM_LIMIT,
prefetcher_config: Default::default(),
max_background_fuse_requests: None,
}
Expand Down
1 change: 0 additions & 1 deletion mountpoint-s3-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ pub mod fuse;
pub mod logging;
#[cfg(feature = "manifest")]
pub mod manifest;
pub mod mem_limiter;
pub mod memory;
pub mod metablock;
pub mod metrics;
Expand Down
2 changes: 2 additions & 0 deletions mountpoint-s3-fs/src/memory.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
mod buffers;
mod limiter;
mod pages;
mod pool;
mod stats;

pub use buffers::{PoolBuffer, PoolBufferMut};
pub use limiter::{ActiveRead, ActiveReadGuard, BufferArea, MINIMUM_MEM_LIMIT, effective_total_memory};
pub use pool::PagedPool;
pub use stats::BufferKind;
Loading
Loading