@@ -148,7 +148,11 @@ impl ReadDirectoryRequest {
148148
149149enum Action {
150150 Watch ( PathBuf , RecursiveMode , SeparatorStyle ) ,
151+ // Internal self-unwatch from the completion callback.
151152 Unwatch ( PathBuf ) ,
153+ // Public `Watcher::unwatch` path. This variant must ack only after `remove_watch` finishes so
154+ // the caller does not observe events after `unwatch()` returns.
155+ UnwatchAck ( PathBuf ) ,
152156 GetWatchedPaths ( Sender < Vec < ( PathBuf , RecursiveMode ) > > ) ,
153157 Stop ,
154158 Configure ( Config , BoundSender < Result < bool > > ) ,
@@ -223,6 +227,10 @@ impl ReadDirectoryChangesServer {
223227 let _ = self . cmd_tx . send ( res) ;
224228 }
225229 Action :: Unwatch ( path) => self . remove_watch ( path) ,
230+ Action :: UnwatchAck ( path) => {
231+ self . remove_watch ( path. clone ( ) ) ;
232+ let _ = self . cmd_tx . send ( Ok ( path) ) ;
233+ }
226234 Action :: GetWatchedPaths ( tx) => {
227235 let _ = tx. send (
228236 self . watches
@@ -710,12 +718,7 @@ impl ReadDirectoryChangesWatcher {
710718 let p = env:: current_dir ( ) . map_err ( Error :: io) ?;
711719 p. join ( path)
712720 } ;
713- let res = self
714- . tx
715- . send ( Action :: Unwatch ( pb) )
716- . map_err ( |_| Error :: generic ( "Error sending to internal channel" ) ) ;
717- self . wakeup_server ( ) ;
718- res
721+ self . send_action_require_ack ( Action :: UnwatchAck ( pb. clone ( ) ) , & pb)
719722 }
720723
721724 fn watched_paths_inner ( & self ) -> Result < Vec < ( PathBuf , RecursiveMode ) > > {
@@ -785,6 +788,9 @@ pub mod tests {
785788 use std:: ffi:: OsString ;
786789 use std:: os:: windows:: ffi:: OsStringExt ;
787790 use std:: path:: { Path , PathBuf } ;
791+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
792+ use std:: sync:: { mpsc, Arc } ;
793+ use std:: thread;
788794 use tempfile:: { tempdir, tempdir_in} ;
789795
790796 use super :: { normalize_path_separators, trim_leading_separators, SeparatorStyle } ;
@@ -1302,4 +1308,88 @@ pub mod tests {
13021308 ] )
13031309 . ensure_no_tail ( ) ;
13041310 }
1311+
1312+ #[ test]
1313+ fn unwatch_waits_for_pending_callback_before_returning ( ) {
1314+ let tmpdir = testdir ( ) ;
1315+ let watched_dir = tmpdir. path ( ) . join ( "watched" ) ;
1316+ std:: fs:: create_dir ( & watched_dir) . expect ( "create watched dir" ) ;
1317+
1318+ let first = watched_dir. join ( "new_dir" ) ;
1319+ let second = watched_dir. join ( "should_not_be_seen" ) ;
1320+ let first_for_handler = first. clone ( ) ;
1321+
1322+ let ( event_tx, event_rx) = mpsc:: channel ( ) ;
1323+ let ( started_tx, started_rx) = mpsc:: channel ( ) ;
1324+ let ( release_tx, release_rx) = mpsc:: channel ( ) ;
1325+ let ( unwatch_done_tx, unwatch_done_rx) = mpsc:: channel ( ) ;
1326+ let ( finish_tx, finish_rx) = mpsc:: channel ( ) ;
1327+ let blocked_once = Arc :: new ( AtomicBool :: new ( false ) ) ;
1328+ let blocked_once_for_handler = blocked_once. clone ( ) ;
1329+
1330+ let mut watcher = ReadDirectoryChangesWatcher :: new (
1331+ move |res : crate :: Result < crate :: Event > | {
1332+ if let Ok ( event) = & res {
1333+ if event. paths . iter ( ) . any ( |path| path == & first_for_handler)
1334+ && !blocked_once_for_handler. swap ( true , Ordering :: SeqCst )
1335+ {
1336+ started_tx. send ( ( ) ) . expect ( "signal callback start" ) ;
1337+ release_rx. recv ( ) . expect ( "release callback" ) ;
1338+ }
1339+ }
1340+
1341+ event_tx. send ( res) . expect ( "forward event" ) ;
1342+ } ,
1343+ crate :: Config :: default ( ) ,
1344+ )
1345+ . expect ( "create watcher" ) ;
1346+ watcher
1347+ . watch ( & watched_dir, RecursiveMode :: NonRecursive )
1348+ . expect ( "watch dir" ) ;
1349+
1350+ std:: fs:: create_dir ( & first) . expect ( "create first dir" ) ;
1351+ started_rx
1352+ . recv_timeout ( Duration :: from_secs ( 5 ) )
1353+ . expect ( "wait for callback to block" ) ;
1354+
1355+ let unwatch_path = watched_dir. clone ( ) ;
1356+ let join = thread:: spawn ( move || {
1357+ let mut watcher = watcher;
1358+ let result = watcher. unwatch ( & unwatch_path) ;
1359+ unwatch_done_tx. send ( result) . expect ( "send unwatch result" ) ;
1360+ finish_rx. recv ( ) . expect ( "finish watcher thread" ) ;
1361+ } ) ;
1362+
1363+ assert ! (
1364+ unwatch_done_rx
1365+ . recv_timeout( Duration :: from_millis( 100 ) )
1366+ . is_err( ) ,
1367+ "unwatch returned before the pending callback finished"
1368+ ) ;
1369+
1370+ release_tx. send ( ( ) ) . expect ( "release callback" ) ;
1371+ unwatch_done_rx
1372+ . recv_timeout ( Duration :: from_secs ( 5 ) )
1373+ . expect ( "wait for unwatch result" )
1374+ . expect ( "unwatch dir" ) ;
1375+
1376+ std:: fs:: create_dir ( & second) . expect ( "create second dir" ) ;
1377+
1378+ let first_event = event_rx
1379+ . recv_timeout ( Duration :: from_secs ( 5 ) )
1380+ . expect ( "receive first event" )
1381+ . expect ( "first event result" ) ;
1382+ assert_eq ! ( first_event, expected( & first) . create_any( ) ) ;
1383+
1384+ while let Ok ( res) = event_rx. recv_timeout ( Duration :: from_millis ( 200 ) ) {
1385+ let event = res. expect ( "event result" ) ;
1386+ assert ! (
1387+ !event. paths. iter( ) . any( |path| path == & second) ,
1388+ "unexpected event after unwatch: {event:#?}"
1389+ ) ;
1390+ }
1391+
1392+ finish_tx. send ( ( ) ) . expect ( "finish watcher thread" ) ;
1393+ join. join ( ) . expect ( "join watcher thread" ) ;
1394+ }
13051395}
0 commit comments