@@ -11,6 +11,10 @@ use std::path::Path;
1111use std:: path:: PathBuf ;
1212use std:: time:: Duration ;
1313
14+ use edenfs_asserted_states_client:: AssertedStatesClient ;
15+ use edenfs_asserted_states_client:: ContentLockGuard ;
16+ use edenfs_asserted_states_client:: StateChange ;
17+ use edenfs_asserted_states_client:: StateError ;
1418use edenfs_client:: changes_since:: ChangeNotification ;
1519use edenfs_client:: changes_since:: ChangesSinceV2Result ;
1620use edenfs_client:: changes_since:: StateChangeNotification ;
@@ -22,55 +26,23 @@ use edenfs_telemetry::EdenSample;
2226use edenfs_telemetry:: QueueingScubaLogger ;
2327use edenfs_telemetry:: SampleLogger ;
2428use edenfs_telemetry:: create_logger;
25- use fs_err as fs;
2629use futures:: StreamExt ;
2730use futures:: stream;
2831use futures:: stream:: BoxStream ;
2932use itertools:: Itertools ;
3033use lazy_static:: lazy_static;
3134use serde:: Serialize ;
32- use util:: file:: get_umask;
33- use util:: lock:: ContentLock ;
34- use util:: lock:: ContentLockError ;
35- use util:: lock:: PathLock ;
3635use util:: lock:: unsanitize_lock_name;
37- use util:: path:: create_dir_all_with_mode;
38- use util:: path:: dir_mode;
39- use util:: path:: remove_file;
4036
4137lazy_static ! {
4238 pub ( crate ) static ref SCUBA_CLIENT : QueueingScubaLogger =
4339 QueueingScubaLogger :: new( create_logger( "edenfs_client" . to_string( ) ) , 1000 ) ;
4440}
4541
46- const ASSERTED_STATE_DIR : & str = ".edenfs-notifications-state" ;
47-
48- fn ensure_directory ( path : & Path ) -> Result < ( ) > {
49- // Create the directory, if it doesn't exist.
50- match path. try_exists ( ) {
51- Ok ( true ) => { }
52- Ok ( false ) => {
53- create_dir_all_with_mode ( path, dir_mode ( get_umask ( ) ) ) ?;
54- }
55- Err ( err) => return Err ( err. into ( ) ) ,
56- }
57- Ok ( ( ) )
58- }
59-
6042#[ derive( Clone , Debug ) ]
6143pub struct StreamingChangesClient {
62- states_root : PathBuf ,
6344 pub session_id : String ,
64- }
65-
66- #[ derive( thiserror:: Error , Debug ) ]
67- pub enum StateError {
68- #[ error( transparent) ]
69- EdenFsError ( #[ from] EdenFsError ) ,
70- #[ error( "State is already asserted {0}" ) ]
71- StateAlreadyAsserted ( String ) ,
72- #[ error( "{0}" ) ]
73- OtherError ( #[ from] anyhow:: Error ) ,
45+ states_client : AssertedStatesClient ,
7446}
7547
7648pub fn get_streaming_changes_client (
@@ -82,12 +54,10 @@ pub fn get_streaming_changes_client(
8254
8355impl StreamingChangesClient {
8456 pub fn new ( mount_point : & Path , session_id : String ) -> Result < Self > {
85- let states_root = mount_point. join ( ASSERTED_STATE_DIR ) ;
86- ensure_directory ( & states_root) ?;
87-
8857 Ok ( StreamingChangesClient {
89- states_root,
9058 session_id,
59+ states_client : AssertedStatesClient :: new ( mount_point)
60+ . map_err ( |e| EdenFsError :: from ( anyhow:: Error :: from ( e) ) ) ?,
9161 } )
9262 }
9363
@@ -103,53 +73,25 @@ impl StreamingChangesClient {
10373
10474 #[ allow( dead_code) ]
10575 pub fn get_state_path ( & self , state : & str ) -> Result < PathBuf > {
106- let state_path = self . states_root . join ( state ) ;
107- ensure_directory ( & state_path ) ? ;
108- Ok ( state_path )
76+ self . states_client
77+ . get_state_path ( state )
78+ . map_err ( |e| EdenFsError :: from ( anyhow :: Error :: from ( e ) ) )
10979 }
11080
11181 pub fn enter_state ( & self , state : & str ) -> Result < ContentLockGuard , StateError > {
112- // Asserts the named state, in the current mount.
113- // Returns () if the state was successfully asserted, or an StateAlreadyAsserted StateError if the state was already asserted.
114- // Returns other errors if an error occurred while asserting the state.
115- // To exit the state, drop the ContentLockGuard returned by this function either explicitly
116- // or implicitly by letting it go out of scope.
117- let state_path: PathBuf = self
118- . get_state_path ( state)
119- . map_err ( StateError :: EdenFsError ) ?;
120- match try_lock_state ( & state_path, state) {
121- Ok ( lock) => Ok ( lock) ,
122- Err ( ContentLockError :: Contended ( _) ) => {
123- Err ( StateError :: StateAlreadyAsserted ( state. to_string ( ) ) )
124- }
125- Err ( ContentLockError :: Io ( err) ) => Err ( StateError :: EdenFsError ( EdenFsError :: from ( err) ) ) ,
126- }
82+ self . states_client . enter_state ( state)
12783 }
12884
12985 pub fn get_asserted_states ( & self ) -> Result < HashSet < String > > {
130- // Gets a set of all asserted states.
131- // For use in debug CLI. Not intended for end user consumption,
132- // use is_state_asserted() with your list of states instead.
133- let mut asserted_states = HashSet :: new ( ) ;
134- for dir_entry in fs:: read_dir ( & self . states_root ) ? {
135- let entry = dir_entry?;
136- if entry. path ( ) . is_dir ( ) {
137- let state = entry. file_name ( ) . to_string_lossy ( ) . to_string ( ) ;
138- if self . is_state_asserted ( & state) ? {
139- asserted_states. insert ( state) ;
140- }
141- }
142- }
143- Ok ( asserted_states)
86+ self . states_client
87+ . get_asserted_states ( )
88+ . map_err ( |e| EdenFsError :: from ( anyhow:: Error :: from ( e) ) )
14489 }
14590
14691 pub fn is_state_asserted ( & self , state : & str ) -> Result < bool > {
147- let state_path = self . get_state_path ( state) ?;
148- match is_state_locked ( & state_path, state) {
149- Ok ( true ) => Ok ( true ) ,
150- Ok ( false ) => Ok ( false ) ,
151- Err ( err) => Err ( err) ,
152- }
92+ self . states_client
93+ . is_state_asserted ( state)
94+ . map_err ( |e| EdenFsError :: from ( anyhow:: Error :: from ( e) ) )
15395 }
15496
15597 // Takes a list of known states and filters them based on the currently desired states
@@ -377,13 +319,9 @@ impl StreamingChangesClient {
377319 }
378320
379321 fn which_states_asserted ( & self , states : & [ String ] ) -> Result < HashSet < String > > {
380- let mut output = HashSet :: new ( ) ;
381- for state in states {
382- if self . is_state_asserted ( state) ? {
383- output. insert ( state. clone ( ) ) ;
384- }
385- }
386- Ok ( output)
322+ self . states_client
323+ . which_states_asserted ( states)
324+ . map_err ( |e| EdenFsError :: from ( anyhow:: Error :: from ( e) ) )
387325 }
388326
389327 fn to_change_event (
@@ -480,60 +418,6 @@ impl StreamingChangesClient {
480418 }
481419}
482420
483- // As PathLock, but creates an additional file with the .notify extension
484- // to log exit to the journal
485- #[ derive( Debug ) ]
486- pub struct ContentLockGuard ( PathLock ) ;
487-
488- impl Drop for ContentLockGuard {
489- fn drop ( & mut self ) {
490- // Done purely to signal the edenfs journal that the lock is no longer held.
491- let file_path = self . 0 . as_file ( ) . path ( ) . with_extension ( "notify" ) ;
492- match remove_file ( & file_path) {
493- Ok ( _) => { }
494- Err ( e) => tracing:: error!( "Notify file {:?} missing: {:?}" , file_path, e) ,
495- } ;
496- // Release the lock when the internal PathLock is dropped on exit
497- }
498- }
499-
500- pub fn try_guarded_lock (
501- content_lock : & ContentLock ,
502- contents : & [ u8 ] ,
503- ) -> Result < ContentLockGuard , ContentLockError > {
504- let inner_lock = content_lock. try_lock ( contents) ?;
505- // Done purely to signal the edenfs journal that the lock has been acquired.
506- let notify_file_path = inner_lock. as_file ( ) . path ( ) . with_extension ( "notify" ) ;
507- if notify_file_path. exists ( ) {
508- remove_file ( & notify_file_path) ?;
509- }
510- fs:: OpenOptions :: new ( )
511- . write ( true )
512- . create ( true )
513- . open ( inner_lock. as_file ( ) . path ( ) . with_extension ( "notify" ) ) ?;
514- Ok ( ContentLockGuard ( inner_lock) )
515- }
516-
517- #[ allow( dead_code) ]
518- fn try_lock_state ( dir : & Path , name : & str ) -> Result < ContentLockGuard , ContentLockError > {
519- let content_lock = ContentLock :: new_with_name ( dir, name) ;
520- let state_lock = try_guarded_lock ( & content_lock, & [ ] ) ?;
521-
522- Ok ( state_lock)
523- }
524-
525- #[ allow( dead_code) ]
526- fn is_state_locked ( dir : & Path , name : & str ) -> Result < bool > {
527- // Check the lock state, without creating the lock file
528- // If the lock doesn't exist, return false
529- let content_lock = ContentLock :: new_with_name ( dir, name) ;
530- match content_lock. check_lock ( ) {
531- Ok ( ( ) ) => Ok ( false ) ,
532- Err ( ContentLockError :: Contended ( _) ) => Ok ( true ) ,
533- Err ( ContentLockError :: Io ( err) ) => Err ( err. into ( ) ) ,
534- }
535- }
536-
537421pub enum IsStateCurrentlyAsserted {
538422 NotAsserted ,
539423 StateAsserted ,
@@ -546,22 +430,6 @@ struct StreamChangesSinceWithStatesData<'a> {
546430 position : JournalPosition ,
547431}
548432
549- #[ derive( Debug , PartialEq , Serialize ) ]
550- pub enum StateChange {
551- Entered ,
552- Left ,
553- }
554-
555- impl fmt:: Display for StateChange {
556- fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
557- if self == & StateChange :: Entered {
558- write ! ( f, "Entered" )
559- } else {
560- write ! ( f, "Left" )
561- }
562- }
563- }
564-
565433#[ derive( Debug , Serialize , PartialEq ) ]
566434pub struct ChangeEvent {
567435 pub event_type : StateChange ,
@@ -612,8 +480,10 @@ impl ChangeEvents {
612480
613481#[ cfg( test) ]
614482mod tests {
483+ use edenfs_asserted_states_client:: * ;
615484 use edenfs_client:: changes_since:: * ;
616485 use edenfs_client:: types:: Dtype ;
486+ use util:: lock:: ContentLock ;
617487
618488 use crate :: * ;
619489
0 commit comments