Skip to content

Commit 27576c9

Browse files
thuongle2210zuston
andauthored
feat(memory): Introduce a pluggable mem-buffer mechanism and an optimized staging buffer impl (#564)
### Description Resolves [issues-546](#546) . This PR introduces a pluggable mem-buffer mechanism, and based on this, this also introduces a optimized staging lookup implementaion of mem-buffer. ### Changelogs of `optimized staging lookup impl` 1. Converts staging buffer from previous structure to `Vec<Block>` for contiguous memory access 2. Adds `batch_boundaries` vector to track batch start positions 3. Introduces `block_position_index` for O(1) block ID to vector index mapping ### Performance Impact - Improves search performance in staging buffers from O(number of staging blocks) to O(1 + number of selected staging blocks) by eliminating linear scans during get operations - Easily extendable with new memory buffer implementations for benchmarking, testing, etc., and switch to the most efficient approach. --------- Co-authored-by: thuong <ledacthuong2210> Co-authored-by: Junfan Zhang <[email protected]>
1 parent ff67a5c commit 27576c9

File tree

11 files changed

+864
-269
lines changed

11 files changed

+864
-269
lines changed

riffle-server/src/app_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,7 @@ pub(crate) mod test {
562562
capacity: "20B".to_string(),
563563
buffer_ticket_timeout_sec: 1,
564564
buffer_ticket_check_interval_sec: 1,
565+
buffer_type: Default::default(),
565566
}),
566567
);
567568
let _ = std::mem::replace(

riffle-server/src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use crate::block_id_manager::BlockIdManagerType;
19+
use crate::store::mem::buffer::BufferType;
1920
use crate::store::ResponseDataIndex::Local;
2021
use serde::{Deserialize, Serialize};
2122
use std::collections::HashMap;
@@ -31,6 +32,13 @@ pub struct MemoryStoreConfig {
3132
pub buffer_ticket_timeout_sec: i64,
3233
#[serde(default = "as_default_buffer_ticket_timeout_check_interval_sec")]
3334
pub buffer_ticket_check_interval_sec: i64,
35+
36+
#[serde(default = "as_default_buffer_type")]
37+
pub buffer_type: BufferType,
38+
}
39+
40+
fn as_default_buffer_type() -> BufferType {
41+
BufferType::DEFAULT
3442
}
3543

3644
fn as_default_buffer_ticket_timeout_check_interval_sec() -> i64 {
@@ -47,6 +55,7 @@ impl MemoryStoreConfig {
4755
capacity,
4856
buffer_ticket_timeout_sec: as_default_buffer_ticket_timeout_sec(),
4957
buffer_ticket_check_interval_sec: as_default_buffer_ticket_timeout_check_interval_sec(),
58+
buffer_type: BufferType::DEFAULT,
5059
}
5160
}
5261

@@ -55,6 +64,7 @@ impl MemoryStoreConfig {
5564
capacity,
5665
buffer_ticket_timeout_sec,
5766
buffer_ticket_check_interval_sec: as_default_buffer_ticket_timeout_check_interval_sec(),
67+
buffer_type: BufferType::DEFAULT,
5868
}
5969
}
6070
}

riffle-server/src/store/hybrid.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,15 @@ use crate::app_manager::AppManagerRef;
6161
use crate::config_reconfigure::ReconfigurableConfManager;
6262
use crate::runtime::manager::RuntimeManager;
6363
use crate::store::local::LocalfileStoreStat;
64-
use crate::store::mem::buffer::MemoryBuffer;
64+
use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer;
65+
use crate::store::mem::buffer::unified_buffer::UnifiedBuffer;
66+
use crate::store::mem::buffer::{BufferType, MemoryBuffer};
6567
use crate::store::mem::capacity::CapacitySnapshot;
6668
use crate::store::spill::hierarchy_event_bus::HierarchyEventBus;
6769
use crate::store::spill::storage_flush_handler::StorageFlushHandler;
6870
use crate::store::spill::storage_select_handler::StorageSelectHandler;
6971
use crate::store::spill::{SpillMessage, SpillWritingViewContext};
72+
use crate::store::ResponseData::Mem;
7073
use tokio::time::Instant;
7174

7275
pub trait PersistentStore: Store + Persistent + Send + Sync + Any {
@@ -89,7 +92,7 @@ const DEFAULT_MEMORY_SPILL_MAX_CONCURRENCY: i32 = 20;
8992

9093
pub struct HybridStore {
9194
// Box<dyn Store> will build fail
92-
pub(crate) hot_store: Arc<MemoryStore>,
95+
pub(crate) hot_store: Arc<MemoryStore<UnifiedBuffer>>,
9396

9497
pub(crate) warm_store: Option<Box<dyn PersistentStore>>,
9598
pub(crate) cold_store: Option<Box<dyn PersistentStore>>,
@@ -172,11 +175,12 @@ impl HybridStore {
172175
}
173176
let async_watermark_spill_enable = hybrid_conf.async_watermark_spill_trigger_enable;
174177

178+
// use the unified buffer to delegate the underlying concrete buffer
179+
let mem_store: MemoryStore<UnifiedBuffer> =
180+
MemoryStore::from(config.memory_store.unwrap(), runtime_manager.clone());
181+
175182
let store = HybridStore {
176-
hot_store: Arc::new(MemoryStore::from(
177-
config.memory_store.unwrap(),
178-
runtime_manager.clone(),
179-
)),
183+
hot_store: Arc::new(mem_store),
180184
warm_store: persistent_stores.pop_front(),
181185
cold_store: persistent_stores.pop_front(),
182186
config: hybrid_conf,
@@ -426,7 +430,7 @@ impl HybridStore {
426430
Ok(Default::default())
427431
}
428432

429-
pub async fn get_memory_buffer(&self, uid: &PartitionUId) -> Result<Arc<MemoryBuffer>> {
433+
pub async fn get_memory_buffer(&self, uid: &PartitionUId) -> Result<Arc<UnifiedBuffer>> {
430434
self.hot_store.get_buffer(uid)
431435
}
432436

@@ -469,7 +473,7 @@ impl HybridStore {
469473
async fn buffer_spill_impl(
470474
&self,
471475
uid: &PartitionUId,
472-
buffer: Arc<MemoryBuffer>,
476+
buffer: Arc<UnifiedBuffer>,
473477
) -> Result<u64> {
474478
let spill_result = buffer.spill()?;
475479
if spill_result.is_none() {

0 commit comments

Comments
 (0)