diff --git a/riffle-server/src/app_manager.rs b/riffle-server/src/app_manager.rs index 8ddb0583..9c5d7652 100644 --- a/riffle-server/src/app_manager.rs +++ b/riffle-server/src/app_manager.rs @@ -562,6 +562,7 @@ pub(crate) mod test { capacity: "20B".to_string(), buffer_ticket_timeout_sec: 1, buffer_ticket_check_interval_sec: 1, + buffer_type: Default::default(), }), ); let _ = std::mem::replace( diff --git a/riffle-server/src/config.rs b/riffle-server/src/config.rs index aba3e72f..bf11951d 100644 --- a/riffle-server/src/config.rs +++ b/riffle-server/src/config.rs @@ -16,6 +16,7 @@ // under the License. use crate::block_id_manager::BlockIdManagerType; +use crate::store::mem::buffer::BufferType; use crate::store::ResponseDataIndex::Local; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -31,6 +32,13 @@ pub struct MemoryStoreConfig { pub buffer_ticket_timeout_sec: i64, #[serde(default = "as_default_buffer_ticket_timeout_check_interval_sec")] pub buffer_ticket_check_interval_sec: i64, + + #[serde(default = "as_default_buffer_type")] + pub buffer_type: BufferType, +} + +fn as_default_buffer_type() -> BufferType { + BufferType::DEFAULT } fn as_default_buffer_ticket_timeout_check_interval_sec() -> i64 { @@ -47,6 +55,7 @@ impl MemoryStoreConfig { capacity, buffer_ticket_timeout_sec: as_default_buffer_ticket_timeout_sec(), buffer_ticket_check_interval_sec: as_default_buffer_ticket_timeout_check_interval_sec(), + buffer_type: BufferType::DEFAULT, } } @@ -55,6 +64,7 @@ impl MemoryStoreConfig { capacity, buffer_ticket_timeout_sec, buffer_ticket_check_interval_sec: as_default_buffer_ticket_timeout_check_interval_sec(), + buffer_type: BufferType::DEFAULT, } } } diff --git a/riffle-server/src/store/hybrid.rs b/riffle-server/src/store/hybrid.rs index efc71977..209a1183 100644 --- a/riffle-server/src/store/hybrid.rs +++ b/riffle-server/src/store/hybrid.rs @@ -61,12 +61,15 @@ use crate::app_manager::AppManagerRef; use crate::config_reconfigure::ReconfigurableConfManager; use crate::runtime::manager::RuntimeManager; use crate::store::local::LocalfileStoreStat; -use crate::store::mem::buffer::MemoryBuffer; +use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; +use crate::store::mem::buffer::unified_buffer::UnifiedBuffer; +use crate::store::mem::buffer::{BufferType, MemoryBuffer}; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::spill::hierarchy_event_bus::HierarchyEventBus; use crate::store::spill::storage_flush_handler::StorageFlushHandler; use crate::store::spill::storage_select_handler::StorageSelectHandler; use crate::store::spill::{SpillMessage, SpillWritingViewContext}; +use crate::store::ResponseData::Mem; use tokio::time::Instant; pub trait PersistentStore: Store + Persistent + Send + Sync + Any { @@ -89,7 +92,7 @@ const DEFAULT_MEMORY_SPILL_MAX_CONCURRENCY: i32 = 20; pub struct HybridStore { // Box will build fail - pub(crate) hot_store: Arc, + pub(crate) hot_store: Arc>, pub(crate) warm_store: Option>, pub(crate) cold_store: Option>, @@ -172,11 +175,12 @@ impl HybridStore { } let async_watermark_spill_enable = hybrid_conf.async_watermark_spill_trigger_enable; + // use the unified buffer to delegate the underlying concrete buffer + let mem_store: MemoryStore = + MemoryStore::from(config.memory_store.unwrap(), runtime_manager.clone()); + let store = HybridStore { - hot_store: Arc::new(MemoryStore::from( - config.memory_store.unwrap(), - runtime_manager.clone(), - )), + hot_store: Arc::new(mem_store), warm_store: persistent_stores.pop_front(), cold_store: persistent_stores.pop_front(), config: hybrid_conf, @@ -426,7 +430,7 @@ impl HybridStore { Ok(Default::default()) } - pub async fn get_memory_buffer(&self, uid: &PartitionUId) -> Result> { + pub async fn get_memory_buffer(&self, uid: &PartitionUId) -> Result> { self.hot_store.get_buffer(uid) } @@ -469,7 +473,7 @@ impl HybridStore { async fn buffer_spill_impl( &self, uid: &PartitionUId, - buffer: Arc, + buffer: Arc, ) -> Result { let spill_result = buffer.spill()?; if spill_result.is_none() { diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 24bea6af..4cae1de5 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -1,31 +1,50 @@ -use crate::composed_bytes; +pub mod default_buffer; +pub mod opt_buffer; +pub mod unified_buffer; + use crate::composed_bytes::ComposedBytes; -use crate::constant::INVALID_BLOCK_ID; +use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; +use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; +use crate::store::mem::buffer::BufferType::DEFAULT; use crate::store::DataBytes; use crate::store::{Block, DataSegment, PartitionedMemoryData}; use anyhow::Result; use croaring::Treemap; -use fastrace::trace; use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::hash::Hash; -use std::mem; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -pub struct MemoryBuffer { - buffer: Mutex, +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Copy)] +pub enum BufferType { + // the default memory_buffer type + DEFAULT, + // the experimental memory_buffer type + EXPERIMENTAL, +} + +impl Default for BufferType { + fn default() -> Self { + DEFAULT + } +} + +#[derive(Default)] +pub struct BufferOptions { + pub buffer_type: BufferType, } #[derive(Default, Debug)] -pub struct BatchMemoryBlock(Vec>); -impl Deref for BatchMemoryBlock { +pub struct MemBlockBatch(pub Vec>); +impl Deref for MemBlockBatch { type Target = Vec>; fn deref(&self) -> &Self::Target { &self.0 } } -impl DerefMut for BatchMemoryBlock { +impl DerefMut for MemBlockBatch { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } @@ -33,9 +52,9 @@ impl DerefMut for BatchMemoryBlock { #[derive(Debug)] pub struct BufferSpillResult { - flight_id: u64, - flight_len: u64, - blocks: Arc, + pub flight_id: u64, + pub flight_len: u64, + pub blocks: Arc, } impl BufferSpillResult { @@ -45,228 +64,67 @@ impl BufferSpillResult { pub fn flight_len(&self) -> u64 { self.flight_len } - pub fn blocks(&self) -> Arc { + pub fn blocks(&self) -> Arc { self.blocks.clone() } } -#[derive(Debug)] -pub struct BufferInternal { - total_size: i64, - staging_size: i64, - flight_size: i64, - - staging: BatchMemoryBlock, - - flight: HashMap>, - flight_counter: u64, -} - -impl BufferInternal { - fn new() -> Self { - BufferInternal { - total_size: 0, - staging_size: 0, - flight_size: 0, - staging: Default::default(), - flight: Default::default(), - flight_counter: 0, - } - } -} - -impl MemoryBuffer { - pub fn new() -> MemoryBuffer { - MemoryBuffer { - buffer: Mutex::new(BufferInternal::new()), - } - } - - #[trace] - pub fn total_size(&self) -> Result { - return Ok(self.buffer.lock().total_size); - } - - #[trace] - pub fn flight_size(&self) -> Result { - return Ok(self.buffer.lock().flight_size); - } - - #[trace] - pub fn staging_size(&self) -> Result { - return Ok(self.buffer.lock().staging_size); - } - - #[trace] - pub fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> { - let mut buffer = self.buffer.lock(); - let flight = &mut buffer.flight; - let removed = flight.remove(&flight_id); - if let Some(block_ref) = removed { - buffer.total_size -= flight_size as i64; - buffer.flight_size -= flight_size as i64; - } - Ok(()) - } - - pub fn get( +pub trait MemoryBuffer { + /// Creates a new buffer instance + fn new(options: BufferOptions) -> Self + where + Self: Sized; + + /// Returns the total size of the buffer. + fn total_size(&self) -> Result + where + Self: Send + Sync; + /// Returns the size of data in flight (spilled but not cleared). + fn flight_size(&self) -> Result + where + Self: Send + Sync; + + /// Returns the size of data in staging (not yet spilled). + fn staging_size(&self) -> Result + where + Self: Send + Sync; + + /// Clears a specific flight by ID and size. + fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> + where + Self: Send + Sync; + + /// Reads data starting after last_block_id, up to read_bytes_limit_len. + fn get( &self, last_block_id: i64, read_bytes_limit_len: i64, task_ids: Option, - ) -> Result { - /// read sequence - /// 1. from flight (expect: last_block_id not found or last_block_id == -1) - /// 2. from staging - let buffer = self.buffer.lock(); - - let mut read_result = vec![]; - let mut read_len = 0i64; - let mut flight_found = false; - - let mut exit = false; - while !exit { - exit = true; - { - if last_block_id == INVALID_BLOCK_ID { - flight_found = true; - } - for (_, batch_block) in buffer.flight.iter() { - for blocks in batch_block.iter() { - for block in blocks { - if !flight_found && block.block_id == last_block_id { - flight_found = true; - continue; - } - if !flight_found { - continue; - } - if read_len >= read_bytes_limit_len { - break; - } - if let Some(ref expected_task_id) = task_ids { - if !expected_task_id.contains(block.task_attempt_id as u64) { - continue; - } - } - read_len += block.length as i64; - read_result.push(block); - } - } - } - } - - { - for blocks in buffer.staging.iter() { - for block in blocks { - if !flight_found && block.block_id == last_block_id { - flight_found = true; - continue; - } - if !flight_found { - continue; - } - if read_len >= read_bytes_limit_len { - break; - } - if let Some(ref expected_task_id) = task_ids { - if !expected_task_id.contains(block.task_attempt_id as u64) { - continue; - } - } - read_len += block.length as i64; - read_result.push(block); - } - } - } - - if !flight_found { - flight_found = true; - exit = false; - } - } - - let mut block_bytes = Vec::with_capacity(read_result.len()); - let mut segments = Vec::with_capacity(read_result.len()); - let mut offset = 0; - for block in read_result { - let data = &block.data; - block_bytes.push(data.clone()); - segments.push(DataSegment { - block_id: block.block_id, - offset, - length: block.length, - uncompress_length: block.uncompress_length, - crc: block.crc, - task_attempt_id: block.task_attempt_id, - }); - offset += block.length as i64; - } - let total_bytes = offset as usize; - - // Note: is_end is computed as total_bytes < read_bytes_limit_len. This works in general, - // but it can incorrectly be false in the edge case where total_bytes == read_bytes_limit_len - // and the buffer has no more blocks left. In that situation, buffer is actually fully read, - // so the client code may need to perform an additional empty-check to handle this case. - let is_end = total_bytes < read_bytes_limit_len as usize; - - let composed_bytes = ComposedBytes::from(block_bytes, total_bytes); - Ok(PartitionedMemoryData { - shuffle_data_block_segments: segments, - data: DataBytes::Composed(composed_bytes), - is_end, - }) - } - - // when there is no any staging data, it will return the None - pub fn spill(&self) -> Result> { - let mut buffer = self.buffer.lock(); - if buffer.staging_size == 0 { - return Ok(None); - } - - let staging: BatchMemoryBlock = { mem::replace(&mut buffer.staging, Default::default()) }; - let staging_ref = Arc::new(staging); - let flight_id = buffer.flight_counter; - - let flight = &mut buffer.flight; - flight.insert(flight_id, staging_ref.clone()); - - let spill_size = buffer.staging_size; - buffer.flight_counter += 1; - buffer.flight_size += spill_size; - buffer.staging_size = 0; - - Ok(Some(BufferSpillResult { - flight_id, - flight_len: spill_size as u64, - blocks: staging_ref.clone(), - })) - } - - #[trace] - pub fn append(&self, blocks: Vec, size: u64) -> Result<()> { - let mut buffer = self.buffer.lock(); - let mut staging = &mut buffer.staging; - staging.push(blocks); - - buffer.staging_size += size as i64; - buffer.total_size += size as i64; - - Ok(()) - } -} - -/// for tests. -impl MemoryBuffer { - fn direct_push(&self, blocks: Vec) -> Result<()> { - let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; - self.append(blocks, len) - } + ) -> Result + where + Self: Send + Sync; + + /// Spills staging data to flight, returns None if no staging data. + fn spill(&self) -> Result> + where + Self: Send + Sync; + + /// Appends blocks to staging area. + fn append(&self, blocks: Vec, size: u64) -> Result<()> + where + Self: Send + Sync; + + /// push directly, just use only in test + #[cfg(test)] + fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> + where + Self: Send + Sync; } #[cfg(test)] mod test { + use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; + use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; use crate::store::mem::buffer::MemoryBuffer; use crate::store::Block; use hashlink::LinkedHashMap; @@ -300,9 +158,9 @@ mod test { } } - #[test] - fn test_with_block_id_zero() -> anyhow::Result<()> { - let mut buffer = MemoryBuffer::new(); + fn run_test_with_block_id_zero() -> anyhow::Result<()> + { + let mut buffer = B::new(Default::default()); let block_1 = create_block(10, 100); let block_2 = create_block(10, 0); @@ -329,8 +187,14 @@ mod test { } #[test] - fn test_put_get() -> anyhow::Result<()> { - let mut buffer = MemoryBuffer::new(); + fn test_with_block_id_zero() -> anyhow::Result<()> { + run_test_with_block_id_zero::()?; + run_test_with_block_id_zero::()?; + Ok(()) + } + + fn run_test_put_get() -> anyhow::Result<()> { + let mut buffer = B::new(Default::default()); /// case1 buffer.direct_push(create_blocks(0, 10, 10))?; @@ -431,8 +295,15 @@ mod test { } #[test] - fn test_get_v2_is_end_with_only_staging() -> anyhow::Result<()> { - let buffer = MemoryBuffer::new(); + fn test_put_get() -> anyhow::Result<()> { + run_test_put_get::()?; + run_test_put_get::()?; + Ok(()) + } + + fn run_test_get_v2_is_end_with_only_staging( + ) -> anyhow::Result<()> { + let buffer = B::new(Default::default()); // 0 -> 10 blocks with total 100 bytes let cnt = 10; let block_len = 10; @@ -461,8 +332,15 @@ mod test { } #[test] - fn test_get_v2_is_end_across_flight_and_staging() -> anyhow::Result<()> { - let buffer = MemoryBuffer::new(); + fn test_get_v2_is_end_with_only_staging() -> anyhow::Result<()> { + run_test_get_v2_is_end_with_only_staging::()?; + run_test_get_v2_is_end_with_only_staging::()?; + Ok(()) + } + + fn run_test_get_v2_is_end_across_flight_and_staging( + ) -> anyhow::Result<()> { + let buffer = B::new(Default::default()); // staging: 0..2 buffer.direct_push(create_blocks(0, 3, 5))?; @@ -484,4 +362,11 @@ mod test { assert_eq!(result2.is_end, true); // no more data after staging Ok(()) } + + #[test] + fn test_get_v2_is_end_across_flight_and_staging() -> anyhow::Result<()> { + run_test_get_v2_is_end_across_flight_and_staging::()?; + run_test_get_v2_is_end_across_flight_and_staging::()?; + Ok(()) + } } diff --git a/riffle-server/src/store/mem/buffer/default_buffer.rs b/riffle-server/src/store/mem/buffer/default_buffer.rs new file mode 100644 index 00000000..db6b1509 --- /dev/null +++ b/riffle-server/src/store/mem/buffer/default_buffer.rs @@ -0,0 +1,251 @@ +use super::{BufferOptions, BufferSpillResult, MemBlockBatch, MemoryBuffer}; +use crate::composed_bytes; +use crate::composed_bytes::ComposedBytes; +use crate::constant::INVALID_BLOCK_ID; +use crate::store::DataBytes; +use crate::store::{Block, DataSegment, PartitionedMemoryData}; +use anyhow::Result; +use croaring::Treemap; +use fastrace::trace; +use parking_lot::Mutex; +use std::collections::HashMap; +use std::hash::Hash; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; + +pub struct DefaultMemoryBuffer { + buffer: Mutex, +} + +#[derive(Debug)] +struct Inner { + pub total_size: i64, + pub staging_size: i64, + pub flight_size: i64, + + pub staging: MemBlockBatch, + + pub flight: HashMap>, + pub flight_counter: u64, +} + +impl Inner { + pub fn new() -> Self { + Inner { + total_size: 0, + staging_size: 0, + flight_size: 0, + staging: Default::default(), + flight: Default::default(), + flight_counter: 0, + } + } +} + +impl MemoryBuffer for DefaultMemoryBuffer { + fn new(opt: BufferOptions) -> DefaultMemoryBuffer { + DefaultMemoryBuffer { + buffer: Mutex::new(Inner::new()), + } + } + + fn total_size(&self) -> Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().total_size); + } + + fn flight_size(&self) -> Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().flight_size); + } + + fn staging_size(&self) -> Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().staging_size); + } + + fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> + where + Self: Send + Sync, + { + let mut buffer = self.buffer.lock(); + let flight = &mut buffer.flight; + let removed = flight.remove(&flight_id); + if let Some(block_ref) = removed { + buffer.total_size -= flight_size as i64; + buffer.flight_size -= flight_size as i64; + } + Ok(()) + } + + fn get( + &self, + last_block_id: i64, + read_bytes_limit_len: i64, + task_ids: Option, + ) -> Result + where + Self: Send + Sync, + { + /// read sequence + /// 1. from flight (expect: last_block_id not found or last_block_id == -1) + /// 2. from staging + let buffer = self.buffer.lock(); + + let mut read_result = vec![]; + let mut read_len = 0i64; + let mut flight_found = false; + + let mut exit = false; + while !exit { + exit = true; + { + if last_block_id == INVALID_BLOCK_ID { + flight_found = true; + } + for (_, batch_block) in buffer.flight.iter() { + for blocks in batch_block.iter() { + for block in blocks { + if !flight_found && block.block_id == last_block_id { + flight_found = true; + continue; + } + if !flight_found { + continue; + } + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + } + } + } + + { + for blocks in buffer.staging.iter() { + for block in blocks { + if !flight_found && block.block_id == last_block_id { + flight_found = true; + continue; + } + if !flight_found { + continue; + } + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + } + } + + if !flight_found { + flight_found = true; + exit = false; + } + } + + let mut block_bytes = Vec::with_capacity(read_result.len()); + let mut segments = Vec::with_capacity(read_result.len()); + let mut offset = 0; + for block in read_result { + let data = &block.data; + block_bytes.push(data.clone()); + segments.push(DataSegment { + block_id: block.block_id, + offset, + length: block.length, + uncompress_length: block.uncompress_length, + crc: block.crc, + task_attempt_id: block.task_attempt_id, + }); + offset += block.length as i64; + } + let total_bytes = offset as usize; + + // Note: is_end is computed as total_bytes < read_bytes_limit_len. This works in general, + // but it can incorrectly be false in the edge case where total_bytes == read_bytes_limit_len + // and the buffer has no more blocks left. In that situation, buffer is actually fully read, + // so the client code may need to perform an additional empty-check to handle this case. + let is_end = total_bytes < read_bytes_limit_len as usize; + + let composed_bytes = ComposedBytes::from(block_bytes, total_bytes); + Ok(PartitionedMemoryData { + shuffle_data_block_segments: segments, + data: DataBytes::Composed(composed_bytes), + is_end, + }) + } + + // when there is no any staging data, it will return the None + fn spill(&self) -> Result> + where + Self: Send + Sync, + { + let mut buffer = self.buffer.lock(); + if buffer.staging_size == 0 { + return Ok(None); + } + + let staging: MemBlockBatch = { mem::replace(&mut buffer.staging, Default::default()) }; + let staging_ref = Arc::new(staging); + let flight_id = buffer.flight_counter; + + let flight = &mut buffer.flight; + flight.insert(flight_id, staging_ref.clone()); + + let spill_size = buffer.staging_size; + buffer.flight_counter += 1; + buffer.flight_size += spill_size; + buffer.staging_size = 0; + + Ok(Some(BufferSpillResult { + flight_id, + flight_len: spill_size as u64, + blocks: staging_ref.clone(), + })) + } + + fn append(&self, blocks: Vec, size: u64) -> Result<()> + where + Self: Send + Sync, + { + let mut buffer = self.buffer.lock(); + let mut staging = &mut buffer.staging; + staging.push(blocks); + + buffer.staging_size += size as i64; + buffer.total_size += size as i64; + + Ok(()) + } + + #[cfg(test)] + fn direct_push(&self, blocks: Vec) -> Result<()> + where + Self: Send + Sync, + { + let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; + self.append(blocks, len) + } +} diff --git a/riffle-server/src/store/mem/buffer/opt_buffer.rs b/riffle-server/src/store/mem/buffer/opt_buffer.rs new file mode 100644 index 00000000..00c3f314 --- /dev/null +++ b/riffle-server/src/store/mem/buffer/opt_buffer.rs @@ -0,0 +1,296 @@ +use crate::composed_bytes::ComposedBytes; +use crate::constant::INVALID_BLOCK_ID; +use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; +use crate::store::mem::buffer::{BufferOptions, BufferSpillResult, MemBlockBatch, MemoryBuffer}; +use crate::store::{Block, DataBytes, DataSegment, PartitionedMemoryData}; +use croaring::Treemap; +use fastrace::trace; +use parking_lot::Mutex; +use std::collections::HashMap; +use std::sync::Arc; + +/// the optimized implementation is from https://github.com/zuston/riffle/pull/564 +#[derive(Debug)] +pub struct OptStagingBufferInternal { + pub total_size: i64, + pub staging_size: i64, + pub flight_size: i64, + + pub staging: Vec, + pub batch_boundaries: Vec, // Track where each batch starts + pub block_position_index: HashMap, // Maps block_id to Vec index + + pub flight: HashMap>, + pub flight_counter: u64, +} + +impl OptStagingBufferInternal { + pub fn new() -> Self { + OptStagingBufferInternal { + total_size: 0, + staging_size: 0, + flight_size: 0, + staging: Vec::new(), + batch_boundaries: Vec::new(), + block_position_index: HashMap::new(), + flight: Default::default(), + flight_counter: 0, + } + } +} + +#[derive(Debug)] +pub struct OptStagingMemoryBuffer { + buffer: Mutex, +} + +impl MemoryBuffer for OptStagingMemoryBuffer { + #[trace] + fn new(opt: BufferOptions) -> Self { + OptStagingMemoryBuffer { + buffer: Mutex::new(OptStagingBufferInternal::new()), + } + } + #[trace] + fn total_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().total_size); + } + + #[trace] + fn flight_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().flight_size); + } + + #[trace] + fn staging_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().staging_size); + } + + #[trace] + fn clear(&self, flight_id: u64, flight_size: u64) -> anyhow::Result<()> + where + Self: Send + Sync, + { + let mut buffer = self.buffer.lock(); + let flight = &mut buffer.flight; + let removed = flight.remove(&flight_id); + if let Some(block_ref) = removed { + buffer.total_size -= flight_size as i64; + buffer.flight_size -= flight_size as i64; + } + Ok(()) + } + + #[trace] + fn get( + &self, + last_block_id: i64, + read_bytes_limit_len: i64, + task_ids: Option, + ) -> anyhow::Result + where + Self: Send + Sync, + { + /// read sequence + /// 1. from flight (expect: last_block_id not found or last_block_id == -1) + /// 2. from staging + let buffer = self.buffer.lock(); + + let mut read_result = vec![]; + let mut read_len = 0i64; + let mut flight_found = false; + + const FIRST_ATTEMP: u8 = 0; + const FALLBACK: u8 = 1; + let strategies = [FIRST_ATTEMP, FALLBACK]; + + for loop_index in strategies { + if last_block_id == INVALID_BLOCK_ID { + flight_found = true; + } + for (_, batch_block) in buffer.flight.iter() { + for blocks in batch_block.iter() { + for block in blocks { + if !flight_found && block.block_id == last_block_id { + flight_found = true; + continue; + } + if !flight_found { + continue; + } + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + } + } + + // Handle staging with Vec + index optimization + let staging_start_idx = if loop_index == FIRST_ATTEMP && !flight_found { + // Try to find position after last_block_id + // Always set flight_found = true for the next searching + flight_found = true; + if let Some(&position) = buffer.block_position_index.get(&last_block_id) { + position + 1 + } else { + // Not found in staging, will handle in fallback + continue; + } + } else { + // Fallback: read from beginning + 0 + }; + + for block in &buffer.staging[staging_start_idx..] { + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + + // // If we found data in first attempt, no need for fallback + if flight_found && loop_index == FIRST_ATTEMP { + break; + } + } + + let mut block_bytes = Vec::with_capacity(read_result.len()); + let mut segments = Vec::with_capacity(read_result.len()); + let mut offset = 0; + for block in read_result { + let data = &block.data; + block_bytes.push(data.clone()); + segments.push(DataSegment { + block_id: block.block_id, + offset, + length: block.length, + uncompress_length: block.uncompress_length, + crc: block.crc, + task_attempt_id: block.task_attempt_id, + }); + offset += block.length as i64; + } + let total_bytes = offset as usize; + + // Note: is_end is computed as total_bytes < read_bytes_limit_len. This works in general, + // but it can incorrectly be false in the edge case where total_bytes == read_bytes_limit_len + // and the buffer has no more blocks left. In that situation, buffer is actually fully read, + // so the client code may need to perform an additional empty-check to handle this case. + let is_end = total_bytes < read_bytes_limit_len as usize; + + let composed_bytes = ComposedBytes::from(block_bytes, total_bytes); + Ok(PartitionedMemoryData { + shuffle_data_block_segments: segments, + data: DataBytes::Composed(composed_bytes), + is_end, + }) + } + + // when there is no any staging data, it will return the None + #[trace] + fn spill(&self) -> anyhow::Result> { + let mut buffer = self.buffer.lock(); + if buffer.staging_size == 0 { + return Ok(None); + } + + // Reconstruct batches from boundaries + let mut batches = Vec::new(); + let mut start = 0; + for i in 0..buffer.batch_boundaries.len() { + let end = buffer.batch_boundaries[i]; + if end >= buffer.staging.len() { + break; + } + + // Find next boundary or use end of staging + let next_boundary = if i + 1 < buffer.batch_boundaries.len() { + buffer.batch_boundaries[i + 1] + } else { + buffer.staging.len() + }; + + batches.push(buffer.staging[start..next_boundary].to_vec()); + start = next_boundary; + } + + let staging: MemBlockBatch = MemBlockBatch(batches); + + // Clear everything + buffer.staging.clear(); + buffer.block_position_index.clear(); + buffer.batch_boundaries.clear(); + + let staging_ref = Arc::new(staging); + let flight_id = buffer.flight_counter; + + let flight = &mut buffer.flight; + flight.insert(flight_id, staging_ref.clone()); + + let spill_size = buffer.staging_size; + buffer.flight_counter += 1; + buffer.flight_size += spill_size; + buffer.staging_size = 0; + + Ok(Some(BufferSpillResult { + flight_id, + flight_len: spill_size as u64, + blocks: staging_ref.clone(), + })) + } + + #[trace] + fn append(&self, blocks: Vec, size: u64) -> anyhow::Result<()> { + let mut buffer = self.buffer.lock(); + let current_position = buffer.staging.len(); + let block_count = blocks.len(); + + // Pre-allocate capacities + buffer.staging.reserve(block_count); + buffer.block_position_index.reserve(block_count); + + // Record batch boundary + if !blocks.is_empty() { + buffer.batch_boundaries.push(current_position); + } + + for (idx, block) in blocks.into_iter().enumerate() { + buffer + .block_position_index + .insert(block.block_id, current_position + idx); + buffer.staging.push(block); + } + buffer.staging_size += size as i64; + buffer.total_size += size as i64; + Ok(()) + } + + #[cfg(test)] + #[trace] + fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> { + let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; + self.append(blocks, len) + } +} diff --git a/riffle-server/src/store/mem/buffer/unified_buffer.rs b/riffle-server/src/store/mem/buffer/unified_buffer.rs new file mode 100644 index 00000000..e6e82ddf --- /dev/null +++ b/riffle-server/src/store/mem/buffer/unified_buffer.rs @@ -0,0 +1,110 @@ +use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; +use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; +use crate::store::mem::buffer::unified_buffer::UnifiedBuffer::{DEFAULT, EXPERIMENTAL}; +use crate::store::mem::buffer::{BufferOptions, BufferSpillResult, BufferType, MemoryBuffer}; +use crate::store::{Block, PartitionedMemoryData}; +use croaring::Treemap; + +/// this is the router to delegate to the underlying concrete implementation without any cost +pub enum UnifiedBuffer { + DEFAULT(DefaultMemoryBuffer), + EXPERIMENTAL(OptStagingMemoryBuffer), +} + +impl MemoryBuffer for UnifiedBuffer { + fn new(opts: BufferOptions) -> Self + where + Self: Sized, + { + match opts.buffer_type { + BufferType::DEFAULT => DEFAULT(DefaultMemoryBuffer::new(opts)), + BufferType::EXPERIMENTAL => EXPERIMENTAL(OptStagingMemoryBuffer::new(opts)), + } + } + + fn total_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + match &self { + DEFAULT(x) => x.total_size(), + EXPERIMENTAL(x) => x.total_size(), + } + } + + fn flight_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + match &self { + DEFAULT(x) => x.flight_size(), + EXPERIMENTAL(x) => x.flight_size(), + } + } + + fn staging_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + match &self { + DEFAULT(x) => x.staging_size(), + EXPERIMENTAL(x) => x.staging_size(), + } + } + + fn clear(&self, flight_id: u64, flight_size: u64) -> anyhow::Result<()> + where + Self: Send + Sync, + { + match &self { + DEFAULT(x) => x.clear(flight_id, flight_size), + EXPERIMENTAL(x) => x.clear(flight_id, flight_size), + } + } + + fn get( + &self, + last_block_id: i64, + read_bytes_limit_len: i64, + task_ids: Option, + ) -> anyhow::Result + where + Self: Send + Sync, + { + match &self { + DEFAULT(x) => x.get(last_block_id, read_bytes_limit_len, task_ids), + EXPERIMENTAL(x) => x.get(last_block_id, read_bytes_limit_len, task_ids), + } + } + + fn spill(&self) -> anyhow::Result> + where + Self: Send + Sync, + { + match &self { + DEFAULT(x) => x.spill(), + EXPERIMENTAL(x) => x.spill(), + } + } + + fn append(&self, blocks: Vec, size: u64) -> anyhow::Result<()> + where + Self: Send + Sync, + { + match &self { + DEFAULT(x) => x.append(blocks, size), + EXPERIMENTAL(x) => x.append(blocks, size), + } + } + + #[cfg(test)] + fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> + where + Self: Send + Sync, + { + match &self { + DEFAULT(x) => x.direct_push(blocks), + EXPERIMENTAL(x) => x.direct_push(blocks), + } + } +} diff --git a/riffle-server/src/store/mem/mod.rs b/riffle-server/src/store/mem/mod.rs index 8a23f07d..2b501a9b 100644 --- a/riffle-server/src/store/mem/mod.rs +++ b/riffle-server/src/store/mem/mod.rs @@ -19,5 +19,4 @@ pub mod budget; pub mod buffer; pub mod capacity; pub mod ticket; - pub use await_tree::InstrumentAwait; diff --git a/riffle-server/src/store/memory.rs b/riffle-server/src/store/memory.rs index d7dfb995..5fcf1284 100644 --- a/riffle-server/src/store/memory.rs +++ b/riffle-server/src/store/memory.rs @@ -37,7 +37,9 @@ use crate::app_manager::partition_identifier::PartitionUId; use crate::ddashmap::DDashMap; use crate::runtime::manager::RuntimeManager; use crate::store::mem::budget::MemoryBudget; -use crate::store::mem::buffer::MemoryBuffer; +use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; +use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; +use crate::store::mem::buffer::{BufferOptions, BufferType, MemoryBuffer}; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::mem::ticket::TicketManager; use crate::store::spill::SpillWritingViewContext; @@ -50,18 +52,19 @@ use fxhash::{FxBuildHasher, FxHasher}; use log::{debug, info, warn}; use std::sync::Arc; -pub struct MemoryStore { +pub struct MemoryStore { memory_capacity: i64, - state: DDashMap>, + state: DDashMap>, budget: MemoryBudget, runtime_manager: RuntimeManager, ticket_manager: TicketManager, + cfg: Option, } -unsafe impl Send for MemoryStore {} -unsafe impl Sync for MemoryStore {} +unsafe impl Send for MemoryStore {} +unsafe impl Sync for MemoryStore {} -impl MemoryStore { +impl MemoryStore { // only for test cases pub fn new(max_memory_size: i64) -> Self { let budget = MemoryBudget::new(max_memory_size); @@ -79,6 +82,7 @@ impl MemoryStore { memory_capacity: max_memory_size, ticket_manager, runtime_manager, + cfg: None, } } @@ -103,6 +107,7 @@ impl MemoryStore { memory_capacity: capacity.as_u64() as i64, ticket_manager, runtime_manager, + cfg: Some(conf), } } @@ -134,7 +139,7 @@ impl MemoryStore { pub fn lookup_spill_buffers( &self, expected_spill_total_bytes: i64, - ) -> Result>, anyhow::Error> { + ) -> Result>, anyhow::Error> { // 1. sort by the staging size. // 2. get the spill buffers until reaching the single max batch size @@ -203,12 +208,19 @@ impl MemoryStore { } // only invoked when inserting - pub fn get_or_create_buffer(&self, uid: PartitionUId) -> Arc { + pub fn get_or_create_buffer(&self, uid: PartitionUId) -> Arc { + let buf_opts = BufferOptions { + buffer_type: self + .cfg + .as_ref() + .map(|x| x.buffer_type) + .unwrap_or(BufferType::DEFAULT), + }; self.state - .compute_if_absent(uid, || Arc::new(MemoryBuffer::new())) + .compute_if_absent(uid, || Arc::new(B::new(buf_opts))) } - pub fn get_buffer(&self, uid: &PartitionUId) -> Result> { + pub fn get_buffer(&self, uid: &PartitionUId) -> Result> { let buffer = self.state.get(uid); if buffer.is_none() { return Err(anyhow!(format!( @@ -246,11 +258,10 @@ impl MemoryStore { } #[async_trait] -impl Store for MemoryStore { +impl Store for MemoryStore { fn start(self: Arc) { // ignore } - #[trace] async fn insert(&self, ctx: WritingViewContext) -> Result<(), WorkerError> { let uid = ctx.uid; @@ -420,12 +431,14 @@ mod test { use crate::app_manager::application_identifier::ApplicationId; use crate::app_manager::partition_identifier::PartitionUId; use crate::app_manager::purge_event::PurgeReason; + use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; + use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; + use crate::store::mem::buffer::MemoryBuffer; use anyhow::Result; use croaring::Treemap; - #[test] - fn test_read_buffer_in_flight() { - let store = MemoryStore::new(1024); + fn run_test_read_buffer_in_flight() { + let store: MemoryStore = MemoryStore::new(1024); let runtime = store.runtime_manager.clone(); let uid = PartitionUId::new(&Default::default(), 0, 0); @@ -591,10 +604,16 @@ mod test { // assert_eq!(0, mem_data.shuffle_data_block_segments.len()); } - async fn get_data_with_last_block_id( + #[test] + fn test_read_buffer_in_flight() { + run_test_read_buffer_in_flight::(); + run_test_read_buffer_in_flight::(); + } + + async fn get_data_with_last_block_id( default_single_read_size: i64, last_block_id: i64, - store: &MemoryStore, + store: &MemoryStore, uid: PartitionUId, ) -> PartitionedMemoryData { let ctx = ReadingViewContext::new( @@ -634,9 +653,8 @@ mod test { WritingViewContext::create_for_test(uid, data_blocks) } - #[test] - fn test_allocated_and_purge_for_memory() { - let store = MemoryStore::new(1024 * 1024 * 1024); + fn run_test_allocated_and_purge_for_memory() { + let store: MemoryStore = MemoryStore::new(1024 * 1024 * 1024); let runtime = store.runtime_manager.clone(); let app_id = ApplicationId::from("application_1_1_2"); @@ -662,8 +680,13 @@ mod test { } #[test] - fn test_purge() -> Result<()> { - let store = MemoryStore::new(1024); + fn test_allocated_and_purge_for_memory() { + run_test_allocated_and_purge_for_memory::(); + run_test_allocated_and_purge_for_memory::(); + } + + fn run_test_purge() -> Result<()> { + let store: MemoryStore = MemoryStore::new(1024); let runtime = store.runtime_manager.clone(); let shuffle_id = 1; @@ -740,8 +763,13 @@ mod test { } #[test] - fn test_put_and_get_for_memory() { - let store = MemoryStore::new(1024 * 1024 * 1024); + fn test_purge() { + run_test_purge::(); + run_test_purge::(); + } + + fn run_test_put_and_get_for_memory() { + let store: MemoryStore = MemoryStore::new(1024 * 1024 * 1024); let runtime = store.runtime_manager.clone(); let writing_ctx = WritingViewContext::create_for_test( @@ -784,8 +812,13 @@ mod test { } #[test] - fn test_block_id_filter_for_memory() { - let store = MemoryStore::new(1024 * 1024 * 1024); + fn test_put_and_get_for_memory() { + run_test_put_and_get_for_memory::(); + run_test_put_and_get_for_memory::(); + } + + fn run_test_block_id_filter_for_memory() { + let store: MemoryStore = MemoryStore::new(1024 * 1024 * 1024); let runtime = store.runtime_manager.clone(); // 1. insert 2 block @@ -850,4 +883,9 @@ mod test { _ => panic!("should not"), } } + #[test] + fn test_block_id_filter_for_memory() { + run_test_block_id_filter_for_memory::(); + run_test_block_id_filter_for_memory::(); + } } diff --git a/riffle-server/src/store/spill/mod.rs b/riffle-server/src/store/spill/mod.rs index 0e099c4f..1b191dda 100644 --- a/riffle-server/src/store/spill/mod.rs +++ b/riffle-server/src/store/spill/mod.rs @@ -8,7 +8,7 @@ use crate::metric::{ TOTAL_SPILL_EVENTS_DROPPED_WITH_APP_NOT_FOUND, }; use crate::store::hybrid::{HybridStore, PersistentStore}; -use crate::store::mem::buffer::BatchMemoryBlock; +use crate::store::mem::buffer::MemBlockBatch; use log::{debug, error, warn}; use once_cell::sync::OnceCell; use parking_lot::Mutex; @@ -82,14 +82,14 @@ unsafe impl Sync for SpillMessage {} #[derive(Clone)] pub struct SpillWritingViewContext { pub uid: PartitionUId, - pub data_blocks: Arc, + pub data_blocks: Arc, app_is_exist_func: Arc bool + 'static>>, } unsafe impl Send for SpillWritingViewContext {} unsafe impl Sync for SpillWritingViewContext {} impl SpillWritingViewContext { - pub fn new(uid: PartitionUId, blocks: Arc, func: F) -> Self + pub fn new(uid: PartitionUId, blocks: Arc, func: F) -> Self where F: Fn(&ApplicationId) -> bool + 'static, { diff --git a/riffle-server/src/store/spill/spill_test.rs b/riffle-server/src/store/spill/spill_test.rs index d44326e2..8b5509e4 100644 --- a/riffle-server/src/store/spill/spill_test.rs +++ b/riffle-server/src/store/spill/spill_test.rs @@ -15,6 +15,7 @@ pub mod tests { }; use crate::runtime::manager::RuntimeManager; use crate::store::hybrid::{HybridStore, PersistentStore}; + use crate::store::mem::buffer::MemoryBuffer; use crate::store::spill::spill_test::mock::MockStore; use crate::store::spill::storage_flush_handler::StorageFlushHandler; use crate::store::spill::storage_select_handler::StorageSelectHandler;