Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions foyer-storage/src/engine/block/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ impl BlobIndex {
pub fn seal(&mut self) -> IoSliceMut {
(&mut self.bytes[BlobIndex::COUNT_OFFSET..BlobIndex::COUNT_OFFSET + BlobIndex::COUNT_BYTES])
.put_u32(self.count as _);
let checksum = Checksummer::checksum64(&self.bytes[BlobIndex::CHECKSUM_BYTES..]);
let used_size = BlobIndex::INDEX_OFFSET + self.count * BlobEntryIndex::serialized_len();
let checksum = Checksummer::checksum64(&self.bytes[BlobIndex::CHECKSUM_BYTES..used_size]);
(&mut self.bytes[0..BlobIndex::CHECKSUM_BYTES]).put_u64(checksum);
self.bytes.clone()
}
Expand All @@ -135,15 +136,16 @@ pub struct BlobIndexReader;

impl BlobIndexReader {
pub fn read(buf: &[u8]) -> Option<Vec<BlobEntryIndex>> {
let checksum = Checksummer::checksum64(&buf[BlobIndex::CHECKSUM_BYTES..]);
let count =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check if the loaded count is within the maximum count limiation. Or L142 will overflow read buf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I have two more miri reports of UB, and a few other oddities to investigate, but I am close to putting up a PR for miri CI. As these fixes will be delayed, I will ignore the relevant tests under miri, so we can get the miri testing in early.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think foyer also has needs to introduce miri test. Let me add it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As previously promised, #1226

(&buf[BlobIndex::COUNT_OFFSET..BlobIndex::COUNT_OFFSET + BlobIndex::COUNT_BYTES]).get_u32() as usize;
let used_size = BlobIndex::INDEX_OFFSET + count * BlobEntryIndex::serialized_len();
let checksum = Checksummer::checksum64(&buf[BlobIndex::CHECKSUM_BYTES..used_size]);
let expected = (&buf[0..BlobIndex::CHECKSUM_BYTES]).get_u64();
if checksum != expected {
tracing::trace!(checksum, expected, "[blob index reader]: checksum mismatch");
return None;
}
let count =
(&buf[BlobIndex::COUNT_OFFSET..BlobIndex::COUNT_OFFSET + BlobIndex::COUNT_BYTES]).get_u32() as usize;
let indices = buf[BlobIndex::INDEX_OFFSET..BlobIndex::INDEX_OFFSET + count * BlobEntryIndex::serialized_len()]
let indices = buf[BlobIndex::INDEX_OFFSET..used_size]
.chunks_exact(BlobEntryIndex::serialized_len())
.map(BlobEntryIndex::read)
.collect();
Expand Down Expand Up @@ -464,7 +466,7 @@ impl Splitter {
}
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug)]
pub struct BlobPart {
/// Blob offset to the block.
pub blob_block_offset: usize,
Expand All @@ -476,6 +478,24 @@ pub struct BlobPart {
pub indices: Vec<BlobEntryIndex>,
}

impl PartialEq for BlobPart {
fn eq(&self, other: &Self) -> bool {
// Compare the initialized portion of the index buffer based on the number of entries.
// After seal(), bytes[0..used_size] contains the checksum, count, and index entries.
let used_size = BlobIndex::INDEX_OFFSET + self.indices.len() * BlobEntryIndex::serialized_len();
let other_used_size = BlobIndex::INDEX_OFFSET + other.indices.len() * BlobEntryIndex::serialized_len();

self.blob_block_offset == other.blob_block_offset
&& self.part_blob_offset == other.part_blob_offset
&& self.data == other.data
&& self.indices == other.indices
&& used_size == other_used_size
&& self.index[..used_size] == other.index[..other_used_size]
}
}

impl Eq for BlobPart {}

#[derive(Debug, PartialEq, Eq)]
pub struct Block {
pub blob_parts: Vec<BlobPart>,
Expand Down Expand Up @@ -689,7 +709,15 @@ mod tests {
let mut ctx = SplitCtx::new(BLOCK_SIZE, BLOB_INDEX_SIZE);
ctx.current_blob_block_offset = 40 * KB;
ctx.current_part_blob_offset = 16 * KB;
ctx.current_blob_index.count = ctx.current_blob_index.capacity() - 1;
// Fill the index with dummy entries to match the count we're setting
for _ in 0..ctx.current_blob_index.capacity() - 1 {
ctx.current_blob_index.write(&BlobEntryIndex {
hash: u64::MAX,
sequence: u64::MAX,
offset: 0,
len: 0,
});
}

let shared_io_slice = IoSliceMut::new(BATCH_SIZE).into_io_slice();
let batch = Splitter::split(
Expand Down
5 changes: 5 additions & 0 deletions foyer-storage/src/io/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ unsafe impl Sync for Raw {}

impl Raw {
/// Allocate an 4K-aligned [`Raw`] with **AT LEAST** `capacity` bytes.
///
/// # Safety
///
/// The returned buffer contains uninitialized memory. Callers must initialize
/// the memory before reading from it to avoid undefined behavior.
pub fn new(capacity: usize) -> Self {
let capacity = bits::align_up(PAGE, capacity);
let layout = unsafe { Layout::from_size_align_unchecked(capacity, PAGE) };
Expand Down
Loading