Skip to content

Commit 1b4ea73

Browse files
authored
fix: ensure panic safety (#1212)
1 parent f209de6 commit 1b4ea73

File tree

4 files changed

+44
-30
lines changed

4 files changed

+44
-30
lines changed

src/parseable/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ use crate::{
5858
mod staging;
5959
mod streams;
6060

61+
/// File extension for arrow files in staging
62+
const ARROW_FILE_EXTENSION: &str = "arrows";
63+
6164
/// Name of a Stream
6265
/// NOTE: this used to be a struct, flattened out for simplicity
6366
pub type LogStream = String;

src/parseable/staging/reader.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader};
3030
use arrow_schema::Schema;
3131
use byteorder::{LittleEndian, ReadBytesExt};
3232
use itertools::kmerge_by;
33-
use tracing::error;
33+
use tracing::{error, warn};
3434

3535
use crate::{
3636
event::DEFAULT_TIMESTAMP_KEY,
@@ -82,14 +82,22 @@ pub struct MergedReverseRecordReader {
8282
}
8383

8484
impl MergedReverseRecordReader {
85-
pub fn try_new(files: &[PathBuf]) -> Self {
86-
let mut readers = Vec::with_capacity(files.len());
87-
for file in files {
88-
let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else {
89-
error!("Invalid file detected, ignoring it: {:?}", file);
85+
pub fn try_new(file_paths: &[PathBuf]) -> Self {
86+
let mut readers = Vec::with_capacity(file_paths.len());
87+
for path in file_paths {
88+
let Ok(file) = File::open(path) else {
89+
warn!("Error when trying to read file: {path:?}");
9090
continue;
9191
};
9292

93+
let reader = match get_reverse_reader(file) {
94+
Ok(r) => r,
95+
Err(err) => {
96+
error!("Invalid file detected, ignoring it: {path:?}; error = {err}");
97+
continue;
98+
}
99+
};
100+
93101
readers.push(reader);
94102
}
95103

@@ -255,7 +263,7 @@ pub fn get_reverse_reader<T: Read + Seek>(
255263
messages.push((header, offset, size));
256264
offset += size;
257265
}
258-
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
266+
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof && !messages.is_empty() => break,
259267
Err(err) => return Err(err),
260268
}
261269
}

src/parseable/streams.rs

+21-14
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,13 @@ use super::{
6262
writer::Writer,
6363
StagingError,
6464
},
65-
LogStream,
65+
LogStream, ARROW_FILE_EXTENSION,
6666
};
6767

6868
#[derive(Debug, thiserror::Error)]
6969
#[error("Stream not found: {0}")]
7070
pub struct StreamNotFound(pub String);
7171

72-
const ARROW_FILE_EXTENSION: &str = "data.arrows";
73-
7472
pub type StreamRef = Arc<Stream>;
7573

7674
/// Gets the unix timestamp for the minute as described by the `SystemTime`
@@ -165,7 +163,7 @@ impl Stream {
165163
hostname.push_str(id);
166164
}
167165
let filename = format!(
168-
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
166+
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}",
169167
parsed_timestamp.date(),
170168
parsed_timestamp.hour(),
171169
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
@@ -495,10 +493,12 @@ impl Stream {
495493
}
496494
writer.close()?;
497495

498-
if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 {
496+
if part_file.metadata().expect("File was just created").len()
497+
< parquet::file::FOOTER_SIZE as u64
498+
{
499499
error!(
500-
"Invalid parquet file {:?} detected for stream {}, removing it",
501-
&part_path, &self.stream_name
500+
"Invalid parquet file {part_path:?} detected for stream {}, removing it",
501+
&self.stream_name
502502
);
503503
remove_file(part_path).unwrap();
504504
} else {
@@ -510,15 +510,22 @@ impl Stream {
510510
}
511511

512512
for file in arrow_files {
513-
// warn!("file-\n{file:?}\n");
514-
let file_size = file.metadata().unwrap().len();
515-
let file_type = file.extension().unwrap().to_str().unwrap();
516-
if remove_file(file.clone()).is_err() {
513+
let file_size = match file.metadata() {
514+
Ok(meta) => meta.len(),
515+
Err(err) => {
516+
warn!(
517+
"File ({}) not found; Error = {err}",
518+
file.display()
519+
);
520+
continue;
521+
}
522+
};
523+
if remove_file(&file).is_err() {
517524
error!("Failed to delete file. Unstable state");
518525
process::abort()
519526
}
520527
metrics::STORAGE_SIZE
521-
.with_label_values(&["staging", &self.stream_name, file_type])
528+
.with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION])
522529
.sub(file_size as i64);
523530
}
524531
}
@@ -883,7 +890,7 @@ mod tests {
883890
);
884891

885892
let expected_path = staging.data_path.join(format!(
886-
"{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}",
893+
"{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}",
887894
parsed_timestamp.date(),
888895
parsed_timestamp.hour(),
889896
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
@@ -917,7 +924,7 @@ mod tests {
917924
);
918925

919926
let expected_path = staging.data_path.join(format!(
920-
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
927+
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}",
921928
parsed_timestamp.date(),
922929
parsed_timestamp.hour(),
923930
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),

src/storage/object_storage.rs

+5-9
Original file line numberDiff line numberDiff line change
@@ -461,16 +461,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
461461
) -> Result<Option<Manifest>, ObjectStorageError> {
462462
let path = manifest_path(path.as_str());
463463
match self.get_object(&path).await {
464-
Ok(bytes) => Ok(Some(
465-
serde_json::from_slice(&bytes).expect("manifest is valid json"),
466-
)),
467-
Err(err) => {
468-
if matches!(err, ObjectStorageError::NoSuchKey(_)) {
469-
Ok(None)
470-
} else {
471-
Err(err)
472-
}
464+
Ok(bytes) => {
465+
let manifest = serde_json::from_slice(&bytes)?;
466+
Ok(Some(manifest))
473467
}
468+
Err(ObjectStorageError::NoSuchKey(_)) => Ok(None),
469+
Err(err) => Err(err),
474470
}
475471
}
476472

0 commit comments

Comments
 (0)