-
Notifications
You must be signed in to change notification settings - Fork 241
Add concurrent write-handle limit returning ENOMEM on open() #1831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/memory-limit
Are you sure you want to change the base?
Changes from 5 commits
d9a2542
2b51753
fb1f1f5
bb93276
2a28c84
9b64dc2
cae2f1c
a1bff6d
7595310
6a1ca1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,6 +86,7 @@ Mountpoint emits the following metrics: | |
|
|
||
| | Metric | Type | Dimensions | Description | | ||
| |--------|------|------------|-------------| | ||
| | `fs.write_handle_limit_exceeded` | Counter | | Number of `open()` calls for write rejected because the [concurrent-writers cap](CONFIGURATION.md#maximum-number-of-files-open-for-write) was reached | | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before documenting the new metric, we should add it to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't know about the Removed from docs for now - we can review all new metrics later as you've suggested. |
||
| | `fuse.io_size` | Histogram | `fuse_request` (read, write) | Bytes transferred per FUSE request | | ||
| | `fuse.request_errors` | Counter | `fuse_request` (read, write, etc.) | Number of FUSE request errors | | ||
| | `fuse.request_latency` | Histogram | `fuse_request` (read, write, etc.) | Time to process a FUSE request | | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ use crate::prefetch::{Prefetcher, PrefetcherBuilder}; | |
| use crate::sync::atomic::{AtomicU64, Ordering}; | ||
| use crate::sync::{Arc, AsyncMutex, AsyncRwLock}; | ||
| use crate::upload::{Uploader, UploaderConfig}; | ||
| use crate::write_handle_limiter::WriteHandleLimiter; | ||
|
|
||
| mod config; | ||
| pub use config::{CacheConfig, S3FilesystemConfig}; | ||
|
|
@@ -55,6 +56,7 @@ where | |
| metablock: Arc<dyn Metablock>, | ||
| prefetcher: Prefetcher<Client>, | ||
| uploader: Uploader<Client>, | ||
| write_handle_limiter: Arc<WriteHandleLimiter>, | ||
| next_handle: AtomicU64, | ||
| file_handles: AsyncRwLock<HashMap<u64, Arc<FileHandle<Client>>>>, | ||
| } | ||
|
|
@@ -150,6 +152,11 @@ where | |
| trace!(?config, "new filesystem"); | ||
|
|
||
| let pool = pool.clone(); | ||
| let write_handle_limiter = Arc::new(WriteHandleLimiter::new( | ||
| pool.mem_limit(), | ||
| pool.data_buffer_budget(), | ||
| client.write_part_size(), | ||
| )); | ||
| let prefetcher = prefetch_builder.build(runtime.clone(), pool.clone(), config.prefetcher_config); | ||
| let uploader = Uploader::new( | ||
| client.clone(), | ||
|
|
@@ -167,6 +174,7 @@ where | |
| metablock: Arc::new(metablock), | ||
| prefetcher, | ||
| uploader, | ||
| write_handle_limiter, | ||
| next_handle: AtomicU64::new(1), | ||
| file_handles: AsyncRwLock::new(HashMap::new()), | ||
| } | ||
|
|
@@ -349,13 +357,18 @@ where | |
|
|
||
| let fh = self.next_handle(); // TODO: can we delay obtaining the next handle until we know we are creating a new file handle? | ||
| let write_mode = self.config.write_mode(); | ||
| let new_handle = self.metablock.open_handle(ino, fh, &write_mode, flags).await?; | ||
| let mut new_handle = self | ||
| .metablock | ||
| .open_handle(ino, fh, &write_mode, flags, Some(&self.write_handle_limiter)) | ||
| .await?; | ||
| let write_slot = new_handle.write_slot.take(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why If this was just to "fix" lifetimes, consider instead unpacking or copying the data you will need later.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense, done. |
||
| let state = FileHandleState::new(&new_handle, flags, self).await?; | ||
| let handle = FileHandle { | ||
| ino, | ||
| location: new_handle.lookup.try_into_s3_location()?, | ||
| open_pid: pid, | ||
| state: AsyncMutex::new(state), | ||
| write_slot, | ||
| }; | ||
| debug!(fh, ino, "new {:?} file handle created", new_handle.mode); | ||
| self.file_handles.write().await.insert(fh, Arc::new(handle)); | ||
|
|
@@ -803,6 +816,7 @@ mod tests { | |
| .bucket(bucket.to_string()) | ||
| .enable_backpressure(true) | ||
| .initial_read_window_size(1024 * 1024) | ||
| .part_size(1024 * 1024) | ||
| .build(), | ||
| ); | ||
| // Create "dir1" in the client to avoid creating it locally | ||
|
|
@@ -1090,4 +1104,134 @@ mod tests { | |
| ); | ||
| S3Filesystem::new(client, prefetcher_builder, pool, runtime, superblock, fs_config) | ||
| } | ||
|
|
||
| /// Verifies that the limiter rejects opens for write past the configured cap with `ENOMEM`, | ||
| /// and that releasing a handle re-opens a slot. Uses a deliberately tight `mem_limit` so the | ||
| /// derived cap is small enough to exhaust quickly. | ||
| /// | ||
| /// The MockClient `part_size` is also the value `client.write_part_size()` returns. With | ||
| /// `mem_limit = 256 MiB`, `part_size = 32 MiB`, `additional_mem_reserved = max(128, 32) = 128 MiB`, | ||
| /// the formula gives `(256 - 128) / 32 = 4` concurrent writers. | ||
| #[tokio::test] | ||
| async fn test_open_for_write_returns_enomem_when_cap_exhausted() { | ||
| let test_name = "test_open_for_write_returns_enomem_when_cap_exhausted"; | ||
| let bucket = Bucket::new("bucket").unwrap(); | ||
| let client = MockClient::config() | ||
| .bucket(bucket.to_string()) | ||
| .enable_backpressure(true) | ||
| .initial_read_window_size(1024 * 1024) | ||
| .part_size(32 * 1024 * 1024) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we define this and other values as constants at the top of this function? And move the calculation described in the rustdoc there.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
| .build(); | ||
| client.add_object( | ||
| &format!("dir1/{}1.txt", test_name), | ||
| MockObject::constant(0xa1, 15, ETag::for_tests()), | ||
| ); | ||
|
|
||
| let runtime = Runtime::new(ThreadPool::builder().pool_size(2).create().unwrap()); | ||
| let pool = PagedPool::new_with_candidate_sizes([32 * 1024 * 1024], 256 * 1024 * 1024); | ||
| let prefetcher_builder = Prefetcher::default_builder(client.clone()); | ||
| let fs_config = S3FilesystemConfig { | ||
| allow_overwrite: true, | ||
| ..Default::default() | ||
| }; | ||
| let superblock = Superblock::new( | ||
| client.clone(), | ||
| S3Path::new(bucket, Default::default()), | ||
| SuperblockConfig { | ||
| cache_config: fs_config.cache_config.clone(), | ||
| s3_personality: fs_config.s3_personality, | ||
| }, | ||
| ); | ||
| let fs = S3Filesystem::new(client, prefetcher_builder, pool, runtime, superblock, fs_config); | ||
|
|
||
| // Sanity-check that we computed exactly 4 writer slots given the test's tuning. | ||
| let cap = fs.write_handle_limiter.max_concurrent_writes(); | ||
| assert_eq!(cap, 4); | ||
|
|
||
| // Resolve the directory inode for mknod calls below. | ||
| let dir_entry = fs.lookup(FUSE_ROOT_INODE, "dir1".as_ref()).await.unwrap(); | ||
| let read_dir_ino = dir_entry.attr.ino; | ||
|
|
||
| // Create more files than the write cap, then prove the cap holds. | ||
| let mut files = Vec::new(); | ||
| for i in 0..(cap + 1) { | ||
| let dentry = fs | ||
| .mknod( | ||
| read_dir_ino, | ||
| format!("file{i}.bin").as_ref(), | ||
| libc::S_IFREG | libc::S_IRWXU, | ||
| 0, | ||
| 0, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
| files.push(dentry); | ||
| } | ||
|
|
||
| // Open up to the cap: all should succeed. | ||
| let mut open_handles = Vec::new(); | ||
| for dentry in files.iter().take(cap) { | ||
| let opened = fs | ||
| .open(dentry.attr.ino, OpenFlags::O_WRONLY, 0) | ||
| .await | ||
| .expect("open within cap should succeed"); | ||
| open_handles.push(opened); | ||
| } | ||
|
|
||
| // The next open exceeds the cap → ENOMEM with the expected message. | ||
| let err = fs | ||
| .open(files[cap].attr.ino, OpenFlags::O_WRONLY, 0) | ||
| .await | ||
| .expect_err("opening past the cap should return ENOMEM"); | ||
| assert_eq!(err.errno, libc::ENOMEM); | ||
| let msg = format!("{err}"); | ||
| assert!( | ||
| msg.contains("cannot open file for write"), | ||
| "unexpected error message: {msg}" | ||
| ); | ||
| assert!( | ||
| msg.contains(&cap.to_string()), | ||
| "error message should reference cap of {cap}: {msg}" | ||
| ); | ||
|
|
||
| // Re-opening the rejected file *before* freeing a slot still returns ENOMEM (no inode | ||
| // state was mutated by the rejected open, and the cap is still full). | ||
| let err = fs | ||
| .open(files[cap].attr.ino, OpenFlags::O_WRONLY, 0) | ||
| .await | ||
| .expect_err("re-opening the rejected file while cap is full should still return ENOMEM"); | ||
| assert_eq!(err.errno, libc::ENOMEM); | ||
|
|
||
| // Locks in the fail-fast check order: when the cap is exhausted AND the target file | ||
| // already has an active writer (open_handles[0] is still live for files[0]), the user | ||
| // sees ENOMEM rather than EPERM. The cheap lock-free limiter check runs before the | ||
| // inode-locked conflict check, so cap exhaustion wins. See the commit message for the | ||
| // ordering rationale; flipping this order is a deliberate design change. | ||
| let err = fs | ||
| .open(files[0].attr.ino, OpenFlags::O_WRONLY, 0) | ||
| .await | ||
| .expect_err("opening an already-writing file at cap should return an error"); | ||
| assert_eq!( | ||
| err.errno, | ||
| libc::ENOMEM, | ||
| "limiter check should fire before inode-conflict check (got errno {})", | ||
| err.errno | ||
| ); | ||
|
|
||
| // Closing one of the open handles releases a slot. | ||
| fs.flush(files[0].attr.ino, open_handles[0].fh, 0, 0) | ||
| .await | ||
| .expect("flush should succeed"); | ||
| fs.release(files[0].attr.ino, open_handles[0].fh, 0, None, true) | ||
| .await | ||
| .expect("release should succeed"); | ||
|
|
||
| // The rejected file can now be opened cleanly. This validates that the ENOMEM rejection | ||
| // didn't leave the inode in `LocalOpenForWriting` — the metablock acquires the slot | ||
| // before mutating any state, so a rejection is fully reversible. | ||
| let _opened_retry = fs | ||
| .open(files[cap].attr.ino, OpenFlags::O_WRONLY, 0) | ||
| .await | ||
| .expect("retrying the previously-rejected file should succeed after a slot is freed"); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ use crate::object::ObjectId; | |
| use crate::prefetch::PrefetchGetObject; | ||
| use crate::sync::{Arc, AsyncMutex}; | ||
| use crate::upload::{AppendUploadRequest, UploadRequest}; | ||
| use crate::write_handle_limiter::WriteHandleSlot; | ||
|
|
||
| use super::{Error, InodeNo, OpenFlags, S3Filesystem, ToErrno}; | ||
|
|
||
|
|
@@ -23,6 +24,11 @@ where | |
| pub state: AsyncMutex<FileHandleState<Client>>, | ||
| /// Process that created the handle | ||
| pub open_pid: u32, | ||
| /// Slot reserved on the [`MemoryLimiter`] for this handle. `Some` for write handles, `None` | ||
| /// for read handles. Released automatically when the `FileHandle` is dropped — held purely | ||
| /// for that `Drop` side effect, so the field is never read directly. | ||
| #[expect(dead_code, reason = "held for its Drop side effect")] | ||
| pub(super) write_slot: Option<WriteHandleSlot>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, prefer
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good suggestion - done. |
||
| } | ||
|
|
||
| impl<Client> FileHandle<Client> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ pub mod s3; | |
| mod superblock; | ||
| mod sync; | ||
| pub mod upload; | ||
| pub mod write_handle_limiter; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider moving under
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
|
||
| pub use async_util::Runtime; | ||
| pub use config::MountpointConfig; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we say "to control memory usage" instead of oom crashes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should start mentioning what's the default for memory target (and thus max writes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Added mentioning of the default value.