diff --git a/mini-lsm-mvcc/src/compact.rs b/mini-lsm-mvcc/src/compact.rs index b578d3628..c8d1f89a4 100644 --- a/mini-lsm-mvcc/src/compact.rs +++ b/mini-lsm-mvcc/src/compact.rs @@ -218,6 +218,61 @@ impl LsmStorageInner { Ok(new_sst) } + fn compact_two_levels( + &self, + snapshot: &LsmStorageState, + upper_level: &Option, + upper_level_sst_ids: &Vec, + _lower_level: &usize, + lower_level_sst_ids: &Vec, + compact_to_bottom_level: bool, + ) -> Result>> { + match upper_level { + Some(_) => { + let mut upper_sstables: Vec> = + Vec::with_capacity(upper_level_sst_ids.len()); + for sst_id in upper_level_sst_ids { + upper_sstables.push(snapshot.sstables[sst_id].clone()); + } + let mut lower_sstables: Vec> = + Vec::with_capacity(lower_level_sst_ids.len()); + for sst_id in lower_level_sst_ids { + lower_sstables.push(snapshot.sstables[sst_id].clone()); + } + + self.compact_generate_sst_from_iter( + TwoMergeIterator::create( + SstConcatIterator::create_and_seek_to_first(upper_sstables)?, + SstConcatIterator::create_and_seek_to_first(lower_sstables)?, + )?, + compact_to_bottom_level, + ) + } + None => { + let mut upper_iters: Vec> = + Vec::with_capacity(upper_level_sst_ids.len()); + for sst_id in upper_level_sst_ids { + upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( + snapshot.sstables[sst_id].clone(), + )?)); + } + let mut lower_sstables: Vec> = + Vec::with_capacity(lower_level_sst_ids.len()); + for sst_id in lower_level_sst_ids { + lower_sstables.push(snapshot.sstables[sst_id].clone()); + } + + self.compact_generate_sst_from_iter( + TwoMergeIterator::create( + MergeIterator::create(upper_iters), + SstConcatIterator::create_and_seek_to_first(lower_sstables)?, + )?, + compact_to_bottom_level, + ) + } + } + } + fn compact(&self, task: &CompactionTask) -> Result>> { let snapshot = { let state = self.state.read(); @@ -227,23 +282,14 @@ impl LsmStorageInner { CompactionTask::ForceFullCompaction { l0_sstables, l1_sstables, - } => { - let mut l0_iters = Vec::with_capacity(l0_sstables.len()); - for id in l0_sstables.iter() { - l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( - snapshot.sstables.get(id).unwrap().clone(), - )?)); - } - let mut l1_iters = Vec::with_capacity(l1_sstables.len()); - for id in l1_sstables.iter() { - l1_iters.push(snapshot.sstables.get(id).unwrap().clone()); - } - let iter = TwoMergeIterator::create( - MergeIterator::create(l0_iters), - SstConcatIterator::create_and_seek_to_first(l1_iters)?, - )?; - self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level()) - } + } => self.compact_two_levels( + &snapshot, + &None, + l0_sstables, + &1, + l1_sstables, + task.compact_to_bottom_level(), + ), CompactionTask::Simple(SimpleLeveledCompactionTask { upper_level, upper_level_sst_ids, @@ -257,42 +303,14 @@ impl LsmStorageInner { lower_level: _, lower_level_sst_ids, .. - }) => match upper_level { - Some(_) => { - let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len()); - for id in upper_level_sst_ids.iter() { - upper_ssts.push(snapshot.sstables.get(id).unwrap().clone()); - } - let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?; - let mut lower_ssts = Vec::with_capacity(lower_level_sst_ids.len()); - for id in lower_level_sst_ids.iter() { - lower_ssts.push(snapshot.sstables.get(id).unwrap().clone()); - } - let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?; - self.compact_generate_sst_from_iter( - TwoMergeIterator::create(upper_iter, lower_iter)?, - task.compact_to_bottom_level(), - ) - } - None => { - let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len()); - for id in upper_level_sst_ids.iter() { - upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( - snapshot.sstables.get(id).unwrap().clone(), - )?)); - } - let upper_iter = MergeIterator::create(upper_iters); - let mut lower_ssts = Vec::with_capacity(lower_level_sst_ids.len()); - for id in lower_level_sst_ids.iter() { - lower_ssts.push(snapshot.sstables.get(id).unwrap().clone()); - } - let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?; - self.compact_generate_sst_from_iter( - TwoMergeIterator::create(upper_iter, lower_iter)?, - task.compact_to_bottom_level(), - ) - } - }, + }) => self.compact_two_levels( + &snapshot, + upper_level, + upper_level_sst_ids, + &1, + lower_level_sst_ids, + task.compact_to_bottom_level(), + ), CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => { let mut iters = Vec::with_capacity(tiers.len()); for (_, tier_sst_ids) in tiers { diff --git a/mini-lsm-mvcc/src/compact/leveled.rs b/mini-lsm-mvcc/src/compact/leveled.rs index f1948d11a..c2ff6403e 100644 --- a/mini-lsm-mvcc/src/compact/leveled.rs +++ b/mini-lsm-mvcc/src/compact/leveled.rs @@ -51,6 +51,14 @@ impl LeveledCompactionController { sst_ids: &[usize], in_level: usize, ) -> Vec { + let mut sstables = Vec::with_capacity(snapshot.levels[in_level - 1].1.len()); + for sst_id in &snapshot.levels[in_level - 1].1 { + sstables.push(snapshot.sstables[sst_id].clone()); + } + if sstables.is_empty() { + return Vec::new(); + } + let begin_key = sst_ids .iter() .map(|id| snapshot.sstables[id].first_key()) @@ -63,15 +71,22 @@ impl LeveledCompactionController { .max() .cloned() .unwrap(); + let mut overlap_ssts = Vec::new(); - for sst_id in &snapshot.levels[in_level - 1].1 { - let sst = &snapshot.sstables[sst_id]; - let first_key = sst.first_key(); - let last_key = sst.last_key(); - if !(last_key < &begin_key || first_key > &end_key) { - overlap_ssts.push(*sst_id); - } - } + let lower = sstables.partition_point(|table| *table.last_key() < begin_key); + let upper = sstables + .partition_point(|table| *table.first_key() <= end_key) + .saturating_sub(1); + println!( + "[leveled compaction] level {} find overlapping ssts, lower: {}, upper: {}", + in_level, lower, upper + ); + overlap_ssts.extend( + sstables[lower..=upper] + .iter() + .map(|table| table.sst_id()) + .collect::>(), + ); overlap_ssts } diff --git a/mini-lsm/src/compact.rs b/mini-lsm/src/compact.rs index e531e1196..41915389e 100644 --- a/mini-lsm/src/compact.rs +++ b/mini-lsm/src/compact.rs @@ -172,6 +172,61 @@ impl LsmStorageInner { Ok(new_sst) } + fn compact_two_levels( + &self, + snapshot: &LsmStorageState, + upper_level: &Option, + upper_level_sst_ids: &Vec, + _lower_level: &usize, + lower_level_sst_ids: &Vec, + compact_to_bottom_level: bool, + ) -> Result>> { + match upper_level { + Some(_) => { + let mut upper_sstables: Vec> = + Vec::with_capacity(upper_level_sst_ids.len()); + for sst_id in upper_level_sst_ids { + upper_sstables.push(snapshot.sstables[sst_id].clone()); + } + let mut lower_sstables: Vec> = + Vec::with_capacity(lower_level_sst_ids.len()); + for sst_id in lower_level_sst_ids { + lower_sstables.push(snapshot.sstables[sst_id].clone()); + } + + self.compact_generate_sst_from_iter( + TwoMergeIterator::create( + SstConcatIterator::create_and_seek_to_first(upper_sstables)?, + SstConcatIterator::create_and_seek_to_first(lower_sstables)?, + )?, + compact_to_bottom_level, + ) + } + None => { + let mut upper_iters: Vec> = + Vec::with_capacity(upper_level_sst_ids.len()); + for sst_id in upper_level_sst_ids { + upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( + snapshot.sstables[sst_id].clone(), + )?)); + } + let mut lower_sstables: Vec> = + Vec::with_capacity(lower_level_sst_ids.len()); + for sst_id in lower_level_sst_ids { + lower_sstables.push(snapshot.sstables[sst_id].clone()); + } + + self.compact_generate_sst_from_iter( + TwoMergeIterator::create( + MergeIterator::create(upper_iters), + SstConcatIterator::create_and_seek_to_first(lower_sstables)?, + )?, + compact_to_bottom_level, + ) + } + } + } + fn compact(&self, task: &CompactionTask) -> Result>> { let snapshot = { let state = self.state.read(); @@ -181,23 +236,14 @@ impl LsmStorageInner { CompactionTask::ForceFullCompaction { l0_sstables, l1_sstables, - } => { - let mut l0_iters = Vec::with_capacity(l0_sstables.len()); - for id in l0_sstables.iter() { - l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( - snapshot.sstables.get(id).unwrap().clone(), - )?)); - } - let mut l1_iters = Vec::with_capacity(l1_sstables.len()); - for id in l1_sstables.iter() { - l1_iters.push(snapshot.sstables.get(id).unwrap().clone()); - } - let iter = TwoMergeIterator::create( - MergeIterator::create(l0_iters), - SstConcatIterator::create_and_seek_to_first(l1_iters)?, - )?; - self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level()) - } + } => self.compact_two_levels( + &snapshot, + &None, + l0_sstables, + &1, + l1_sstables, + task.compact_to_bottom_level(), + ), CompactionTask::Simple(SimpleLeveledCompactionTask { upper_level, upper_level_sst_ids, @@ -211,42 +257,14 @@ impl LsmStorageInner { lower_level: _, lower_level_sst_ids, .. - }) => match upper_level { - Some(_) => { - let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len()); - for id in upper_level_sst_ids.iter() { - upper_ssts.push(snapshot.sstables.get(id).unwrap().clone()); - } - let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?; - let mut lower_ssts = Vec::with_capacity(lower_level_sst_ids.len()); - for id in lower_level_sst_ids.iter() { - lower_ssts.push(snapshot.sstables.get(id).unwrap().clone()); - } - let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?; - self.compact_generate_sst_from_iter( - TwoMergeIterator::create(upper_iter, lower_iter)?, - task.compact_to_bottom_level(), - ) - } - None => { - let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len()); - for id in upper_level_sst_ids.iter() { - upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first( - snapshot.sstables.get(id).unwrap().clone(), - )?)); - } - let upper_iter = MergeIterator::create(upper_iters); - let mut lower_ssts = Vec::with_capacity(lower_level_sst_ids.len()); - for id in lower_level_sst_ids.iter() { - lower_ssts.push(snapshot.sstables.get(id).unwrap().clone()); - } - let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?; - self.compact_generate_sst_from_iter( - TwoMergeIterator::create(upper_iter, lower_iter)?, - task.compact_to_bottom_level(), - ) - } - }, + }) => self.compact_two_levels( + &snapshot, + upper_level, + upper_level_sst_ids, + &1, + lower_level_sst_ids, + task.compact_to_bottom_level(), + ), CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => { let mut iters = Vec::with_capacity(tiers.len()); for (_, tier_sst_ids) in tiers { diff --git a/mini-lsm/src/compact/leveled.rs b/mini-lsm/src/compact/leveled.rs index 71925a57d..c98da59de 100644 --- a/mini-lsm/src/compact/leveled.rs +++ b/mini-lsm/src/compact/leveled.rs @@ -51,6 +51,14 @@ impl LeveledCompactionController { sst_ids: &[usize], in_level: usize, ) -> Vec { + let mut sstables = Vec::with_capacity(snapshot.levels[in_level - 1].1.len()); + for sst_id in &snapshot.levels[in_level - 1].1 { + sstables.push(snapshot.sstables[sst_id].clone()); + } + if sstables.is_empty() { + return Vec::new(); + } + let begin_key = sst_ids .iter() .map(|id| snapshot.sstables[id].first_key()) @@ -63,15 +71,22 @@ impl LeveledCompactionController { .max() .cloned() .unwrap(); + let mut overlap_ssts = Vec::new(); - for sst_id in &snapshot.levels[in_level - 1].1 { - let sst = &snapshot.sstables[sst_id]; - let first_key = sst.first_key(); - let last_key = sst.last_key(); - if !(last_key < &begin_key || first_key > &end_key) { - overlap_ssts.push(*sst_id); - } - } + let lower = sstables.partition_point(|table| *table.last_key() < begin_key); + let upper = sstables + .partition_point(|table| *table.first_key() <= end_key) + .saturating_sub(1); + println!( + "[leveled compaction] level {} find overlapping ssts, lower: {}, upper: {}", + in_level, lower, upper + ); + overlap_ssts.extend( + sstables[lower..=upper] + .iter() + .map(|table| table.sst_id()) + .collect::>(), + ); overlap_ssts }