Skip to content

Commit 187707a

Browse files
committed
rename to MemoryBuffer as a trait name
1 parent 81d9a89 commit 187707a

File tree

7 files changed

+28
-27
lines changed

7 files changed

+28
-27
lines changed

riffle-server/src/store/hybrid.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ use crate::runtime::manager::RuntimeManager;
6363
use crate::store::local::LocalfileStoreStat;
6464
use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer;
6565
use crate::store::mem::buffer::unified_buffer::UnifiedBuffer;
66-
use crate::store::mem::buffer::{BufferOps, BufferType};
66+
use crate::store::mem::buffer::{BufferType, MemoryBuffer};
6767
use crate::store::mem::capacity::CapacitySnapshot;
6868
use crate::store::spill::hierarchy_event_bus::HierarchyEventBus;
6969
use crate::store::spill::storage_flush_handler::StorageFlushHandler;

riffle-server/src/store/mem/buffer.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ impl BufferSpillResult {
6969
}
7070
}
7171

72-
pub trait BufferOps {
72+
pub trait MemoryBuffer {
7373
/// Creates a new buffer instance
7474
fn new(options: BufferOptions) -> Self
7575
where
@@ -125,7 +125,7 @@ pub trait BufferOps {
125125
mod test {
126126
use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer;
127127
use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer;
128-
use crate::store::mem::buffer::BufferOps;
128+
use crate::store::mem::buffer::MemoryBuffer;
129129
use crate::store::Block;
130130
use hashlink::LinkedHashMap;
131131
use std::collections::LinkedList;
@@ -158,7 +158,8 @@ mod test {
158158
}
159159
}
160160

161-
fn run_test_with_block_id_zero<B: BufferOps + Send + Sync + 'static>() -> anyhow::Result<()> {
161+
fn run_test_with_block_id_zero<B: MemoryBuffer + Send + Sync + 'static>() -> anyhow::Result<()>
162+
{
162163
let mut buffer = B::new(Default::default());
163164
let block_1 = create_block(10, 100);
164165
let block_2 = create_block(10, 0);
@@ -192,7 +193,7 @@ mod test {
192193
Ok(())
193194
}
194195

195-
fn run_test_put_get<B: BufferOps + Send + Sync + 'static>() -> anyhow::Result<()> {
196+
fn run_test_put_get<B: MemoryBuffer + Send + Sync + 'static>() -> anyhow::Result<()> {
196197
let mut buffer = B::new(Default::default());
197198

198199
/// case1
@@ -300,7 +301,7 @@ mod test {
300301
Ok(())
301302
}
302303

303-
fn run_test_get_v2_is_end_with_only_staging<B: BufferOps + Send + Sync + 'static>(
304+
fn run_test_get_v2_is_end_with_only_staging<B: MemoryBuffer + Send + Sync + 'static>(
304305
) -> anyhow::Result<()> {
305306
let buffer = B::new(Default::default());
306307
// 0 -> 10 blocks with total 100 bytes
@@ -337,7 +338,7 @@ mod test {
337338
Ok(())
338339
}
339340

340-
fn run_test_get_v2_is_end_across_flight_and_staging<B: BufferOps + Send + Sync + 'static>(
341+
fn run_test_get_v2_is_end_across_flight_and_staging<B: MemoryBuffer + Send + Sync + 'static>(
341342
) -> anyhow::Result<()> {
342343
let buffer = B::new(Default::default());
343344

riffle-server/src/store/mem/buffer/default_buffer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{BufferOps, BufferOptions, BufferSpillResult, MemBlockBatch};
1+
use super::{BufferOptions, BufferSpillResult, MemBlockBatch, MemoryBuffer};
22
use crate::composed_bytes;
33
use crate::composed_bytes::ComposedBytes;
44
use crate::constant::INVALID_BLOCK_ID;
@@ -43,7 +43,7 @@ impl Inner {
4343
}
4444
}
4545

46-
impl BufferOps for DefaultMemoryBuffer {
46+
impl MemoryBuffer for DefaultMemoryBuffer {
4747
fn new(opt: BufferOptions) -> DefaultMemoryBuffer {
4848
DefaultMemoryBuffer {
4949
buffer: Mutex::new(Inner::new()),

riffle-server/src/store/mem/buffer/opt_buffer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::composed_bytes::ComposedBytes;
22
use crate::constant::INVALID_BLOCK_ID;
33
use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer;
4-
use crate::store::mem::buffer::{BufferOps, BufferOptions, BufferSpillResult, MemBlockBatch};
4+
use crate::store::mem::buffer::{BufferOptions, BufferSpillResult, MemBlockBatch, MemoryBuffer};
55
use crate::store::{Block, DataBytes, DataSegment, PartitionedMemoryData};
66
use croaring::Treemap;
77
use fastrace::trace;
@@ -44,7 +44,7 @@ pub struct OptStagingMemoryBuffer {
4444
buffer: Mutex<OptStagingBufferInternal>,
4545
}
4646

47-
impl BufferOps for OptStagingMemoryBuffer {
47+
impl MemoryBuffer for OptStagingMemoryBuffer {
4848
#[trace]
4949
fn new(opt: BufferOptions) -> Self {
5050
OptStagingMemoryBuffer {

riffle-server/src/store/mem/buffer/unified_buffer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer;
22
use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer;
33
use crate::store::mem::buffer::unified_buffer::UnifiedBuffer::{DEFAULT, EXPERIMENTAL};
4-
use crate::store::mem::buffer::{BufferOps, BufferOptions, BufferSpillResult, BufferType};
4+
use crate::store::mem::buffer::{BufferOptions, BufferSpillResult, BufferType, MemoryBuffer};
55
use crate::store::{Block, PartitionedMemoryData};
66
use croaring::Treemap;
77

@@ -11,7 +11,7 @@ pub enum UnifiedBuffer {
1111
EXPERIMENTAL(OptStagingMemoryBuffer),
1212
}
1313

14-
impl BufferOps for UnifiedBuffer {
14+
impl MemoryBuffer for UnifiedBuffer {
1515
fn new(opts: BufferOptions) -> Self
1616
where
1717
Self: Sized,

riffle-server/src/store/memory.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::runtime::manager::RuntimeManager;
3939
use crate::store::mem::budget::MemoryBudget;
4040
use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer;
4141
use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer;
42-
use crate::store::mem::buffer::{BufferOps, BufferOptions, BufferType};
42+
use crate::store::mem::buffer::{BufferOptions, BufferType, MemoryBuffer};
4343
use crate::store::mem::capacity::CapacitySnapshot;
4444
use crate::store::mem::ticket::TicketManager;
4545
use crate::store::spill::SpillWritingViewContext;
@@ -52,7 +52,7 @@ use fxhash::{FxBuildHasher, FxHasher};
5252
use log::{debug, info, warn};
5353
use std::sync::Arc;
5454

55-
pub struct MemoryStore<B: BufferOps + Send + Sync + 'static = DefaultMemoryBuffer> {
55+
pub struct MemoryStore<B: MemoryBuffer + Send + Sync + 'static = DefaultMemoryBuffer> {
5656
memory_capacity: i64,
5757
state: DDashMap<PartitionUId, Arc<B>>,
5858
budget: MemoryBudget,
@@ -61,10 +61,10 @@ pub struct MemoryStore<B: BufferOps + Send + Sync + 'static = DefaultMemoryBuffe
6161
cfg: Option<MemoryStoreConfig>,
6262
}
6363

64-
unsafe impl<B: BufferOps + Send + Sync> Send for MemoryStore<B> {}
65-
unsafe impl<B: BufferOps + Send + Sync> Sync for MemoryStore<B> {}
64+
unsafe impl<B: MemoryBuffer + Send + Sync> Send for MemoryStore<B> {}
65+
unsafe impl<B: MemoryBuffer + Send + Sync> Sync for MemoryStore<B> {}
6666

67-
impl<B: BufferOps + Send + Sync + 'static> MemoryStore<B> {
67+
impl<B: MemoryBuffer + Send + Sync + 'static> MemoryStore<B> {
6868
// only for test cases
6969
pub fn new(max_memory_size: i64) -> Self {
7070
let budget = MemoryBudget::new(max_memory_size);
@@ -258,7 +258,7 @@ impl<B: BufferOps + Send + Sync + 'static> MemoryStore<B> {
258258
}
259259

260260
#[async_trait]
261-
impl<B: BufferOps + Send + Sync + 'static> Store for MemoryStore<B> {
261+
impl<B: MemoryBuffer + Send + Sync + 'static> Store for MemoryStore<B> {
262262
fn start(self: Arc<Self>) {
263263
// ignore
264264
}
@@ -433,11 +433,11 @@ mod test {
433433
use crate::app_manager::purge_event::PurgeReason;
434434
use crate::store::mem::buffer::default_buffer::DefaultMemoryBuffer;
435435
use crate::store::mem::buffer::opt_buffer::OptStagingMemoryBuffer;
436-
use crate::store::mem::buffer::BufferOps;
436+
use crate::store::mem::buffer::MemoryBuffer;
437437
use anyhow::Result;
438438
use croaring::Treemap;
439439

440-
fn run_test_read_buffer_in_flight<B: BufferOps + Send + Sync + 'static>() {
440+
fn run_test_read_buffer_in_flight<B: MemoryBuffer + Send + Sync + 'static>() {
441441
let store: MemoryStore<B> = MemoryStore::new(1024);
442442
let runtime = store.runtime_manager.clone();
443443

@@ -610,7 +610,7 @@ mod test {
610610
run_test_read_buffer_in_flight::<OptStagingMemoryBuffer>();
611611
}
612612

613-
async fn get_data_with_last_block_id<B: BufferOps + Send + Sync + 'static>(
613+
async fn get_data_with_last_block_id<B: MemoryBuffer + Send + Sync + 'static>(
614614
default_single_read_size: i64,
615615
last_block_id: i64,
616616
store: &MemoryStore<B>,
@@ -653,7 +653,7 @@ mod test {
653653
WritingViewContext::create_for_test(uid, data_blocks)
654654
}
655655

656-
fn run_test_allocated_and_purge_for_memory<B: BufferOps + Send + Sync + 'static>() {
656+
fn run_test_allocated_and_purge_for_memory<B: MemoryBuffer + Send + Sync + 'static>() {
657657
let store: MemoryStore<B> = MemoryStore::new(1024 * 1024 * 1024);
658658
let runtime = store.runtime_manager.clone();
659659

@@ -685,7 +685,7 @@ mod test {
685685
run_test_allocated_and_purge_for_memory::<OptStagingMemoryBuffer>();
686686
}
687687

688-
fn run_test_purge<B: BufferOps + Send + Sync + 'static>() -> Result<()> {
688+
fn run_test_purge<B: MemoryBuffer + Send + Sync + 'static>() -> Result<()> {
689689
let store: MemoryStore<B> = MemoryStore::new(1024);
690690
let runtime = store.runtime_manager.clone();
691691

@@ -768,7 +768,7 @@ mod test {
768768
run_test_purge::<OptStagingMemoryBuffer>();
769769
}
770770

771-
fn run_test_put_and_get_for_memory<B: BufferOps + Send + Sync + 'static>() {
771+
fn run_test_put_and_get_for_memory<B: MemoryBuffer + Send + Sync + 'static>() {
772772
let store: MemoryStore<B> = MemoryStore::new(1024 * 1024 * 1024);
773773
let runtime = store.runtime_manager.clone();
774774

@@ -817,7 +817,7 @@ mod test {
817817
run_test_put_and_get_for_memory::<OptStagingMemoryBuffer>();
818818
}
819819

820-
fn run_test_block_id_filter_for_memory<B: BufferOps + Send + Sync + 'static>() {
820+
fn run_test_block_id_filter_for_memory<B: MemoryBuffer + Send + Sync + 'static>() {
821821
let store: MemoryStore<B> = MemoryStore::new(1024 * 1024 * 1024);
822822
let runtime = store.runtime_manager.clone();
823823

riffle-server/src/store/spill/spill_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub mod tests {
1515
};
1616
use crate::runtime::manager::RuntimeManager;
1717
use crate::store::hybrid::{HybridStore, PersistentStore};
18-
use crate::store::mem::buffer::BufferOps;
18+
use crate::store::mem::buffer::MemoryBuffer;
1919
use crate::store::spill::spill_test::mock::MockStore;
2020
use crate::store::spill::storage_flush_handler::StorageFlushHandler;
2121
use crate::store::spill::storage_select_handler::StorageSelectHandler;

0 commit comments

Comments
 (0)