Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge JournalManager, FlushManager and WriteBuffer into one FlushTracker #118

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
6 changes: 4 additions & 2 deletions src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ impl Batch {

// IMPORTANT: Add batch size to current write buffer size
// Otherwise write buffer growth is unbounded when using batches
self.keyspace.write_buffer_manager.allocate(batch_size);
self.keyspace
.flush_tracker
.increment_buffer_size(batch_size);

// Check each affected partition for write stall/halt
for partition in partitions_with_possible_stall {
Expand All @@ -147,7 +149,7 @@ impl Batch {

// IMPORTANT: Check write buffer as well
// Otherwise batch writes are never stalled/halted
let write_buffer_size = self.keyspace.write_buffer_manager.get();
let write_buffer_size = self.keyspace.flush_tracker.buffer_size();
partition.check_write_buffer_size(write_buffer_size);
}

Expand Down
39 changes: 12 additions & 27 deletions src/compaction/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,29 @@
// (found in the LICENSE-* files in the repository)

use crate::PartitionHandle;
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};
use std::{collections::VecDeque, sync::Mutex};
use std_semaphore::Semaphore;

pub struct CompactionManagerInner {
/// The compaction manager keeps track of which partitions
/// have recently been flushed in a FIFO queue.
///
/// Its semaphore notifies compaction threads which will wake
/// up and consume the queue items.
///
/// The semaphore is incremented by the flush worker and optionally
/// by the individual partitions in case of write halting.
pub struct CompactionManager {
partitions: Mutex<VecDeque<PartitionHandle>>,
semaphore: Semaphore,
}

impl Drop for CompactionManagerInner {
impl Drop for CompactionManager {
fn drop(&mut self) {
log::trace!("Dropping compaction manager");
}
}

impl Default for CompactionManagerInner {
impl Default for CompactionManager {
fn default() -> Self {
Self {
partitions: Mutex::new(VecDeque::with_capacity(10)),
Expand All @@ -29,26 +34,6 @@ impl Default for CompactionManagerInner {
}
}

/// The compaction manager keeps track of which partitions
/// have recently been flushed in a FIFO queue.
///
/// Its semaphore notifies compaction threads which will wake
/// up and consume the queue items.
///
/// The semaphore is incremented by the flush worker and optionally
/// by the individual partitions in case of write halting.
#[derive(Clone, Default)]
#[allow(clippy::module_name_repetitions)]
pub struct CompactionManager(Arc<CompactionManagerInner>);

impl std::ops::Deref for CompactionManager {
type Target = CompactionManagerInner;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl CompactionManager {
pub fn clear(&self) {
self.partitions.lock().expect("lock is poisoned").clear();
Expand Down
2 changes: 1 addition & 1 deletion src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn run(
item.0.name
);

let strategy = item.config.compaction_strategy.clone();
let strategy = &item.config.compaction_strategy;

// TODO: loop if there's more work to do

Expand Down
140 changes: 84 additions & 56 deletions src/flush/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
// (found in the LICENSE-* files in the repository)

use super::queue::FlushQueue;
use crate::{batch::PartitionKey, HashMap, HashSet, PartitionHandle};
use crate::{batch::PartitionKey, space_tracker::SpaceTracker, HashMap, PartitionHandle};
use lsm_tree::{Memtable, SegmentId};
use std::sync::Arc;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};

pub struct Task {
/// ID of memtable
pub(crate) id: SegmentId,
pub id: SegmentId,

/// Memtable to flush
pub(crate) sealed_memtable: Arc<Memtable>,
pub sealed_memtable: Arc<Memtable>,

/// Partition
pub(crate) partition: PartitionHandle,
pub partition: PartitionHandle,
}

impl std::fmt::Debug for Task {
Expand All @@ -24,44 +24,48 @@ impl std::fmt::Debug for Task {
}
}

// TODO: accessing flush manager shouldn't take RwLock... but changing its internals should

/// The [`FlushManager`] stores a dictionary of queues, each queue
/// The [`FlushTaskQueue`] stores a dictionary of queues, each queue
/// containing some flush tasks.
///
/// Each flush task references a sealed memtable and the given partition.
#[allow(clippy::module_name_repetitions)]
#[derive(Debug)]
pub struct FlushManager {
queues: HashMap<PartitionKey, FlushQueue>,
pub struct FlushTaskQueue {
queues: RwLock<HashMap<PartitionKey, FlushQueue>>,
/// Keeps track of write buffer size
buffer_size: SpaceTracker,
}

impl Drop for FlushManager {
fn drop(&mut self) {
log::trace!("Dropping flush manager");
impl FlushTaskQueue {
pub fn new() -> Self {
Self {
queues: RwLock::default(),
buffer_size: SpaceTracker::new(),
}
}

#[cfg(feature = "__internal_whitebox")]
crate::drop::decrement_drop_counter();
#[track_caller]
fn queues_read_lock(&self) -> RwLockReadGuard<'_, HashMap<Arc<str>, FlushQueue>> {
self.queues.read().expect("lock is poisoned")
}
}

impl FlushManager {
pub(crate) fn new() -> Self {
#[cfg(feature = "__internal_whitebox")]
crate::drop::increment_drop_counter();
#[track_caller]
fn queues_write_lock(&self) -> RwLockWriteGuard<'_, HashMap<Arc<str>, FlushQueue>> {
self.queues.write().expect("lock is poisoned")
}

Self {
queues: HashMap::default(),
}
pub fn buffer_size(&self) -> &SpaceTracker {
&self.buffer_size
}

pub(crate) fn clear(&mut self) {
self.queues.clear();
#[track_caller]
pub fn clear(&self) {
self.queues_write_lock().clear();
}

/// Gets the names of partitions that have queued tasks.
pub(crate) fn get_partitions_with_tasks(&self) -> HashSet<PartitionKey> {
self.queues
#[track_caller]
pub fn get_partitions_with_tasks(&self) -> Vec<PartitionKey> {
self.queues_read_lock()
.iter()
.filter(|(_, v)| !v.is_empty())
.map(|(k, _)| k)
Expand All @@ -70,50 +74,63 @@ impl FlushManager {
}

/// Returns the amount of queues.
pub(crate) fn queue_count(&self) -> usize {
self.queues.len()
#[track_caller]
pub fn queue_count(&self) -> usize {
self.queues_read_lock().len()
}

/// Returns the amount of bytes queued.
pub(crate) fn queued_size(&self) -> u64 {
self.queues.values().map(FlushQueue::size).sum::<u64>()
#[track_caller]
pub fn queued_size(&self) -> u64 {
self.queues_read_lock()
.values()
.map(FlushQueue::size)
.sum::<u64>()
}

// NOTE: is actually used in tests
#[allow(dead_code)]
/// Returns the amount of tasks that are queued to be flushed.
pub(crate) fn len(&self) -> usize {
self.queues.values().map(FlushQueue::len).sum::<usize>()
#[track_caller]
pub fn task_count(&self) -> usize {
self.queues_read_lock()
.values()
.map(FlushQueue::len)
.sum::<usize>()
}

// NOTE: is actually used in tests
#[allow(dead_code)]
#[must_use]
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
#[track_caller]
pub fn is_empty(&self) -> bool {
self.queues_read_lock().values().all(FlushQueue::is_empty)
}

pub(crate) fn remove_partition(&mut self, name: &str) {
self.queues.remove(name);
#[track_caller]
pub fn remove_partition(&self, name: &str) {
self.queues_write_lock().remove(name);
}

pub(crate) fn enqueue_task(&mut self, partition_name: PartitionKey, task: Task) {
#[track_caller]
pub fn enqueue(&self, task: Task) {
let partition_name = task.partition.name.clone();
log::debug!(
"Enqueuing {partition_name}:{} for flushing ({} B)",
task.id,
task.sealed_memtable.size()
);

self.queues
self.queues_write_lock()
.entry(partition_name)
.or_default()
.enqueue(Arc::new(task));
}

/// Returns a list of tasks per partition.
pub(crate) fn collect_tasks(&mut self, limit: usize) -> HashMap<PartitionKey, Vec<Arc<Task>>> {
let mut collected: HashMap<_, Vec<_>> = HashMap::default();
let mut cnt = 0;
#[track_caller]
pub fn collect_tasks(&self, limit: usize) -> Vec<Vec<Arc<Task>>> {
let mut collected = Vec::new();

// NOTE: Returning multiple tasks per partition is fine and will
// help with flushing very active partitions.
Expand All @@ -122,25 +139,36 @@ impl FlushManager {
// we will never cover up a lower seqno of some other segment.
// For this to work, all tasks need to be successful and atomically
// applied (all-or-nothing).
'outer: for (partition_name, queue) in &self.queues {
for item in queue.iter() {
if cnt == limit {
break 'outer;
for (partition_name, item) in self
.queues_read_lock()
.iter()
.flat_map(|(partition_name, queue)| {
queue.iter().map(move |item| (partition_name, item))
})
.take(limit)
{
match collected.last_mut() {
None => collected.push(vec![item.clone()]),
Some(items) => {
if items
.last()
.map_or(true, |item| &item.partition.name == partition_name)
{
items.push(item.clone());
} else {
collected.push(vec![item.clone()]);
}
}

collected
.entry(partition_name.clone())
.or_default()
.push(item.clone());

cnt += 1;
}
}

collected
}

pub(crate) fn dequeue_tasks(&mut self, partition_name: PartitionKey, cnt: usize) {
self.queues.entry(partition_name).or_default().dequeue(cnt);
#[track_caller]
pub fn dequeue(&self, partition_name: &PartitionKey, cnt: usize) {
if let Some(queue) = self.queues_write_lock().get_mut(partition_name) {
queue.dequeue(cnt)
}
}
}
Loading
Loading