Skip to content

Commit 26e89c2

Browse files
committed
cli: Fix FUSE kernel cache serving stale directory listings
When readdirplus populates the kernel dcache with TTL=MAX, the kernel caches those entries indefinitely. After unlink/rmdir/ rename the kernel drops the specific entry it operated on, but the parent directory's cached readdirplus result can still contain stale entries on subsequent ls calls. This was most visible in overlay mode: after whiteout-deleting a base file, ls still showed the file because the kernel served the cached readdirplus result without re-querying userspace. Fix by sending Notifier::inval_entry to the kernel after unlink, rmdir, and rename so the dcache drops the affected entry. These notifications cannot be issued from inside FUSE callbacks because the kernel may synchronously respond with FUSE_FORGET, which the single-threaded session loop cannot process until the current callback returns -- deadlocking the mount. Instead, callbacks enqueue invalidation requests into a DeferredNotifier (mpsc channel) that a background thread drains and writes to /dev/fuse. The session loop remains free to read FUSE_FORGET while the notification thread writes. Add test-fuse-cache-invalidation.sh covering unlink, rmdir, rename, create, and mkdir visibility in subsequent readdir.
1 parent fef7b6a commit 26e89c2

7 files changed

Lines changed: 315 additions & 14 deletions

File tree

cli/src/fuse.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ fn maximize_fd_limit() {
6868
}
6969
}
7070

71-
/// Cache entries never expire - we explicitly invalidate on mutations.
71+
/// Cache entries never expire — we use deferred kernel cache invalidation
72+
/// (via Notifier::inval_entry) after mutations to keep the dcache consistent.
7273
/// This is safe because we are the only writer to the filesystem.
7374
const TTL: Duration = Duration::MAX;
7475

