Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 71 additions & 53 deletions mini-lsm-mvcc/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,61 @@ impl LsmStorageInner {
Ok(new_sst)
}

fn compact_two_levels(
&self,
snapshot: &LsmStorageState,
upper_level: &Option<usize>,
upper_level_sst_ids: &Vec<usize>,
_lower_level: &usize,
lower_level_sst_ids: &Vec<usize>,
compact_to_bottom_level: bool,
) -> Result<Vec<Arc<SsTable>>> {
match upper_level {
Some(_) => {
let mut upper_sstables: Vec<Arc<SsTable>> =
Vec::with_capacity(upper_level_sst_ids.len());
for sst_id in upper_level_sst_ids {
upper_sstables.push(snapshot.sstables[sst_id].clone());
}
let mut lower_sstables: Vec<Arc<SsTable>> =
Vec::with_capacity(lower_level_sst_ids.len());
for sst_id in lower_level_sst_ids {
lower_sstables.push(snapshot.sstables[sst_id].clone());
}

self.compact_generate_sst_from_iter(
TwoMergeIterator::create(
SstConcatIterator::create_and_seek_to_first(upper_sstables)?,
SstConcatIterator::create_and_seek_to_first(lower_sstables)?,
)?,
compact_to_bottom_level,
)
}
None => {
let mut upper_iters: Vec<Box<SsTableIterator>> =
Vec::with_capacity(upper_level_sst_ids.len());
for sst_id in upper_level_sst_ids {
upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
snapshot.sstables[sst_id].clone(),
)?));
}
let mut lower_sstables: Vec<Arc<SsTable>> =
Vec::with_capacity(lower_level_sst_ids.len());
for sst_id in lower_level_sst_ids {
lower_sstables.push(snapshot.sstables[sst_id].clone());
}

self.compact_generate_sst_from_iter(
TwoMergeIterator::create(
MergeIterator::create(upper_iters),
SstConcatIterator::create_and_seek_to_first(lower_sstables)?,
)?,
compact_to_bottom_level,
)
}
}
}

