Skip to content

Commit 4690a34

Browse files
authored
cli: Fix FUSE kernel cache serving stale directory listings (#317)
When readdirplus populates the kernel dcache with TTL=MAX, the kernel caches entries indefinitely. After unlink/rmdir/rename the parent directory's cached readdirplus result can still contain stale entries on subsequent ls calls. Fix by sending FUSE_NOTIFY_INVAL_ENTRY after unlink, rmdir, and rename. Notifications are dispatched from a background thread to avoid deadlocking the single-threaded session loop. Add integration test covering unlink, rmdir, rename, create, and mkdir visibility in subsequent readdir.
2 parents 1c8d804 + 7578fa8 commit 4690a34

7 files changed

Lines changed: 319 additions & 14 deletions

File tree

cli/src/fuse.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ fn maximize_fd_limit() {
6868
}
6969
}
7070

71-
/// Cache entries never expire - we explicitly invalidate on mutations.
71+
/// Cache entries never expire — we use deferred kernel cache invalidation
72+
/// (via Notifier::inval_entry) after mutations to keep the dcache consistent.
7273
/// This is safe because we are the only writer to the filesystem.
7374
const TTL: Duration = Duration::MAX;
7475

@@ -602,7 +603,7 @@ impl Filesystem for AgentFSFuse {
602603
///
603604
/// Verifies the target is a directory and is empty before removal.
604605
/// Returns `ENOTDIR` if not a directory, `ENOTEMPTY` if not empty.
605-
fn rmdir(&mut self, _req: &Request, parent: u64, name: &OsStr, reply: ReplyEmpty) {
606+
fn rmdir(&mut self, req: &Request, parent: u64, name: &OsStr, reply: ReplyEmpty) {
606607
tracing::debug!("FUSE::rmdir: parent={}, name={:?}", parent, name);
607608

608609
let Some(name_str) = name.to_str() else {
@@ -619,6 +620,7 @@ impl Filesystem for AgentFSFuse {
619620
match result {
620621
Ok(()) => {
621622
reply.ok();
623+
req.deferred_notifier().inval_entry(parent, name);
622624
}
623625
Err(e) => reply.error(error_to_errno(&e)),
624626
}
@@ -772,7 +774,7 @@ impl Filesystem for AgentFSFuse {
772774
/// Removes a file (unlinks it from the directory).
773775
///
774776
/// Gets the file's inode before removal to clean up the path cache.
775-
fn unlink(&mut self, _req: &Request, parent: u64, name: &OsStr, reply: ReplyEmpty) {
777+
fn unlink(&mut self, req: &Request, parent: u64, name: &OsStr, reply: ReplyEmpty) {
776778
tracing::debug!("FUSE::unlink: parent={}, name={:?}", parent, name);
777779

778780
let Some(name_str) = name.to_str() else {
@@ -789,6 +791,7 @@ impl Filesystem for AgentFSFuse {
789791
match result {
790792
Ok(()) => {
791793
reply.ok();
794+
req.deferred_notifier().inval_entry(parent, name);
792795
}
793796
Err(e) => reply.error(error_to_errno(&e)),
794797
}
@@ -799,7 +802,7 @@ impl Filesystem for AgentFSFuse {
799802
/// Moves `name` from `parent` to `newname` under `newparent`.
800803
fn rename(
801804
&mut self,
802-
_req: &Request,
805+
req: &Request,
803806
parent: u64,
804807
name: &OsStr,
805808
newparent: u64,
@@ -841,6 +844,9 @@ impl Filesystem for AgentFSFuse {
841844
match result {
842845
Ok(()) => {
843846
reply.ok();
847+
let dn = req.deferred_notifier();
848+
dn.inval_entry(parent, name);
849+
dn.inval_entry(newparent, newname);
844850
}
845851
Err(e) => reply.error(error_to_errno(&e)),
846852
}

cli/src/fuser/deferred_notify.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use log::debug;
2+
use std::{
3+
ffi::{OsStr, OsString},
4+
sync::mpsc,
5+
};
6+
7+
/// A queued invalidation operation to be flushed by the notify thread.
8+
#[derive(Debug)]
9+
pub enum NotifyOp {
10+
InvalEntry { parent: u64, name: OsString },
11+
}
12+
13+
/// Queues kernel cache invalidation requests for deferred execution.
14+
///
15+
/// FUSE notification writes to /dev/fuse cannot be issued from the session
16+
/// loop thread — even outside callbacks — because the kernel processes
17+
/// FUSE_NOTIFY_INVAL_ENTRY synchronously within the writev() call. That
18+
/// processing can trigger d_invalidate → iput → FUSE_FORGET, which needs
19+
/// the daemon to be reading /dev/fuse. Since the session loop thread is
20+
/// blocked in writev(), it can't read, causing a deadlock.
21+
///
22+
/// DeferredNotifier solves this by sending operations over an mpsc channel
23+
/// to a dedicated background thread that writes to /dev/fuse independently
24+
/// of the session loop.
25+
#[derive(Debug, Clone)]
26+
pub struct DeferredNotifier {
27+
tx: mpsc::Sender<NotifyOp>,
28+
}
29+
30+
impl DeferredNotifier {
31+
pub(crate) fn new(tx: mpsc::Sender<NotifyOp>) -> Self {
32+
Self { tx }
33+
}
34+
35+
pub fn inval_entry(&self, parent: u64, name: &OsStr) {
36+
if let Err(e) = self.tx.send(NotifyOp::InvalEntry {
37+
parent,
38+
name: name.to_os_string(),
39+
}) {
40+
debug!("deferred inval_entry send failed (notify thread gone?): {e}");
41+
}
42+
}
43+
}

cli/src/fuser/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ mod channel;
5151
unexpected_cfgs,
5252
clippy::manual_is_multiple_of
5353
)]
54+
pub(crate) mod deferred_notify;
5455
mod ll;
5556
#[allow(clippy::io_other_error)]
5657
mod mnt;

cli/src/fuser/request.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::convert::TryInto;
1212
use std::path::Path;
1313

1414
use super::channel::ChannelSender;
15+
use super::deferred_notify::DeferredNotifier;
1516
use super::ll::Request as _;
1617
use super::reply::ReplyDirectoryPlus;
1718
use super::reply::{Reply, ReplyDirectory, ReplySender};
@@ -25,6 +26,8 @@ use super::{ll, KernelConfig};
2526
pub struct Request<'a> {
2627
/// Channel sender for sending the reply
2728
ch: ChannelSender,
29+
/// Deferred notifier for enqueueing cache invalidations
30+
deferred: &'a DeferredNotifier,
2831
/// Request raw data
2932
#[allow(unused)]
3033
data: &'a [u8],
@@ -34,7 +37,11 @@ pub struct Request<'a> {
3437

3538
impl<'a> Request<'a> {
3639
/// Create a new request from the given data
37-
pub(crate) fn new(ch: ChannelSender, data: &'a [u8]) -> Option<Request<'a>> {
40+
pub(crate) fn new(
41+
ch: ChannelSender,
42+
deferred: &'a DeferredNotifier,
43+
data: &'a [u8],
44+
) -> Option<Request<'a>> {
3845
let request = match ll::AnyRequest::try_from(data) {
3946
Ok(request) => request,
4047
Err(err) => {
@@ -43,7 +50,16 @@ impl<'a> Request<'a> {
4350
}
4451
};
4552

46-
Some(Self { ch, data, request })
53+
Some(Self {
54+
ch,
55+
deferred,
56+
data,
57+
request,
58+
})
59+
}
60+
61+
pub fn deferred_notifier(&self) -> &DeferredNotifier {
62+
self.deferred
4763
}
4864

4965
/// Dispatch request to the given filesystem.

cli/src/fuser/session.rs

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ use std::path::{Path, PathBuf};
1414
use std::sync::{Arc, Mutex};
1515
use std::thread::{self, JoinHandle};
1616

17+
use std::sync::mpsc;
18+
19+
use super::deferred_notify::{DeferredNotifier, NotifyOp};
1720
use super::ll::fuse_abi as abi;
1821
use super::request::Request;
1922
use super::Filesystem;
@@ -64,6 +67,10 @@ pub struct Session<FS: Filesystem> {
6467
pub(crate) initialized: bool,
6568
/// True if the filesystem was destroyed (destroy operation done)
6669
pub(crate) destroyed: bool,
70+
/// Sender half of the deferred notification queue
71+
notify_tx: Option<mpsc::Sender<NotifyOp>>,
72+
/// Receiver half — moved to the notify thread in run()
73+
notify_rx: Option<mpsc::Receiver<NotifyOp>>,
6774
}
6875

6976
impl<FS: Filesystem> AsFd for Session<FS> {
@@ -109,6 +116,8 @@ impl<FS: Filesystem> Session<FS> {
109116
SessionACL::Owner
110117
};
111118

119+
let (notify_tx, notify_rx) = mpsc::channel();
120+
112121
Ok(Session {
113122
filesystem,
114123
ch,
@@ -119,13 +128,16 @@ impl<FS: Filesystem> Session<FS> {
119128
proto_minor: 0,
120129
initialized: false,
121130
destroyed: false,
131+
notify_tx: Some(notify_tx),
132+
notify_rx: Some(notify_rx),
122133
})
123134
}
124135

125136
/// Wrap an existing /dev/fuse file descriptor. This doesn't mount the
126137
/// filesystem anywhere; that must be done separately.
127138
pub fn from_fd(filesystem: FS, fd: OwnedFd, acl: SessionACL) -> Self {
128139
let ch = Channel::new(Arc::new(fd.into()));
140+
let (notify_tx, notify_rx) = mpsc::channel();
129141
Session {
130142
filesystem,
131143
ch,
@@ -136,6 +148,8 @@ impl<FS: Filesystem> Session<FS> {
136148
proto_minor: 0,
137149
initialized: false,
138150
destroyed: false,
151+
notify_tx: Some(notify_tx),
152+
notify_rx: Some(notify_rx),
139153
}
140154
}
141155

@@ -146,20 +160,43 @@ impl<FS: Filesystem> Session<FS> {
146160
/// # Errors
147161
/// Returns any final error when the session comes to an end.
148162
pub fn run(&mut self) -> io::Result<()> {
163+
let notify_rx = self.notify_rx.take().expect("run() called more than once");
164+
let notifier = self.notifier();
165+
let notify_handle = thread::spawn(move || {
166+
for op in notify_rx {
167+
let res = match op {
168+
NotifyOp::InvalEntry { parent, ref name } => {
169+
notifier.inval_entry(parent, name.as_os_str())
170+
}
171+
};
172+
if let Err(e) = res {
173+
debug!("FUSE notify failed: {e}");
174+
}
175+
}
176+
});
177+
178+
// A single DeferredNotifier shared by all requests in this session,
179+
// avoiding a Sender clone on every FUSE request dispatch.
180+
let deferred =
181+
DeferredNotifier::new(self.notify_tx.as_ref().expect("notify_tx missing").clone());
182+
149183
// Buffer for receiving requests from the kernel. Only one is allocated and
150184
// it is reused immediately after dispatching to conserve memory and allocations.
151185
let mut buffer = vec![0; BUFFER_SIZE];
152186
let buf = aligned_sub_buf(&mut buffer, std::mem::align_of::<abi::fuse_in_header>());
187+
let mut result = Ok(());
153188
loop {
154189
// Read the next request from the given channel to kernel driver
155190
// The kernel driver makes sure that we get exactly one request per read
156191
match self.ch.receive(buf) {
157-
Ok(size) => match Request::new(self.ch.sender(), &buf[..size]) {
158-
// Dispatch request
159-
Some(req) => req.dispatch(self),
160-
// Quit loop on illegal request
161-
None => break,
162-
},
192+
Ok(size) => {
193+
match Request::new(self.ch.sender(), &deferred, &buf[..size]) {
194+
// Dispatch request
195+
Some(req) => req.dispatch(self),
196+
// Quit loop on illegal request
197+
None => break,
198+
}
199+
}
163200
Err(err) => match err.raw_os_error() {
164201
Some(
165202
ENOENT // Operation interrupted. Accordingly to FUSE, this is safe to retry
@@ -168,11 +205,23 @@ impl<FS: Filesystem> Session<FS> {
168205
) => continue,
169206
Some(ENODEV) => break,
170207
// Unhandled error
171-
_ => return Err(err),
208+
_ => {
209+
result = Err(err);
210+
break;
211+
}
172212
},
173213
}
174214
}
175-
Ok(())
215+
216+
// Drop all senders to close the channel, then join the notify thread
217+
// to ensure in-flight invalidations are flushed before returning.
218+
drop(deferred);
219+
self.notify_tx.take();
220+
if let Err(e) = notify_handle.join() {
221+
warn!("notify thread panicked: {e:?}");
222+
}
223+
224+
result
176225
}
177226

178227
/// Unmount the filesystem

cli/tests/all.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ DIR="$(dirname "$0")"
2222
"$DIR/test-mount.sh"
2323
"$DIR/test-overlay-whiteout.sh"
2424
"$DIR/test-overlay-delta-in-base-dir.sh"
25+
"$DIR/test-fuse-cache-invalidation.sh"
2526
"$DIR/test-symlinks.sh" || true # Requires user namespaces (may fail in CI)

0 commit comments

Comments
 (0)