Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions riffle-server/src/app_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ pub(crate) mod test {
capacity: "20B".to_string(),
buffer_ticket_timeout_sec: 1,
buffer_ticket_check_interval_sec: 1,
buffer_type: Default::default(),
}),
);
let _ = std::mem::replace(
Expand Down
10 changes: 10 additions & 0 deletions riffle-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::block_id_manager::BlockIdManagerType;
use crate::store::mem::buffer::BufferType;
use crate::store::ResponseDataIndex::Local;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand All @@ -31,6 +32,13 @@ pub struct MemoryStoreConfig {
pub buffer_ticket_timeout_sec: i64,
#[serde(default = "as_default_buffer_ticket_timeout_check_interval_sec")]
pub buffer_ticket_check_interval_sec: i64,

#[serde(default = "as_default_buffer_type")]
pub buffer_type: BufferType,
}

fn as_default_buffer_type() -> BufferType {
BufferType::DEFAULT
}

fn as_default_buffer_ticket_timeout_check_interval_sec() -> i64 {
Expand All @@ -47,6 +55,7 @@ impl MemoryStoreConfig {
capacity,
buffer_ticket_timeout_sec: as_default_buffer_ticket_timeout_sec(),
buffer_ticket_check_interval_sec: as_default_buffer_ticket_timeout_check_interval_sec(),
buffer_type: BufferType::DEFAULT,
}
}

Expand All @@ -55,6 +64,7 @@ impl MemoryStoreConfig {
capacity,
buffer_ticket_timeout_sec,
buffer_ticket_check_interval_sec: as_default_buffer_ticket_timeout_check_interval_sec(),
buffer_type: BufferType::DEFAULT,
}
}
}
Expand Down
20 changes: 12 additions & 8 deletions riffle-server/src/store/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,15 @@ use crate::app_manager::AppManagerRef;
use crate::config_reconfigure::ReconfigurableConfManager;
use crate::runtime::manager::RuntimeManager;
use crate::store::local::LocalfileStoreStat;
use crate::store::mem::buffer::MemoryBuffer;
use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer;
use crate::store::mem::buffer::unified_buffer::UnifiedBuffer;
use crate::store::mem::buffer::{BufferType, MemoryBuffer};
use crate::store::mem::capacity::CapacitySnapshot;
use crate::store::spill::hierarchy_event_bus::HierarchyEventBus;
use crate::store::spill::storage_flush_handler::StorageFlushHandler;
use crate::store::spill::storage_select_handler::StorageSelectHandler;
use crate::store::spill::{SpillMessage, SpillWritingViewContext};
use crate::store::ResponseData::Mem;
use tokio::time::Instant;

pub trait PersistentStore: Store + Persistent + Send + Sync + Any {
Expand All @@ -89,7 +92,7 @@ const DEFAULT_MEMORY_SPILL_MAX_CONCURRENCY: i32 = 20;

pub struct HybridStore {
// Box<dyn Store> will build fail
pub(crate) hot_store: Arc<MemoryStore>,
pub(crate) hot_store: Arc<MemoryStore<UnifiedBuffer>>,

pub(crate) warm_store: Option<Box<dyn PersistentStore>>,
pub(crate) cold_store: Option<Box<dyn PersistentStore>>,
Expand Down Expand Up @@ -172,11 +175,12 @@ impl HybridStore {
}
let async_watermark_spill_enable = hybrid_conf.async_watermark_spill_trigger_enable;

// use the unified buffer to delegate the underlying concrete buffer
let mem_store: MemoryStore<UnifiedBuffer> =
MemoryStore::from(config.memory_store.unwrap(), runtime_manager.clone());

let store = HybridStore {
hot_store: Arc::new(MemoryStore::from(
config.memory_store.unwrap(),
runtime_manager.clone(),
)),
hot_store: Arc::new(mem_store),
warm_store: persistent_stores.pop_front(),
cold_store: persistent_stores.pop_front(),
config: hybrid_conf,
Expand Down Expand Up @@ -426,7 +430,7 @@ impl HybridStore {
Ok(Default::default())
}

pub async fn get_memory_buffer(&self, uid: &PartitionUId) -> Result<Arc<MemoryBuffer>> {
pub async fn get_memory_buffer(&self, uid: &PartitionUId) -> Result<Arc<UnifiedBuffer>> {
self.hot_store.get_buffer(uid)
}

Expand Down Expand Up @@ -469,7 +473,7 @@ impl HybridStore {
async fn buffer_spill_impl(
&self,
uid: &PartitionUId,
buffer: Arc<MemoryBuffer>,
buffer: Arc<UnifiedBuffer>,
) -> Result<u64> {
let spill_result = buffer.spill()?;
if spill_result.is_none() {
Expand Down
Loading
Loading