@@ -24,12 +24,13 @@ use std::{
24
24
path:: { Path , PathBuf } ,
25
25
process,
26
26
sync:: { Arc , Mutex , RwLock } ,
27
+ time:: { SystemTime , UNIX_EPOCH } ,
27
28
} ;
28
29
29
30
use arrow_array:: RecordBatch ;
30
31
use arrow_ipc:: writer:: StreamWriter ;
31
32
use arrow_schema:: { Field , Fields , Schema } ;
32
- use chrono:: { NaiveDateTime , Timelike , Utc } ;
33
+ use chrono:: { NaiveDateTime , Timelike } ;
33
34
use derive_more:: { Deref , DerefMut } ;
34
35
use itertools:: Itertools ;
35
36
use parquet:: {
@@ -72,6 +73,14 @@ const ARROW_FILE_EXTENSION: &str = "data.arrows";
72
73
73
74
pub type StreamRef = Arc < Stream > ;
74
75
76
+ /// Gets the unix timestamp for the minute as described by the `SystemTime`
77
+ fn minute_from_system_time ( time : SystemTime ) -> u128 {
78
+ time. duration_since ( UNIX_EPOCH )
79
+ . expect ( "Legitimate time" )
80
+ . as_millis ( )
81
+ / 60000
82
+ }
83
+
75
84
/// All state associated with a single logstream in Parseable.
76
85
pub struct Stream {
77
86
pub stream_name : String ,
@@ -156,8 +165,7 @@ impl Stream {
156
165
hostname. push_str ( id) ;
157
166
}
158
167
let filename = format ! (
159
- "{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}" ,
160
- Utc :: now( ) . format( "%Y%m%dT%H%M" ) ,
168
+ "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}" ,
161
169
parsed_timestamp. date( ) ,
162
170
parsed_timestamp. hour( ) ,
163
171
minute_to_slot( parsed_timestamp. minute( ) , OBJECT_STORE_DATA_GRANULARITY ) . unwrap( ) ,
@@ -192,7 +200,7 @@ impl Stream {
192
200
/// Only includes ones starting from the previous minute
193
201
pub fn arrow_files_grouped_exclude_time (
194
202
& self ,
195
- exclude : NaiveDateTime ,
203
+ exclude : SystemTime ,
196
204
shutdown_signal : bool ,
197
205
) -> HashMap < PathBuf , Vec < PathBuf > > {
198
206
let mut grouped_arrow_file: HashMap < PathBuf , Vec < PathBuf > > = HashMap :: new ( ) ;
@@ -202,12 +210,13 @@ impl Stream {
202
210
// don't keep the ones for the current minute
203
211
if !shutdown_signal {
204
212
arrow_files. retain ( |path| {
205
- !path
206
- . file_name ( )
207
- . unwrap ( )
208
- . to_str ( )
209
- . unwrap ( )
210
- . starts_with ( & exclude. format ( "%Y%m%dT%H%M" ) . to_string ( ) )
213
+ let creation = path
214
+ . metadata ( )
215
+ . expect ( "Arrow file should exist on disk" )
216
+ . created ( )
217
+ . expect ( "Creation time should be accessible" ) ;
218
+ // Compare if creation time is actually from previous minute
219
+ minute_from_system_time ( creation) < minute_from_system_time ( exclude)
211
220
} ) ;
212
221
}
213
222
@@ -429,8 +438,8 @@ impl Stream {
429
438
) -> Result < Option < Schema > , StagingError > {
430
439
let mut schemas = Vec :: new ( ) ;
431
440
432
- let time = chrono :: Utc :: now ( ) . naive_utc ( ) ;
433
- let staging_files = self . arrow_files_grouped_exclude_time ( time , shutdown_signal) ;
441
+ let now = SystemTime :: now ( ) ;
442
+ let staging_files = self . arrow_files_grouped_exclude_time ( now , shutdown_signal) ;
434
443
if staging_files. is_empty ( ) {
435
444
metrics:: STAGING_FILES
436
445
. with_label_values ( & [ & self . stream_name ] )
@@ -757,7 +766,7 @@ mod tests {
757
766
758
767
use arrow_array:: { Int32Array , StringArray , TimestampMillisecondArray } ;
759
768
use arrow_schema:: { DataType , Field , TimeUnit } ;
760
- use chrono:: { NaiveDate , TimeDelta } ;
769
+ use chrono:: { NaiveDate , TimeDelta , Utc } ;
761
770
use temp_dir:: TempDir ;
762
771
use tokio:: time:: sleep;
763
772
@@ -874,8 +883,7 @@ mod tests {
874
883
) ;
875
884
876
885
let expected_path = staging. data_path . join ( format ! (
877
- "{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}" ,
878
- Utc :: now( ) . format( "%Y%m%dT%H%M" ) ,
886
+ "{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}" ,
879
887
parsed_timestamp. date( ) ,
880
888
parsed_timestamp. hour( ) ,
881
889
minute_to_slot( parsed_timestamp. minute( ) , OBJECT_STORE_DATA_GRANULARITY ) . unwrap( ) ,
@@ -909,8 +917,7 @@ mod tests {
909
917
) ;
910
918
911
919
let expected_path = staging. data_path . join ( format ! (
912
- "{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}" ,
913
- Utc :: now( ) . format( "%Y%m%dT%H%M" ) ,
920
+ "{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}" ,
914
921
parsed_timestamp. date( ) ,
915
922
parsed_timestamp. hour( ) ,
916
923
minute_to_slot( parsed_timestamp. minute( ) , OBJECT_STORE_DATA_GRANULARITY ) . unwrap( ) ,
0 commit comments