fn compact(&self, task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
let snapshot = {
let state = self.state.read();
Expand All @@ -227,23 +282,14 @@ impl LsmStorageInner {
CompactionTask::ForceFullCompaction {
l0_sstables,
l1_sstables,
} => {
let mut l0_iters = Vec::with_capacity(l0_sstables.len());
for id in l0_sstables.iter() {
l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
snapshot.sstables.get(id).unwrap().clone(),
)?));
}
let mut l1_iters = Vec::with_capacity(l1_sstables.len());
for id in l1_sstables.iter() {
l1_iters.push(snapshot.sstables.get(id).unwrap().clone());
}
let iter = TwoMergeIterator::create(
MergeIterator::create(l0_iters),
SstConcatIterator::create_and_seek_to_first(l1_iters)?,
)?;
self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level())
}
} => self.compact_two_levels(
&snapshot,
&None,
l0_sstables,
&1,
l1_sstables,
task.compact_to_bottom_level(),
),
CompactionTask::Simple(SimpleLeveledCompactionTask {
upper_level,
upper_level_sst_ids,
Expand All @@ -257,42 +303,14 @@ impl LsmStorageInner {
lower_level: _,
lower_level_sst_ids,
..
}) => match upper_level {
Some(_) => {
let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len());
for id in upper_level_sst_ids.iter() {
upper_ssts.push(snapshot.sstables.get(id).unwrap().clone());
}
let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?;
let mut lower_ssts = Vec::with_capacity(lower_level_sst_ids.len());
for id in lower_level_sst_ids.iter() {
lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
}
let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
self.compact_generate_sst_from_iter(
TwoMergeIterator::create(upper_iter, lower_iter)?,
task.compact_to_bottom_level(),
)
}
None => {
let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len());
for id in upper_level_sst_ids.iter() {
upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
snapshot.sstables.get(id).unwrap().clone(),
)?));
}
let upper_iter = MergeIterator::create(upper_iters);
let mut lower_ssts = Vec::with_capacity(lower_level_sst_ids.len());
for id in lower_level_sst_ids.iter() {
lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
}
let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
self.compact_generate_sst_from_iter(
TwoMergeIterator::create(upper_iter, lower_iter)?,
task.compact_to_bottom_level(),
)
}
},
}) => self.compact_two_levels(
&snapshot,
upper_level,
upper_level_sst_ids,
&1,
lower_level_sst_ids,
task.compact_to_bottom_level(),
),
CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => {
let mut iters = Vec::with_capacity(tiers.len());
for (_, tier_sst_ids) in tiers {
Expand Down
31 changes: 23 additions & 8 deletions mini-lsm-mvcc/src/compact/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ impl LeveledCompactionController {
sst_ids: &[usize],
in_level: usize,
) -> Vec<usize> {
let mut sstables = Vec::with_capacity(snapshot.levels[in_level - 1].1.len());
for sst_id in &snapshot.levels[in_level - 1].1 {
sstables.push(snapshot.sstables[sst_id].clone());
}
if sstables.is_empty() {
return Vec::new();
}

let begin_key = sst_ids
.iter()
.map(|id| snapshot.sstables[id].first_key())
Expand All @@ -63,15 +71,22 @@ impl LeveledCompactionController {
.max()
.cloned()
.unwrap();

let mut overlap_ssts = Vec::new();
for sst_id in &snapshot.levels[in_level - 1].1 {
let sst = &snapshot.sstables[sst_id];
let first_key = sst.first_key();
let last_key = sst.last_key();
if !(last_key < &begin_key || first_key > &end_key) {
overlap_ssts.push(*sst_id);
}
}
let lower = sstables.partition_point(|table| *table.last_key() < begin_key);
let upper = sstables
.partition_point(|table| *table.first_key() <= end_key)
.saturating_sub(1);
println!(
"[leveled compaction] level {} find overlapping ssts, lower: {}, upper: {}",
in_level, lower, upper
);
overlap_ssts.extend(
sstables[lower..=upper]
.iter()
.map(|table| table.sst_id())
.collect::<Vec<usize>>(),
);
overlap_ssts
}

Expand Down
124 changes: 71 additions & 53 deletions mini-lsm/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,61 @@ impl LsmStorageInner {
Ok(new_sst)
}

fn compact_two_levels(
&self,
snapshot: &LsmStorageState,
upper_level: &Option<usize>,
upper_level_sst_ids: &Vec<usize>,
_lower_level: &usize,
lower_level_sst_ids: &Vec<usize>,
compact_to_bottom_level: bool,
) -> Result<Vec<Arc<SsTable>>> {
match upper_level {
Some(_) => {
let mut upper_sstables: Vec<Arc<SsTable>> =
Vec::with_capacity(upper_level_sst_ids.len());
for sst_id in upper_level_sst_ids {
upper_sstables.push(snapshot.sstables[sst_id].clone());
}
let mut lower_sstables: Vec<Arc<SsTable>> =
Vec::with_capacity(lower_level_sst_ids.len());
for sst_id in lower_level_sst_ids {
lower_sstables.push(snapshot.sstables[sst_id].clone());
}

self.compact_generate_sst_from_iter(
TwoMergeIterator::create(
SstConcatIterator::create_and_seek_to_first(upper_sstables)?,
SstConcatIterator::create_and_seek_to_first(lower_sstables)?,
)?,
compact_to_bottom_level,
)
}
None => {
let mut upper_iters: Vec<Box<SsTableIterator>> =
Vec::with_capacity(upper_level_sst_ids.len());
for sst_id in upper_level_sst_ids {
upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
snapshot.sstables[sst_id].clone(),
)?));
}
let mut lower_sstables: Vec<Arc<SsTable>> =
Vec::with_capacity(lower_level_sst_ids.len());
for sst_id in lower_level_sst_ids {
lower_sstables.push(snapshot.sstables[sst_id].clone());
}

self.compact_generate_sst_from_iter(
TwoMergeIterator::create(
MergeIterator::create(upper_iters),
SstConcatIterator::create_and_seek_to_first(lower_sstables)?,
)?,
compact_to_bottom_level,
)
}
}
}