@@ -602,7 +603,7 @@ impl Filesystem for AgentFSFuse {
602603
///
603604
/// Verifies the target is a directory and is empty before removal.
604605
/// Returns `ENOTDIR` if not a directory, `ENOTEMPTY` if not empty.
605-
fn rmdir(&mut self, _req: &Request, parent: u64, name: &OsStr, reply: ReplyEmpty) {
606+
fn rmdir(&mut self, req: &Request, parent: u64, name: &OsStr, reply: ReplyEmpty) {
606607
tracing::debug!("FUSE::rmdir: parent={}, name={:?}", parent, name);
607608

608609
let Some(name_str) = name.to_str() else {
@@ -619,6 +620,7 @@ impl Filesystem for AgentFSFuse {
619620
match result {
620621
Ok(()) => {
621622
reply.ok();
623+
req.deferred_notifier().inval_entry(parent, name);
622624
}
623625
Err(e) => reply.error(error_to_errno(&e)),
624626
}
@@ -772,7 +774,7 @@ impl Filesystem for AgentFSFuse {
772774
/// Removes a file (unlinks it from the directory).
773775
///
774776
/// Gets the file's inode before removal to clean up the path cache.
775-
fn unlink(&mut self, _req: &Request, parent: u64, name: &OsStr, reply: ReplyEmpty) {
777+
fn unlink(&mut self, req: &Request, parent: u64, name: &OsStr, reply: ReplyEmpty) {
776778
tracing::debug!("FUSE::unlink: parent={}, name={:?}", parent, name);
777779

778780
let Some(name_str) = name.to_str() else {
@@ -789,6 +791,7 @@ impl Filesystem for AgentFSFuse {
789791
match result {
790792
Ok(()) => {
791793
reply.ok();
794+
req.deferred_notifier().inval_entry(parent, name);
792795
}
793796
Err(e) => reply.error(error_to_errno(&e)),
794797
}
@@ -799,7 +802,7 @@ impl Filesystem for AgentFSFuse {
799802
/// Moves `name` from `parent` to `newname` under `newparent`.
800803
fn rename(
801804
&mut self,
802-
_req: &Request,
805+
req: &Request,
803806
parent: u64,
804807
name: &OsStr,
805808
newparent: u64,
@@ -841,6 +844,9 @@ impl Filesystem for AgentFSFuse {
841844
match result {
842845
Ok(()) => {
843846
reply.ok();
847+
let dn = req.deferred_notifier();
848+
dn.inval_entry(parent, name);
849+
dn.inval_entry(newparent, newname);
844850
}
845851
Err(e) => reply.error(error_to_errno(&e)),
846852
}

cli/src/fuser/deferred_notify.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use log::debug;
2+
use std::{
3+
ffi::{OsStr, OsString},
4+
sync::mpsc,
5+
};
6+
7+
/// A queued invalidation operation to be flushed between FUSE callbacks.
8+
#[derive(Debug)]
9+
pub enum NotifyOp {
10+
InvalEntry { parent: u64, name: OsString },
11+
}
12+
13+
/// Queues kernel cache invalidation requests for deferred execution.
14+
///
15+
/// FUSE notification writes to /dev/fuse cannot be issued from within
16+
/// filesystem callbacks because the kernel may respond with FUSE_FORGET,
17+
/// which the single-threaded session loop cannot process until the current
18+
/// callback returns — causing a deadlock. DeferredNotifier solves this by
19+
/// enqueueing operations that a dedicated background thread flushes
20+
/// asynchronously.
21+
#[derive(Debug, Clone)]
22+
pub struct DeferredNotifier {
23+
tx: mpsc::Sender<NotifyOp>,
24+
}
25+
26+
impl DeferredNotifier {
27+
pub(crate) fn new(tx: mpsc::Sender<NotifyOp>) -> Self {
28+
Self { tx }
29+
}
30+
31+
pub fn inval_entry(&self, parent: u64, name: &OsStr) {
32+
if let Err(e) = self.tx.send(NotifyOp::InvalEntry {
33+
parent,
34+
name: name.to_os_string(),
35+
}) {
36+
debug!("deferred inval_entry send failed (notify thread gone?): {e}");
37+
}
38+
}
39+
}

cli/src/fuser/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ mod channel;
5151
unexpected_cfgs,
5252
clippy::manual_is_multiple_of
5353
)]
54+
pub(crate) mod deferred_notify;
5455
mod ll;
5556
#[allow(clippy::io_other_error)]
5657
mod mnt;

cli/src/fuser/request.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::convert::TryInto;
1212
use std::path::Path;
1313

1414
use super::channel::ChannelSender;
15+
use super::deferred_notify::DeferredNotifier;
1516
use super::ll::Request as _;
1617
use super::reply::ReplyDirectoryPlus;
1718
use super::reply::{Reply, ReplyDirectory, ReplySender};
@@ -25,6 +26,8 @@ use super::{ll, KernelConfig};
2526
pub struct Request<'a> {
2627
/// Channel sender for sending the reply
2728
ch: ChannelSender,
29+
/// Deferred notifier for enqueueing cache invalidations
30+
deferred: &'a DeferredNotifier,
2831
/// Request raw data
2932
#[allow(unused)]
3033
data: &'a [u8],
@@ -34,7 +37,11 @@ pub struct Request<'a> {
3437

3538
impl<'a> Request<'a> {
3639
/// Create a new request from the given data
37-
pub(crate) fn new(ch: ChannelSender, data: &'a [u8]) -> Option<Request<'a>> {
40+
pub(crate) fn new(
41+
ch: ChannelSender,
42+
deferred: &'a DeferredNotifier,
43+
data: &'a [u8],
44+
) -> Option<Request<'a>> {
3845
let request = match ll::AnyRequest::try_from(data) {
3946
Ok(request) => request,
4047
Err(err) => {
@@ -43,7 +50,16 @@ impl<'a> Request<'a> {
4350
}
4451
};
4552

46-
Some(Self { ch, data, request })
53+
Some(Self {
54+
ch,
55+
deferred,
56+
data,
57+
request,
58+
})
59+
}
60+
61+
pub fn deferred_notifier(&self) -> &DeferredNotifier {
62+
self.deferred
4763
}
4864

4965
/// Dispatch request to the given filesystem.

cli/src/fuser/session.rs

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ use std::path::{Path, PathBuf};
1414
use std::sync::{Arc, Mutex};
1515
use std::thread::{self, JoinHandle};
1616

17+
use std::sync::mpsc;
18+
19+
use super::deferred_notify::{DeferredNotifier, NotifyOp};
1720
use super::ll::fuse_abi as abi;
1821
use super::request::Request;
1922
use super::Filesystem;
@@ -64,6 +67,10 @@ pub struct Session<FS: Filesystem> {
6467
pub(crate) initialized: bool,
6568
/// True if the filesystem was destroyed (destroy operation done)
6669
pub(crate) destroyed: bool,
70+
/// Sender half of the deferred notification queue
71+
notify_tx: Option<mpsc::Sender<NotifyOp>>,
72+
/// Receiver half — moved to the notify thread in run()
73+
notify_rx: Option<mpsc::Receiver<NotifyOp>>,
6774
}
6875

6976
impl<FS: Filesystem> AsFd for Session<FS> {
@@ -109,6 +116,8 @@ impl<FS: Filesystem> Session<FS> {
109116
SessionACL::Owner
110117
};
111118

119+
let (notify_tx, notify_rx) = mpsc::channel();
120+
112121
Ok(Session {
113122
filesystem,
114123
ch,
@@ -119,13 +128,16 @@ impl<FS: Filesystem> Session<FS> {
119128
proto_minor: 0,
120129
initialized: false,
121130
destroyed: false,
131+
notify_tx: Some(notify_tx),
132+
notify_rx: Some(notify_rx),
122133
})
123134
}
124135

125136
/// Wrap an existing /dev/fuse file descriptor. This doesn't mount the
126137
/// filesystem anywhere; that must be done separately.
127138
pub fn from_fd(filesystem: FS, fd: OwnedFd, acl: SessionACL) -> Self {
128139
let ch = Channel::new(Arc::new(fd.into()));
140+
let (notify_tx, notify_rx) = mpsc::channel();
129141
Session {
130142
filesystem,
131143
ch,
@@ -136,6 +148,8 @@ impl<FS: Filesystem> Session<FS> {
136148
proto_minor: 0,
137149
initialized: false,
138150
destroyed: false,
151+
notify_tx: Some(notify_tx),
152+
notify_rx: Some(notify_rx),
139153
}
140154
}
141155

@@ -146,20 +160,43 @@ impl<FS: Filesystem> Session<FS> {
146160
/// # Errors
147161
/// Returns any final error when the session comes to an end.
148162
pub fn run(&mut self) -> io::Result<()> {
163+
let notify_rx = self.notify_rx.take().expect("run() called more than once");
164+
let notifier = self.notifier();
165+
let notify_handle = thread::spawn(move || {
166+
for op in notify_rx {
167+
let res = match op {
168+
NotifyOp::InvalEntry { parent, ref name } => {
169+
notifier.inval_entry(parent, name.as_os_str())
170+
}
171+
};
172+
if let Err(e) = res {
173+
debug!("FUSE notify failed: {e}");
174+
}
175+
}
176+
});
177+
178+
// A single DeferredNotifier shared by all requests in this session,
179+
// avoiding a Sender clone on every FUSE request dispatch.
180+
let deferred =
181+
DeferredNotifier::new(self.notify_tx.as_ref().expect("notify_tx missing").clone());
182+
149183
// Buffer for receiving requests from the kernel. Only one is allocated and
150184
// it is reused immediately after dispatching to conserve memory and allocations.
151185
let mut buffer = vec![0; BUFFER_SIZE];
152186
let buf = aligned_sub_buf(&mut buffer, std::mem::align_of::<abi::fuse_in_header>());
187+
let mut result = Ok(());
153188
loop {
154189
// Read the next request from the given channel to kernel driver
155190
// The kernel driver makes sure that we get exactly one request per read
156191
match self.ch.receive(buf) {
157-
Ok(size) => match Request::new(self.ch.sender(), &buf[..size]) {
158-
// Dispatch request
159-
Some(req) => req.dispatch(self),
160-
// Quit loop on illegal request
161-
None => break,
162-
},
192+
Ok(size) => {
193+
match Request::new(self.ch.sender(), &deferred, &buf[..size]) {
194+
// Dispatch request
195+
Some(req) => req.dispatch(self),
196+
// Quit loop on illegal request
197+
None => break,
198+
}
199+
}
163200
Err(err) => match err.raw_os_error() {
164201
Some(
165202
ENOENT // Operation interrupted. Accordingly to FUSE, this is safe to retry
@@ -168,11 +205,23 @@ impl<FS: Filesystem> Session<FS> {
168205
) => continue,
169206
Some(ENODEV) => break,
170207
// Unhandled error
171-
_ => return Err(err),
208+
_ => {
209+
result = Err(err);
210+
break;
211+
}
172212
},
173213
}
174214
}
175-
Ok(())
215+
216+
// Drop all senders to close the channel, then join the notify thread
217+
// to ensure in-flight invalidations are flushed before returning.
218+
drop(deferred);
219+
self.notify_tx.take();
220+
if let Err(e) = notify_handle.join() {
221+
warn!("notify thread panicked: {e:?}");
222+
}
223+
224+
result
176225
}
177226

178227
/// Unmount the filesystem

cli/tests/all.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ DIR="$(dirname "$0")"
2222
"$DIR/test-mount.sh"
2323
"$DIR/test-overlay-whiteout.sh"
2424
"$DIR/test-overlay-delta-in-base-dir.sh"
25+
"$DIR/test-fuse-cache-invalidation.sh"
2526
"$DIR/test-symlinks.sh" || true # Requires user namespaces (may fail in CI)

0 commit comments

Comments
 (0)