Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 65 additions & 10 deletions notify/src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ struct ReadDirectoryRequest {

impl ReadDirectoryRequest {
fn unwatch_raw(&self) {
let _ = self
let result = self
.action_tx
.send(Action::UnwatchRaw(self.data.dir.clone()));
if let Err(e) = result {
tracing::error!(?e, "failed to send UnwatchRaw action");
}
}
}

Expand Down Expand Up @@ -103,7 +106,7 @@ impl ReadDirectoryChangesServer {
let (action_tx, action_rx) = unbounded();
// it is, in fact, ok to send the semaphore across threads
let sem_temp = wakeup_sem as u64;
let _ = thread::Builder::new()
let result = thread::Builder::new()
.name("notify-rs windows loop".to_string())
.spawn({
let tx = action_tx.clone();
Expand All @@ -121,6 +124,9 @@ impl ReadDirectoryChangesServer {
server.run();
}
});
if let Err(e) = result {
tracing::error!(?e, "failed to spawn ReadDirectoryChangesWatcher thread");
}
action_tx
}

Expand All @@ -133,7 +139,10 @@ impl ReadDirectoryChangesServer {
match action {
Action::Watch(path, watch_mode) => {
let res = self.add_watch(path, watch_mode);
let _ = self.cmd_tx.send(res);
let result = self.cmd_tx.send(res);
if let Err(e) = result {
tracing::error!(?e, "failed to send Watch result");
}
}
Action::Unwatch(path) => self.remove_watch(path),
Action::UnwatchRaw(path) => self.remove_watch_raw(path),
Expand Down Expand Up @@ -171,6 +180,7 @@ impl ReadDirectoryChangesServer {
}
}

#[tracing::instrument(level = "trace", skip(self))]
fn add_watch(&mut self, path: PathBuf, watch_mode: WatchMode) -> Result<PathBuf> {
let existing_watch_mode = self.watches.borrow().get(&path).cloned();
if let Some(existing) = existing_watch_mode {
Expand All @@ -184,6 +194,12 @@ impl ReadDirectoryChangesServer {
TargetMode::TrackPath => false,
TargetMode::NoTrack => watch_mode.target_mode == TargetMode::TrackPath,
};
tracing::trace!(
?need_upgrade_to_recursive,
?need_to_watch_parent_newly,
"upgrading existing watch for path: {}",
path.display()
);
if need_to_watch_parent_newly && let Some(parent) = path.parent() {
self.add_watch_raw(parent.to_path_buf(), false, false)?;
}
Expand Down Expand Up @@ -246,6 +262,7 @@ impl ReadDirectoryChangesServer {
Ok(path)
}

#[tracing::instrument(level = "trace", skip(self))]
fn add_watch_raw(
&mut self,
path: PathBuf,
Expand All @@ -255,8 +272,13 @@ impl ReadDirectoryChangesServer {
if let Some((ws, was_recursive)) = self.watch_handles.get(&path) {
let need_upgrade_to_recursive = !*was_recursive && is_recursive;
if !need_upgrade_to_recursive {
tracing::trace!(
"watch handle already exists and no need to upgrade: {}",
path.display()
);
return Ok(());
}
tracing::trace!("upgrading watch handle to recursive: {}", path.display());
stop_watch(ws);
}

Expand Down Expand Up @@ -309,12 +331,14 @@ impl ReadDirectoryChangesServer {
Ok(())
}

#[tracing::instrument(level = "trace", skip(self))]
fn remove_watch(&mut self, path: PathBuf) {
if self.watches.borrow_mut().remove(&path).is_some() {
self.remove_watch_raw(path);
}
}

#[tracing::instrument(level = "trace", skip(self))]
fn remove_watch_raw(&mut self, path: PathBuf) {
if let Some((ws, _)) = self.watch_handles.remove(&path) {
stop_watch(&ws);
Expand Down Expand Up @@ -342,6 +366,7 @@ impl ReadDirectoryChangesServer {
}

fn stop_watch(ws: &WatchState) {
tracing::trace!("removing ReadDirectoryChangesW watch");
unsafe {
let cio = CancelIo(ws.dir_handle);
let ch = CloseHandle(ws.dir_handle);
Expand All @@ -361,6 +386,8 @@ fn start_read(
handle: HANDLE,
action_tx: Sender<Action>,
) {
tracing::trace!("starting ReadDirectoryChangesW watch: {}", rd.dir.display());

let request = Box::new(ReadDirectoryRequest {
event_handler,
handle,
Expand Down Expand Up @@ -431,6 +458,14 @@ unsafe extern "system" fn handle_event(
}
let event_handler = |res| emit_event(&request.event_handler, res);

if error_code != ERROR_SUCCESS {
tracing::trace!(
path = ?request.data.dir,
is_recursive = request.data.is_recursive,
"ReadDirectoryChangesW handle_event called with error code {error_code}",
);
}

match error_code {
ERROR_OPERATION_ABORTED => {
// received when dir is unwatched or watcher is shutdown; return and let overlapped/request get drop-cleaned
Expand All @@ -442,6 +477,11 @@ unsafe extern "system" fn handle_event(
// This could happen when the watched directory is deleted or trashed, first check if it's the case.
// If so, unwatch the directory and return, otherwise, continue to handle the event.
if !dir.exists() {
tracing::debug!(
path = ?request.data.dir,
is_recursive = request.data.is_recursive,
"ReadDirectoryChangesW handle_event: ERROR_ACCESS_DENIED event and directory no longer exists",
);
if request
.data
.watches
Expand Down Expand Up @@ -516,13 +556,16 @@ unsafe extern "system" fn handle_event(
.contains_key(&request.data.dir)
|| request.data.watches.borrow().contains_key(&path));

if !skip {
tracing::trace!(
"Event: path = `{}`, action = {:?}",
path.display(),
cur_entry.Action
);
tracing::trace!(
handle_path = ?request.data.dir,
is_recursive = request.data.is_recursive,
?path,
skip,
action = cur_entry.Action,
"ReadDirectoryChangesW handle_event called",
);

if !skip {
let newe = Event::new(EventKind::Any).add_path(path.clone());

match cur_entry.Action {
Expand Down Expand Up @@ -564,6 +607,11 @@ unsafe extern "system" fn handle_event(
cur_entry = unsafe { ptr::read_unaligned(cur_offset as *const FILE_NOTIFY_INFORMATION) };
}

tracing::trace!(
?remove_paths,
"processing ReadDirectoryChangesW watch changes",
);

for path in remove_paths {
let is_no_track = {
request
Expand Down Expand Up @@ -667,19 +715,23 @@ impl ReadDirectoryChangesWatcher {
}

impl Watcher for ReadDirectoryChangesWatcher {
#[tracing::instrument(level = "debug", skip(event_handler))]
fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> {
let event_handler = Arc::new(Mutex::new(event_handler));
Self::create(event_handler)
}

#[tracing::instrument(level = "debug", skip(self))]
fn watch(&mut self, path: &Path, watch_mode: WatchMode) -> Result<()> {
self.watch_inner(path, watch_mode)
}

#[tracing::instrument(level = "debug", skip(self))]
fn unwatch(&mut self, path: &Path) -> Result<()> {
self.unwatch_inner(path)
}

#[tracing::instrument(level = "debug", skip(self))]
fn configure(&mut self, config: Config) -> Result<bool> {
let (tx, rx) = bounded(1);
self.tx.send(Action::Configure(config, tx))?;
Expand All @@ -700,7 +752,10 @@ impl Watcher for ReadDirectoryChangesWatcher {

impl Drop for ReadDirectoryChangesWatcher {
fn drop(&mut self) {
let _ = self.tx.send(Action::Stop);
let result = self.tx.send(Action::Stop);
if let Err(e) = result {
tracing::error!(?e, "failed to send Stop action");
}
// better wake it up
self.wakeup_server();
}
Expand Down