Skip to content

Merge JournalManager, FlushManager and WriteBuffer into one FlushTracker #118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
Reduce clones and collects
* Replaces HashSet/Map with Vec where the results are immediately iterated over or collected into another Vec. The Vecs are guaranteed to be a set due to being derived from HashMaps.
hummingly committed Jan 2, 2025
commit 5a9c2cef63ce41b8a32f14763ce7a11bc88e7814
2 changes: 1 addition & 1 deletion src/compaction/worker.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ pub fn run(compaction_manager: &CompactionManager, snapshot_tracker: &SnapshotTr
item.0.name
);

let strategy = item.config.compaction_strategy.clone();
let strategy = &item.config.compaction_strategy;

// TODO: loop if there's more work to do

50 changes: 27 additions & 23 deletions src/flush/manager.rs
Original file line number Diff line number Diff line change
@@ -3,9 +3,7 @@
// (found in the LICENSE-* files in the repository)

use super::queue::FlushQueue;
use crate::{
batch::PartitionKey, write_buffer_manager::SpaceTracker, HashMap, HashSet, PartitionHandle,
};
use crate::{batch::PartitionKey, write_buffer_manager::SpaceTracker, HashMap, PartitionHandle};
use lsm_tree::{Memtable, SegmentId};
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};

@@ -67,7 +65,7 @@ impl FlushTaskQueues {

/// Gets the names of partitions that have queued tasks.
#[track_caller]
pub fn get_partitions_with_tasks(&self) -> HashSet<PartitionKey> {
pub fn get_partitions_with_tasks(&self) -> Vec<PartitionKey> {
self.queues_read_lock()
.iter()
.filter(|(_, v)| !v.is_empty())
@@ -132,9 +130,8 @@ impl FlushTaskQueues {

/// Returns a list of tasks per partition.
#[track_caller]
pub fn collect_tasks(&self, limit: usize) -> HashMap<PartitionKey, Vec<Arc<Task>>> {
let mut collected: HashMap<_, Vec<_>> = HashMap::default();
let mut cnt = 0;
pub fn collect_tasks(&self, limit: usize) -> Vec<Vec<Arc<Task>>> {
let mut collected = Vec::new();

// NOTE: Returning multiple tasks per partition is fine and will
// help with flushing very active partitions.
@@ -143,29 +140,36 @@ impl FlushTaskQueues {
// we will never cover up a lower seqno of some other segment.
// For this to work, all tasks need to be successful and atomically
// applied (all-or-nothing).
'outer: for (partition_name, queue) in self.queues_read_lock().iter() {
for item in queue.iter() {
if cnt == limit {
break 'outer;
for (partition_name, item) in self
.queues_read_lock()
.iter()
.flat_map(|(partition_name, queue)| {
queue.iter().map(move |item| (partition_name, item))
})
.take(limit)
{
match collected.last_mut() {
None => collected.push(vec![item.clone()]),
Some(items) => {
if items
.last()
.map_or(true, |item| &item.partition.name == partition_name)
{
items.push(item.clone());
} else {
collected.push(vec![item.clone()]);
}
}

collected
.entry(partition_name.clone())
.or_default()
.push(item.clone());

cnt += 1;
}
}

collected
}

#[track_caller]
pub fn dequeue(&self, partition_name: PartitionKey, cnt: usize) {
self.queues_write_lock()
.entry(partition_name)
.or_default()
.dequeue(cnt);
pub fn dequeue(&self, partition_name: &PartitionKey, cnt: usize) {
if let Some(queue) = self.queues_write_lock().get_mut(partition_name) {
queue.dequeue(cnt)
}
}
}
45 changes: 22 additions & 23 deletions src/flush/worker.rs
Original file line number Diff line number Diff line change
@@ -4,8 +4,8 @@

use super::manager::Task;
use crate::{
batch::PartitionKey, compaction::manager::CompactionManager, flush_tracker::FlushTracker,
snapshot_tracker::SnapshotTracker, HashMap, PartitionHandle,
compaction::manager::CompactionManager, flush_tracker::FlushTracker,
snapshot_tracker::SnapshotTracker, PartitionHandle,
};
use lsm_tree::{AbstractTree, Segment, SeqNo};
use std::sync::Arc;
@@ -47,31 +47,29 @@ type MultiFlushResults = Vec<crate::Result<MultiFlushResultItem>>;
///
/// Each thread is responsible for the tasks of one partition.
fn run_multi_flush(
partitioned_tasks: &HashMap<PartitionKey, Vec<Arc<Task>>>,
partitioned_tasks: Vec<Vec<Arc<Task>>>,
eviction_threshold: SeqNo,
) -> MultiFlushResults {
log::debug!("spawning {} worker threads", partitioned_tasks.len());

// NOTE: Don't trust clippy
#[allow(clippy::needless_collect)]
let threads = partitioned_tasks
.iter()
.map(|(partition_name, tasks)| {
let partition_name = partition_name.clone();
let tasks = tasks.clone();

.into_iter()
.map(|tasks| {
std::thread::spawn(move || {
log::trace!(
"flushing {} memtables for partition {partition_name:?}",
tasks.len()
);

let partition = tasks
.first()
.expect("should always have at least one task")
.partition
.clone();

log::trace!(
"flushing {} memtables for partition {:?}",
tasks.len(),
partition.name
);

let memtables_size: u64 = tasks
.iter()
.map(|t| u64::from(t.sealed_memtable.size()))
@@ -86,14 +84,17 @@ fn run_multi_flush(
})
.collect::<Vec<_>>();

let created_segments = flush_workers
.into_iter()
.map(|t| t.join().expect("should join"))
.collect::<crate::Result<Vec<_>>>()?;
let mut created_segments = Vec::with_capacity(flush_workers.len());

for t in flush_workers {
if let Some(segment) = t.join().expect("should join")? {
created_segments.push(segment);
}
}

Ok(MultiFlushResultItem {
partition,
created_segments: created_segments.into_iter().flatten().collect(),
created_segments,
size: memtables_size,
})
})
@@ -117,14 +118,12 @@ pub fn run(
log::debug!("read locking flush manager");
let partitioned_tasks = flush_tracker.collect_tasks(parallelism);

let task_count = partitioned_tasks.iter().map(|x| x.1.len()).sum::<usize>();

if task_count == 0 {
if partitioned_tasks.is_empty() {
log::debug!("No tasks collected");
return;
}

for result in run_multi_flush(&partitioned_tasks, snapshot_tracker.get_seqno_safe_to_gc()) {
for result in run_multi_flush(partitioned_tasks, snapshot_tracker.get_seqno_safe_to_gc()) {
match result {
Ok(MultiFlushResultItem {
partition,
@@ -142,7 +141,7 @@ pub fn run(
partition.name,
created_segments.len()
);
flush_tracker.dequeue_tasks(partition.name.clone(), created_segments.len());
flush_tracker.dequeue_tasks(&partition.name, created_segments.len());

flush_tracker.shrink_buffer(memtables_size);
compaction_manager.notify(partition);
37 changes: 22 additions & 15 deletions src/flush_tracker.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@ use crate::{
writer::Writer,
},
keyspace::Partitions,
HashMap, HashSet,
};
use lsm_tree::{AbstractTree, SequenceNumberCounter};
use std::{
@@ -68,7 +67,7 @@ impl FlushTracker {
let raw_reader = JournalReader::new(&journal_path)?;
let reader = JournalBatchReader::new(raw_reader);

let mut watermarks: HashMap<PartitionKey, EvictionWatermark> = HashMap::default();
let mut watermarks: Vec<EvictionWatermark> = Vec::new();

for batch in reader {
let batch = batch?;
@@ -77,15 +76,23 @@ impl FlushTracker {
if let Some(handle) = partitions_lock.get(&item.partition) {
let tree = &handle.tree;

watermarks
.entry(item.partition)
.and_modify(|prev| {
match watermarks.binary_search_by(|watermark| {
watermark.partition.name.cmp(&item.partition)
}) {
Ok(index) => {
let prev = &mut watermarks[index];
prev.lsn = prev.lsn.max(batch.seqno);
})
.or_insert_with(|| EvictionWatermark {
partition: handle.clone(),
lsn: batch.seqno,
});
}
Err(index) => {
watermarks.insert(
index,
EvictionWatermark {
partition: handle.clone(),
lsn: batch.seqno,
},
);
}
};

match item.value_type {
lsm_tree::ValueType::Value => {
@@ -105,7 +112,7 @@ impl FlushTracker {
log::debug!("Sealing recovered memtables");
let mut recovered_count = 0;

for handle in watermarks.values() {
for handle in &watermarks {
let tree = &handle.partition.tree;

let partition_lsn = tree.get_highest_persisted_seqno();
@@ -168,7 +175,7 @@ impl FlushTracker {

// IMPORTANT: Add sealed journal to journal manager
self.item_queue.enqueue(Item {
watermarks: watermarks.into_values().collect(),
watermarks,
path: journal_path.clone(),
size_in_bytes: journal_size,
});
@@ -189,7 +196,7 @@ impl FlushTracker {

/// Gets the names of partitions that have queued tasks.
#[inline(always)]
pub fn get_partitions_with_tasks(&self) -> HashSet<PartitionKey> {
pub fn get_partitions_with_tasks(&self) -> Vec<PartitionKey> {
self.task_queues.get_partitions_with_tasks()
}

@@ -233,12 +240,12 @@ impl FlushTracker {

/// Returns a list of tasks per partition.
#[inline(always)]
pub fn collect_tasks(&self, limit: usize) -> HashMap<PartitionKey, Vec<Arc<Task>>> {
pub fn collect_tasks(&self, limit: usize) -> Vec<Vec<Arc<Task>>> {
self.task_queues.collect_tasks(limit)
}

#[inline(always)]
pub fn dequeue_tasks(&self, partition_name: PartitionKey, cnt: usize) {
pub fn dequeue_tasks(&self, partition_name: &PartitionKey, cnt: usize) {
self.task_queues.dequeue(partition_name, cnt);
}
}
18 changes: 5 additions & 13 deletions src/journal/manager.rs
Original file line number Diff line number Diff line change
@@ -101,27 +101,17 @@ impl JournalItemQueue {
pub fn maintenance(&self) -> crate::Result<()> {
let mut items = self.items_write_lock();

loop {
let Some(item) = items.first() else {
return Ok(());
};

while let Some(item) = items.first() {
// TODO: unit test: check deleted partition does not prevent journal eviction
for item in &item.watermarks {
// Only check partition seqno if not deleted
if !item
.partition
.is_deleted
.load(std::sync::atomic::Ordering::Acquire)
&& item.partition.tree.get_highest_persisted_seqno() < Some(item.lsn)
{
let Some(partition_seqno) = item.partition.tree.get_highest_persisted_seqno()
else {
return Ok(());
};

if partition_seqno < item.lsn {
return Ok(());
}
return Ok(());
}
}

@@ -141,6 +131,8 @@ impl JournalItemQueue {

items.remove(0);
}

Ok(())
}

#[track_caller]
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -114,7 +114,6 @@ mod version;
mod write_buffer_manager;

pub(crate) type HashMap<K, V> = std::collections::HashMap<K, V, xxhash_rust::xxh3::Xxh3Builder>;
pub(crate) type HashSet<K> = std::collections::HashSet<K, xxhash_rust::xxh3::Xxh3Builder>;

pub use {
batch::Batch,
18 changes: 7 additions & 11 deletions src/monitor.rs
Original file line number Diff line number Diff line change
@@ -48,10 +48,11 @@ impl Monitor {
"monitor: try flushing affected partitions because journals have passed 50% of threshold"
);

let partitions = self.partitions.read().expect("lock is poisoned");

// TODO: this may not scale well for many partitions
let lowest_persisted_partition = partitions
let lowest_persisted_partition = self
.partitions
.read()
.expect("lock is poisoned")
.values()
.filter(|x| x.tree.active_memtable_size() > 0)
.min_by(|a, b| {
@@ -61,8 +62,6 @@ impl Monitor {
})
.cloned();

drop(partitions);

if let Some(lowest_persisted_partition) = lowest_persisted_partition {
let partitions_names_with_queued_tasks = self.flush_tracker.get_partitions_with_tasks();

@@ -89,12 +88,15 @@ impl Monitor {
log::trace!(
"monitor: flush inactive partition because write buffer has passed 50% of threshold"
);
let mut partitions_with_tasks = self.flush_tracker.get_partitions_with_tasks();
partitions_with_tasks.sort();

let mut partitions = self
.partitions
.read()
.expect("lock is poisoned")
.values()
.filter(|x| partitions_with_tasks.binary_search(&x.name).is_err())
.cloned()
.collect::<Vec<_>>();

@@ -104,12 +106,6 @@ impl Monitor {
.cmp(&a.tree.active_memtable_size())
});

let partitions_names_with_queued_tasks = self.flush_tracker.get_partitions_with_tasks();

let partitions = partitions
.into_iter()
.filter(|x| !partitions_names_with_queued_tasks.contains(&x.name));

for partition in partitions {
log::debug!("monitor: WB rotating {:?}", partition.name);