Skip to content

Commit dfd6d50

Browse files
committed
dft
1 parent 5a4ba93 commit dfd6d50

3 files changed

Lines changed: 102 additions & 8 deletions

File tree

src/compaction/leveled.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,15 @@ impl LeveledCompaction {
10231023
CompactionFilterDecision::Keep => {}
10241024
CompactionFilterDecision::Remove => continue,
10251025
CompactionFilterDecision::ChangeValue(_) => {
1026-
// force_merge_level doesn't rewrite values; treat as Keep
1026+
// force_merge_level doesn't rewrite values — the output
1027+
// SST reuses the existing encoded block. Silently treat
1028+
// as Keep. Regular level-to-level compaction handles
1029+
// ChangeValue correctly.
1030+
debug_assert!(
1031+
false,
1032+
"CompactionFilter returned ChangeValue in force_merge_level, \
1033+
which cannot rewrite values — treating as Keep"
1034+
);
10271035
}
10281036
}
10291037
}

src/db.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use crate::manifest::version_edit::{FileMetaData, VersionEdit};
2424
use crate::manifest::version_set::VersionSet;
2525
use crate::memtable::MemTable;
2626
use crate::memtable::skiplist::MemTableCursorIter;
27-
use crate::options::{CompactionFilter, CompactionFilterDecision, DbOptions, ReadOptions, WriteOptions};
27+
use crate::options::{
28+
CompactionFilter, CompactionFilterDecision, DbOptions, ReadOptions, WriteOptions,
29+
};
2830
use crate::rate_limiter::RateLimiter;
2931
use crate::sst::table_builder::{TableBuildOptions, TableBuilder};
3032
use crate::sst::table_reader::TableIterator;
@@ -1582,25 +1584,35 @@ impl DB {
15821584
/// **Note:** The key remains readable via `get()` until compaction
15831585
/// physically removes it. Use regular `delete()` if immediate
15841586
/// invisibility is required.
1587+
///
1588+
/// **Warning:** Dead keys are held in memory only and are **not**
1589+
/// persisted to WAL or SST. If the process crashes or restarts
1590+
/// before compaction removes them, the registrations are lost and
1591+
/// the keys will remain in the database permanently unless
1592+
/// re-registered.
15851593
pub fn lazy_delete(&self, key: &[u8]) {
15861594
let mut set = self.dead_keys.write().unwrap();
15871595
set.insert(key.to_vec());
15881596
}
15891597

15901598
/// Batch version of [`lazy_delete`](Self::lazy_delete).
15911599
///
1592-
/// If the number of accumulated dead keys exceeds
1600+
/// Dead keys are **not** persisted — see [`lazy_delete`](Self::lazy_delete)
1601+
/// for durability caveats.
1602+
///
1603+
/// If the number of accumulated dead keys newly crosses
15931604
/// `lazy_delete_compaction_threshold`, a background compaction is
15941605
/// automatically signalled.
1595-
pub fn lazy_delete_batch(&self, keys: &[Vec<u8>]) {
1606+
pub fn lazy_delete_batch(&self, keys: impl IntoIterator<Item = impl AsRef<[u8]>>) {
15961607
let threshold = self.options.lazy_delete_compaction_threshold;
15971608
let mut set = self.dead_keys.write().unwrap();
1609+
let prev_len = set.len();
15981610
for k in keys {
1599-
set.insert(k.clone());
1611+
set.insert(k.as_ref().to_vec());
16001612
}
16011613
let len = set.len();
16021614
drop(set);
1603-
if threshold > 0 && len >= threshold {
1615+
if threshold > 0 && len >= threshold && prev_len < threshold {
16041616
self.signal_compaction();
16051617
}
16061618
}
@@ -1610,8 +1622,13 @@ impl DB {
16101622
self.dead_keys.read().unwrap().len()
16111623
}
16121624

1613-
/// Clear all dead keys. Call after compaction has processed all levels
1614-
/// to reclaim memory used by the dead-keys set.
1625+
/// Clear all dead keys to reclaim memory used by the dead-keys set.
1626+
///
1627+
/// **Caution:** Only call this after a full compaction
1628+
/// ([`compact_range(None, None)`](Self::compact_range)) has
1629+
/// finished. Any dead keys that have not yet been visited by
1630+
/// compaction will be silently abandoned — those keys will remain
1631+
/// in the database and will not be removed unless re-registered.
16151632
pub fn clear_dead_keys(&self) {
16161633
self.dead_keys.write().unwrap().clear();
16171634
}

tests/lazy_delete.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,72 @@ fn clear_dead_keys_resets_count() {
9797
db.clear_dead_keys();
9898
assert_eq!(db.dead_key_count(), 0);
9999
}
100+
101+
/// Verify that `lazy_delete_batch` triggers background compaction when the
102+
/// dead-key count newly crosses `lazy_delete_compaction_threshold`.
103+
#[test]
104+
fn lazy_delete_batch_auto_triggers_compaction() {
105+
let dir = tempfile::tempdir().unwrap();
106+
let db = DB::open(
107+
DbOptions {
108+
create_if_missing: true,
109+
write_buffer_size: 1024,
110+
lazy_delete_compaction_threshold: 5,
111+
..Default::default()
112+
},
113+
dir.path(),
114+
)
115+
.unwrap();
116+
117+
// Write enough data across flushes so that compaction has work to do.
118+
for i in 0u32..20 {
119+
db.put(&i.to_be_bytes(), &[i as u8; 64]).unwrap();
120+
}
121+
122+
// Register 6 keys — crosses the threshold of 5, should signal compaction.
123+
let dead: Vec<Vec<u8>> = (0u32..6).map(|i| i.to_be_bytes().to_vec()).collect();
124+
db.lazy_delete_batch(&dead);
125+
126+
// Give background compaction a moment to run.
127+
std::thread::sleep(std::time::Duration::from_secs(2));
128+
129+
// The dead keys should have been removed by background compaction.
130+
for i in 0u32..6 {
131+
assert!(
132+
db.get(&i.to_be_bytes()).unwrap().is_none(),
133+
"key {} should have been removed by auto-triggered compaction",
134+
i
135+
);
136+
}
137+
138+
// Surviving keys should still be readable.
139+
for i in 6u32..20 {
140+
assert!(
141+
db.get(&i.to_be_bytes()).unwrap().is_some(),
142+
"key {} should survive",
143+
i
144+
);
145+
}
146+
}
147+
148+
/// Verify that `lazy_delete_batch` accepts `&[&[u8]]` slices (not just
149+
/// `Vec<u8>`), confirming the generic `impl AsRef<[u8]>` signature works.
150+
#[test]
151+
fn lazy_delete_batch_accepts_slices() {
152+
let dir = tempfile::tempdir().unwrap();
153+
let db = make_db(dir.path());
154+
155+
db.put(b"a", b"1").unwrap();
156+
db.put(b"pad", &[0u8; 1024]).unwrap();
157+
db.put(b"b", b"2").unwrap();
158+
159+
// Pass &[u8] slices — must compile with the generic signature.
160+
let keys: Vec<&[u8]> = vec![b"a", b"b"];
161+
db.lazy_delete_batch(keys);
162+
assert_eq!(db.dead_key_count(), 2);
163+
164+
db.compact_range(None::<&[u8]>, None::<&[u8]>).unwrap();
165+
166+
assert!(db.get(b"a").unwrap().is_none());
167+
assert!(db.get(b"b").unwrap().is_none());
168+
}

0 commit comments

Comments
 (0)