fn compact(&self, task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
let snapshot = {
let state = self.state.read();
Expand All @@ -181,23 +236,14 @@ impl LsmStorageInner {
CompactionTask::ForceFullCompaction {
l0_sstables,
l1_sstables,
} => {
let mut l0_iters = Vec::with_capacity(l0_sstables.len());
for id in l0_sstables.iter() {
l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
snapshot.sstables.get(id).unwrap().clone(),
)?));
}
let mut l1_iters = Vec::with_capacity(l1_sstables.len());
for id in l1_sstables.iter() {
l1_iters.push(snapshot.sstables.get(id).unwrap().clone());
}
let iter = TwoMergeIterator::create(
MergeIterator::create(l0_iters),
SstConcatIterator::create_and_seek_to_first(l1_iters)?,
)?;
self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level())
}
} => self.compact_two_levels(
&snapshot,
&None,
l0_sstables,
&1,
l1_sstables,
task.compact_to_bottom_level(),
),
CompactionTask::Simple(SimpleLeveledCompactionTask {
upper_level,
upper_level_sst_ids,
Expand All @@ -211,42 +257,14 @@ impl LsmStorageInner {
lower_level: _,
lower_level_sst_ids,
..
}) => match upper_level {
Some(_) => {
let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len());
for id in upper_level_sst_ids.iter() {
upper_ssts.push(snapshot.sstables.get(id).unwrap().clone());
}
let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?;
let mut lower_ssts = Vec::with_capacity(lower_level_sst_ids.len());
for id in lower_level_sst_ids.iter() {
lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
}
let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
self.compact_generate_sst_from_iter(
TwoMergeIterator::create(upper_iter, lower_iter)?,
task.compact_to_bottom_level(),
)
}
None => {
let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len());
for id in upper_level_sst_ids.iter() {
upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
snapshot.sstables.get(id).unwrap().clone(),
)?));
}
let upper_iter = MergeIterator::create(upper_iters);
let mut lower_ssts = Vec::with_capacity(lower_level_sst_ids.len());
for id in lower_level_sst_ids.iter() {
lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
}
let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
self.compact_generate_sst_from_iter(
TwoMergeIterator::create(upper_iter, lower_iter)?,
task.compact_to_bottom_level(),
)
}
},
}) => self.compact_two_levels(
&snapshot,
upper_level,
upper_level_sst_ids,
&1,
lower_level_sst_ids,
task.compact_to_bottom_level(),
),
CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => {
let mut iters = Vec::with_capacity(tiers.len());
for (_, tier_sst_ids) in tiers {
Expand Down
31 changes: 23 additions & 8 deletions mini-lsm/src/compact/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ impl LeveledCompactionController {
sst_ids: &[usize],
in_level: usize,
) -> Vec<usize> {
let mut sstables = Vec::with_capacity(snapshot.levels[in_level - 1].1.len());
for sst_id in &snapshot.levels[in_level - 1].1 {
sstables.push(snapshot.sstables[sst_id].clone());
}
if sstables.is_empty() {
return Vec::new();
}

let begin_key = sst_ids
.iter()
.map(|id| snapshot.sstables[id].first_key())
Expand All @@ -63,15 +71,22 @@ impl LeveledCompactionController {
.max()
.cloned()
.unwrap();

let mut overlap_ssts = Vec::new();
for sst_id in &snapshot.levels[in_level - 1].1 {
let sst = &snapshot.sstables[sst_id];
let first_key = sst.first_key();
let last_key = sst.last_key();
if !(last_key < &begin_key || first_key > &end_key) {
overlap_ssts.push(*sst_id);
}
}
let lower = sstables.partition_point(|table| *table.last_key() < begin_key);
let upper = sstables
.partition_point(|table| *table.first_key() <= end_key)
.saturating_sub(1);
println!(
"[leveled compaction] level {} find overlapping ssts, lower: {}, upper: {}",
in_level, lower, upper
);
overlap_ssts.extend(
sstables[lower..=upper]
.iter()
.map(|table| table.sst_id())
.collect::<Vec<usize>>(),
);
overlap_ssts
}

Expand Down