Skip to content

Commit d147f48

Browse files
authored
fix: ensure current slot data goes into current slot file and isn't flushed until end of the slot (#1238)
Fixes #1240
1 parent e361ca0 commit d147f48

File tree

5 files changed

+221
-58
lines changed

5 files changed

+221
-58
lines changed

src/parseable/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ mod streams;
6767
/// File extension for arrow files in staging
6868
const ARROW_FILE_EXTENSION: &str = "arrows";
6969

70+
/// File extension for incomplete arrow files
71+
const PART_FILE_EXTENSION: &str = "part";
72+
7073
/// Name of a Stream
7174
/// NOTE: this used to be a struct, flattened out for simplicity
7275
pub type LogStream = String;

src/parseable/staging/reader.rs

+13-7
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,6 @@ fn find_limit_and_type(
322322
#[cfg(test)]
323323
mod tests {
324324
use std::{
325-
fs::File,
326325
io::{self, Cursor, Read},
327326
path::Path,
328327
sync::Arc,
@@ -336,9 +335,17 @@ mod tests {
336335
write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter,
337336
};
338337
use arrow_schema::{DataType, Field, Schema};
338+
use chrono::Utc;
339339
use temp_dir::TempDir;
340340

341-
use crate::parseable::staging::reader::{MergedReverseRecordReader, OffsetReader};
341+
use crate::{
342+
parseable::staging::{
343+
reader::{MergedReverseRecordReader, OffsetReader},
344+
writer::DiskWriter,
345+
},
346+
utils::time::TimeRange,
347+
OBJECT_STORE_DATA_GRANULARITY,
348+
};
342349

343350
use super::get_reverse_reader;
344351

@@ -482,15 +489,14 @@ mod tests {
482489
schema: &Arc<Schema>,
483490
batches: &[RecordBatch],
484491
) -> io::Result<()> {
485-
let file = File::create(path)?;
492+
let range = TimeRange::granularity_range(Utc::now(), OBJECT_STORE_DATA_GRANULARITY);
486493
let mut writer =
487-
StreamWriter::try_new(file, schema).expect("Failed to create StreamWriter");
494+
DiskWriter::try_new(path, schema, range).expect("Failed to create StreamWriter");
488495

489496
for batch in batches {
490497
writer.write(batch).expect("Failed to write batch");
491498
}
492499

493-
writer.finish().expect("Failed to finalize writer");
494500
Ok(())
495501
}
496502

@@ -524,7 +530,7 @@ mod tests {
524530
#[test]
525531
fn test_merged_reverse_record_reader() -> io::Result<()> {
526532
let dir = TempDir::new().unwrap();
527-
let file_path = dir.path().join("test.arrow");
533+
let file_path = dir.path().join("test.data.arrows");
528534

529535
// Create a schema
530536
let schema = Arc::new(Schema::new(vec![
@@ -627,7 +633,7 @@ mod tests {
627633
#[test]
628634
fn test_get_reverse_reader_single_message() -> io::Result<()> {
629635
let dir = TempDir::new().unwrap();
630-
let file_path = dir.path().join("test_single.arrow");
636+
let file_path = dir.path().join("test_single.data.arrows");
631637

632638
// Create a schema
633639
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));

src/parseable/staging/writer.rs

+68-3
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,87 @@
1919

2020
use std::{
2121
collections::{HashMap, HashSet},
22-
fs::File,
22+
fs::{File, OpenOptions},
23+
io::BufWriter,
24+
path::PathBuf,
2325
sync::Arc,
2426
};
2527

2628
use arrow_array::RecordBatch;
2729
use arrow_ipc::writer::StreamWriter;
2830
use arrow_schema::Schema;
2931
use arrow_select::concat::concat_batches;
32+
use chrono::Utc;
3033
use itertools::Itertools;
34+
use tracing::{error, warn};
3135

32-
use crate::utils::arrow::adapt_batch;
36+
use crate::{
37+
parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION},
38+
utils::{arrow::adapt_batch, time::TimeRange},
39+
};
40+
41+
use super::StagingError;
3342

3443
#[derive(Default)]
3544
pub struct Writer {
3645
pub mem: MemWriter<16384>,
37-
pub disk: HashMap<String, StreamWriter<File>>,
46+
pub disk: HashMap<String, DiskWriter>,
47+
}
48+
49+
pub struct DiskWriter {
50+
inner: StreamWriter<BufWriter<File>>,
51+
path: PathBuf,
52+
range: TimeRange,
53+
}
54+
55+
impl DiskWriter {
56+
/// Try to create a file to stream arrows into
57+
pub fn try_new(
58+
path: impl Into<PathBuf>,
59+
schema: &Schema,
60+
range: TimeRange,
61+
) -> Result<Self, StagingError> {
62+
let mut path = path.into();
63+
path.set_extension(PART_FILE_EXTENSION);
64+
let file = OpenOptions::new()
65+
.write(true)
66+
.truncate(true)
67+
.create(true)
68+
.open(&path)?;
69+
let inner = StreamWriter::try_new_buffered(file, schema)?;
70+
71+
Ok(Self { inner, path, range })
72+
}
73+
74+
pub fn is_current(&self) -> bool {
75+
self.range.contains(Utc::now())
76+
}
77+
78+
/// Write a single recordbatch into file
79+
pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> {
80+
self.inner.write(rb).map_err(StagingError::Arrow)
81+
}
82+
}
83+
84+
impl Drop for DiskWriter {
85+
/// Write the continuation bytes and mark the file as done, rename to `.data.arrows`
86+
fn drop(&mut self) {
87+
if let Err(err) = self.inner.finish() {
88+
error!("Couldn't finish arrow file {:?}, error = {err}", self.path);
89+
return;
90+
}
91+
92+
let mut arrow_path = self.path.to_owned();
93+
arrow_path.set_extension(ARROW_FILE_EXTENSION);
94+
95+
if arrow_path.exists() {
96+
warn!("File {arrow_path:?} exists and will be overwritten");
97+
}
98+
99+
if let Err(err) = std::fs::rename(&self.path, &arrow_path) {
100+
error!("Couldn't rename file {:?}, error = {err}", self.path);
101+
}
102+
}
38103
}
39104

40105
/// Structure to keep recordbatches in memory.

src/parseable/streams.rs

+35-47
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@ use std::{
2828
};
2929

3030
use arrow_array::RecordBatch;
31-
use arrow_ipc::writer::StreamWriter;
3231
use arrow_schema::{Field, Fields, Schema};
33-
use chrono::{NaiveDateTime, Timelike};
32+
use chrono::{NaiveDateTime, Timelike, Utc};
3433
use derive_more::{Deref, DerefMut};
3534
use itertools::Itertools;
3635
use parquet::{
@@ -55,14 +54,14 @@ use crate::{
5554
metrics,
5655
option::Mode,
5756
storage::{object_storage::to_bytes, retention::Retention, StreamType},
58-
utils::time::Minute,
57+
utils::time::{Minute, TimeRange},
5958
LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY,
6059
};
6160

6261
use super::{
6362
staging::{
6463
reader::{MergedRecordReader, MergedReverseRecordReader},
65-
writer::Writer,
64+
writer::{DiskWriter, Writer},
6665
StagingError,
6766
},
6867
LogStream, ARROW_FILE_EXTENSION,
@@ -123,29 +122,26 @@ impl Stream {
123122
) -> Result<(), StagingError> {
124123
let mut guard = self.writer.lock().unwrap();
125124
if self.options.mode != Mode::Query || stream_type == StreamType::Internal {
126-
match guard.disk.get_mut(schema_key) {
125+
let filename =
126+
self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values);
127+
match guard.disk.get_mut(&filename) {
127128
Some(writer) => {
128129
writer.write(record)?;
129130
}
130131
None => {
131132
// entry is not present thus we create it
132-
let file_path = self.path_by_current_time(
133-
schema_key,
134-
parsed_timestamp,
135-
custom_partition_values,
136-
);
137133
std::fs::create_dir_all(&self.data_path)?;
138134

139-
let file = OpenOptions::new()
140-
.create(true)
141-
.append(true)
142-
.open(&file_path)?;
143-
144-
let mut writer = StreamWriter::try_new(file, &record.schema())
135+
let range = TimeRange::granularity_range(
136+
parsed_timestamp.and_local_timezone(Utc).unwrap(),
137+
OBJECT_STORE_DATA_GRANULARITY,
138+
);
139+
let file_path = self.data_path.join(&filename);
140+
let mut writer = DiskWriter::try_new(file_path, &record.schema(), range)
145141
.expect("File and RecordBatch both are checked");
146142

147143
writer.write(record)?;
148-
guard.disk.insert(schema_key.to_owned(), writer);
144+
guard.disk.insert(filename, writer);
149145
}
150146
};
151147
}
@@ -155,17 +151,17 @@ impl Stream {
155151
Ok(())
156152
}
157153

158-
pub fn path_by_current_time(
154+
pub fn filename_by_partition(
159155
&self,
160156
stream_hash: &str,
161157
parsed_timestamp: NaiveDateTime,
162158
custom_partition_values: &HashMap<String, String>,
163-
) -> PathBuf {
159+
) -> String {
164160
let mut hostname = hostname::get().unwrap().into_string().unwrap();
165161
if let Some(id) = &self.ingestor_id {
166162
hostname.push_str(id);
167163
}
168-
let filename = format!(
164+
format!(
169165
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}",
170166
parsed_timestamp.date(),
171167
parsed_timestamp.hour(),
@@ -175,8 +171,7 @@ impl Stream {
175171
.sorted_by_key(|v| v.0)
176172
.map(|(key, value)| format!("{key}={value}."))
177173
.join("")
178-
);
179-
self.data_path.join(filename)
174+
)
180175
}
181176

182177
pub fn arrow_files(&self) -> Vec<PathBuf> {
@@ -366,19 +361,12 @@ impl Stream {
366361
self.writer.lock().unwrap().mem.clear();
367362
}
368363

369-
pub fn flush(&self) {
370-
let mut disk_writers = {
371-
let mut writer = self.writer.lock().unwrap();
372-
// Flush memory
373-
writer.mem.clear();
374-
// Take schema -> disk writer mapping
375-
std::mem::take(&mut writer.disk)
376-
};
377-
378-
// Flush disk
379-
for writer in disk_writers.values_mut() {
380-
_ = writer.finish();
381-
}
364+
pub fn flush(&self, forced: bool) {
365+
let mut writer = self.writer.lock().unwrap();
366+
// Flush memory
367+
writer.mem.clear();
368+
// Drop schema -> disk writer mapping, triggers flush to disk
369+
writer.disk.retain(|_, w| !forced && w.is_current());
382370
}
383371

384372
fn parquet_writer_props(
@@ -733,7 +721,7 @@ impl Stream {
733721

734722
/// First flushes arrows onto disk and then converts the arrow into parquet
735723
pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> {
736-
self.flush();
724+
self.flush(shutdown_signal);
737725

738726
self.prepare_parquet(shutdown_signal)
739727
}
@@ -944,18 +932,18 @@ mod tests {
944932
None,
945933
);
946934

947-
let expected_path = staging.data_path.join(format!(
935+
let expected = format!(
948936
"{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}",
949937
parsed_timestamp.date(),
950938
parsed_timestamp.hour(),
951939
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
952940
hostname::get().unwrap().into_string().unwrap()
953-
));
941+
);
954942

955-
let generated_path =
956-
staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values);
943+
let generated =
944+
staging.filename_by_partition(stream_hash, parsed_timestamp, &custom_partition_values);
957945

958-
assert_eq!(generated_path, expected_path);
946+
assert_eq!(generated, expected);
959947
}
960948

961949
#[test]
@@ -978,18 +966,18 @@ mod tests {
978966
None,
979967
);
980968

981-
let expected_path = staging.data_path.join(format!(
969+
let expected = format!(
982970
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}",
983971
parsed_timestamp.date(),
984972
parsed_timestamp.hour(),
985973
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
986974
hostname::get().unwrap().into_string().unwrap()
987-
));
975+
);
988976

989-
let generated_path =
990-
staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values);
977+
let generated =
978+
staging.filename_by_partition(stream_hash, parsed_timestamp, &custom_partition_values);
991979

992-
assert_eq!(generated_path, expected_path);
980+
assert_eq!(generated, expected);
993981
}
994982

995983
#[test]
@@ -1045,7 +1033,7 @@ mod tests {
10451033
StreamType::UserDefined,
10461034
)
10471035
.unwrap();
1048-
staging.flush();
1036+
staging.flush(true);
10491037
}
10501038

10511039
#[test]

0 commit comments

Comments
 (0)