From a7ccc8dbd509ddd99aa0b55f1fb33b24c9afcd21 Mon Sep 17 00:00:00 2001 From: thuong Date: Thu, 25 Dec 2025 08:05:50 +0700 Subject: [PATCH 01/18] feat: format code --- riffle-server/src/store/mem/buffer.rs | 126 +++++++++++++++++--------- 1 file changed, 85 insertions(+), 41 deletions(-) diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 24bea6af..e586f839 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -13,6 +13,7 @@ use std::mem; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +#[derive(Debug)] pub struct MemoryBuffer { buffer: Mutex, } @@ -56,7 +57,9 @@ pub struct BufferInternal { staging_size: i64, flight_size: i64, - staging: BatchMemoryBlock, + staging: Vec, + batch_boundaries: Vec, // Track where each batch starts + block_position_index: HashMap, // Maps block_id to Vec index flight: HashMap>, flight_counter: u64, @@ -68,7 +71,9 @@ impl BufferInternal { total_size: 0, staging_size: 0, flight_size: 0, - staging: Default::default(), + staging: Vec::new(), + batch_boundaries: Vec::new(), + block_position_index: HashMap::new(), flight: Default::default(), flight_counter: 0, } @@ -124,40 +129,16 @@ impl MemoryBuffer { let mut read_len = 0i64; let mut flight_found = false; - let mut exit = false; - while !exit { - exit = true; - { - if last_block_id == INVALID_BLOCK_ID { - flight_found = true; - } - for (_, batch_block) in buffer.flight.iter() { - for blocks in batch_block.iter() { - for block in blocks { - if !flight_found && block.block_id == last_block_id { - flight_found = true; - continue; - } - if !flight_found { - continue; - } - if read_len >= read_bytes_limit_len { - break; - } - if let Some(ref expected_task_id) = task_ids { - if !expected_task_id.contains(block.task_attempt_id as u64) { - continue; - } - } - read_len += block.length as i64; - read_result.push(block); - } - } - } - } + const FIRST_ATTEMP: u8 = 0; + const FALLBACK: u8 = 1; + let strategies = [FIRST_ATTEMP, FALLBACK]; - { - for blocks in buffer.staging.iter() { + for loop_index in strategies { + if last_block_id == INVALID_BLOCK_ID { + flight_found = true; + } + for (_, batch_block) in buffer.flight.iter() { + for blocks in batch_block.iter() { for block in blocks { if !flight_found && block.block_id == last_block_id { flight_found = true; @@ -180,9 +161,38 @@ impl MemoryBuffer { } } - if !flight_found { + // Handle staging with Vec + index optimization + let staging_start_idx = if loop_index == FIRST_ATTEMP && !flight_found { + // Try to find position after last_block_id + // Always set flight_found = true for the next searching flight_found = true; - exit = false; + if let Some(&position) = buffer.block_position_index.get(&last_block_id) { + position + 1 + } else { + // Not found in staging, will handle in fallback + continue; + } + } else { + // Fallback: read from beginning + 0 + }; + + for block in &buffer.staging[staging_start_idx..] { + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + + // // If we found data in first attempt, no need for fallback + if flight_found && loop_index == FIRST_ATTEMP { + break; } } @@ -225,7 +235,33 @@ impl MemoryBuffer { return Ok(None); } - let staging: BatchMemoryBlock = { mem::replace(&mut buffer.staging, Default::default()) }; + // Reconstruct batches from boundaries + let mut batches = Vec::new(); + let mut start = 0; + for i in 0..buffer.batch_boundaries.len() { + let end = buffer.batch_boundaries[i]; + if end >= buffer.staging.len() { + break; + } + + // Find next boundary or use end of staging + let next_boundary = if i + 1 < buffer.batch_boundaries.len() { + buffer.batch_boundaries[i + 1] + } else { + buffer.staging.len() + }; + + batches.push(buffer.staging[start..next_boundary].to_vec()); + start = next_boundary; + } + + let staging: BatchMemoryBlock = BatchMemoryBlock(batches); + + // Clear everything + buffer.staging.clear(); + buffer.block_position_index.clear(); + buffer.batch_boundaries.clear(); + let staging_ref = Arc::new(staging); let flight_id = buffer.flight_counter; @@ -247,12 +283,20 @@ impl MemoryBuffer { #[trace] pub fn append(&self, blocks: Vec, size: u64) -> Result<()> { let mut buffer = self.buffer.lock(); - let mut staging = &mut buffer.staging; - staging.push(blocks); + let current_position = buffer.staging.len(); + // Record batch boundary + if !blocks.is_empty() { + buffer.batch_boundaries.push(current_position); + } + for (idx, block) in blocks.into_iter().enumerate() { + buffer + .block_position_index + .insert(block.block_id, current_position + idx); + buffer.staging.push(block); + } buffer.staging_size += size as i64; buffer.total_size += size as i64; - Ok(()) } } From 8becaa04a7a1cda47f71031abeb5a44f7f352943 Mon Sep 17 00:00:00 2001 From: thuong Date: Thu, 25 Dec 2025 20:51:10 +0700 Subject: [PATCH 02/18] feat: pre-allocate capabilities --- riffle-server/src/store/mem/buffer.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index e586f839..5b4db80b 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -284,11 +284,17 @@ impl MemoryBuffer { pub fn append(&self, blocks: Vec, size: u64) -> Result<()> { let mut buffer = self.buffer.lock(); let current_position = buffer.staging.len(); + let block_count = blocks.len(); + + // Pre-allocate capacities + buffer.staging.reserve(block_count); + buffer.block_position_index.reserve(block_count); // Record batch boundary if !blocks.is_empty() { buffer.batch_boundaries.push(current_position); } + for (idx, block) in blocks.into_iter().enumerate() { buffer .block_position_index From 904bf5a2e3a2db8434c52ab806967317a7533d5d Mon Sep 17 00:00:00 2001 From: thuong Date: Thu, 25 Dec 2025 21:39:12 +0700 Subject: [PATCH 03/18] feat: create buffer_core.rs where it stores core data structure and operations, implement trait for memory_buffer --- riffle-server/src/store/mem/buffer.rs | 80 +++------------- riffle-server/src/store/mem/buffer_core.rs | 106 +++++++++++++++++++++ riffle-server/src/store/mem/mod.rs | 2 +- 3 files changed, 118 insertions(+), 70 deletions(-) create mode 100644 riffle-server/src/store/mem/buffer_core.rs diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 5b4db80b..1f88a0d3 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -1,3 +1,4 @@ +use super::buffer_core::{BatchMemoryBlock, BufferInternal, BufferOps, BufferSpillResult}; use crate::composed_bytes; use crate::composed_bytes::ComposedBytes; use crate::constant::INVALID_BLOCK_ID; @@ -18,92 +19,32 @@ pub struct MemoryBuffer { buffer: Mutex, } -#[derive(Default, Debug)] -pub struct BatchMemoryBlock(Vec>); -impl Deref for BatchMemoryBlock { - type Target = Vec>; - fn deref(&self) -> &Self::Target { - &self.0 - } -} -impl DerefMut for BatchMemoryBlock { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -#[derive(Debug)] -pub struct BufferSpillResult { - flight_id: u64, - flight_len: u64, - blocks: Arc, -} - -impl BufferSpillResult { - pub fn flight_id(&self) -> u64 { - self.flight_id - } - pub fn flight_len(&self) -> u64 { - self.flight_len - } - pub fn blocks(&self) -> Arc { - self.blocks.clone() - } -} - -#[derive(Debug)] -pub struct BufferInternal { - total_size: i64, - staging_size: i64, - flight_size: i64, - - staging: Vec, - batch_boundaries: Vec, // Track where each batch starts - block_position_index: HashMap, // Maps block_id to Vec index - - flight: HashMap>, - flight_counter: u64, -} - -impl BufferInternal { - fn new() -> Self { - BufferInternal { - total_size: 0, - staging_size: 0, - flight_size: 0, - staging: Vec::new(), - batch_boundaries: Vec::new(), - block_position_index: HashMap::new(), - flight: Default::default(), - flight_counter: 0, - } - } -} - impl MemoryBuffer { pub fn new() -> MemoryBuffer { MemoryBuffer { buffer: Mutex::new(BufferInternal::new()), } } +} +impl BufferOps for MemoryBuffer { #[trace] - pub fn total_size(&self) -> Result { + fn total_size(&self) -> Result { return Ok(self.buffer.lock().total_size); } #[trace] - pub fn flight_size(&self) -> Result { + fn flight_size(&self) -> Result { return Ok(self.buffer.lock().flight_size); } #[trace] - pub fn staging_size(&self) -> Result { + fn staging_size(&self) -> Result { return Ok(self.buffer.lock().staging_size); } #[trace] - pub fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> { + fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> { let mut buffer = self.buffer.lock(); let flight = &mut buffer.flight; let removed = flight.remove(&flight_id); @@ -114,7 +55,7 @@ impl MemoryBuffer { Ok(()) } - pub fn get( + fn get( &self, last_block_id: i64, read_bytes_limit_len: i64, @@ -229,7 +170,7 @@ impl MemoryBuffer { } // when there is no any staging data, it will return the None - pub fn spill(&self) -> Result> { + fn spill(&self) -> Result> { let mut buffer = self.buffer.lock(); if buffer.staging_size == 0 { return Ok(None); @@ -281,7 +222,7 @@ impl MemoryBuffer { } #[trace] - pub fn append(&self, blocks: Vec, size: u64) -> Result<()> { + fn append(&self, blocks: Vec, size: u64) -> Result<()> { let mut buffer = self.buffer.lock(); let current_position = buffer.staging.len(); let block_count = blocks.len(); @@ -317,6 +258,7 @@ impl MemoryBuffer { #[cfg(test)] mod test { + use crate::store::mem::buffer::BufferOps; use crate::store::mem::buffer::MemoryBuffer; use crate::store::Block; use hashlink::LinkedHashMap; diff --git a/riffle-server/src/store/mem/buffer_core.rs b/riffle-server/src/store/mem/buffer_core.rs new file mode 100644 index 00000000..1acc36ac --- /dev/null +++ b/riffle-server/src/store/mem/buffer_core.rs @@ -0,0 +1,106 @@ +use crate::composed_bytes::ComposedBytes; +use crate::store::DataBytes; +use crate::store::{Block, DataSegment, PartitionedMemoryData}; +use anyhow::Result; +use croaring::Treemap; +use parking_lot::Mutex; +use std::collections::HashMap; +use std::hash::Hash; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; + +#[derive(Debug)] +pub struct MemoryBuffer { + buffer: Mutex, +} + +#[derive(Default, Debug)] +pub struct BatchMemoryBlock(pub Vec>); +impl Deref for BatchMemoryBlock { + type Target = Vec>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for BatchMemoryBlock { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[derive(Debug)] +pub struct BufferSpillResult { + pub flight_id: u64, + pub flight_len: u64, + pub blocks: Arc, +} + +impl BufferSpillResult { + pub fn flight_id(&self) -> u64 { + self.flight_id + } + pub fn flight_len(&self) -> u64 { + self.flight_len + } + pub fn blocks(&self) -> Arc { + self.blocks.clone() + } +} + +#[derive(Debug)] +pub struct BufferInternal { + pub total_size: i64, + pub staging_size: i64, + pub flight_size: i64, + + pub staging: Vec, + pub batch_boundaries: Vec, // Track where each batch starts + pub block_position_index: HashMap, // Maps block_id to Vec index + + pub flight: HashMap>, + pub flight_counter: u64, +} + +impl BufferInternal { + pub fn new() -> Self { + BufferInternal { + total_size: 0, + staging_size: 0, + flight_size: 0, + staging: Vec::new(), + batch_boundaries: Vec::new(), + block_position_index: HashMap::new(), + flight: Default::default(), + flight_counter: 0, + } + } +} + + +pub trait BufferOps { + /// Returns the total size of the buffer. + fn total_size(&self) -> Result; + + /// Returns the size of data in flight (spilled but not cleared). + fn flight_size(&self) -> Result; + + /// Returns the size of data in staging (not yet spilled). + fn staging_size(&self) -> Result; + + /// Clears a specific flight by ID and size. + fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()>; + + /// Reads data starting after last_block_id, up to read_bytes_limit_len. + fn get( + &self, + last_block_id: i64, + read_bytes_limit_len: i64, + task_ids: Option, + ) -> Result; + + /// Spills staging data to flight, returns None if no staging data. + fn spill(&self) -> Result>; + + /// Appends blocks to staging area. + fn append(&self, blocks: Vec, size: u64) -> Result<()>; +} diff --git a/riffle-server/src/store/mem/mod.rs b/riffle-server/src/store/mem/mod.rs index 8a23f07d..52a1a369 100644 --- a/riffle-server/src/store/mem/mod.rs +++ b/riffle-server/src/store/mem/mod.rs @@ -19,5 +19,5 @@ pub mod budget; pub mod buffer; pub mod capacity; pub mod ticket; - +pub mod buffer_core; pub use await_tree::InstrumentAwait; From c991380dffe4e1f84fbf8e9eb2563d5fa96eaed5 Mon Sep 17 00:00:00 2001 From: thuong Date: Thu, 25 Dec 2025 21:55:01 +0700 Subject: [PATCH 04/18] feat: add buffer operations trait to all related components --- riffle-server/src/store/hybrid.rs | 2 ++ riffle-server/src/store/mem/buffer_core.rs | 1 - riffle-server/src/store/mem/mod.rs | 2 +- riffle-server/src/store/memory.rs | 1 + riffle-server/src/store/spill/mod.rs | 2 +- riffle-server/src/store/spill/spill_test.rs | 1 + 6 files changed, 6 insertions(+), 3 deletions(-) diff --git a/riffle-server/src/store/hybrid.rs b/riffle-server/src/store/hybrid.rs index efc71977..5d4a6b65 100644 --- a/riffle-server/src/store/hybrid.rs +++ b/riffle-server/src/store/hybrid.rs @@ -31,12 +31,14 @@ use crate::metric::{ #[cfg(feature = "hdfs")] use crate::store::hdfs::HdfsStore; use crate::store::localfile::LocalFileStore; +use crate::store::mem::buffer_core::BufferOps; use crate::store::memory::MemoryStore; use crate::store::{Persistent, RequireBufferResponse, ResponseData, ResponseDataIndex, Store}; use anyhow::{anyhow, Result}; use async_trait::async_trait; +use bytes::buf; use log::{error, info, warn}; use prometheus::core::Atomic; use std::any::Any; diff --git a/riffle-server/src/store/mem/buffer_core.rs b/riffle-server/src/store/mem/buffer_core.rs index 1acc36ac..20dd3fc6 100644 --- a/riffle-server/src/store/mem/buffer_core.rs +++ b/riffle-server/src/store/mem/buffer_core.rs @@ -76,7 +76,6 @@ impl BufferInternal { } } - pub trait BufferOps { /// Returns the total size of the buffer. fn total_size(&self) -> Result; diff --git a/riffle-server/src/store/mem/mod.rs b/riffle-server/src/store/mem/mod.rs index 52a1a369..cbbdf05e 100644 --- a/riffle-server/src/store/mem/mod.rs +++ b/riffle-server/src/store/mem/mod.rs @@ -17,7 +17,7 @@ pub mod budget; pub mod buffer; +pub mod buffer_core; pub mod capacity; pub mod ticket; -pub mod buffer_core; pub use await_tree::InstrumentAwait; diff --git a/riffle-server/src/store/memory.rs b/riffle-server/src/store/memory.rs index d7dfb995..3b554613 100644 --- a/riffle-server/src/store/memory.rs +++ b/riffle-server/src/store/memory.rs @@ -38,6 +38,7 @@ use crate::ddashmap::DDashMap; use crate::runtime::manager::RuntimeManager; use crate::store::mem::budget::MemoryBudget; use crate::store::mem::buffer::MemoryBuffer; +use crate::store::mem::buffer_core::BufferOps; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::mem::ticket::TicketManager; use crate::store::spill::SpillWritingViewContext; diff --git a/riffle-server/src/store/spill/mod.rs b/riffle-server/src/store/spill/mod.rs index 0e099c4f..3babaeca 100644 --- a/riffle-server/src/store/spill/mod.rs +++ b/riffle-server/src/store/spill/mod.rs @@ -8,7 +8,7 @@ use crate::metric::{ TOTAL_SPILL_EVENTS_DROPPED_WITH_APP_NOT_FOUND, }; use crate::store::hybrid::{HybridStore, PersistentStore}; -use crate::store::mem::buffer::BatchMemoryBlock; +use crate::store::mem::buffer_core::BatchMemoryBlock; use log::{debug, error, warn}; use once_cell::sync::OnceCell; use parking_lot::Mutex; diff --git a/riffle-server/src/store/spill/spill_test.rs b/riffle-server/src/store/spill/spill_test.rs index d44326e2..a0623054 100644 --- a/riffle-server/src/store/spill/spill_test.rs +++ b/riffle-server/src/store/spill/spill_test.rs @@ -15,6 +15,7 @@ pub mod tests { }; use crate::runtime::manager::RuntimeManager; use crate::store::hybrid::{HybridStore, PersistentStore}; + use crate::store::mem::buffer_core::BufferOps; use crate::store::spill::spill_test::mock::MockStore; use crate::store::spill::storage_flush_handler::StorageFlushHandler; use crate::store::spill::storage_select_handler::StorageSelectHandler; From c22594e6e54c4aea5bc5301f5cd837fb84aa108e Mon Sep 17 00:00:00 2001 From: thuong Date: Thu, 25 Dec 2025 23:36:04 +0700 Subject: [PATCH 05/18] feat: update test cases for supporting both memory buffer implementation --- riffle-server/src/store/mem/buffer.rs | 326 ++++++++++++++++++++- riffle-server/src/store/mem/buffer_core.rs | 34 --- 2 files changed, 311 insertions(+), 49 deletions(-) diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 1f88a0d3..b7f55137 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -1,4 +1,4 @@ -use super::buffer_core::{BatchMemoryBlock, BufferInternal, BufferOps, BufferSpillResult}; +use super::buffer_core::{BatchMemoryBlock, BufferOps, BufferSpillResult}; use crate::composed_bytes; use crate::composed_bytes::ComposedBytes; use crate::constant::INVALID_BLOCK_ID; @@ -15,19 +15,48 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; #[derive(Debug)] -pub struct MemoryBuffer { - buffer: Mutex, +pub struct OptStagingBufferInternal { + pub total_size: i64, + pub staging_size: i64, + pub flight_size: i64, + + pub staging: Vec, + pub batch_boundaries: Vec, // Track where each batch starts + pub block_position_index: HashMap, // Maps block_id to Vec index + + pub flight: HashMap>, + pub flight_counter: u64, } -impl MemoryBuffer { - pub fn new() -> MemoryBuffer { - MemoryBuffer { - buffer: Mutex::new(BufferInternal::new()), +impl OptStagingBufferInternal { + pub fn new() -> Self { + OptStagingBufferInternal { + total_size: 0, + staging_size: 0, + flight_size: 0, + staging: Vec::new(), + batch_boundaries: Vec::new(), + block_position_index: HashMap::new(), + flight: Default::default(), + flight_counter: 0, } } } -impl BufferOps for MemoryBuffer { +#[derive(Debug)] +pub struct OptStagingMemoryBuffer { + buffer: Mutex, +} + +impl OptStagingMemoryBuffer { + pub fn new() -> OptStagingMemoryBuffer { + OptStagingMemoryBuffer { + buffer: Mutex::new(OptStagingBufferInternal::new()), + } + } +} + +impl BufferOps for OptStagingMemoryBuffer { #[trace] fn total_size(&self) -> Result { return Ok(self.buffer.lock().total_size); @@ -248,6 +277,225 @@ impl BufferOps for MemoryBuffer { } } +/// for tests. +impl OptStagingMemoryBuffer { + fn direct_push(&self, blocks: Vec) -> Result<()> { + let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; + self.append(blocks, len) + } +} + +#[derive(Debug)] +pub struct BufferInternal { + pub total_size: i64, + pub staging_size: i64, + pub flight_size: i64, + + pub staging: BatchMemoryBlock, + + pub flight: HashMap>, + pub flight_counter: u64, +} + +impl BufferInternal { + pub fn new() -> Self { + BufferInternal { + total_size: 0, + staging_size: 0, + flight_size: 0, + staging: Default::default(), + flight: Default::default(), + flight_counter: 0, + } + } +} +pub struct MemoryBuffer { + buffer: Mutex, +} + +impl MemoryBuffer { + pub fn new() -> MemoryBuffer { + MemoryBuffer { + buffer: Mutex::new(BufferInternal::new()), + } + } +} +impl BufferOps for MemoryBuffer { + #[trace] + fn total_size(&self) -> Result { + return Ok(self.buffer.lock().total_size); + } + + #[trace] + fn flight_size(&self) -> Result { + return Ok(self.buffer.lock().flight_size); + } + + #[trace] + fn staging_size(&self) -> Result { + return Ok(self.buffer.lock().staging_size); + } + + #[trace] + fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> { + let mut buffer = self.buffer.lock(); + let flight = &mut buffer.flight; + let removed = flight.remove(&flight_id); + if let Some(block_ref) = removed { + buffer.total_size -= flight_size as i64; + buffer.flight_size -= flight_size as i64; + } + Ok(()) + } + + fn get( + &self, + last_block_id: i64, + read_bytes_limit_len: i64, + task_ids: Option, + ) -> Result { + /// read sequence + /// 1. from flight (expect: last_block_id not found or last_block_id == -1) + /// 2. from staging + let buffer = self.buffer.lock(); + + let mut read_result = vec![]; + let mut read_len = 0i64; + let mut flight_found = false; + + let mut exit = false; + while !exit { + exit = true; + { + if last_block_id == INVALID_BLOCK_ID { + flight_found = true; + } + for (_, batch_block) in buffer.flight.iter() { + for blocks in batch_block.iter() { + for block in blocks { + if !flight_found && block.block_id == last_block_id { + flight_found = true; + continue; + } + if !flight_found { + continue; + } + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + } + } + } + + { + for blocks in buffer.staging.iter() { + for block in blocks { + if !flight_found && block.block_id == last_block_id { + flight_found = true; + continue; + } + if !flight_found { + continue; + } + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + } + } + + if !flight_found { + flight_found = true; + exit = false; + } + } + + let mut block_bytes = Vec::with_capacity(read_result.len()); + let mut segments = Vec::with_capacity(read_result.len()); + let mut offset = 0; + for block in read_result { + let data = &block.data; + block_bytes.push(data.clone()); + segments.push(DataSegment { + block_id: block.block_id, + offset, + length: block.length, + uncompress_length: block.uncompress_length, + crc: block.crc, + task_attempt_id: block.task_attempt_id, + }); + offset += block.length as i64; + } + let total_bytes = offset as usize; + + // Note: is_end is computed as total_bytes < read_bytes_limit_len. This works in general, + // but it can incorrectly be false in the edge case where total_bytes == read_bytes_limit_len + // and the buffer has no more blocks left. In that situation, buffer is actually fully read, + // so the client code may need to perform an additional empty-check to handle this case. + let is_end = total_bytes < read_bytes_limit_len as usize; + + let composed_bytes = ComposedBytes::from(block_bytes, total_bytes); + Ok(PartitionedMemoryData { + shuffle_data_block_segments: segments, + data: DataBytes::Composed(composed_bytes), + is_end, + }) + } + + // when there is no any staging data, it will return the None + fn spill(&self) -> Result> { + let mut buffer = self.buffer.lock(); + if buffer.staging_size == 0 { + return Ok(None); + } + + let staging: BatchMemoryBlock = { mem::replace(&mut buffer.staging, Default::default()) }; + let staging_ref = Arc::new(staging); + let flight_id = buffer.flight_counter; + + let flight = &mut buffer.flight; + flight.insert(flight_id, staging_ref.clone()); + + let spill_size = buffer.staging_size; + buffer.flight_counter += 1; + buffer.flight_size += spill_size; + buffer.staging_size = 0; + + Ok(Some(BufferSpillResult { + flight_id, + flight_len: spill_size as u64, + blocks: staging_ref.clone(), + })) + } + + #[trace] + fn append(&self, blocks: Vec, size: u64) -> Result<()> { + let mut buffer = self.buffer.lock(); + let mut staging = &mut buffer.staging; + staging.push(blocks); + + buffer.staging_size += size as i64; + buffer.total_size += size as i64; + + Ok(()) + } +} + /// for tests. impl MemoryBuffer { fn direct_push(&self, blocks: Vec) -> Result<()> { @@ -258,8 +506,8 @@ impl MemoryBuffer { #[cfg(test)] mod test { - use crate::store::mem::buffer::BufferOps; - use crate::store::mem::buffer::MemoryBuffer; + use crate::store::mem::buffer::{MemoryBuffer, OptStagingMemoryBuffer}; + use crate::store::mem::buffer_core::BufferOps; use crate::store::Block; use hashlink::LinkedHashMap; use std::collections::LinkedList; @@ -292,8 +540,30 @@ mod test { } } - #[test] - fn test_with_block_id_zero() -> anyhow::Result<()> { + trait TestBuffer: BufferOps { + fn new() -> Self; + fn direct_push(&self, blocks: Vec) -> anyhow::Result<()>; + } + + impl TestBuffer for MemoryBuffer { + fn new() -> Self { + MemoryBuffer::new() + } + fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> { + MemoryBuffer::direct_push(self, blocks) + } + } + + impl TestBuffer for OptStagingMemoryBuffer { + fn new() -> Self { + OptStagingMemoryBuffer::new() + } + fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> { + OptStagingMemoryBuffer::direct_push(self, blocks) + } + } + + fn run_test_with_block_id_zero() -> anyhow::Result<()> { let mut buffer = MemoryBuffer::new(); let block_1 = create_block(10, 100); let block_2 = create_block(10, 0); @@ -321,7 +591,13 @@ mod test { } #[test] - fn test_put_get() -> anyhow::Result<()> { + fn test_with_block_id_zero() -> anyhow::Result<()> { + run_test_with_block_id_zero::()?; + run_test_with_block_id_zero::()?; + Ok(()) + } + + fn run_test_put_get() -> anyhow::Result<()> { let mut buffer = MemoryBuffer::new(); /// case1 @@ -423,7 +699,13 @@ mod test { } #[test] - fn test_get_v2_is_end_with_only_staging() -> anyhow::Result<()> { + fn test_put_get() -> anyhow::Result<()> { + run_test_put_get::()?; + run_test_put_get::()?; + Ok(()) + } + + fn run_test_get_v2_is_end_with_only_staging() -> anyhow::Result<()> { let buffer = MemoryBuffer::new(); // 0 -> 10 blocks with total 100 bytes let cnt = 10; @@ -453,7 +735,14 @@ mod test { } #[test] - fn test_get_v2_is_end_across_flight_and_staging() -> anyhow::Result<()> { + fn test_get_v2_is_end_with_only_staging() -> anyhow::Result<()> { + run_test_get_v2_is_end_with_only_staging::()?; + run_test_get_v2_is_end_with_only_staging::()?; + Ok(()) + } + + fn run_test_get_v2_is_end_across_flight_and_staging( + ) -> anyhow::Result<()> { let buffer = MemoryBuffer::new(); // staging: 0..2 @@ -476,4 +765,11 @@ mod test { assert_eq!(result2.is_end, true); // no more data after staging Ok(()) } + + #[test] + fn test_get_v2_is_end_across_flight_and_staging() -> anyhow::Result<()> { + run_test_get_v2_is_end_across_flight_and_staging::()?; + run_test_get_v2_is_end_across_flight_and_staging::()?; + Ok(()) + } } diff --git a/riffle-server/src/store/mem/buffer_core.rs b/riffle-server/src/store/mem/buffer_core.rs index 20dd3fc6..c39714d4 100644 --- a/riffle-server/src/store/mem/buffer_core.rs +++ b/riffle-server/src/store/mem/buffer_core.rs @@ -9,11 +9,6 @@ use std::hash::Hash; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -#[derive(Debug)] -pub struct MemoryBuffer { - buffer: Mutex, -} - #[derive(Default, Debug)] pub struct BatchMemoryBlock(pub Vec>); impl Deref for BatchMemoryBlock { @@ -47,35 +42,6 @@ impl BufferSpillResult { } } -#[derive(Debug)] -pub struct BufferInternal { - pub total_size: i64, - pub staging_size: i64, - pub flight_size: i64, - - pub staging: Vec, - pub batch_boundaries: Vec, // Track where each batch starts - pub block_position_index: HashMap, // Maps block_id to Vec index - - pub flight: HashMap>, - pub flight_counter: u64, -} - -impl BufferInternal { - pub fn new() -> Self { - BufferInternal { - total_size: 0, - staging_size: 0, - flight_size: 0, - staging: Vec::new(), - batch_boundaries: Vec::new(), - block_position_index: HashMap::new(), - flight: Default::default(), - flight_counter: 0, - } - } -} - pub trait BufferOps { /// Returns the total size of the buffer. fn total_size(&self) -> Result; From 97ebedbb9a9f14d1e6f5e08d8633aea9e988f442 Mon Sep 17 00:00:00 2001 From: thuong Date: Thu, 25 Dec 2025 23:44:59 +0700 Subject: [PATCH 06/18] feat: update test cases for supporting both memory buffer implementation --- riffle-server/src/store/mem/buffer.rs | 23 ++++++---------------- riffle-server/src/store/mem/buffer_core.rs | 4 ++++ 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index b7f55137..77f4853a 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -275,10 +275,8 @@ impl BufferOps for OptStagingMemoryBuffer { buffer.total_size += size as i64; Ok(()) } -} -/// for tests. -impl OptStagingMemoryBuffer { + #[cfg(test)] fn direct_push(&self, blocks: Vec) -> Result<()> { let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; self.append(blocks, len) @@ -494,10 +492,8 @@ impl BufferOps for MemoryBuffer { Ok(()) } -} -/// for tests. -impl MemoryBuffer { + #[cfg(test)] fn direct_push(&self, blocks: Vec) -> Result<()> { let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; self.append(blocks, len) @@ -542,29 +538,22 @@ mod test { trait TestBuffer: BufferOps { fn new() -> Self; - fn direct_push(&self, blocks: Vec) -> anyhow::Result<()>; } impl TestBuffer for MemoryBuffer { fn new() -> Self { MemoryBuffer::new() } - fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> { - MemoryBuffer::direct_push(self, blocks) - } } impl TestBuffer for OptStagingMemoryBuffer { fn new() -> Self { OptStagingMemoryBuffer::new() } - fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> { - OptStagingMemoryBuffer::direct_push(self, blocks) - } } fn run_test_with_block_id_zero() -> anyhow::Result<()> { - let mut buffer = MemoryBuffer::new(); + let mut buffer = B::new(); let block_1 = create_block(10, 100); let block_2 = create_block(10, 0); @@ -598,7 +587,7 @@ mod test { } fn run_test_put_get() -> anyhow::Result<()> { - let mut buffer = MemoryBuffer::new(); + let mut buffer = B::new(); /// case1 buffer.direct_push(create_blocks(0, 10, 10))?; @@ -706,7 +695,7 @@ mod test { } fn run_test_get_v2_is_end_with_only_staging() -> anyhow::Result<()> { - let buffer = MemoryBuffer::new(); + let buffer = B::new(); // 0 -> 10 blocks with total 100 bytes let cnt = 10; let block_len = 10; @@ -743,7 +732,7 @@ mod test { fn run_test_get_v2_is_end_across_flight_and_staging( ) -> anyhow::Result<()> { - let buffer = MemoryBuffer::new(); + let buffer = B::new(); // staging: 0..2 buffer.direct_push(create_blocks(0, 3, 5))?; diff --git a/riffle-server/src/store/mem/buffer_core.rs b/riffle-server/src/store/mem/buffer_core.rs index c39714d4..8f583548 100644 --- a/riffle-server/src/store/mem/buffer_core.rs +++ b/riffle-server/src/store/mem/buffer_core.rs @@ -68,4 +68,8 @@ pub trait BufferOps { /// Appends blocks to staging area. fn append(&self, blocks: Vec, size: u64) -> Result<()>; + + /// push directly, just use only in test + #[cfg(test)] + fn direct_push(&self, blocks: Vec) -> anyhow::Result<()>; } From f53bfa3d84051f6754d9a0d1e137005bb0783366 Mon Sep 17 00:00:00 2001 From: thuong Date: Fri, 26 Dec 2025 07:16:23 +0700 Subject: [PATCH 07/18] feat: update test cases for using BufferOps --- riffle-server/src/store/mem/buffer.rs | 126 ++++++++++++++------- riffle-server/src/store/mem/buffer_core.rs | 38 +++++-- riffle-server/src/store/memory.rs | 76 ++++++++----- 3 files changed, 166 insertions(+), 74 deletions(-) diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 77f4853a..adf17750 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -48,32 +48,50 @@ pub struct OptStagingMemoryBuffer { buffer: Mutex, } -impl OptStagingMemoryBuffer { - pub fn new() -> OptStagingMemoryBuffer { +// impl OptStagingMemoryBuffer { +// pub fn new() -> OptStagingMemoryBuffer { +// OptStagingMemoryBuffer { +// buffer: Mutex::new(OptStagingBufferInternal::new()), +// } +// } +// } + +impl BufferOps for OptStagingMemoryBuffer { + #[trace] + fn new() -> OptStagingMemoryBuffer { OptStagingMemoryBuffer { buffer: Mutex::new(OptStagingBufferInternal::new()), } } -} - -impl BufferOps for OptStagingMemoryBuffer { #[trace] - fn total_size(&self) -> Result { + fn total_size(&self) -> Result + where + Self: Send + Sync, + { return Ok(self.buffer.lock().total_size); } #[trace] - fn flight_size(&self) -> Result { + fn flight_size(&self) -> Result + where + Self: Send + Sync, + { return Ok(self.buffer.lock().flight_size); } #[trace] - fn staging_size(&self) -> Result { + fn staging_size(&self) -> Result + where + Self: Send + Sync, + { return Ok(self.buffer.lock().staging_size); } #[trace] - fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> { + fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> + where + Self: Send + Sync, + { let mut buffer = self.buffer.lock(); let flight = &mut buffer.flight; let removed = flight.remove(&flight_id); @@ -84,12 +102,16 @@ impl BufferOps for OptStagingMemoryBuffer { Ok(()) } + #[trace] fn get( &self, last_block_id: i64, read_bytes_limit_len: i64, task_ids: Option, - ) -> Result { + ) -> Result + where + Self: Send + Sync, + { /// read sequence /// 1. from flight (expect: last_block_id not found or last_block_id == -1) /// 2. from staging @@ -199,6 +221,7 @@ impl BufferOps for OptStagingMemoryBuffer { } // when there is no any staging data, it will return the None + #[trace] fn spill(&self) -> Result> { let mut buffer = self.buffer.lock(); if buffer.staging_size == 0 { @@ -277,6 +300,7 @@ impl BufferOps for OptStagingMemoryBuffer { } #[cfg(test)] + #[trace] fn direct_push(&self, blocks: Vec) -> Result<()> { let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; self.append(blocks, len) @@ -311,31 +335,49 @@ pub struct MemoryBuffer { buffer: Mutex, } -impl MemoryBuffer { - pub fn new() -> MemoryBuffer { +// impl MemoryBuffer { +// pub fn new() -> MemoryBuffer { +// MemoryBuffer { +// buffer: Mutex::new(BufferInternal::new()), +// } +// } +// } +impl BufferOps for MemoryBuffer { + #[trace] + fn new() -> MemoryBuffer { MemoryBuffer { buffer: Mutex::new(BufferInternal::new()), } } -} -impl BufferOps for MemoryBuffer { #[trace] - fn total_size(&self) -> Result { + fn total_size(&self) -> Result + where + Self: Send + Sync, + { return Ok(self.buffer.lock().total_size); } #[trace] - fn flight_size(&self) -> Result { + fn flight_size(&self) -> Result + where + Self: Send + Sync, + { return Ok(self.buffer.lock().flight_size); } #[trace] - fn staging_size(&self) -> Result { + fn staging_size(&self) -> Result + where + Self: Send + Sync, + { return Ok(self.buffer.lock().staging_size); } #[trace] - fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> { + fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> + where + Self: Send + Sync, + { let mut buffer = self.buffer.lock(); let flight = &mut buffer.flight; let removed = flight.remove(&flight_id); @@ -346,12 +388,16 @@ impl BufferOps for MemoryBuffer { Ok(()) } + #[trace] fn get( &self, last_block_id: i64, read_bytes_limit_len: i64, task_ids: Option, - ) -> Result { + ) -> Result + where + Self: Send + Sync, + { /// read sequence /// 1. from flight (expect: last_block_id not found or last_block_id == -1) /// 2. from staging @@ -456,7 +502,11 @@ impl BufferOps for MemoryBuffer { } // when there is no any staging data, it will return the None - fn spill(&self) -> Result> { + #[trace] + fn spill(&self) -> Result> + where + Self: Send + Sync, + { let mut buffer = self.buffer.lock(); if buffer.staging_size == 0 { return Ok(None); @@ -482,7 +532,10 @@ impl BufferOps for MemoryBuffer { } #[trace] - fn append(&self, blocks: Vec, size: u64) -> Result<()> { + fn append(&self, blocks: Vec, size: u64) -> Result<()> + where + Self: Send + Sync, + { let mut buffer = self.buffer.lock(); let mut staging = &mut buffer.staging; staging.push(blocks); @@ -494,7 +547,11 @@ impl BufferOps for MemoryBuffer { } #[cfg(test)] - fn direct_push(&self, blocks: Vec) -> Result<()> { + #[trace] + fn direct_push(&self, blocks: Vec) -> Result<()> + where + Self: Send + Sync, + { let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; self.append(blocks, len) } @@ -536,23 +593,13 @@ mod test { } } - trait TestBuffer: BufferOps { - fn new() -> Self; - } + trait TestBuffer: BufferOps {} - impl TestBuffer for MemoryBuffer { - fn new() -> Self { - MemoryBuffer::new() - } - } + impl TestBuffer for MemoryBuffer {} - impl TestBuffer for OptStagingMemoryBuffer { - fn new() -> Self { - OptStagingMemoryBuffer::new() - } - } + impl TestBuffer for OptStagingMemoryBuffer {} - fn run_test_with_block_id_zero() -> anyhow::Result<()> { + fn run_test_with_block_id_zero() -> anyhow::Result<()> { let mut buffer = B::new(); let block_1 = create_block(10, 100); let block_2 = create_block(10, 0); @@ -586,7 +633,7 @@ mod test { Ok(()) } - fn run_test_put_get() -> anyhow::Result<()> { + fn run_test_put_get() -> anyhow::Result<()> { let mut buffer = B::new(); /// case1 @@ -694,7 +741,8 @@ mod test { Ok(()) } - fn run_test_get_v2_is_end_with_only_staging() -> anyhow::Result<()> { + fn run_test_get_v2_is_end_with_only_staging( + ) -> anyhow::Result<()> { let buffer = B::new(); // 0 -> 10 blocks with total 100 bytes let cnt = 10; @@ -730,7 +778,7 @@ mod test { Ok(()) } - fn run_test_get_v2_is_end_across_flight_and_staging( + fn run_test_get_v2_is_end_across_flight_and_staging( ) -> anyhow::Result<()> { let buffer = B::new(); diff --git a/riffle-server/src/store/mem/buffer_core.rs b/riffle-server/src/store/mem/buffer_core.rs index 8f583548..b6e6c713 100644 --- a/riffle-server/src/store/mem/buffer_core.rs +++ b/riffle-server/src/store/mem/buffer_core.rs @@ -43,17 +43,29 @@ impl BufferSpillResult { } pub trait BufferOps { - /// Returns the total size of the buffer. - fn total_size(&self) -> Result; + /// Creates a new buffer instance + fn new() -> Self + where + Self: Sized; + /// Returns the total size of the buffer. + fn total_size(&self) -> Result + where + Self: Send + Sync; /// Returns the size of data in flight (spilled but not cleared). - fn flight_size(&self) -> Result; + fn flight_size(&self) -> Result + where + Self: Send + Sync; /// Returns the size of data in staging (not yet spilled). - fn staging_size(&self) -> Result; + fn staging_size(&self) -> Result + where + Self: Send + Sync; /// Clears a specific flight by ID and size. - fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()>; + fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> + where + Self: Send + Sync; /// Reads data starting after last_block_id, up to read_bytes_limit_len. fn get( @@ -61,15 +73,23 @@ pub trait BufferOps { last_block_id: i64, read_bytes_limit_len: i64, task_ids: Option, - ) -> Result; + ) -> Result + where + Self: Send + Sync; /// Spills staging data to flight, returns None if no staging data. - fn spill(&self) -> Result>; + fn spill(&self) -> Result> + where + Self: Send + Sync; /// Appends blocks to staging area. - fn append(&self, blocks: Vec, size: u64) -> Result<()>; + fn append(&self, blocks: Vec, size: u64) -> Result<()> + where + Self: Send + Sync; /// push directly, just use only in test #[cfg(test)] - fn direct_push(&self, blocks: Vec) -> anyhow::Result<()>; + fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> + where + Self: Send + Sync; } diff --git a/riffle-server/src/store/memory.rs b/riffle-server/src/store/memory.rs index 3b554613..a89f81e3 100644 --- a/riffle-server/src/store/memory.rs +++ b/riffle-server/src/store/memory.rs @@ -51,18 +51,18 @@ use fxhash::{FxBuildHasher, FxHasher}; use log::{debug, info, warn}; use std::sync::Arc; -pub struct MemoryStore { +pub struct MemoryStore { memory_capacity: i64, - state: DDashMap>, + state: DDashMap>, budget: MemoryBudget, runtime_manager: RuntimeManager, ticket_manager: TicketManager, } -unsafe impl Send for MemoryStore {} -unsafe impl Sync for MemoryStore {} +unsafe impl Send for MemoryStore {} +unsafe impl Sync for MemoryStore {} -impl MemoryStore { +impl MemoryStore { // only for test cases pub fn new(max_memory_size: i64) -> Self { let budget = MemoryBudget::new(max_memory_size); @@ -135,7 +135,7 @@ impl MemoryStore { pub fn lookup_spill_buffers( &self, expected_spill_total_bytes: i64, - ) -> Result>, anyhow::Error> { + ) -> Result>, anyhow::Error> { // 1. sort by the staging size. // 2. get the spill buffers until reaching the single max batch size @@ -204,12 +204,11 @@ impl MemoryStore { } // only invoked when inserting - pub fn get_or_create_buffer(&self, uid: PartitionUId) -> Arc { - self.state - .compute_if_absent(uid, || Arc::new(MemoryBuffer::new())) + pub fn get_or_create_buffer(&self, uid: PartitionUId) -> Arc { + self.state.compute_if_absent(uid, || Arc::new(B::new())) } - pub fn get_buffer(&self, uid: &PartitionUId) -> Result> { + pub fn get_buffer(&self, uid: &PartitionUId) -> Result> { let buffer = self.state.get(uid); if buffer.is_none() { return Err(anyhow!(format!( @@ -247,11 +246,10 @@ impl MemoryStore { } #[async_trait] -impl Store for MemoryStore { +impl Store for MemoryStore { fn start(self: Arc) { // ignore } - #[trace] async fn insert(&self, ctx: WritingViewContext) -> Result<(), WorkerError> { let uid = ctx.uid; @@ -409,6 +407,8 @@ mod test { WritingViewContext, }; + use crate::store::mem::buffer::{MemoryBuffer, OptStagingMemoryBuffer}; + use crate::store::mem::buffer_core::BufferOps; use crate::store::memory::MemoryStore; use crate::store::ResponseData::Mem; @@ -424,9 +424,8 @@ mod test { use anyhow::Result; use croaring::Treemap; - #[test] - fn test_read_buffer_in_flight() { - let store = MemoryStore::new(1024); + fn run_test_read_buffer_in_flight() { + let store: MemoryStore = MemoryStore::new(1024); let runtime = store.runtime_manager.clone(); let uid = PartitionUId::new(&Default::default(), 0, 0); @@ -592,10 +591,16 @@ mod test { // assert_eq!(0, mem_data.shuffle_data_block_segments.len()); } - async fn get_data_with_last_block_id( + #[test] + fn test_read_buffer_in_flight() { + run_test_read_buffer_in_flight::(); + run_test_read_buffer_in_flight::(); + } + + async fn get_data_with_last_block_id( default_single_read_size: i64, last_block_id: i64, - store: &MemoryStore, + store: &MemoryStore, uid: PartitionUId, ) -> PartitionedMemoryData { let ctx = ReadingViewContext::new( @@ -635,9 +640,8 @@ mod test { WritingViewContext::create_for_test(uid, data_blocks) } - #[test] - fn test_allocated_and_purge_for_memory() { - let store = MemoryStore::new(1024 * 1024 * 1024); + fn run_test_allocated_and_purge_for_memory() { + let store: MemoryStore = MemoryStore::new(1024 * 1024 * 1024); let runtime = store.runtime_manager.clone(); let app_id = ApplicationId::from("application_1_1_2"); @@ -663,8 +667,13 @@ mod test { } #[test] - fn test_purge() -> Result<()> { - let store = MemoryStore::new(1024); + fn test_allocated_and_purge_for_memory() { + run_test_allocated_and_purge_for_memory::(); + run_test_allocated_and_purge_for_memory::(); + } + + fn run_test_purge() -> Result<()> { + let store: MemoryStore = MemoryStore::new(1024); let runtime = store.runtime_manager.clone(); let shuffle_id = 1; @@ -741,8 +750,13 @@ mod test { } #[test] - fn test_put_and_get_for_memory() { - let store = MemoryStore::new(1024 * 1024 * 1024); + fn test_purge() { + run_test_purge::(); + run_test_purge::(); + } + + fn run_test_put_and_get_for_memory() { + let store: MemoryStore = MemoryStore::new(1024 * 1024 * 1024); let runtime = store.runtime_manager.clone(); let writing_ctx = WritingViewContext::create_for_test( @@ -785,8 +799,13 @@ mod test { } #[test] - fn test_block_id_filter_for_memory() { - let store = MemoryStore::new(1024 * 1024 * 1024); + fn test_put_and_get_for_memory() { + run_test_put_and_get_for_memory::(); + run_test_put_and_get_for_memory::(); + } + + fn run_test_block_id_filter_for_memory() { + let store: MemoryStore = MemoryStore::new(1024 * 1024 * 1024); let runtime = store.runtime_manager.clone(); // 1. insert 2 block @@ -851,4 +870,9 @@ mod test { _ => panic!("should not"), } } + #[test] + fn test_block_id_filter_for_memory() { + run_test_block_id_filter_for_memory::(); + run_test_block_id_filter_for_memory::(); + } } From e05f7b1c3eabeae670fcd1e848d9f1455101ad2d Mon Sep 17 00:00:00 2001 From: thuong Date: Fri, 26 Dec 2025 07:19:33 +0700 Subject: [PATCH 08/18] feat: remove the redundant import line --- riffle-server/src/store/hybrid.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/riffle-server/src/store/hybrid.rs b/riffle-server/src/store/hybrid.rs index 5d4a6b65..d4d194d7 100644 --- a/riffle-server/src/store/hybrid.rs +++ b/riffle-server/src/store/hybrid.rs @@ -38,7 +38,6 @@ use crate::store::{Persistent, RequireBufferResponse, ResponseData, ResponseData use anyhow::{anyhow, Result}; use async_trait::async_trait; -use bytes::buf; use log::{error, info, warn}; use prometheus::core::Atomic; use std::any::Any; From fcc1bf2ed90f58ac74a0650d8e46df787355b901 Mon Sep 17 00:00:00 2001 From: thuong Date: Fri, 26 Dec 2025 07:21:40 +0700 Subject: [PATCH 09/18] feat: remove redundant lines --- riffle-server/src/store/mem/buffer.rs | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index adf17750..a0e86954 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -48,14 +48,6 @@ pub struct OptStagingMemoryBuffer { buffer: Mutex, } -// impl OptStagingMemoryBuffer { -// pub fn new() -> OptStagingMemoryBuffer { -// OptStagingMemoryBuffer { -// buffer: Mutex::new(OptStagingBufferInternal::new()), -// } -// } -// } - impl BufferOps for OptStagingMemoryBuffer { #[trace] fn new() -> OptStagingMemoryBuffer { @@ -335,13 +327,6 @@ pub struct MemoryBuffer { buffer: Mutex, } -// impl MemoryBuffer { -// pub fn new() -> MemoryBuffer { -// MemoryBuffer { -// buffer: Mutex::new(BufferInternal::new()), -// } -// } -// } impl BufferOps for MemoryBuffer { #[trace] fn new() -> MemoryBuffer { From 615b3e18b4ea0d45fe44264328a32775389d8431 Mon Sep 17 00:00:00 2001 From: thuong Date: Fri, 26 Dec 2025 07:24:26 +0700 Subject: [PATCH 10/18] feat: change the ordering of memory_buffer implementations --- riffle-server/src/store/mem/buffer.rs | 334 +++++++++++++------------- 1 file changed, 166 insertions(+), 168 deletions(-) diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index a0e86954..594e6609 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -15,44 +15,38 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; #[derive(Debug)] -pub struct OptStagingBufferInternal { +pub struct BufferInternal { pub total_size: i64, pub staging_size: i64, pub flight_size: i64, - pub staging: Vec, - pub batch_boundaries: Vec, // Track where each batch starts - pub block_position_index: HashMap, // Maps block_id to Vec index + pub staging: BatchMemoryBlock, pub flight: HashMap>, pub flight_counter: u64, } -impl OptStagingBufferInternal { +impl BufferInternal { pub fn new() -> Self { - OptStagingBufferInternal { + BufferInternal { total_size: 0, staging_size: 0, flight_size: 0, - staging: Vec::new(), - batch_boundaries: Vec::new(), - block_position_index: HashMap::new(), + staging: Default::default(), flight: Default::default(), flight_counter: 0, } } } - -#[derive(Debug)] -pub struct OptStagingMemoryBuffer { - buffer: Mutex, +pub struct MemoryBuffer { + buffer: Mutex, } -impl BufferOps for OptStagingMemoryBuffer { +impl BufferOps for MemoryBuffer { #[trace] - fn new() -> OptStagingMemoryBuffer { - OptStagingMemoryBuffer { - buffer: Mutex::new(OptStagingBufferInternal::new()), + fn new() -> MemoryBuffer { + MemoryBuffer { + buffer: Mutex::new(BufferInternal::new()), } } #[trace] @@ -113,16 +107,40 @@ impl BufferOps for OptStagingMemoryBuffer { let mut read_len = 0i64; let mut flight_found = false; - const FIRST_ATTEMP: u8 = 0; - const FALLBACK: u8 = 1; - let strategies = [FIRST_ATTEMP, FALLBACK]; - - for loop_index in strategies { - if last_block_id == INVALID_BLOCK_ID { - flight_found = true; + let mut exit = false; + while !exit { + exit = true; + { + if last_block_id == INVALID_BLOCK_ID { + flight_found = true; + } + for (_, batch_block) in buffer.flight.iter() { + for blocks in batch_block.iter() { + for block in blocks { + if !flight_found && block.block_id == last_block_id { + flight_found = true; + continue; + } + if !flight_found { + continue; + } + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + } + } } - for (_, batch_block) in buffer.flight.iter() { - for blocks in batch_block.iter() { + + { + for blocks in buffer.staging.iter() { for block in blocks { if !flight_found && block.block_id == last_block_id { flight_found = true; @@ -145,38 +163,9 @@ impl BufferOps for OptStagingMemoryBuffer { } } - // Handle staging with Vec + index optimization - let staging_start_idx = if loop_index == FIRST_ATTEMP && !flight_found { - // Try to find position after last_block_id - // Always set flight_found = true for the next searching + if !flight_found { flight_found = true; - if let Some(&position) = buffer.block_position_index.get(&last_block_id) { - position + 1 - } else { - // Not found in staging, will handle in fallback - continue; - } - } else { - // Fallback: read from beginning - 0 - }; - - for block in &buffer.staging[staging_start_idx..] { - if read_len >= read_bytes_limit_len { - break; - } - if let Some(ref expected_task_id) = task_ids { - if !expected_task_id.contains(block.task_attempt_id as u64) { - continue; - } - } - read_len += block.length as i64; - read_result.push(block); - } - - // // If we found data in first attempt, no need for fallback - if flight_found && loop_index == FIRST_ATTEMP { - break; + exit = false; } } @@ -214,39 +203,16 @@ impl BufferOps for OptStagingMemoryBuffer { // when there is no any staging data, it will return the None #[trace] - fn spill(&self) -> Result> { + fn spill(&self) -> Result> + where + Self: Send + Sync, + { let mut buffer = self.buffer.lock(); if buffer.staging_size == 0 { return Ok(None); } - // Reconstruct batches from boundaries - let mut batches = Vec::new(); - let mut start = 0; - for i in 0..buffer.batch_boundaries.len() { - let end = buffer.batch_boundaries[i]; - if end >= buffer.staging.len() { - break; - } - - // Find next boundary or use end of staging - let next_boundary = if i + 1 < buffer.batch_boundaries.len() { - buffer.batch_boundaries[i + 1] - } else { - buffer.staging.len() - }; - - batches.push(buffer.staging[start..next_boundary].to_vec()); - start = next_boundary; - } - - let staging: BatchMemoryBlock = BatchMemoryBlock(batches); - - // Clear everything - buffer.staging.clear(); - buffer.block_position_index.clear(); - buffer.batch_boundaries.clear(); - + let staging: BatchMemoryBlock = { mem::replace(&mut buffer.staging, Default::default()) }; let staging_ref = Arc::new(staging); let flight_id = buffer.flight_counter; @@ -266,72 +232,69 @@ impl BufferOps for OptStagingMemoryBuffer { } #[trace] - fn append(&self, blocks: Vec, size: u64) -> Result<()> { + fn append(&self, blocks: Vec, size: u64) -> Result<()> + where + Self: Send + Sync, + { let mut buffer = self.buffer.lock(); - let current_position = buffer.staging.len(); - let block_count = blocks.len(); - - // Pre-allocate capacities - buffer.staging.reserve(block_count); - buffer.block_position_index.reserve(block_count); - - // Record batch boundary - if !blocks.is_empty() { - buffer.batch_boundaries.push(current_position); - } + let mut staging = &mut buffer.staging; + staging.push(blocks); - for (idx, block) in blocks.into_iter().enumerate() { - buffer - .block_position_index - .insert(block.block_id, current_position + idx); - buffer.staging.push(block); - } buffer.staging_size += size as i64; buffer.total_size += size as i64; + Ok(()) } #[cfg(test)] #[trace] - fn direct_push(&self, blocks: Vec) -> Result<()> { + fn direct_push(&self, blocks: Vec) -> Result<()> + where + Self: Send + Sync, + { let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; self.append(blocks, len) } } - #[derive(Debug)] -pub struct BufferInternal { +pub struct OptStagingBufferInternal { pub total_size: i64, pub staging_size: i64, pub flight_size: i64, - pub staging: BatchMemoryBlock, + pub staging: Vec, + pub batch_boundaries: Vec, // Track where each batch starts + pub block_position_index: HashMap, // Maps block_id to Vec index pub flight: HashMap>, pub flight_counter: u64, } -impl BufferInternal { +impl OptStagingBufferInternal { pub fn new() -> Self { - BufferInternal { + OptStagingBufferInternal { total_size: 0, staging_size: 0, flight_size: 0, - staging: Default::default(), + staging: Vec::new(), + batch_boundaries: Vec::new(), + block_position_index: HashMap::new(), flight: Default::default(), flight_counter: 0, } } } -pub struct MemoryBuffer { - buffer: Mutex, + +#[derive(Debug)] +pub struct OptStagingMemoryBuffer { + buffer: Mutex, } -impl BufferOps for MemoryBuffer { +impl BufferOps for OptStagingMemoryBuffer { #[trace] - fn new() -> MemoryBuffer { - MemoryBuffer { - buffer: Mutex::new(BufferInternal::new()), + fn new() -> OptStagingMemoryBuffer { + OptStagingMemoryBuffer { + buffer: Mutex::new(OptStagingBufferInternal::new()), } } #[trace] @@ -392,40 +355,16 @@ impl BufferOps for MemoryBuffer { let mut read_len = 0i64; let mut flight_found = false; - let mut exit = false; - while !exit { - exit = true; - { - if last_block_id == INVALID_BLOCK_ID { - flight_found = true; - } - for (_, batch_block) in buffer.flight.iter() { - for blocks in batch_block.iter() { - for block in blocks { - if !flight_found && block.block_id == last_block_id { - flight_found = true; - continue; - } - if !flight_found { - continue; - } - if read_len >= read_bytes_limit_len { - break; - } - if let Some(ref expected_task_id) = task_ids { - if !expected_task_id.contains(block.task_attempt_id as u64) { - continue; - } - } - read_len += block.length as i64; - read_result.push(block); - } - } - } - } + const FIRST_ATTEMP: u8 = 0; + const FALLBACK: u8 = 1; + let strategies = [FIRST_ATTEMP, FALLBACK]; - { - for blocks in buffer.staging.iter() { + for loop_index in strategies { + if last_block_id == INVALID_BLOCK_ID { + flight_found = true; + } + for (_, batch_block) in buffer.flight.iter() { + for blocks in batch_block.iter() { for block in blocks { if !flight_found && block.block_id == last_block_id { flight_found = true; @@ -448,9 +387,38 @@ impl BufferOps for MemoryBuffer { } } - if !flight_found { + // Handle staging with Vec + index optimization + let staging_start_idx = if loop_index == FIRST_ATTEMP && !flight_found { + // Try to find position after last_block_id + // Always set flight_found = true for the next searching flight_found = true; - exit = false; + if let Some(&position) = buffer.block_position_index.get(&last_block_id) { + position + 1 + } else { + // Not found in staging, will handle in fallback + continue; + } + } else { + // Fallback: read from beginning + 0 + }; + + for block in &buffer.staging[staging_start_idx..] { + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + + // // If we found data in first attempt, no need for fallback + if flight_found && loop_index == FIRST_ATTEMP { + break; } } @@ -488,16 +456,39 @@ impl BufferOps for MemoryBuffer { // when there is no any staging data, it will return the None #[trace] - fn spill(&self) -> Result> - where - Self: Send + Sync, - { + fn spill(&self) -> Result> { let mut buffer = self.buffer.lock(); if buffer.staging_size == 0 { return Ok(None); } - let staging: BatchMemoryBlock = { mem::replace(&mut buffer.staging, Default::default()) }; + // Reconstruct batches from boundaries + let mut batches = Vec::new(); + let mut start = 0; + for i in 0..buffer.batch_boundaries.len() { + let end = buffer.batch_boundaries[i]; + if end >= buffer.staging.len() { + break; + } + + // Find next boundary or use end of staging + let next_boundary = if i + 1 < buffer.batch_boundaries.len() { + buffer.batch_boundaries[i + 1] + } else { + buffer.staging.len() + }; + + batches.push(buffer.staging[start..next_boundary].to_vec()); + start = next_boundary; + } + + let staging: BatchMemoryBlock = BatchMemoryBlock(batches); + + // Clear everything + buffer.staging.clear(); + buffer.block_position_index.clear(); + buffer.batch_boundaries.clear(); + let staging_ref = Arc::new(staging); let flight_id = buffer.flight_counter; @@ -517,31 +508,38 @@ impl BufferOps for MemoryBuffer { } #[trace] - fn append(&self, blocks: Vec, size: u64) -> Result<()> - where - Self: Send + Sync, - { + fn append(&self, blocks: Vec, size: u64) -> Result<()> { let mut buffer = self.buffer.lock(); - let mut staging = &mut buffer.staging; - staging.push(blocks); + let current_position = buffer.staging.len(); + let block_count = blocks.len(); + // Pre-allocate capacities + buffer.staging.reserve(block_count); + buffer.block_position_index.reserve(block_count); + + // Record batch boundary + if !blocks.is_empty() { + buffer.batch_boundaries.push(current_position); + } + + for (idx, block) in blocks.into_iter().enumerate() { + buffer + .block_position_index + .insert(block.block_id, current_position + idx); + buffer.staging.push(block); + } buffer.staging_size += size as i64; buffer.total_size += size as i64; - Ok(()) } #[cfg(test)] #[trace] - fn direct_push(&self, blocks: Vec) -> Result<()> - where - Self: Send + Sync, - { + fn direct_push(&self, blocks: Vec) -> Result<()> { let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; self.append(blocks, len) } } - #[cfg(test)] mod test { use crate::store::mem::buffer::{MemoryBuffer, OptStagingMemoryBuffer}; From a33a78287747eec0c12717660cd30e84e4c384c4 Mon Sep 17 00:00:00 2001 From: thuong Date: Fri, 26 Dec 2025 07:27:42 +0700 Subject: [PATCH 11/18] convert TestBuffer trait to BufferOps trait at test cases --- riffle-server/src/store/mem/buffer.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 594e6609..fa8b52f7 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -576,13 +576,7 @@ mod test { } } - trait TestBuffer: BufferOps {} - - impl TestBuffer for MemoryBuffer {} - - impl TestBuffer for OptStagingMemoryBuffer {} - - fn run_test_with_block_id_zero() -> anyhow::Result<()> { + fn run_test_with_block_id_zero() -> anyhow::Result<()> { let mut buffer = B::new(); let block_1 = create_block(10, 100); let block_2 = create_block(10, 0); @@ -616,7 +610,7 @@ mod test { Ok(()) } - fn run_test_put_get() -> anyhow::Result<()> { + fn run_test_put_get() -> anyhow::Result<()> { let mut buffer = B::new(); /// case1 @@ -724,7 +718,7 @@ mod test { Ok(()) } - fn run_test_get_v2_is_end_with_only_staging( + fn run_test_get_v2_is_end_with_only_staging( ) -> anyhow::Result<()> { let buffer = B::new(); // 0 -> 10 blocks with total 100 bytes @@ -761,7 +755,7 @@ mod test { Ok(()) } - fn run_test_get_v2_is_end_across_flight_and_staging( + fn run_test_get_v2_is_end_across_flight_and_staging( ) -> anyhow::Result<()> { let buffer = B::new(); From 067220a88aa8c1c29c46c50a16f1c3cb58c5cbe1 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 26 Dec 2025 13:58:27 +0800 Subject: [PATCH 12/18] re-org folders --- riffle-server/src/store/hybrid.rs | 2 +- riffle-server/src/store/mem/buffer.rs | 554 ++---------------- .../src/store/mem/buffer/default_buffer.rs | 250 ++++++++ .../src/store/mem/buffer/opt_buffer.rs | 295 ++++++++++ riffle-server/src/store/mem/buffer_core.rs | 95 --- riffle-server/src/store/mem/mod.rs | 1 - riffle-server/src/store/memory.rs | 9 +- riffle-server/src/store/spill/mod.rs | 2 +- riffle-server/src/store/spill/spill_test.rs | 1 - 9 files changed, 608 insertions(+), 601 deletions(-) create mode 100644 riffle-server/src/store/mem/buffer/default_buffer.rs create mode 100644 riffle-server/src/store/mem/buffer/opt_buffer.rs delete mode 100644 riffle-server/src/store/mem/buffer_core.rs diff --git a/riffle-server/src/store/hybrid.rs b/riffle-server/src/store/hybrid.rs index d4d194d7..a55f25db 100644 --- a/riffle-server/src/store/hybrid.rs +++ b/riffle-server/src/store/hybrid.rs @@ -31,7 +31,6 @@ use crate::metric::{ #[cfg(feature = "hdfs")] use crate::store::hdfs::HdfsStore; use crate::store::localfile::LocalFileStore; -use crate::store::mem::buffer_core::BufferOps; use crate::store::memory::MemoryStore; use crate::store::{Persistent, RequireBufferResponse, ResponseData, ResponseDataIndex, Store}; @@ -62,6 +61,7 @@ use crate::app_manager::AppManagerRef; use crate::config_reconfigure::ReconfigurableConfManager; use crate::runtime::manager::RuntimeManager; use crate::store::local::LocalfileStoreStat; +use crate::store::mem::buffer::default_buffer::MemoryBuffer; use crate::store::mem::buffer::MemoryBuffer; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::spill::hierarchy_event_bus::HierarchyEventBus; diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index fa8b52f7..01b010ba 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -1,94 +1,76 @@ -use super::buffer_core::{BatchMemoryBlock, BufferOps, BufferSpillResult}; -use crate::composed_bytes; +pub mod default_buffer; +pub mod opt_buffer; + use crate::composed_bytes::ComposedBytes; -use crate::constant::INVALID_BLOCK_ID; use crate::store::DataBytes; use crate::store::{Block, DataSegment, PartitionedMemoryData}; use anyhow::Result; use croaring::Treemap; -use fastrace::trace; use parking_lot::Mutex; use std::collections::HashMap; use std::hash::Hash; -use std::mem; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -#[derive(Debug)] -pub struct BufferInternal { - pub total_size: i64, - pub staging_size: i64, - pub flight_size: i64, - - pub staging: BatchMemoryBlock, - - pub flight: HashMap>, - pub flight_counter: u64, +#[derive(Default, Debug)] +pub struct BatchMemoryBlock(pub Vec>); +impl Deref for BatchMemoryBlock { + type Target = Vec>; + fn deref(&self) -> &Self::Target { + &self.0 + } } - -impl BufferInternal { - pub fn new() -> Self { - BufferInternal { - total_size: 0, - staging_size: 0, - flight_size: 0, - staging: Default::default(), - flight: Default::default(), - flight_counter: 0, - } +impl DerefMut for BatchMemoryBlock { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 } } -pub struct MemoryBuffer { - buffer: Mutex, + +#[derive(Debug)] +pub struct BufferSpillResult { + pub flight_id: u64, + pub flight_len: u64, + pub blocks: Arc, } -impl BufferOps for MemoryBuffer { - #[trace] - fn new() -> MemoryBuffer { - MemoryBuffer { - buffer: Mutex::new(BufferInternal::new()), - } +impl BufferSpillResult { + pub fn flight_id(&self) -> u64 { + self.flight_id } - #[trace] - fn total_size(&self) -> Result - where - Self: Send + Sync, - { - return Ok(self.buffer.lock().total_size); + pub fn flight_len(&self) -> u64 { + self.flight_len } + pub fn blocks(&self) -> Arc { + self.blocks.clone() + } +} + +pub trait BufferOps { + /// Creates a new buffer instance + fn new() -> Self + where + Self: Sized; - #[trace] + /// Returns the total size of the buffer. + fn total_size(&self) -> Result + where + Self: Send + Sync; + /// Returns the size of data in flight (spilled but not cleared). fn flight_size(&self) -> Result where - Self: Send + Sync, - { - return Ok(self.buffer.lock().flight_size); - } + Self: Send + Sync; - #[trace] + /// Returns the size of data in staging (not yet spilled). fn staging_size(&self) -> Result where - Self: Send + Sync, - { - return Ok(self.buffer.lock().staging_size); - } + Self: Send + Sync; - #[trace] + /// Clears a specific flight by ID and size. fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> where - Self: Send + Sync, - { - let mut buffer = self.buffer.lock(); - let flight = &mut buffer.flight; - let removed = flight.remove(&flight_id); - if let Some(block_ref) = removed { - buffer.total_size -= flight_size as i64; - buffer.flight_size -= flight_size as i64; - } - Ok(()) - } + Self: Send + Sync; - #[trace] + /// Reads data starting after last_block_id, up to read_bytes_limit_len. fn get( &self, last_block_id: i64, @@ -96,454 +78,30 @@ impl BufferOps for MemoryBuffer { task_ids: Option, ) -> Result where - Self: Send + Sync, - { - /// read sequence - /// 1. from flight (expect: last_block_id not found or last_block_id == -1) - /// 2. from staging - let buffer = self.buffer.lock(); - - let mut read_result = vec![]; - let mut read_len = 0i64; - let mut flight_found = false; - - let mut exit = false; - while !exit { - exit = true; - { - if last_block_id == INVALID_BLOCK_ID { - flight_found = true; - } - for (_, batch_block) in buffer.flight.iter() { - for blocks in batch_block.iter() { - for block in blocks { - if !flight_found && block.block_id == last_block_id { - flight_found = true; - continue; - } - if !flight_found { - continue; - } - if read_len >= read_bytes_limit_len { - break; - } - if let Some(ref expected_task_id) = task_ids { - if !expected_task_id.contains(block.task_attempt_id as u64) { - continue; - } - } - read_len += block.length as i64; - read_result.push(block); - } - } - } - } - - { - for blocks in buffer.staging.iter() { - for block in blocks { - if !flight_found && block.block_id == last_block_id { - flight_found = true; - continue; - } - if !flight_found { - continue; - } - if read_len >= read_bytes_limit_len { - break; - } - if let Some(ref expected_task_id) = task_ids { - if !expected_task_id.contains(block.task_attempt_id as u64) { - continue; - } - } - read_len += block.length as i64; - read_result.push(block); - } - } - } - - if !flight_found { - flight_found = true; - exit = false; - } - } - - let mut block_bytes = Vec::with_capacity(read_result.len()); - let mut segments = Vec::with_capacity(read_result.len()); - let mut offset = 0; - for block in read_result { - let data = &block.data; - block_bytes.push(data.clone()); - segments.push(DataSegment { - block_id: block.block_id, - offset, - length: block.length, - uncompress_length: block.uncompress_length, - crc: block.crc, - task_attempt_id: block.task_attempt_id, - }); - offset += block.length as i64; - } - let total_bytes = offset as usize; - - // Note: is_end is computed as total_bytes < read_bytes_limit_len. This works in general, - // but it can incorrectly be false in the edge case where total_bytes == read_bytes_limit_len - // and the buffer has no more blocks left. In that situation, buffer is actually fully read, - // so the client code may need to perform an additional empty-check to handle this case. - let is_end = total_bytes < read_bytes_limit_len as usize; - - let composed_bytes = ComposedBytes::from(block_bytes, total_bytes); - Ok(PartitionedMemoryData { - shuffle_data_block_segments: segments, - data: DataBytes::Composed(composed_bytes), - is_end, - }) - } + Self: Send + Sync; - // when there is no any staging data, it will return the None - #[trace] + /// Spills staging data to flight, returns None if no staging data. fn spill(&self) -> Result> where - Self: Send + Sync, - { - let mut buffer = self.buffer.lock(); - if buffer.staging_size == 0 { - return Ok(None); - } - - let staging: BatchMemoryBlock = { mem::replace(&mut buffer.staging, Default::default()) }; - let staging_ref = Arc::new(staging); - let flight_id = buffer.flight_counter; - - let flight = &mut buffer.flight; - flight.insert(flight_id, staging_ref.clone()); - - let spill_size = buffer.staging_size; - buffer.flight_counter += 1; - buffer.flight_size += spill_size; - buffer.staging_size = 0; - - Ok(Some(BufferSpillResult { - flight_id, - flight_len: spill_size as u64, - blocks: staging_ref.clone(), - })) - } + Self: Send + Sync; - #[trace] + /// Appends blocks to staging area. fn append(&self, blocks: Vec, size: u64) -> Result<()> where - Self: Send + Sync, - { - let mut buffer = self.buffer.lock(); - let mut staging = &mut buffer.staging; - staging.push(blocks); - - buffer.staging_size += size as i64; - buffer.total_size += size as i64; - - Ok(()) - } + Self: Send + Sync; + /// push directly, just use only in test #[cfg(test)] - #[trace] - fn direct_push(&self, blocks: Vec) -> Result<()> + fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> where - Self: Send + Sync, - { - let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; - self.append(blocks, len) - } + Self: Send + Sync; } -#[derive(Debug)] -pub struct OptStagingBufferInternal { - pub total_size: i64, - pub staging_size: i64, - pub flight_size: i64, - pub staging: Vec, - pub batch_boundaries: Vec, // Track where each batch starts - pub block_position_index: HashMap, // Maps block_id to Vec index - - pub flight: HashMap>, - pub flight_counter: u64, -} - -impl OptStagingBufferInternal { - pub fn new() -> Self { - OptStagingBufferInternal { - total_size: 0, - staging_size: 0, - flight_size: 0, - staging: Vec::new(), - batch_boundaries: Vec::new(), - block_position_index: HashMap::new(), - flight: Default::default(), - flight_counter: 0, - } - } -} - -#[derive(Debug)] -pub struct OptStagingMemoryBuffer { - buffer: Mutex, -} - -impl BufferOps for OptStagingMemoryBuffer { - #[trace] - fn new() -> OptStagingMemoryBuffer { - OptStagingMemoryBuffer { - buffer: Mutex::new(OptStagingBufferInternal::new()), - } - } - #[trace] - fn total_size(&self) -> Result - where - Self: Send + Sync, - { - return Ok(self.buffer.lock().total_size); - } - - #[trace] - fn flight_size(&self) -> Result - where - Self: Send + Sync, - { - return Ok(self.buffer.lock().flight_size); - } - - #[trace] - fn staging_size(&self) -> Result - where - Self: Send + Sync, - { - return Ok(self.buffer.lock().staging_size); - } - - #[trace] - fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> - where - Self: Send + Sync, - { - let mut buffer = self.buffer.lock(); - let flight = &mut buffer.flight; - let removed = flight.remove(&flight_id); - if let Some(block_ref) = removed { - buffer.total_size -= flight_size as i64; - buffer.flight_size -= flight_size as i64; - } - Ok(()) - } - - #[trace] - fn get( - &self, - last_block_id: i64, - read_bytes_limit_len: i64, - task_ids: Option, - ) -> Result - where - Self: Send + Sync, - { - /// read sequence - /// 1. from flight (expect: last_block_id not found or last_block_id == -1) - /// 2. from staging - let buffer = self.buffer.lock(); - - let mut read_result = vec![]; - let mut read_len = 0i64; - let mut flight_found = false; - - const FIRST_ATTEMP: u8 = 0; - const FALLBACK: u8 = 1; - let strategies = [FIRST_ATTEMP, FALLBACK]; - - for loop_index in strategies { - if last_block_id == INVALID_BLOCK_ID { - flight_found = true; - } - for (_, batch_block) in buffer.flight.iter() { - for blocks in batch_block.iter() { - for block in blocks { - if !flight_found && block.block_id == last_block_id { - flight_found = true; - continue; - } - if !flight_found { - continue; - } - if read_len >= read_bytes_limit_len { - break; - } - if let Some(ref expected_task_id) = task_ids { - if !expected_task_id.contains(block.task_attempt_id as u64) { - continue; - } - } - read_len += block.length as i64; - read_result.push(block); - } - } - } - - // Handle staging with Vec + index optimization - let staging_start_idx = if loop_index == FIRST_ATTEMP && !flight_found { - // Try to find position after last_block_id - // Always set flight_found = true for the next searching - flight_found = true; - if let Some(&position) = buffer.block_position_index.get(&last_block_id) { - position + 1 - } else { - // Not found in staging, will handle in fallback - continue; - } - } else { - // Fallback: read from beginning - 0 - }; - - for block in &buffer.staging[staging_start_idx..] { - if read_len >= read_bytes_limit_len { - break; - } - if let Some(ref expected_task_id) = task_ids { - if !expected_task_id.contains(block.task_attempt_id as u64) { - continue; - } - } - read_len += block.length as i64; - read_result.push(block); - } - - // // If we found data in first attempt, no need for fallback - if flight_found && loop_index == FIRST_ATTEMP { - break; - } - } - - let mut block_bytes = Vec::with_capacity(read_result.len()); - let mut segments = Vec::with_capacity(read_result.len()); - let mut offset = 0; - for block in read_result { - let data = &block.data; - block_bytes.push(data.clone()); - segments.push(DataSegment { - block_id: block.block_id, - offset, - length: block.length, - uncompress_length: block.uncompress_length, - crc: block.crc, - task_attempt_id: block.task_attempt_id, - }); - offset += block.length as i64; - } - let total_bytes = offset as usize; - - // Note: is_end is computed as total_bytes < read_bytes_limit_len. This works in general, - // but it can incorrectly be false in the edge case where total_bytes == read_bytes_limit_len - // and the buffer has no more blocks left. In that situation, buffer is actually fully read, - // so the client code may need to perform an additional empty-check to handle this case. - let is_end = total_bytes < read_bytes_limit_len as usize; - - let composed_bytes = ComposedBytes::from(block_bytes, total_bytes); - Ok(PartitionedMemoryData { - shuffle_data_block_segments: segments, - data: DataBytes::Composed(composed_bytes), - is_end, - }) - } - - // when there is no any staging data, it will return the None - #[trace] - fn spill(&self) -> Result> { - let mut buffer = self.buffer.lock(); - if buffer.staging_size == 0 { - return Ok(None); - } - - // Reconstruct batches from boundaries - let mut batches = Vec::new(); - let mut start = 0; - for i in 0..buffer.batch_boundaries.len() { - let end = buffer.batch_boundaries[i]; - if end >= buffer.staging.len() { - break; - } - - // Find next boundary or use end of staging - let next_boundary = if i + 1 < buffer.batch_boundaries.len() { - buffer.batch_boundaries[i + 1] - } else { - buffer.staging.len() - }; - - batches.push(buffer.staging[start..next_boundary].to_vec()); - start = next_boundary; - } - - let staging: BatchMemoryBlock = BatchMemoryBlock(batches); - - // Clear everything - buffer.staging.clear(); - buffer.block_position_index.clear(); - buffer.batch_boundaries.clear(); - - let staging_ref = Arc::new(staging); - let flight_id = buffer.flight_counter; - - let flight = &mut buffer.flight; - flight.insert(flight_id, staging_ref.clone()); - - let spill_size = buffer.staging_size; - buffer.flight_counter += 1; - buffer.flight_size += spill_size; - buffer.staging_size = 0; - - Ok(Some(BufferSpillResult { - flight_id, - flight_len: spill_size as u64, - blocks: staging_ref.clone(), - })) - } - - #[trace] - fn append(&self, blocks: Vec, size: u64) -> Result<()> { - let mut buffer = self.buffer.lock(); - let current_position = buffer.staging.len(); - let block_count = blocks.len(); - - // Pre-allocate capacities - buffer.staging.reserve(block_count); - buffer.block_position_index.reserve(block_count); - - // Record batch boundary - if !blocks.is_empty() { - buffer.batch_boundaries.push(current_position); - } - - for (idx, block) in blocks.into_iter().enumerate() { - buffer - .block_position_index - .insert(block.block_id, current_position + idx); - buffer.staging.push(block); - } - buffer.staging_size += size as i64; - buffer.total_size += size as i64; - Ok(()) - } - - #[cfg(test)] - #[trace] - fn direct_push(&self, blocks: Vec) -> Result<()> { - let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; - self.append(blocks, len) - } -} #[cfg(test)] mod test { - use crate::store::mem::buffer::{MemoryBuffer, OptStagingMemoryBuffer}; - use crate::store::mem::buffer_core::BufferOps; + use crate::store::mem::buffer::default_buffer::MemoryBuffer; + use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; + use crate::store::mem::buffer::BufferOps; use crate::store::Block; use hashlink::LinkedHashMap; use std::collections::LinkedList; diff --git a/riffle-server/src/store/mem/buffer/default_buffer.rs b/riffle-server/src/store/mem/buffer/default_buffer.rs new file mode 100644 index 00000000..a3c03100 --- /dev/null +++ b/riffle-server/src/store/mem/buffer/default_buffer.rs @@ -0,0 +1,250 @@ +use super::{BatchMemoryBlock, BufferOps, BufferSpillResult}; +use crate::composed_bytes; +use crate::composed_bytes::ComposedBytes; +use crate::constant::INVALID_BLOCK_ID; +use crate::store::DataBytes; +use crate::store::{Block, DataSegment, PartitionedMemoryData}; +use anyhow::Result; +use croaring::Treemap; +use fastrace::trace; +use parking_lot::Mutex; +use std::collections::HashMap; +use std::hash::Hash; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; + +#[derive(Debug)] +pub struct BufferInternal { + pub total_size: i64, + pub staging_size: i64, + pub flight_size: i64, + + pub staging: BatchMemoryBlock, + + pub flight: HashMap>, + pub flight_counter: u64, +} + +impl BufferInternal { + pub fn new() -> Self { + BufferInternal { + total_size: 0, + staging_size: 0, + flight_size: 0, + staging: Default::default(), + flight: Default::default(), + flight_counter: 0, + } + } +} +pub struct MemoryBuffer { + buffer: Mutex, +} + +impl BufferOps for MemoryBuffer { + fn new() -> MemoryBuffer { + MemoryBuffer { + buffer: Mutex::new(BufferInternal::new()), + } + } + + fn total_size(&self) -> Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().total_size); + } + + fn flight_size(&self) -> Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().flight_size); + } + + fn staging_size(&self) -> Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().staging_size); + } + + fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> + where + Self: Send + Sync, + { + let mut buffer = self.buffer.lock(); + let flight = &mut buffer.flight; + let removed = flight.remove(&flight_id); + if let Some(block_ref) = removed { + buffer.total_size -= flight_size as i64; + buffer.flight_size -= flight_size as i64; + } + Ok(()) + } + + fn get( + &self, + last_block_id: i64, + read_bytes_limit_len: i64, + task_ids: Option, + ) -> Result + where + Self: Send + Sync, + { + /// read sequence + /// 1. from flight (expect: last_block_id not found or last_block_id == -1) + /// 2. from staging + let buffer = self.buffer.lock(); + + let mut read_result = vec![]; + let mut read_len = 0i64; + let mut flight_found = false; + + let mut exit = false; + while !exit { + exit = true; + { + if last_block_id == INVALID_BLOCK_ID { + flight_found = true; + } + for (_, batch_block) in buffer.flight.iter() { + for blocks in batch_block.iter() { + for block in blocks { + if !flight_found && block.block_id == last_block_id { + flight_found = true; + continue; + } + if !flight_found { + continue; + } + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + } + } + } + + { + for blocks in buffer.staging.iter() { + for block in blocks { + if !flight_found && block.block_id == last_block_id { + flight_found = true; + continue; + } + if !flight_found { + continue; + } + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + } + } + + if !flight_found { + flight_found = true; + exit = false; + } + } + + let mut block_bytes = Vec::with_capacity(read_result.len()); + let mut segments = Vec::with_capacity(read_result.len()); + let mut offset = 0; + for block in read_result { + let data = &block.data; + block_bytes.push(data.clone()); + segments.push(DataSegment { + block_id: block.block_id, + offset, + length: block.length, + uncompress_length: block.uncompress_length, + crc: block.crc, + task_attempt_id: block.task_attempt_id, + }); + offset += block.length as i64; + } + let total_bytes = offset as usize; + + // Note: is_end is computed as total_bytes < read_bytes_limit_len. This works in general, + // but it can incorrectly be false in the edge case where total_bytes == read_bytes_limit_len + // and the buffer has no more blocks left. In that situation, buffer is actually fully read, + // so the client code may need to perform an additional empty-check to handle this case. + let is_end = total_bytes < read_bytes_limit_len as usize; + + let composed_bytes = ComposedBytes::from(block_bytes, total_bytes); + Ok(PartitionedMemoryData { + shuffle_data_block_segments: segments, + data: DataBytes::Composed(composed_bytes), + is_end, + }) + } + + // when there is no any staging data, it will return the None + fn spill(&self) -> Result> + where + Self: Send + Sync, + { + let mut buffer = self.buffer.lock(); + if buffer.staging_size == 0 { + return Ok(None); + } + + let staging: BatchMemoryBlock = { mem::replace(&mut buffer.staging, Default::default()) }; + let staging_ref = Arc::new(staging); + let flight_id = buffer.flight_counter; + + let flight = &mut buffer.flight; + flight.insert(flight_id, staging_ref.clone()); + + let spill_size = buffer.staging_size; + buffer.flight_counter += 1; + buffer.flight_size += spill_size; + buffer.staging_size = 0; + + Ok(Some(BufferSpillResult { + flight_id, + flight_len: spill_size as u64, + blocks: staging_ref.clone(), + })) + } + + fn append(&self, blocks: Vec, size: u64) -> Result<()> + where + Self: Send + Sync, + { + let mut buffer = self.buffer.lock(); + let mut staging = &mut buffer.staging; + staging.push(blocks); + + buffer.staging_size += size as i64; + buffer.total_size += size as i64; + + Ok(()) + } + + #[cfg(test)] + fn direct_push(&self, blocks: Vec) -> Result<()> + where + Self: Send + Sync, + { + let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; + self.append(blocks, len) + } +} diff --git a/riffle-server/src/store/mem/buffer/opt_buffer.rs b/riffle-server/src/store/mem/buffer/opt_buffer.rs new file mode 100644 index 00000000..30063998 --- /dev/null +++ b/riffle-server/src/store/mem/buffer/opt_buffer.rs @@ -0,0 +1,295 @@ +use crate::composed_bytes::ComposedBytes; +use crate::constant::INVALID_BLOCK_ID; +use crate::store::mem::buffer::{BatchMemoryBlock, BufferOps, BufferSpillResult}; +use crate::store::{Block, DataBytes, DataSegment, PartitionedMemoryData}; +use croaring::Treemap; +use fastrace::trace; +use parking_lot::Mutex; +use std::collections::HashMap; +use std::sync::Arc; + +/// the optimized implementation is from https://github.com/zuston/riffle/pull/564 +#[derive(Debug)] +pub struct OptStagingBufferInternal { + pub total_size: i64, + pub staging_size: i64, + pub flight_size: i64, + + pub staging: Vec, + pub batch_boundaries: Vec, // Track where each batch starts + pub block_position_index: HashMap, // Maps block_id to Vec index + + pub flight: HashMap>, + pub flight_counter: u64, +} + +impl OptStagingBufferInternal { + pub fn new() -> Self { + OptStagingBufferInternal { + total_size: 0, + staging_size: 0, + flight_size: 0, + staging: Vec::new(), + batch_boundaries: Vec::new(), + block_position_index: HashMap::new(), + flight: Default::default(), + flight_counter: 0, + } + } +} + +#[derive(Debug)] +pub struct OptStagingMemoryBuffer { + buffer: Mutex, +} + +impl BufferOps for OptStagingMemoryBuffer { + #[trace] + fn new() -> OptStagingMemoryBuffer { + OptStagingMemoryBuffer { + buffer: Mutex::new(OptStagingBufferInternal::new()), + } + } + #[trace] + fn total_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().total_size); + } + + #[trace] + fn flight_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().flight_size); + } + + #[trace] + fn staging_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + return Ok(self.buffer.lock().staging_size); + } + + #[trace] + fn clear(&self, flight_id: u64, flight_size: u64) -> anyhow::Result<()> + where + Self: Send + Sync, + { + let mut buffer = self.buffer.lock(); + let flight = &mut buffer.flight; + let removed = flight.remove(&flight_id); + if let Some(block_ref) = removed { + buffer.total_size -= flight_size as i64; + buffer.flight_size -= flight_size as i64; + } + Ok(()) + } + + #[trace] + fn get( + &self, + last_block_id: i64, + read_bytes_limit_len: i64, + task_ids: Option, + ) -> anyhow::Result + where + Self: Send + Sync, + { + /// read sequence + /// 1. from flight (expect: last_block_id not found or last_block_id == -1) + /// 2. from staging + let buffer = self.buffer.lock(); + + let mut read_result = vec![]; + let mut read_len = 0i64; + let mut flight_found = false; + + const FIRST_ATTEMP: u8 = 0; + const FALLBACK: u8 = 1; + let strategies = [FIRST_ATTEMP, FALLBACK]; + + for loop_index in strategies { + if last_block_id == INVALID_BLOCK_ID { + flight_found = true; + } + for (_, batch_block) in buffer.flight.iter() { + for blocks in batch_block.iter() { + for block in blocks { + if !flight_found && block.block_id == last_block_id { + flight_found = true; + continue; + } + if !flight_found { + continue; + } + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + } + } + + // Handle staging with Vec + index optimization + let staging_start_idx = if loop_index == FIRST_ATTEMP && !flight_found { + // Try to find position after last_block_id + // Always set flight_found = true for the next searching + flight_found = true; + if let Some(&position) = buffer.block_position_index.get(&last_block_id) { + position + 1 + } else { + // Not found in staging, will handle in fallback + continue; + } + } else { + // Fallback: read from beginning + 0 + }; + + for block in &buffer.staging[staging_start_idx..] { + if read_len >= read_bytes_limit_len { + break; + } + if let Some(ref expected_task_id) = task_ids { + if !expected_task_id.contains(block.task_attempt_id as u64) { + continue; + } + } + read_len += block.length as i64; + read_result.push(block); + } + + // // If we found data in first attempt, no need for fallback + if flight_found && loop_index == FIRST_ATTEMP { + break; + } + } + + let mut block_bytes = Vec::with_capacity(read_result.len()); + let mut segments = Vec::with_capacity(read_result.len()); + let mut offset = 0; + for block in read_result { + let data = &block.data; + block_bytes.push(data.clone()); + segments.push(DataSegment { + block_id: block.block_id, + offset, + length: block.length, + uncompress_length: block.uncompress_length, + crc: block.crc, + task_attempt_id: block.task_attempt_id, + }); + offset += block.length as i64; + } + let total_bytes = offset as usize; + + // Note: is_end is computed as total_bytes < read_bytes_limit_len. This works in general, + // but it can incorrectly be false in the edge case where total_bytes == read_bytes_limit_len + // and the buffer has no more blocks left. In that situation, buffer is actually fully read, + // so the client code may need to perform an additional empty-check to handle this case. + let is_end = total_bytes < read_bytes_limit_len as usize; + + let composed_bytes = ComposedBytes::from(block_bytes, total_bytes); + Ok(PartitionedMemoryData { + shuffle_data_block_segments: segments, + data: DataBytes::Composed(composed_bytes), + is_end, + }) + } + + // when there is no any staging data, it will return the None + #[trace] + fn spill(&self) -> anyhow::Result> { + let mut buffer = self.buffer.lock(); + if buffer.staging_size == 0 { + return Ok(None); + } + + // Reconstruct batches from boundaries + let mut batches = Vec::new(); + let mut start = 0; + for i in 0..buffer.batch_boundaries.len() { + let end = buffer.batch_boundaries[i]; + if end >= buffer.staging.len() { + break; + } + + // Find next boundary or use end of staging + let next_boundary = if i + 1 < buffer.batch_boundaries.len() { + buffer.batch_boundaries[i + 1] + } else { + buffer.staging.len() + }; + + batches.push(buffer.staging[start..next_boundary].to_vec()); + start = next_boundary; + } + + let staging: BatchMemoryBlock = BatchMemoryBlock(batches); + + // Clear everything + buffer.staging.clear(); + buffer.block_position_index.clear(); + buffer.batch_boundaries.clear(); + + let staging_ref = Arc::new(staging); + let flight_id = buffer.flight_counter; + + let flight = &mut buffer.flight; + flight.insert(flight_id, staging_ref.clone()); + + let spill_size = buffer.staging_size; + buffer.flight_counter += 1; + buffer.flight_size += spill_size; + buffer.staging_size = 0; + + Ok(Some(BufferSpillResult { + flight_id, + flight_len: spill_size as u64, + blocks: staging_ref.clone(), + })) + } + + #[trace] + fn append(&self, blocks: Vec, size: u64) -> anyhow::Result<()> { + let mut buffer = self.buffer.lock(); + let current_position = buffer.staging.len(); + let block_count = blocks.len(); + + // Pre-allocate capacities + buffer.staging.reserve(block_count); + buffer.block_position_index.reserve(block_count); + + // Record batch boundary + if !blocks.is_empty() { + buffer.batch_boundaries.push(current_position); + } + + for (idx, block) in blocks.into_iter().enumerate() { + buffer + .block_position_index + .insert(block.block_id, current_position + idx); + buffer.staging.push(block); + } + buffer.staging_size += size as i64; + buffer.total_size += size as i64; + Ok(()) + } + + #[cfg(test)] + #[trace] + fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> { + let len: u64 = blocks.iter().map(|block| block.length).sum::() as u64; + self.append(blocks, len) + } +} diff --git a/riffle-server/src/store/mem/buffer_core.rs b/riffle-server/src/store/mem/buffer_core.rs deleted file mode 100644 index b6e6c713..00000000 --- a/riffle-server/src/store/mem/buffer_core.rs +++ /dev/null @@ -1,95 +0,0 @@ -use crate::composed_bytes::ComposedBytes; -use crate::store::DataBytes; -use crate::store::{Block, DataSegment, PartitionedMemoryData}; -use anyhow::Result; -use croaring::Treemap; -use parking_lot::Mutex; -use std::collections::HashMap; -use std::hash::Hash; -use std::ops::{Deref, DerefMut}; -use std::sync::Arc; - -#[derive(Default, Debug)] -pub struct BatchMemoryBlock(pub Vec>); -impl Deref for BatchMemoryBlock { - type Target = Vec>; - fn deref(&self) -> &Self::Target { - &self.0 - } -} -impl DerefMut for BatchMemoryBlock { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -#[derive(Debug)] -pub struct BufferSpillResult { - pub flight_id: u64, - pub flight_len: u64, - pub blocks: Arc, -} - -impl BufferSpillResult { - pub fn flight_id(&self) -> u64 { - self.flight_id - } - pub fn flight_len(&self) -> u64 { - self.flight_len - } - pub fn blocks(&self) -> Arc { - self.blocks.clone() - } -} - -pub trait BufferOps { - /// Creates a new buffer instance - fn new() -> Self - where - Self: Sized; - - /// Returns the total size of the buffer. - fn total_size(&self) -> Result - where - Self: Send + Sync; - /// Returns the size of data in flight (spilled but not cleared). - fn flight_size(&self) -> Result - where - Self: Send + Sync; - - /// Returns the size of data in staging (not yet spilled). - fn staging_size(&self) -> Result - where - Self: Send + Sync; - - /// Clears a specific flight by ID and size. - fn clear(&self, flight_id: u64, flight_size: u64) -> Result<()> - where - Self: Send + Sync; - - /// Reads data starting after last_block_id, up to read_bytes_limit_len. - fn get( - &self, - last_block_id: i64, - read_bytes_limit_len: i64, - task_ids: Option, - ) -> Result - where - Self: Send + Sync; - - /// Spills staging data to flight, returns None if no staging data. - fn spill(&self) -> Result> - where - Self: Send + Sync; - - /// Appends blocks to staging area. - fn append(&self, blocks: Vec, size: u64) -> Result<()> - where - Self: Send + Sync; - - /// push directly, just use only in test - #[cfg(test)] - fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> - where - Self: Send + Sync; -} diff --git a/riffle-server/src/store/mem/mod.rs b/riffle-server/src/store/mem/mod.rs index cbbdf05e..2b501a9b 100644 --- a/riffle-server/src/store/mem/mod.rs +++ b/riffle-server/src/store/mem/mod.rs @@ -17,7 +17,6 @@ pub mod budget; pub mod buffer; -pub mod buffer_core; pub mod capacity; pub mod ticket; pub use await_tree::InstrumentAwait; diff --git a/riffle-server/src/store/memory.rs b/riffle-server/src/store/memory.rs index a89f81e3..77b37584 100644 --- a/riffle-server/src/store/memory.rs +++ b/riffle-server/src/store/memory.rs @@ -37,8 +37,8 @@ use crate::app_manager::partition_identifier::PartitionUId; use crate::ddashmap::DDashMap; use crate::runtime::manager::RuntimeManager; use crate::store::mem::budget::MemoryBudget; -use crate::store::mem::buffer::MemoryBuffer; -use crate::store::mem::buffer_core::BufferOps; +use crate::store::mem::buffer::default_buffer::MemoryBuffer; +use crate::store::mem::buffer::BufferOps; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::mem::ticket::TicketManager; use crate::store::spill::SpillWritingViewContext; @@ -407,8 +407,6 @@ mod test { WritingViewContext, }; - use crate::store::mem::buffer::{MemoryBuffer, OptStagingMemoryBuffer}; - use crate::store::mem::buffer_core::BufferOps; use crate::store::memory::MemoryStore; use crate::store::ResponseData::Mem; @@ -421,6 +419,9 @@ mod test { use crate::app_manager::application_identifier::ApplicationId; use crate::app_manager::partition_identifier::PartitionUId; use crate::app_manager::purge_event::PurgeReason; + use crate::store::mem::buffer::default_buffer::MemoryBuffer; + use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; + use crate::store::mem::buffer::BufferOps; use anyhow::Result; use croaring::Treemap; diff --git a/riffle-server/src/store/spill/mod.rs b/riffle-server/src/store/spill/mod.rs index 3babaeca..0e099c4f 100644 --- a/riffle-server/src/store/spill/mod.rs +++ b/riffle-server/src/store/spill/mod.rs @@ -8,7 +8,7 @@ use crate::metric::{ TOTAL_SPILL_EVENTS_DROPPED_WITH_APP_NOT_FOUND, }; use crate::store::hybrid::{HybridStore, PersistentStore}; -use crate::store::mem::buffer_core::BatchMemoryBlock; +use crate::store::mem::buffer::BatchMemoryBlock; use log::{debug, error, warn}; use once_cell::sync::OnceCell; use parking_lot::Mutex; diff --git a/riffle-server/src/store/spill/spill_test.rs b/riffle-server/src/store/spill/spill_test.rs index a0623054..d44326e2 100644 --- a/riffle-server/src/store/spill/spill_test.rs +++ b/riffle-server/src/store/spill/spill_test.rs @@ -15,7 +15,6 @@ pub mod tests { }; use crate::runtime::manager::RuntimeManager; use crate::store::hybrid::{HybridStore, PersistentStore}; - use crate::store::mem::buffer_core::BufferOps; use crate::store::spill::spill_test::mock::MockStore; use crate::store::spill::storage_flush_handler::StorageFlushHandler; use crate::store::spill::storage_select_handler::StorageSelectHandler; From 4e74b4daf0cbe6371c5565aa8b187c27b8ff5b13 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 26 Dec 2025 14:20:41 +0800 Subject: [PATCH 13/18] enum to avoid generic type leak --- riffle-server/src/config.rs | 10 +++ riffle-server/src/store/hybrid.rs | 2 +- riffle-server/src/store/mem/buffer.rs | 12 +++ .../src/store/mem/buffer/route_buffer.rs | 82 +++++++++++++++++++ riffle-server/src/store/memory.rs | 1 + 5 files changed, 106 insertions(+), 1 deletion(-) create mode 100644 riffle-server/src/store/mem/buffer/route_buffer.rs diff --git a/riffle-server/src/config.rs b/riffle-server/src/config.rs index aba3e72f..38c66c06 100644 --- a/riffle-server/src/config.rs +++ b/riffle-server/src/config.rs @@ -16,6 +16,7 @@ // under the License. use crate::block_id_manager::BlockIdManagerType; +use crate::store::mem::buffer::MemoryBufferType; use crate::store::ResponseDataIndex::Local; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -31,6 +32,13 @@ pub struct MemoryStoreConfig { pub buffer_ticket_timeout_sec: i64, #[serde(default = "as_default_buffer_ticket_timeout_check_interval_sec")] pub buffer_ticket_check_interval_sec: i64, + + #[serde(default = "as_default_buffer_type")] + pub buffer_type: MemoryBufferType, +} + +fn as_default_buffer_type() -> MemoryBufferType { + MemoryBufferType::DEFAULT } fn as_default_buffer_ticket_timeout_check_interval_sec() -> i64 { @@ -47,6 +55,7 @@ impl MemoryStoreConfig { capacity, buffer_ticket_timeout_sec: as_default_buffer_ticket_timeout_sec(), buffer_ticket_check_interval_sec: as_default_buffer_ticket_timeout_check_interval_sec(), + buffer_type: MemoryBufferType::DEFAULT, } } @@ -55,6 +64,7 @@ impl MemoryStoreConfig { capacity, buffer_ticket_timeout_sec, buffer_ticket_check_interval_sec: as_default_buffer_ticket_timeout_check_interval_sec(), + buffer_type: MemoryBufferType::DEFAULT, } } } diff --git a/riffle-server/src/store/hybrid.rs b/riffle-server/src/store/hybrid.rs index a55f25db..2f28539e 100644 --- a/riffle-server/src/store/hybrid.rs +++ b/riffle-server/src/store/hybrid.rs @@ -62,7 +62,7 @@ use crate::config_reconfigure::ReconfigurableConfManager; use crate::runtime::manager::RuntimeManager; use crate::store::local::LocalfileStoreStat; use crate::store::mem::buffer::default_buffer::MemoryBuffer; -use crate::store::mem::buffer::MemoryBuffer; +use crate::store::mem::buffer::BufferOps; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::spill::hierarchy_event_bus::HierarchyEventBus; use crate::store::spill::storage_flush_handler::StorageFlushHandler; diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 01b010ba..605bdf07 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -1,17 +1,29 @@ pub mod default_buffer; pub mod opt_buffer; +mod route_buffer; use crate::composed_bytes::ComposedBytes; +use crate::store::mem::buffer::default_buffer::MemoryBuffer; +use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; use crate::store::DataBytes; use crate::store::{Block, DataSegment, PartitionedMemoryData}; use anyhow::Result; use croaring::Treemap; use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::hash::Hash; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub enum MemoryBufferType { + // the default memory_buffer type + DEFAULT, + // the experimental memory_buffer type + EXPERIMENTAL, +} + #[derive(Default, Debug)] pub struct BatchMemoryBlock(pub Vec>); impl Deref for BatchMemoryBlock { diff --git a/riffle-server/src/store/mem/buffer/route_buffer.rs b/riffle-server/src/store/mem/buffer/route_buffer.rs new file mode 100644 index 00000000..9294e51d --- /dev/null +++ b/riffle-server/src/store/mem/buffer/route_buffer.rs @@ -0,0 +1,82 @@ +use crate::store::mem::buffer::default_buffer::MemoryBuffer; +use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; +use crate::store::mem::buffer::{BufferOps, BufferSpillResult}; +use crate::store::{Block, PartitionedMemoryData}; +use croaring::Treemap; + +/// this is the router to delegate to the underlying concrete implementation without any cost +pub enum RouterBuffer { + DEFAULT(MemoryBuffer), + EXPERIMENTAL(OptStagingMemoryBuffer), +} + +impl BufferOps for RouterBuffer { + fn new() -> Self + where + Self: Sized, + { + todo!() + } + + fn total_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + todo!() + } + + fn flight_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + todo!() + } + + fn staging_size(&self) -> anyhow::Result + where + Self: Send + Sync, + { + todo!() + } + + fn clear(&self, flight_id: u64, flight_size: u64) -> anyhow::Result<()> + where + Self: Send + Sync, + { + todo!() + } + + fn get( + &self, + last_block_id: i64, + read_bytes_limit_len: i64, + task_ids: Option, + ) -> anyhow::Result + where + Self: Send + Sync, + { + todo!() + } + + fn spill(&self) -> anyhow::Result> + where + Self: Send + Sync, + { + todo!() + } + + fn append(&self, blocks: Vec, size: u64) -> anyhow::Result<()> + where + Self: Send + Sync, + { + todo!() + } + + #[cfg(test)] + fn direct_push(&self, blocks: Vec) -> anyhow::Result<()> + where + Self: Send + Sync, + { + todo!() + } +} diff --git a/riffle-server/src/store/memory.rs b/riffle-server/src/store/memory.rs index 77b37584..cc3f7f4f 100644 --- a/riffle-server/src/store/memory.rs +++ b/riffle-server/src/store/memory.rs @@ -38,6 +38,7 @@ use crate::ddashmap::DDashMap; use crate::runtime::manager::RuntimeManager; use crate::store::mem::budget::MemoryBudget; use crate::store::mem::buffer::default_buffer::MemoryBuffer; +use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; use crate::store::mem::buffer::BufferOps; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::mem::ticket::TicketManager; From cc96c1c982102bc9a92d97799ef6a26eddc7d40c Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 26 Dec 2025 14:27:03 +0800 Subject: [PATCH 14/18] activate this --- riffle-server/src/store/hybrid.rs | 23 +++++++++++++++-------- riffle-server/src/store/mem/buffer.rs | 4 ++-- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/riffle-server/src/store/hybrid.rs b/riffle-server/src/store/hybrid.rs index 2f28539e..540032ea 100644 --- a/riffle-server/src/store/hybrid.rs +++ b/riffle-server/src/store/hybrid.rs @@ -62,12 +62,14 @@ use crate::config_reconfigure::ReconfigurableConfManager; use crate::runtime::manager::RuntimeManager; use crate::store::local::LocalfileStoreStat; use crate::store::mem::buffer::default_buffer::MemoryBuffer; -use crate::store::mem::buffer::BufferOps; +use crate::store::mem::buffer::route_buffer::RouterBuffer; +use crate::store::mem::buffer::{BufferOps, MemoryBufferType}; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::spill::hierarchy_event_bus::HierarchyEventBus; use crate::store::spill::storage_flush_handler::StorageFlushHandler; use crate::store::spill::storage_select_handler::StorageSelectHandler; use crate::store::spill::{SpillMessage, SpillWritingViewContext}; +use crate::store::ResponseData::Mem; use tokio::time::Instant; pub trait PersistentStore: Store + Persistent + Send + Sync + Any { @@ -90,7 +92,7 @@ const DEFAULT_MEMORY_SPILL_MAX_CONCURRENCY: i32 = 20; pub struct HybridStore { // Box will build fail - pub(crate) hot_store: Arc, + pub(crate) hot_store: Arc>, pub(crate) warm_store: Option>, pub(crate) cold_store: Option>, @@ -173,11 +175,16 @@ impl HybridStore { } let async_watermark_spill_enable = hybrid_conf.async_watermark_spill_trigger_enable; + let mem_buffer_type = config + .memory_store + .as_ref() + .map(|x| x.buffer_type) + .unwrap_or(MemoryBufferType::DEFAULT); + let mem_store: MemoryStore = + MemoryStore::from(config.memory_store.unwrap(), runtime_manager.clone()); + let store = HybridStore { - hot_store: Arc::new(MemoryStore::from( - config.memory_store.unwrap(), - runtime_manager.clone(), - )), + hot_store: Arc::new(mem_store), warm_store: persistent_stores.pop_front(), cold_store: persistent_stores.pop_front(), config: hybrid_conf, @@ -427,7 +434,7 @@ impl HybridStore { Ok(Default::default()) } - pub async fn get_memory_buffer(&self, uid: &PartitionUId) -> Result> { + pub async fn get_memory_buffer(&self, uid: &PartitionUId) -> Result> { self.hot_store.get_buffer(uid) } @@ -470,7 +477,7 @@ impl HybridStore { async fn buffer_spill_impl( &self, uid: &PartitionUId, - buffer: Arc, + buffer: Arc, ) -> Result { let spill_result = buffer.spill()?; if spill_result.is_none() { diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 605bdf07..0ad1d3e8 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -1,6 +1,6 @@ pub mod default_buffer; pub mod opt_buffer; -mod route_buffer; +pub mod route_buffer; use crate::composed_bytes::ComposedBytes; use crate::store::mem::buffer::default_buffer::MemoryBuffer; @@ -16,7 +16,7 @@ use std::hash::Hash; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Copy)] pub enum MemoryBufferType { // the default memory_buffer type DEFAULT, From 4bd211292f83aca9e398c2523f80030b76a51ee0 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 26 Dec 2025 14:29:53 +0800 Subject: [PATCH 15/18] memoryBuffer rename to DefaultMemoryBuffer --- riffle-server/src/store/hybrid.rs | 2 +- riffle-server/src/store/mem/buffer.rs | 12 +++++------ .../src/store/mem/buffer/default_buffer.rs | 21 ++++++++++--------- .../src/store/mem/buffer/route_buffer.rs | 4 ++-- riffle-server/src/store/memory.rs | 16 +++++++------- 5 files changed, 28 insertions(+), 27 deletions(-) diff --git a/riffle-server/src/store/hybrid.rs b/riffle-server/src/store/hybrid.rs index 540032ea..d5fe8e37 100644 --- a/riffle-server/src/store/hybrid.rs +++ b/riffle-server/src/store/hybrid.rs @@ -61,7 +61,7 @@ use crate::app_manager::AppManagerRef; use crate::config_reconfigure::ReconfigurableConfManager; use crate::runtime::manager::RuntimeManager; use crate::store::local::LocalfileStoreStat; -use crate::store::mem::buffer::default_buffer::MemoryBuffer; +use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::route_buffer::RouterBuffer; use crate::store::mem::buffer::{BufferOps, MemoryBufferType}; use crate::store::mem::capacity::CapacitySnapshot; diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 0ad1d3e8..04be434c 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -3,7 +3,7 @@ pub mod opt_buffer; pub mod route_buffer; use crate::composed_bytes::ComposedBytes; -use crate::store::mem::buffer::default_buffer::MemoryBuffer; +use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; use crate::store::DataBytes; use crate::store::{Block, DataSegment, PartitionedMemoryData}; @@ -111,7 +111,7 @@ pub trait BufferOps { #[cfg(test)] mod test { - use crate::store::mem::buffer::default_buffer::MemoryBuffer; + use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; use crate::store::mem::buffer::BufferOps; use crate::store::Block; @@ -175,7 +175,7 @@ mod test { #[test] fn test_with_block_id_zero() -> anyhow::Result<()> { - run_test_with_block_id_zero::()?; + run_test_with_block_id_zero::()?; run_test_with_block_id_zero::()?; Ok(()) } @@ -283,7 +283,7 @@ mod test { #[test] fn test_put_get() -> anyhow::Result<()> { - run_test_put_get::()?; + run_test_put_get::()?; run_test_put_get::()?; Ok(()) } @@ -320,7 +320,7 @@ mod test { #[test] fn test_get_v2_is_end_with_only_staging() -> anyhow::Result<()> { - run_test_get_v2_is_end_with_only_staging::()?; + run_test_get_v2_is_end_with_only_staging::()?; run_test_get_v2_is_end_with_only_staging::()?; Ok(()) } @@ -352,7 +352,7 @@ mod test { #[test] fn test_get_v2_is_end_across_flight_and_staging() -> anyhow::Result<()> { - run_test_get_v2_is_end_across_flight_and_staging::()?; + run_test_get_v2_is_end_across_flight_and_staging::()?; run_test_get_v2_is_end_across_flight_and_staging::()?; Ok(()) } diff --git a/riffle-server/src/store/mem/buffer/default_buffer.rs b/riffle-server/src/store/mem/buffer/default_buffer.rs index a3c03100..4f1d6c9e 100644 --- a/riffle-server/src/store/mem/buffer/default_buffer.rs +++ b/riffle-server/src/store/mem/buffer/default_buffer.rs @@ -14,8 +14,12 @@ use std::mem; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +pub struct DefaultMemoryBuffer { + buffer: Mutex, +} + #[derive(Debug)] -pub struct BufferInternal { +struct Inner { pub total_size: i64, pub staging_size: i64, pub flight_size: i64, @@ -26,9 +30,9 @@ pub struct BufferInternal { pub flight_counter: u64, } -impl BufferInternal { +impl Inner { pub fn new() -> Self { - BufferInternal { + Inner { total_size: 0, staging_size: 0, flight_size: 0, @@ -38,14 +42,11 @@ impl BufferInternal { } } } -pub struct MemoryBuffer { - buffer: Mutex, -} -impl BufferOps for MemoryBuffer { - fn new() -> MemoryBuffer { - MemoryBuffer { - buffer: Mutex::new(BufferInternal::new()), +impl BufferOps for DefaultMemoryBuffer { + fn new() -> DefaultMemoryBuffer { + DefaultMemoryBuffer { + buffer: Mutex::new(Inner::new()), } } diff --git a/riffle-server/src/store/mem/buffer/route_buffer.rs b/riffle-server/src/store/mem/buffer/route_buffer.rs index 9294e51d..7c786fd4 100644 --- a/riffle-server/src/store/mem/buffer/route_buffer.rs +++ b/riffle-server/src/store/mem/buffer/route_buffer.rs @@ -1,4 +1,4 @@ -use crate::store::mem::buffer::default_buffer::MemoryBuffer; +use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; use crate::store::mem::buffer::{BufferOps, BufferSpillResult}; use crate::store::{Block, PartitionedMemoryData}; @@ -6,7 +6,7 @@ use croaring::Treemap; /// this is the router to delegate to the underlying concrete implementation without any cost pub enum RouterBuffer { - DEFAULT(MemoryBuffer), + DEFAULT(DefaultMemoryBuffer), EXPERIMENTAL(OptStagingMemoryBuffer), } diff --git a/riffle-server/src/store/memory.rs b/riffle-server/src/store/memory.rs index cc3f7f4f..b83312ec 100644 --- a/riffle-server/src/store/memory.rs +++ b/riffle-server/src/store/memory.rs @@ -37,7 +37,7 @@ use crate::app_manager::partition_identifier::PartitionUId; use crate::ddashmap::DDashMap; use crate::runtime::manager::RuntimeManager; use crate::store::mem::budget::MemoryBudget; -use crate::store::mem::buffer::default_buffer::MemoryBuffer; +use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; use crate::store::mem::buffer::BufferOps; use crate::store::mem::capacity::CapacitySnapshot; @@ -52,7 +52,7 @@ use fxhash::{FxBuildHasher, FxHasher}; use log::{debug, info, warn}; use std::sync::Arc; -pub struct MemoryStore { +pub struct MemoryStore { memory_capacity: i64, state: DDashMap>, budget: MemoryBudget, @@ -420,7 +420,7 @@ mod test { use crate::app_manager::application_identifier::ApplicationId; use crate::app_manager::partition_identifier::PartitionUId; use crate::app_manager::purge_event::PurgeReason; - use crate::store::mem::buffer::default_buffer::MemoryBuffer; + use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; use crate::store::mem::buffer::BufferOps; use anyhow::Result; @@ -595,7 +595,7 @@ mod test { #[test] fn test_read_buffer_in_flight() { - run_test_read_buffer_in_flight::(); + run_test_read_buffer_in_flight::(); run_test_read_buffer_in_flight::(); } @@ -670,7 +670,7 @@ mod test { #[test] fn test_allocated_and_purge_for_memory() { - run_test_allocated_and_purge_for_memory::(); + run_test_allocated_and_purge_for_memory::(); run_test_allocated_and_purge_for_memory::(); } @@ -753,7 +753,7 @@ mod test { #[test] fn test_purge() { - run_test_purge::(); + run_test_purge::(); run_test_purge::(); } @@ -802,7 +802,7 @@ mod test { #[test] fn test_put_and_get_for_memory() { - run_test_put_and_get_for_memory::(); + run_test_put_and_get_for_memory::(); run_test_put_and_get_for_memory::(); } @@ -874,7 +874,7 @@ mod test { } #[test] fn test_block_id_filter_for_memory() { - run_test_block_id_filter_for_memory::(); + run_test_block_id_filter_for_memory::(); run_test_block_id_filter_for_memory::(); } } From 376e8b1d115e71adc76ac9f85ac4964cb5932f8c Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 26 Dec 2025 14:31:23 +0800 Subject: [PATCH 16/18] rename to unifiedBuffer --- riffle-server/src/store/hybrid.rs | 10 +++++----- riffle-server/src/store/mem/buffer.rs | 2 +- .../mem/buffer/{route_buffer.rs => unified_buffer.rs} | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) rename riffle-server/src/store/mem/buffer/{route_buffer.rs => unified_buffer.rs} (96%) diff --git a/riffle-server/src/store/hybrid.rs b/riffle-server/src/store/hybrid.rs index d5fe8e37..ff0947f7 100644 --- a/riffle-server/src/store/hybrid.rs +++ b/riffle-server/src/store/hybrid.rs @@ -62,7 +62,7 @@ use crate::config_reconfigure::ReconfigurableConfManager; use crate::runtime::manager::RuntimeManager; use crate::store::local::LocalfileStoreStat; use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; -use crate::store::mem::buffer::route_buffer::RouterBuffer; +use crate::store::mem::buffer::unified_buffer::UnifiedBuffer; use crate::store::mem::buffer::{BufferOps, MemoryBufferType}; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::spill::hierarchy_event_bus::HierarchyEventBus; @@ -92,7 +92,7 @@ const DEFAULT_MEMORY_SPILL_MAX_CONCURRENCY: i32 = 20; pub struct HybridStore { // Box will build fail - pub(crate) hot_store: Arc>, + pub(crate) hot_store: Arc>, pub(crate) warm_store: Option>, pub(crate) cold_store: Option>, @@ -180,7 +180,7 @@ impl HybridStore { .as_ref() .map(|x| x.buffer_type) .unwrap_or(MemoryBufferType::DEFAULT); - let mem_store: MemoryStore = + let mem_store: MemoryStore = MemoryStore::from(config.memory_store.unwrap(), runtime_manager.clone()); let store = HybridStore { @@ -434,7 +434,7 @@ impl HybridStore { Ok(Default::default()) } - pub async fn get_memory_buffer(&self, uid: &PartitionUId) -> Result> { + pub async fn get_memory_buffer(&self, uid: &PartitionUId) -> Result> { self.hot_store.get_buffer(uid) } @@ -477,7 +477,7 @@ impl HybridStore { async fn buffer_spill_impl( &self, uid: &PartitionUId, - buffer: Arc, + buffer: Arc, ) -> Result { let spill_result = buffer.spill()?; if spill_result.is_none() { diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 04be434c..88aeafa5 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -1,6 +1,6 @@ pub mod default_buffer; pub mod opt_buffer; -pub mod route_buffer; +pub mod unified_buffer; use crate::composed_bytes::ComposedBytes; use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; diff --git a/riffle-server/src/store/mem/buffer/route_buffer.rs b/riffle-server/src/store/mem/buffer/unified_buffer.rs similarity index 96% rename from riffle-server/src/store/mem/buffer/route_buffer.rs rename to riffle-server/src/store/mem/buffer/unified_buffer.rs index 7c786fd4..496d9347 100644 --- a/riffle-server/src/store/mem/buffer/route_buffer.rs +++ b/riffle-server/src/store/mem/buffer/unified_buffer.rs @@ -5,12 +5,12 @@ use crate::store::{Block, PartitionedMemoryData}; use croaring::Treemap; /// this is the router to delegate to the underlying concrete implementation without any cost -pub enum RouterBuffer { +pub enum UnifiedBuffer { DEFAULT(DefaultMemoryBuffer), EXPERIMENTAL(OptStagingMemoryBuffer), } -impl BufferOps for RouterBuffer { +impl BufferOps for UnifiedBuffer { fn new() -> Self where Self: Sized, From 81d9a89a588324af260e5074dd38be16106ad45a Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 26 Dec 2025 14:58:13 +0800 Subject: [PATCH 17/18] use bufferOptions --- riffle-server/src/app_manager.rs | 1 + riffle-server/src/config.rs | 12 ++--- riffle-server/src/store/hybrid.rs | 8 +-- riffle-server/src/store/mem/buffer.rs | 34 +++++++++---- .../src/store/mem/buffer/default_buffer.rs | 10 ++-- .../src/store/mem/buffer/opt_buffer.rs | 9 ++-- .../src/store/mem/buffer/unified_buffer.rs | 50 +++++++++++++++---- riffle-server/src/store/memory.rs | 15 +++++- riffle-server/src/store/spill/mod.rs | 6 +-- riffle-server/src/store/spill/spill_test.rs | 1 + 10 files changed, 98 insertions(+), 48 deletions(-) diff --git a/riffle-server/src/app_manager.rs b/riffle-server/src/app_manager.rs index 8ddb0583..9c5d7652 100644 --- a/riffle-server/src/app_manager.rs +++ b/riffle-server/src/app_manager.rs @@ -562,6 +562,7 @@ pub(crate) mod test { capacity: "20B".to_string(), buffer_ticket_timeout_sec: 1, buffer_ticket_check_interval_sec: 1, + buffer_type: Default::default(), }), ); let _ = std::mem::replace( diff --git a/riffle-server/src/config.rs b/riffle-server/src/config.rs index 38c66c06..bf11951d 100644 --- a/riffle-server/src/config.rs +++ b/riffle-server/src/config.rs @@ -16,7 +16,7 @@ // under the License. use crate::block_id_manager::BlockIdManagerType; -use crate::store::mem::buffer::MemoryBufferType; +use crate::store::mem::buffer::BufferType; use crate::store::ResponseDataIndex::Local; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -34,11 +34,11 @@ pub struct MemoryStoreConfig { pub buffer_ticket_check_interval_sec: i64, #[serde(default = "as_default_buffer_type")] - pub buffer_type: MemoryBufferType, + pub buffer_type: BufferType, } -fn as_default_buffer_type() -> MemoryBufferType { - MemoryBufferType::DEFAULT +fn as_default_buffer_type() -> BufferType { + BufferType::DEFAULT } fn as_default_buffer_ticket_timeout_check_interval_sec() -> i64 { @@ -55,7 +55,7 @@ impl MemoryStoreConfig { capacity, buffer_ticket_timeout_sec: as_default_buffer_ticket_timeout_sec(), buffer_ticket_check_interval_sec: as_default_buffer_ticket_timeout_check_interval_sec(), - buffer_type: MemoryBufferType::DEFAULT, + buffer_type: BufferType::DEFAULT, } } @@ -64,7 +64,7 @@ impl MemoryStoreConfig { capacity, buffer_ticket_timeout_sec, buffer_ticket_check_interval_sec: as_default_buffer_ticket_timeout_check_interval_sec(), - buffer_type: MemoryBufferType::DEFAULT, + buffer_type: BufferType::DEFAULT, } } } diff --git a/riffle-server/src/store/hybrid.rs b/riffle-server/src/store/hybrid.rs index ff0947f7..5784e1eb 100644 --- a/riffle-server/src/store/hybrid.rs +++ b/riffle-server/src/store/hybrid.rs @@ -63,7 +63,7 @@ use crate::runtime::manager::RuntimeManager; use crate::store::local::LocalfileStoreStat; use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::unified_buffer::UnifiedBuffer; -use crate::store::mem::buffer::{BufferOps, MemoryBufferType}; +use crate::store::mem::buffer::{BufferOps, BufferType}; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::spill::hierarchy_event_bus::HierarchyEventBus; use crate::store::spill::storage_flush_handler::StorageFlushHandler; @@ -175,11 +175,7 @@ impl HybridStore { } let async_watermark_spill_enable = hybrid_conf.async_watermark_spill_trigger_enable; - let mem_buffer_type = config - .memory_store - .as_ref() - .map(|x| x.buffer_type) - .unwrap_or(MemoryBufferType::DEFAULT); + // use the unified buffer to delegate the underlying concrete buffer let mem_store: MemoryStore = MemoryStore::from(config.memory_store.unwrap(), runtime_manager.clone()); diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 88aeafa5..1d21fbda 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -5,6 +5,7 @@ pub mod unified_buffer; use crate::composed_bytes::ComposedBytes; use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; +use crate::store::mem::buffer::BufferType::DEFAULT; use crate::store::DataBytes; use crate::store::{Block, DataSegment, PartitionedMemoryData}; use anyhow::Result; @@ -17,22 +18,33 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Copy)] -pub enum MemoryBufferType { +pub enum BufferType { // the default memory_buffer type DEFAULT, // the experimental memory_buffer type EXPERIMENTAL, } +impl Default for BufferType { + fn default() -> Self { + DEFAULT + } +} + +#[derive(Default)] +pub struct BufferOptions { + pub buffer_type: BufferType, +} + #[derive(Default, Debug)] -pub struct BatchMemoryBlock(pub Vec>); -impl Deref for BatchMemoryBlock { +pub struct MemBlockBatch(pub Vec>); +impl Deref for MemBlockBatch { type Target = Vec>; fn deref(&self) -> &Self::Target { &self.0 } } -impl DerefMut for BatchMemoryBlock { +impl DerefMut for MemBlockBatch { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } @@ -42,7 +54,7 @@ impl DerefMut for BatchMemoryBlock { pub struct BufferSpillResult { pub flight_id: u64, pub flight_len: u64, - pub blocks: Arc, + pub blocks: Arc, } impl BufferSpillResult { @@ -52,14 +64,14 @@ impl BufferSpillResult { pub fn flight_len(&self) -> u64 { self.flight_len } - pub fn blocks(&self) -> Arc { + pub fn blocks(&self) -> Arc { self.blocks.clone() } } pub trait BufferOps { /// Creates a new buffer instance - fn new() -> Self + fn new(options: BufferOptions) -> Self where Self: Sized; @@ -147,7 +159,7 @@ mod test { } fn run_test_with_block_id_zero() -> anyhow::Result<()> { - let mut buffer = B::new(); + let mut buffer = B::new(Default::default()); let block_1 = create_block(10, 100); let block_2 = create_block(10, 0); @@ -181,7 +193,7 @@ mod test { } fn run_test_put_get() -> anyhow::Result<()> { - let mut buffer = B::new(); + let mut buffer = B::new(Default::default()); /// case1 buffer.direct_push(create_blocks(0, 10, 10))?; @@ -290,7 +302,7 @@ mod test { fn run_test_get_v2_is_end_with_only_staging( ) -> anyhow::Result<()> { - let buffer = B::new(); + let buffer = B::new(Default::default()); // 0 -> 10 blocks with total 100 bytes let cnt = 10; let block_len = 10; @@ -327,7 +339,7 @@ mod test { fn run_test_get_v2_is_end_across_flight_and_staging( ) -> anyhow::Result<()> { - let buffer = B::new(); + let buffer = B::new(Default::default()); // staging: 0..2 buffer.direct_push(create_blocks(0, 3, 5))?; diff --git a/riffle-server/src/store/mem/buffer/default_buffer.rs b/riffle-server/src/store/mem/buffer/default_buffer.rs index 4f1d6c9e..06696ac0 100644 --- a/riffle-server/src/store/mem/buffer/default_buffer.rs +++ b/riffle-server/src/store/mem/buffer/default_buffer.rs @@ -1,4 +1,4 @@ -use super::{BatchMemoryBlock, BufferOps, BufferSpillResult}; +use super::{BufferOps, BufferOptions, BufferSpillResult, MemBlockBatch}; use crate::composed_bytes; use crate::composed_bytes::ComposedBytes; use crate::constant::INVALID_BLOCK_ID; @@ -24,9 +24,9 @@ struct Inner { pub staging_size: i64, pub flight_size: i64, - pub staging: BatchMemoryBlock, + pub staging: MemBlockBatch, - pub flight: HashMap>, + pub flight: HashMap>, pub flight_counter: u64, } @@ -44,7 +44,7 @@ impl Inner { } impl BufferOps for DefaultMemoryBuffer { - fn new() -> DefaultMemoryBuffer { + fn new(opt: BufferOptions) -> DefaultMemoryBuffer { DefaultMemoryBuffer { buffer: Mutex::new(Inner::new()), } @@ -207,7 +207,7 @@ impl BufferOps for DefaultMemoryBuffer { return Ok(None); } - let staging: BatchMemoryBlock = { mem::replace(&mut buffer.staging, Default::default()) }; + let staging: MemBlockBatch = { mem::replace(&mut buffer.staging, Default::default()) }; let staging_ref = Arc::new(staging); let flight_id = buffer.flight_counter; diff --git a/riffle-server/src/store/mem/buffer/opt_buffer.rs b/riffle-server/src/store/mem/buffer/opt_buffer.rs index 30063998..95e66680 100644 --- a/riffle-server/src/store/mem/buffer/opt_buffer.rs +++ b/riffle-server/src/store/mem/buffer/opt_buffer.rs @@ -1,6 +1,7 @@ use crate::composed_bytes::ComposedBytes; use crate::constant::INVALID_BLOCK_ID; -use crate::store::mem::buffer::{BatchMemoryBlock, BufferOps, BufferSpillResult}; +use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; +use crate::store::mem::buffer::{BufferOps, BufferOptions, BufferSpillResult, MemBlockBatch}; use crate::store::{Block, DataBytes, DataSegment, PartitionedMemoryData}; use croaring::Treemap; use fastrace::trace; @@ -19,7 +20,7 @@ pub struct OptStagingBufferInternal { pub batch_boundaries: Vec, // Track where each batch starts pub block_position_index: HashMap, // Maps block_id to Vec index - pub flight: HashMap>, + pub flight: HashMap>, pub flight_counter: u64, } @@ -45,7 +46,7 @@ pub struct OptStagingMemoryBuffer { impl BufferOps for OptStagingMemoryBuffer { #[trace] - fn new() -> OptStagingMemoryBuffer { + fn new(opt: BufferOptions) -> Self { OptStagingMemoryBuffer { buffer: Mutex::new(OptStagingBufferInternal::new()), } @@ -235,7 +236,7 @@ impl BufferOps for OptStagingMemoryBuffer { start = next_boundary; } - let staging: BatchMemoryBlock = BatchMemoryBlock(batches); + let staging: MemBlockBatch = MemBlockBatch(batches); // Clear everything buffer.staging.clear(); diff --git a/riffle-server/src/store/mem/buffer/unified_buffer.rs b/riffle-server/src/store/mem/buffer/unified_buffer.rs index 496d9347..0de64129 100644 --- a/riffle-server/src/store/mem/buffer/unified_buffer.rs +++ b/riffle-server/src/store/mem/buffer/unified_buffer.rs @@ -1,6 +1,7 @@ use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; -use crate::store::mem::buffer::{BufferOps, BufferSpillResult}; +use crate::store::mem::buffer::unified_buffer::UnifiedBuffer::{DEFAULT, EXPERIMENTAL}; +use crate::store::mem::buffer::{BufferOps, BufferOptions, BufferSpillResult, BufferType}; use crate::store::{Block, PartitionedMemoryData}; use croaring::Treemap; @@ -11,39 +12,54 @@ pub enum UnifiedBuffer { } impl BufferOps for UnifiedBuffer { - fn new() -> Self + fn new(opts: BufferOptions) -> Self where Self: Sized, { - todo!() + match opts.buffer_type { + BufferType::DEFAULT => DEFAULT(DefaultMemoryBuffer::new(opts)), + BufferType::EXPERIMENTAL => EXPERIMENTAL(OptStagingMemoryBuffer::new(opts)), + } } fn total_size(&self) -> anyhow::Result where Self: Send + Sync, { - todo!() + match &self { + DEFAULT(x) => x.total_size(), + EXPERIMENTAL(x) => x.total_size(), + } } fn flight_size(&self) -> anyhow::Result where Self: Send + Sync, { - todo!() + match &self { + DEFAULT(x) => x.flight_size(), + EXPERIMENTAL(x) => x.flight_size(), + } } fn staging_size(&self) -> anyhow::Result where Self: Send + Sync, { - todo!() + match &self { + DEFAULT(x) => x.staging_size(), + EXPERIMENTAL(x) => x.staging_size(), + } } fn clear(&self, flight_id: u64, flight_size: u64) -> anyhow::Result<()> where Self: Send + Sync, { - todo!() + match &self { + DEFAULT(x) => x.clear(flight_id, flight_size), + EXPERIMENTAL(x) => x.clear(flight_id, flight_size), + } } fn get( @@ -55,21 +71,30 @@ impl BufferOps for UnifiedBuffer { where Self: Send + Sync, { - todo!() + match &self { + DEFAULT(x) => x.get(last_block_id, read_bytes_limit_len, task_ids), + EXPERIMENTAL(x) => x.get(last_block_id, read_bytes_limit_len, task_ids), + } } fn spill(&self) -> anyhow::Result> where Self: Send + Sync, { - todo!() + match &self { + DEFAULT(x) => x.spill(), + EXPERIMENTAL(x) => x.spill(), + } } fn append(&self, blocks: Vec, size: u64) -> anyhow::Result<()> where Self: Send + Sync, { - todo!() + match &self { + DEFAULT(x) => x.append(blocks, size), + EXPERIMENTAL(x) => x.append(blocks, size), + } } #[cfg(test)] @@ -77,6 +102,9 @@ impl BufferOps for UnifiedBuffer { where Self: Send + Sync, { - todo!() + match &self { + DEFAULT(x) => x.direct_push(blocks), + EXPERIMENTAL(x) => x.direct_push(blocks), + } } } diff --git a/riffle-server/src/store/memory.rs b/riffle-server/src/store/memory.rs index b83312ec..44a37370 100644 --- a/riffle-server/src/store/memory.rs +++ b/riffle-server/src/store/memory.rs @@ -39,7 +39,7 @@ use crate::runtime::manager::RuntimeManager; use crate::store::mem::budget::MemoryBudget; use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; -use crate::store::mem::buffer::BufferOps; +use crate::store::mem::buffer::{BufferOps, BufferOptions, BufferType}; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::mem::ticket::TicketManager; use crate::store::spill::SpillWritingViewContext; @@ -58,6 +58,7 @@ pub struct MemoryStore, } unsafe impl Send for MemoryStore {} @@ -81,6 +82,7 @@ impl MemoryStore { memory_capacity: max_memory_size, ticket_manager, runtime_manager, + cfg: None, } } @@ -105,6 +107,7 @@ impl MemoryStore { memory_capacity: capacity.as_u64() as i64, ticket_manager, runtime_manager, + cfg: Some(conf), } } @@ -206,7 +209,15 @@ impl MemoryStore { // only invoked when inserting pub fn get_or_create_buffer(&self, uid: PartitionUId) -> Arc { - self.state.compute_if_absent(uid, || Arc::new(B::new())) + let buf_opts = BufferOptions { + buffer_type: self + .cfg + .as_ref() + .map(|x| x.buffer_type) + .unwrap_or(BufferType::DEFAULT), + }; + self.state + .compute_if_absent(uid, || Arc::new(B::new(buf_opts))) } pub fn get_buffer(&self, uid: &PartitionUId) -> Result> { diff --git a/riffle-server/src/store/spill/mod.rs b/riffle-server/src/store/spill/mod.rs index 0e099c4f..1b191dda 100644 --- a/riffle-server/src/store/spill/mod.rs +++ b/riffle-server/src/store/spill/mod.rs @@ -8,7 +8,7 @@ use crate::metric::{ TOTAL_SPILL_EVENTS_DROPPED_WITH_APP_NOT_FOUND, }; use crate::store::hybrid::{HybridStore, PersistentStore}; -use crate::store::mem::buffer::BatchMemoryBlock; +use crate::store::mem::buffer::MemBlockBatch; use log::{debug, error, warn}; use once_cell::sync::OnceCell; use parking_lot::Mutex; @@ -82,14 +82,14 @@ unsafe impl Sync for SpillMessage {} #[derive(Clone)] pub struct SpillWritingViewContext { pub uid: PartitionUId, - pub data_blocks: Arc, + pub data_blocks: Arc, app_is_exist_func: Arc bool + 'static>>, } unsafe impl Send for SpillWritingViewContext {} unsafe impl Sync for SpillWritingViewContext {} impl SpillWritingViewContext { - pub fn new(uid: PartitionUId, blocks: Arc, func: F) -> Self + pub fn new(uid: PartitionUId, blocks: Arc, func: F) -> Self where F: Fn(&ApplicationId) -> bool + 'static, { diff --git a/riffle-server/src/store/spill/spill_test.rs b/riffle-server/src/store/spill/spill_test.rs index d44326e2..bb027c75 100644 --- a/riffle-server/src/store/spill/spill_test.rs +++ b/riffle-server/src/store/spill/spill_test.rs @@ -15,6 +15,7 @@ pub mod tests { }; use crate::runtime::manager::RuntimeManager; use crate::store::hybrid::{HybridStore, PersistentStore}; + use crate::store::mem::buffer::BufferOps; use crate::store::spill::spill_test::mock::MockStore; use crate::store::spill::storage_flush_handler::StorageFlushHandler; use crate::store::spill::storage_select_handler::StorageSelectHandler; From 187707a4e9dba0b9b0d8af36e1f58a397614895a Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 26 Dec 2025 15:12:18 +0800 Subject: [PATCH 18/18] rename to MemoryBuffer as a trait name --- riffle-server/src/store/hybrid.rs | 2 +- riffle-server/src/store/mem/buffer.rs | 13 +++++----- .../src/store/mem/buffer/default_buffer.rs | 4 +-- .../src/store/mem/buffer/opt_buffer.rs | 4 +-- .../src/store/mem/buffer/unified_buffer.rs | 4 +-- riffle-server/src/store/memory.rs | 26 +++++++++---------- riffle-server/src/store/spill/spill_test.rs | 2 +- 7 files changed, 28 insertions(+), 27 deletions(-) diff --git a/riffle-server/src/store/hybrid.rs b/riffle-server/src/store/hybrid.rs index 5784e1eb..209a1183 100644 --- a/riffle-server/src/store/hybrid.rs +++ b/riffle-server/src/store/hybrid.rs @@ -63,7 +63,7 @@ use crate::runtime::manager::RuntimeManager; use crate::store::local::LocalfileStoreStat; use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::unified_buffer::UnifiedBuffer; -use crate::store::mem::buffer::{BufferOps, BufferType}; +use crate::store::mem::buffer::{BufferType, MemoryBuffer}; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::spill::hierarchy_event_bus::HierarchyEventBus; use crate::store::spill::storage_flush_handler::StorageFlushHandler; diff --git a/riffle-server/src/store/mem/buffer.rs b/riffle-server/src/store/mem/buffer.rs index 1d21fbda..4cae1de5 100644 --- a/riffle-server/src/store/mem/buffer.rs +++ b/riffle-server/src/store/mem/buffer.rs @@ -69,7 +69,7 @@ impl BufferSpillResult { } } -pub trait BufferOps { +pub trait MemoryBuffer { /// Creates a new buffer instance fn new(options: BufferOptions) -> Self where @@ -125,7 +125,7 @@ pub trait BufferOps { mod test { use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; - use crate::store::mem::buffer::BufferOps; + use crate::store::mem::buffer::MemoryBuffer; use crate::store::Block; use hashlink::LinkedHashMap; use std::collections::LinkedList; @@ -158,7 +158,8 @@ mod test { } } - fn run_test_with_block_id_zero() -> anyhow::Result<()> { + fn run_test_with_block_id_zero() -> anyhow::Result<()> + { let mut buffer = B::new(Default::default()); let block_1 = create_block(10, 100); let block_2 = create_block(10, 0); @@ -192,7 +193,7 @@ mod test { Ok(()) } - fn run_test_put_get() -> anyhow::Result<()> { + fn run_test_put_get() -> anyhow::Result<()> { let mut buffer = B::new(Default::default()); /// case1 @@ -300,7 +301,7 @@ mod test { Ok(()) } - fn run_test_get_v2_is_end_with_only_staging( + fn run_test_get_v2_is_end_with_only_staging( ) -> anyhow::Result<()> { let buffer = B::new(Default::default()); // 0 -> 10 blocks with total 100 bytes @@ -337,7 +338,7 @@ mod test { Ok(()) } - fn run_test_get_v2_is_end_across_flight_and_staging( + fn run_test_get_v2_is_end_across_flight_and_staging( ) -> anyhow::Result<()> { let buffer = B::new(Default::default()); diff --git a/riffle-server/src/store/mem/buffer/default_buffer.rs b/riffle-server/src/store/mem/buffer/default_buffer.rs index 06696ac0..db6b1509 100644 --- a/riffle-server/src/store/mem/buffer/default_buffer.rs +++ b/riffle-server/src/store/mem/buffer/default_buffer.rs @@ -1,4 +1,4 @@ -use super::{BufferOps, BufferOptions, BufferSpillResult, MemBlockBatch}; +use super::{BufferOptions, BufferSpillResult, MemBlockBatch, MemoryBuffer}; use crate::composed_bytes; use crate::composed_bytes::ComposedBytes; use crate::constant::INVALID_BLOCK_ID; @@ -43,7 +43,7 @@ impl Inner { } } -impl BufferOps for DefaultMemoryBuffer { +impl MemoryBuffer for DefaultMemoryBuffer { fn new(opt: BufferOptions) -> DefaultMemoryBuffer { DefaultMemoryBuffer { buffer: Mutex::new(Inner::new()), diff --git a/riffle-server/src/store/mem/buffer/opt_buffer.rs b/riffle-server/src/store/mem/buffer/opt_buffer.rs index 95e66680..00c3f314 100644 --- a/riffle-server/src/store/mem/buffer/opt_buffer.rs +++ b/riffle-server/src/store/mem/buffer/opt_buffer.rs @@ -1,7 +1,7 @@ use crate::composed_bytes::ComposedBytes; use crate::constant::INVALID_BLOCK_ID; use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; -use crate::store::mem::buffer::{BufferOps, BufferOptions, BufferSpillResult, MemBlockBatch}; +use crate::store::mem::buffer::{BufferOptions, BufferSpillResult, MemBlockBatch, MemoryBuffer}; use crate::store::{Block, DataBytes, DataSegment, PartitionedMemoryData}; use croaring::Treemap; use fastrace::trace; @@ -44,7 +44,7 @@ pub struct OptStagingMemoryBuffer { buffer: Mutex, } -impl BufferOps for OptStagingMemoryBuffer { +impl MemoryBuffer for OptStagingMemoryBuffer { #[trace] fn new(opt: BufferOptions) -> Self { OptStagingMemoryBuffer { diff --git a/riffle-server/src/store/mem/buffer/unified_buffer.rs b/riffle-server/src/store/mem/buffer/unified_buffer.rs index 0de64129..e6e82ddf 100644 --- a/riffle-server/src/store/mem/buffer/unified_buffer.rs +++ b/riffle-server/src/store/mem/buffer/unified_buffer.rs @@ -1,7 +1,7 @@ use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; use crate::store::mem::buffer::unified_buffer::UnifiedBuffer::{DEFAULT, EXPERIMENTAL}; -use crate::store::mem::buffer::{BufferOps, BufferOptions, BufferSpillResult, BufferType}; +use crate::store::mem::buffer::{BufferOptions, BufferSpillResult, BufferType, MemoryBuffer}; use crate::store::{Block, PartitionedMemoryData}; use croaring::Treemap; @@ -11,7 +11,7 @@ pub enum UnifiedBuffer { EXPERIMENTAL(OptStagingMemoryBuffer), } -impl BufferOps for UnifiedBuffer { +impl MemoryBuffer for UnifiedBuffer { fn new(opts: BufferOptions) -> Self where Self: Sized, diff --git a/riffle-server/src/store/memory.rs b/riffle-server/src/store/memory.rs index 44a37370..5fcf1284 100644 --- a/riffle-server/src/store/memory.rs +++ b/riffle-server/src/store/memory.rs @@ -39,7 +39,7 @@ use crate::runtime::manager::RuntimeManager; use crate::store::mem::budget::MemoryBudget; use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; -use crate::store::mem::buffer::{BufferOps, BufferOptions, BufferType}; +use crate::store::mem::buffer::{BufferOptions, BufferType, MemoryBuffer}; use crate::store::mem::capacity::CapacitySnapshot; use crate::store::mem::ticket::TicketManager; use crate::store::spill::SpillWritingViewContext; @@ -52,7 +52,7 @@ use fxhash::{FxBuildHasher, FxHasher}; use log::{debug, info, warn}; use std::sync::Arc; -pub struct MemoryStore { +pub struct MemoryStore { memory_capacity: i64, state: DDashMap>, budget: MemoryBudget, @@ -61,10 +61,10 @@ pub struct MemoryStore, } -unsafe impl Send for MemoryStore {} -unsafe impl Sync for MemoryStore {} +unsafe impl Send for MemoryStore {} +unsafe impl Sync for MemoryStore {} -impl MemoryStore { +impl MemoryStore { // only for test cases pub fn new(max_memory_size: i64) -> Self { let budget = MemoryBudget::new(max_memory_size); @@ -258,7 +258,7 @@ impl MemoryStore { } #[async_trait] -impl Store for MemoryStore { +impl Store for MemoryStore { fn start(self: Arc) { // ignore } @@ -433,11 +433,11 @@ mod test { use crate::app_manager::purge_event::PurgeReason; use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer; use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer; - use crate::store::mem::buffer::BufferOps; + use crate::store::mem::buffer::MemoryBuffer; use anyhow::Result; use croaring::Treemap; - fn run_test_read_buffer_in_flight() { + fn run_test_read_buffer_in_flight() { let store: MemoryStore = MemoryStore::new(1024); let runtime = store.runtime_manager.clone(); @@ -610,7 +610,7 @@ mod test { run_test_read_buffer_in_flight::(); } - async fn get_data_with_last_block_id( + async fn get_data_with_last_block_id( default_single_read_size: i64, last_block_id: i64, store: &MemoryStore, @@ -653,7 +653,7 @@ mod test { WritingViewContext::create_for_test(uid, data_blocks) } - fn run_test_allocated_and_purge_for_memory() { + fn run_test_allocated_and_purge_for_memory() { let store: MemoryStore = MemoryStore::new(1024 * 1024 * 1024); let runtime = store.runtime_manager.clone(); @@ -685,7 +685,7 @@ mod test { run_test_allocated_and_purge_for_memory::(); } - fn run_test_purge() -> Result<()> { + fn run_test_purge() -> Result<()> { let store: MemoryStore = MemoryStore::new(1024); let runtime = store.runtime_manager.clone(); @@ -768,7 +768,7 @@ mod test { run_test_purge::(); } - fn run_test_put_and_get_for_memory() { + fn run_test_put_and_get_for_memory() { let store: MemoryStore = MemoryStore::new(1024 * 1024 * 1024); let runtime = store.runtime_manager.clone(); @@ -817,7 +817,7 @@ mod test { run_test_put_and_get_for_memory::(); } - fn run_test_block_id_filter_for_memory() { + fn run_test_block_id_filter_for_memory() { let store: MemoryStore = MemoryStore::new(1024 * 1024 * 1024); let runtime = store.runtime_manager.clone(); diff --git a/riffle-server/src/store/spill/spill_test.rs b/riffle-server/src/store/spill/spill_test.rs index bb027c75..8b5509e4 100644 --- a/riffle-server/src/store/spill/spill_test.rs +++ b/riffle-server/src/store/spill/spill_test.rs @@ -15,7 +15,7 @@ pub mod tests { }; use crate::runtime::manager::RuntimeManager; use crate::store::hybrid::{HybridStore, PersistentStore}; - use crate::store::mem::buffer::BufferOps; + use crate::store::mem::buffer::MemoryBuffer; use crate::store::spill::spill_test::mock::MockStore; use crate::store::spill::storage_flush_handler::StorageFlushHandler; use crate::store::spill::storage_select_handler::StorageSelectHandler;