From 05fb1a0242ee3e7f2b9de4bdad89164f9f550ebb Mon Sep 17 00:00:00 2001 From: Hubert Gruszecki Date: Sun, 27 Oct 2024 14:50:14 +0100 Subject: [PATCH] Make Indexes a memory-mapped file --- Cargo.lock | 10 ++ server/Cargo.toml | 1 + server/src/compat/storage_conversion/mod.rs | 6 +- server/src/streaming/segments/index.rs | 100 ++++++++++++++++---- server/src/streaming/segments/messages.rs | 10 +- server/src/streaming/segments/segment.rs | 6 +- server/src/streaming/segments/storage.rs | 13 ++- server/src/streaming/storage.rs | 7 +- 8 files changed, 119 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 09f857b12..669a6b08d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2406,6 +2406,15 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "memmap2" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f" +dependencies = [ + "libc", +] + [[package]] name = "mime" version = "0.3.17" @@ -4021,6 +4030,7 @@ dependencies = [ "iggy", "jsonwebtoken", "log", + "memmap2", "moka", "openssl", "opentelemetry", diff --git a/server/Cargo.toml b/server/Cargo.toml index 3ec5beaf8..d6a83cf49 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -31,6 +31,7 @@ futures = "0.3.30" iggy = { path = "../sdk" } jsonwebtoken = "9.3.0" log = "0.4.20" +memmap2 = "0.9.5" moka = { version = "0.12.5", features = ["future"] } openssl = { version = "0.10.66", features = ["vendored"] } opentelemetry = { version = "0.26.0", features = ["trace", "logs"] } diff --git a/server/src/compat/storage_conversion/mod.rs b/server/src/compat/storage_conversion/mod.rs index 2a4e8d9dc..6136a2bba 100644 --- a/server/src/compat/storage_conversion/mod.rs +++ b/server/src/compat/storage_conversion/mod.rs @@ -8,7 +8,7 @@ use crate::state::State; use crate::streaming::batching::message_batch::RetainedMessageBatch; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; use crate::streaming::persistence::persister::Persister; -use crate::streaming::segments::index::{Index, IndexRange}; +use crate::streaming::segments::index::{Index, IndexRange, Indexes}; use crate::streaming::segments::segment::Segment; use crate::streaming::storage::{ PartitionStorage, SegmentStorage, StreamStorage, SystemInfoStorage, SystemStorage, TopicStorage, @@ -246,8 +246,8 @@ impl SegmentStorage for NoopSegmentStorage { Ok(()) } - async fn load_all_indexes(&self, _segment: &Segment) -> Result, IggyError> { - Ok(vec![]) + async fn load_all_indexes(&self, _segment: &Segment) -> Result { + Ok(Indexes::default()) } async fn load_index_range( diff --git a/server/src/streaming/segments/index.rs b/server/src/streaming/segments/index.rs index 242ec6e49..01c137799 100644 --- a/server/src/streaming/segments/index.rs +++ b/server/src/streaming/segments/index.rs @@ -1,7 +1,75 @@ use crate::streaming::segments::segment::Segment; use iggy::error::IggyError; use iggy::error::IggyError::InvalidOffset; +use memmap2::Mmap; +use std::ops::Deref; +use std::ops::Index as IndexOp; +#[derive(Debug)] +pub enum Indexes { + InMemory(Vec), + MemoryMapped { mmap: Mmap }, +} + +impl From> for Indexes { + fn from(indexes: Vec) -> Self { + Indexes::InMemory(indexes) + } +} + +impl Indexes { + pub fn push(&mut self, index: Index) { + match self { + Indexes::InMemory(vec) => { + vec.push(index); + } + Indexes::MemoryMapped { .. } => { + panic!("Cannot push to memory-mapped indexes"); + } + } + } +} + +impl Deref for Indexes { + type Target = [Index]; + + fn deref(&self) -> &Self::Target { + match self { + Indexes::InMemory(vec) => vec.as_slice(), + Indexes::MemoryMapped { mmap, .. } => { + let bytes = &mmap[..]; + let len = bytes.len() / std::mem::size_of::(); + let ptr = bytes.as_ptr() as *const Index; + unsafe { std::slice::from_raw_parts(ptr, len) } + } + } + } +} + +impl IndexOp for Indexes { + type Output = Index; + + fn index(&self, idx: usize) -> &Self::Output { + &self.deref()[idx] + } +} + +impl Default for Indexes { + fn default() -> Self { + Indexes::InMemory(Vec::new()) + } +} + +impl<'a> IntoIterator for &'a Indexes { + type Item = &'a Index; + type IntoIter = std::slice::Iter<'a, Index>; + + fn into_iter(self) -> Self::IntoIter { + self.deref().iter() + } +} + +#[repr(C)] #[derive(Debug, Eq, Clone, Copy, Default)] pub struct Index { pub offset: u32, @@ -22,12 +90,19 @@ pub struct IndexRange { } impl Segment { + pub fn get_indexes_slice(&self) -> &[Index] { + match &self.indexes { + Some(indexes) => indexes, + None => &[], + } + } + pub fn load_highest_lower_bound_index( &self, - indices: &[Index], start_offset: u32, end_offset: u32, ) -> Result { + let indices = self.get_indexes_slice(); let starting_offset_idx = binary_search_index(indices, start_offset); let ending_offset_idx = binary_search_index(indices, end_offset); @@ -143,16 +218,16 @@ mod tests { timestamp: 5000, }, ]; - segment.indexes.as_mut().unwrap().extend(indexes); + if let Some(Indexes::InMemory(vec)) = segment.indexes.as_mut() { + vec.extend(indexes); + } } #[test] fn should_find_both_indices() { let mut segment = create_segment(); create_test_indices(&mut segment); - let result = segment - .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 15, 45) - .unwrap(); + let result = segment.load_highest_lower_bound_index(15, 45).unwrap(); assert_eq!(result.start.offset, 20); assert_eq!(result.end.offset, 50); @@ -162,16 +237,12 @@ mod tests { fn start_and_end_index_should_be_equal() { let mut segment = create_segment(); create_test_indices(&mut segment); - let result_end_range = segment - .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 65, 100) - .unwrap(); + let result_end_range = segment.load_highest_lower_bound_index(65, 100).unwrap(); assert_eq!(result_end_range.start.offset, 65); assert_eq!(result_end_range.end.offset, 65); - let result_start_range = segment - .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 0, 5) - .unwrap(); + let result_start_range = segment.load_highest_lower_bound_index(0, 5).unwrap(); assert_eq!(result_start_range.start.offset, 5); assert_eq!(result_start_range.end.offset, 5); } @@ -180,9 +251,7 @@ mod tests { fn should_clamp_last_index_when_out_of_range() { let mut segment = create_segment(); create_test_indices(&mut segment); - let result = segment - .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 5, 100) - .unwrap(); + let result = segment.load_highest_lower_bound_index(5, 100).unwrap(); assert_eq!(result.start.offset, 5); assert_eq!(result.end.offset, 65); @@ -193,8 +262,7 @@ mod tests { let mut segment = create_segment(); create_test_indices(&mut segment); - let result = - segment.load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 100, 200); + let result = segment.load_highest_lower_bound_index(100, 200); assert!(result.is_err()); } } diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index e519b77c8..b8bd6b90f 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -117,14 +117,12 @@ impl Segment { return Ok(EMPTY_MESSAGES.into_iter().map(Arc::new).collect()); } - if let Some(indices) = &self.indexes { + if self.indexes.is_some() { let relative_start_offset = (start_offset - self.start_offset) as u32; let relative_end_offset = (end_offset - self.start_offset) as u32; - let index_range = match self.load_highest_lower_bound_index( - indices, - relative_start_offset, - relative_end_offset, - ) { + let index_range = match self + .load_highest_lower_bound_index(relative_start_offset, relative_end_offset) + { Ok(range) => range, Err(_) => { trace!( diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index bcdddd456..c4805f58e 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -1,6 +1,6 @@ +use super::index::Indexes; use crate::configs::system::SystemConfig; use crate::streaming::batching::batch_accumulator::BatchAccumulator; -use crate::streaming::segments::index::Index; use crate::streaming::storage::SystemStorage; use iggy::utils::expiry::IggyExpiry; use iggy::utils::timestamp::IggyTimestamp; @@ -34,7 +34,7 @@ pub struct Segment { pub(crate) message_expiry: IggyExpiry, pub(crate) unsaved_messages: Option, pub(crate) config: Arc, - pub(crate) indexes: Option>, + pub(crate) indexes: Option, pub(crate) storage: Arc, } @@ -74,7 +74,7 @@ impl Segment { _ => message_expiry, }, indexes: match config.segment.cache_indexes { - true => Some(Vec::new()), + true => Some(Indexes::default()), false => None, }, unsaved_messages: None, diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index 3cfd44e1b..27f0901ac 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -14,14 +14,18 @@ use bytes::{BufMut, BytesMut}; use iggy::error::IggyError; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::checksum; +use memmap2::Mmap; use std::io::SeekFrom; use std::path::Path; use std::sync::atomic::Ordering; use std::sync::Arc; +use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, BufReader}; use tracing::{error, info, trace, warn}; -const EMPTY_INDEXES: Vec = vec![]; +use super::index::Indexes; + +const EMPTY_INDEXES: Indexes = Indexes::InMemory(Vec::new()); pub const INDEX_SIZE: u32 = 16; // offset: 4 bytes, position: 4 bytes, timestamp: 8 bytes const BUF_READER_CAPACITY_BYTES: usize = 512 * 1000; @@ -72,6 +76,9 @@ impl SegmentStorage for FileSegmentStorage { if segment.is_full().await { segment.is_closed = true; + let file = Arc::new(File::open(&segment.index_path).await?); + let mmap = unsafe { Mmap::map(&file) }?; + segment.indexes = Some(Indexes::MemoryMapped { mmap }); } let messages_count = segment.get_messages_count(); @@ -267,7 +274,7 @@ impl SegmentStorage for FileSegmentStorage { Ok(()) } - async fn load_all_indexes(&self, segment: &Segment) -> Result, IggyError> { + async fn load_all_indexes(&self, segment: &Segment) -> Result { trace!("Loading indexes from file..."); let file = file::open(&segment.index_path).await?; let file_size = file.metadata().await?.len() as usize; @@ -315,7 +322,7 @@ impl SegmentStorage for FileSegmentStorage { trace!("Loaded {} indexes from file.", indexes_count); - Ok(indexes) + Ok(Indexes::InMemory(indexes)) } async fn load_index_range( diff --git a/server/src/streaming/storage.rs b/server/src/streaming/storage.rs index b8a39330e..e7cd08042 100644 --- a/server/src/streaming/storage.rs +++ b/server/src/streaming/storage.rs @@ -1,4 +1,5 @@ use super::batching::message_batch::RetainedMessageBatch; +use super::segments::index::Indexes; use crate::configs::system::SystemConfig; use crate::state::system::{PartitionState, StreamState, TopicState}; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; @@ -77,7 +78,7 @@ pub trait SegmentStorage: Send + Sync { ) -> Result; async fn load_message_ids(&self, segment: &Segment) -> Result, IggyError>; async fn load_checksums(&self, segment: &Segment) -> Result<(), IggyError>; - async fn load_all_indexes(&self, segment: &Segment) -> Result, IggyError>; + async fn load_all_indexes(&self, segment: &Segment) -> Result; async fn load_index_range( &self, segment: &Segment, @@ -307,8 +308,8 @@ pub(crate) mod tests { Ok(()) } - async fn load_all_indexes(&self, _segment: &Segment) -> Result, IggyError> { - Ok(vec![]) + async fn load_all_indexes(&self, _segment: &Segment) -> Result { + Ok(Indexes::default()) } async fn load_index_range(