@@ -18,7 +18,7 @@ use std::fmt::{Debug, Formatter};
18
18
use std:: pin:: Pin ;
19
19
use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
20
20
use std:: sync:: { Arc , Weak } ;
21
- use std:: time:: { Duration , SystemTime } ;
21
+ use std:: time:: SystemTime ;
22
22
23
23
use async_lock:: RwLock ;
24
24
use async_trait:: async_trait;
@@ -39,8 +39,7 @@ use nativelink_util::store_trait::{
39
39
StoreDriver , StoreKey , StoreKeyBorrow , StoreOptimizations , UploadSizeInfo ,
40
40
} ;
41
41
use nativelink_util:: { background_spawn, spawn_blocking} ;
42
- use tokio:: io:: { AsyncReadExt , AsyncSeekExt , AsyncWriteExt , SeekFrom } ;
43
- use tokio:: time:: { sleep, timeout, Sleep } ;
42
+ use tokio:: io:: { AsyncReadExt , AsyncWriteExt , Take } ;
44
43
use tokio_stream:: wrappers:: ReadDirStream ;
45
44
use tracing:: { event, Level } ;
46
45
@@ -168,7 +167,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
168
167
fn make_and_open_file (
169
168
block_size : u64 ,
170
169
encoded_file_path : EncodedFilePath ,
171
- ) -> impl Future < Output = Result < ( Self , fs:: ResumeableFileSlot , OsString ) , Error > > + Send
170
+ ) -> impl Future < Output = Result < ( Self , fs:: FileSlot , OsString ) , Error > > + Send
172
171
where
173
172
Self : Sized ;
174
173
@@ -186,7 +185,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
186
185
& self ,
187
186
offset : u64 ,
188
187
length : u64 ,
189
- ) -> impl Future < Output = Result < fs:: ResumeableFileSlot , Error > > + Send ;
188
+ ) -> impl Future < Output = Result < Take < fs:: FileSlot > , Error > > + Send ;
190
189
191
190
/// This function is a safe way to extract the file name of the underlying file. To protect users from
192
191
/// accidentally creating undefined behavior we encourage users to do the logic they need to do with
@@ -231,7 +230,7 @@ impl FileEntry for FileEntryImpl {
231
230
async fn make_and_open_file (
232
231
block_size : u64 ,
233
232
encoded_file_path : EncodedFilePath ,
234
- ) -> Result < ( FileEntryImpl , fs:: ResumeableFileSlot , OsString ) , Error > {
233
+ ) -> Result < ( FileEntryImpl , fs:: FileSlot , OsString ) , Error > {
235
234
let temp_full_path = encoded_file_path. get_file_path ( ) . to_os_string ( ) ;
236
235
let temp_file_result = fs:: create_file ( temp_full_path. clone ( ) )
237
236
. or_else ( |mut err| async {
@@ -276,30 +275,19 @@ impl FileEntry for FileEntryImpl {
276
275
& self . encoded_file_path
277
276
}
278
277
279
- async fn read_file_part (
278
+ fn read_file_part (
280
279
& self ,
281
280
offset : u64 ,
282
281
length : u64 ,
283
- ) -> Result < fs:: ResumeableFileSlot , Error > {
284
- let ( mut file, full_content_path_for_debug_only) = self
285
- . get_file_path_locked ( |full_content_path| async move {
286
- let file = fs:: open_file ( full_content_path. clone ( ) , length)
287
- . await
288
- . err_tip ( || {
289
- format ! ( "Failed to open file in filesystem store {full_content_path:?}" )
290
- } ) ?;
291
- Ok ( ( file, full_content_path) )
292
- } )
293
- . await ?;
294
-
295
- file. as_reader ( )
296
- . await
297
- . err_tip ( || "Could not seek file in read_file_part()" ) ?
298
- . get_mut ( )
299
- . seek ( SeekFrom :: Start ( offset) )
300
- . await
301
- . err_tip ( || format ! ( "Failed to seek file: {full_content_path_for_debug_only:?}" ) ) ?;
302
- Ok ( file)
282
+ ) -> impl Future < Output = Result < Take < fs:: FileSlot > , Error > > + Send {
283
+ self . get_file_path_locked ( move |full_content_path| async move {
284
+ let file = fs:: open_file ( & full_content_path, offset, length)
285
+ . await
286
+ . err_tip ( || {
287
+ format ! ( "Failed to open file in filesystem store {full_content_path:?}" )
288
+ } ) ?;
289
+ Ok ( file)
290
+ } )
303
291
}
304
292
305
293
async fn get_file_path_locked <
@@ -524,6 +512,7 @@ async fn add_files_to_cache<Fe: FileEntry>(
524
512
) ;
525
513
}
526
514
} ;
515
+
527
516
Result :: < ( String , SystemTime , u64 , bool ) , Error > :: Ok ( (
528
517
file_name,
529
518
atime,
@@ -668,19 +657,16 @@ pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
668
657
#[ metric( help = "Size of the configured read buffer size" ) ]
669
658
read_buffer_size : usize ,
670
659
weak_self : Weak < Self > ,
671
- sleep_fn : fn ( Duration ) -> Sleep ,
672
660
rename_fn : fn ( & OsStr , & OsStr ) -> Result < ( ) , std:: io:: Error > ,
673
661
}
674
662
675
663
impl < Fe : FileEntry > FilesystemStore < Fe > {
676
664
pub async fn new ( spec : & FilesystemSpec ) -> Result < Arc < Self > , Error > {
677
- Self :: new_with_timeout_and_rename_fn ( spec, sleep, |from, to| std:: fs:: rename ( from, to) )
678
- . await
665
+ Self :: new_with_timeout_and_rename_fn ( spec, |from, to| std:: fs:: rename ( from, to) ) . await
679
666
}
680
667
681
668
pub async fn new_with_timeout_and_rename_fn (
682
669
spec : & FilesystemSpec ,
683
- sleep_fn : fn ( Duration ) -> Sleep ,
684
670
rename_fn : fn ( & OsStr , & OsStr ) -> Result < ( ) , std:: io:: Error > ,
685
671
) -> Result < Arc < Self > , Error > {
686
672
async fn create_subdirs ( path : & str ) -> Result < ( ) , Error > {
@@ -735,7 +721,6 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
735
721
block_size,
736
722
read_buffer_size,
737
723
weak_self : weak_self. clone ( ) ,
738
- sleep_fn,
739
724
rename_fn,
740
725
} ) )
741
726
}
@@ -754,50 +739,34 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
754
739
async fn update_file < ' a > (
755
740
self : Pin < & ' a Self > ,
756
741
mut entry : Fe ,
757
- mut resumeable_temp_file : fs:: ResumeableFileSlot ,
742
+ mut temp_file : fs:: FileSlot ,
758
743
final_key : StoreKey < ' static > ,
759
744
mut reader : DropCloserReadHalf ,
760
745
) -> Result < ( ) , Error > {
761
746
let mut data_size = 0 ;
762
747
loop {
763
- let Ok ( data_result) = timeout ( fs:: idle_file_descriptor_timeout ( ) , reader. recv ( ) ) . await
764
- else {
765
- // In the event we timeout, we want to close the writing file, to prevent
766
- // the file descriptor left open for long periods of time.
767
- // This is needed because we wrap `fs` so only a fixed number of file
768
- // descriptors may be open at any given time. If we are streaming from
769
- // File -> File, it can cause a deadlock if the Write file is not sending
770
- // data because it is waiting for a file descriotor to open before sending data.
771
- resumeable_temp_file. close_file ( ) . await . err_tip ( || {
772
- "Could not close file due to timeout in FileSystemStore::update_file"
773
- } ) ?;
774
- continue ;
775
- } ;
776
- let mut data = data_result. err_tip ( || "Failed to receive data in filesystem store" ) ?;
748
+ let mut data = reader
749
+ . recv ( )
750
+ . await
751
+ . err_tip ( || "Failed to receive data in filesystem store" ) ?;
777
752
let data_len = data. len ( ) ;
778
753
if data_len == 0 {
779
754
break ; // EOF.
780
755
}
781
- resumeable_temp_file
782
- . as_writer ( )
783
- . await
784
- . err_tip ( || "in filesystem_store::update_file" ) ?
756
+ temp_file
785
757
. write_all_buf ( & mut data)
786
758
. await
787
759
. err_tip ( || "Failed to write data into filesystem store" ) ?;
788
760
data_size += data_len as u64 ;
789
761
}
790
762
791
- resumeable_temp_file
792
- . as_writer ( )
793
- . await
794
- . err_tip ( || "in filesystem_store::update_file" ) ?
763
+ temp_file
795
764
. as_ref ( )
796
765
. sync_all ( )
797
766
. await
798
767
. err_tip ( || "Failed to sync_data in filesystem store" ) ?;
799
768
800
- drop ( resumeable_temp_file ) ;
769
+ drop ( temp_file ) ;
801
770
802
771
* entry. data_size_mut ( ) = data_size;
803
772
self . emplace_file ( final_key, Arc :: new ( entry) ) . await
@@ -942,19 +911,13 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
942
911
async fn update_with_whole_file (
943
912
self : Pin < & Self > ,
944
913
key : StoreKey < ' _ > ,
945
- mut file : fs:: ResumeableFileSlot ,
914
+ path : OsString ,
915
+ file : fs:: FileSlot ,
946
916
upload_size : UploadSizeInfo ,
947
- ) -> Result < Option < fs:: ResumeableFileSlot > , Error > {
948
- let path = file. get_path ( ) . as_os_str ( ) . to_os_string ( ) ;
917
+ ) -> Result < Option < fs:: FileSlot > , Error > {
949
918
let file_size = match upload_size {
950
919
UploadSizeInfo :: ExactSize ( size) => size,
951
920
UploadSizeInfo :: MaxSize ( _) => file
952
- . as_reader ( )
953
- . await
954
- . err_tip ( || {
955
- format ! ( "While getting metadata for {path:?} in update_with_whole_file" )
956
- } ) ?
957
- . get_ref ( )
958
921
. as_ref ( )
959
922
. metadata ( )
960
923
. await
@@ -995,7 +958,6 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
995
958
. err_tip ( || "Failed to send zero EOF in filesystem store get_part" ) ?;
996
959
return Ok ( ( ) ) ;
997
960
}
998
-
999
961
let entry = self . evicting_map . get ( & key) . await . ok_or_else ( || {
1000
962
make_err ! (
1001
963
Code :: NotFound ,
@@ -1004,47 +966,21 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
1004
966
)
1005
967
} ) ?;
1006
968
let read_limit = length. unwrap_or ( u64:: MAX ) ;
1007
- let mut resumeable_temp_file = entry. read_file_part ( offset, read_limit) . await ?;
969
+ let mut temp_file = entry. read_file_part ( offset, read_limit) . await ?;
1008
970
1009
971
loop {
1010
972
let mut buf = BytesMut :: with_capacity ( self . read_buffer_size ) ;
1011
- resumeable_temp_file
1012
- . as_reader ( )
1013
- . await
1014
- . err_tip ( || "In FileSystemStore::get_part()" ) ?
973
+ temp_file
1015
974
. read_buf ( & mut buf)
1016
975
. await
1017
976
. err_tip ( || "Failed to read data in filesystem store" ) ?;
1018
977
if buf. is_empty ( ) {
1019
978
break ; // EOF.
1020
979
}
1021
- // In the event it takes a while to send the data to the client, we want to close the
1022
- // reading file, to prevent the file descriptor left open for long periods of time.
1023
- // Failing to do so might cause deadlocks if the receiver is unable to receive data
1024
- // because it is waiting for a file descriptor to open before receiving data.
1025
- // Using `ResumeableFileSlot` will re-open the file in the event it gets closed on the
1026
- // next iteration.
1027
- let buf_content = buf. freeze ( ) ;
1028
- loop {
1029
- let sleep_fn = ( self . sleep_fn ) ( fs:: idle_file_descriptor_timeout ( ) ) ;
1030
- tokio:: pin!( sleep_fn) ;
1031
- tokio:: select! {
1032
- ( ) = & mut ( sleep_fn) => {
1033
- resumeable_temp_file
1034
- . close_file( )
1035
- . await
1036
- . err_tip( || "Could not close file due to timeout in FileSystemStore::get_part" ) ?;
1037
- }
1038
- res = writer. send( buf_content. clone( ) ) => {
1039
- match res {
1040
- Ok ( ( ) ) => break ,
1041
- Err ( err) => {
1042
- return Err ( err) . err_tip( || "Failed to send chunk in filesystem store get_part" ) ;
1043
- }
1044
- }
1045
- }
1046
- }
1047
- }
980
+ writer
981
+ . send ( buf. freeze ( ) )
982
+ . await
983
+ . err_tip ( || "Failed to send chunk in filesystem store get_part" ) ?;
1048
984
}
1049
985
writer
1050
986
. send_eof ( )
0 commit comments