Skip to content

Commit ce9f9dd

Browse files
authored
Keep last flushed memtable for history (#360)
1 parent 685af19 commit ce9f9dd

1 file changed

Lines changed: 99 additions & 26 deletions

File tree

src/lsm.rs

Lines changed: 99 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,18 @@ pub trait CompactionOperations: Send + Sync {
7676
/// progressively larger SSTables with non-overlapping key ranges (except L0).
7777
/// - **Compaction**: Background process that merges SSTables to maintain read performance and
7878
/// remove deleted entries.
79+
///
80+
/// # Lock Ordering
81+
///
82+
/// To prevent deadlocks, locks must be acquired in this order:
83+
/// 1. `active_memtable` - serializes writes
84+
/// 2. `level_manifest` - SST metadata and table IDs
85+
/// 3. `immutable_memtables` - pending flush queue
86+
/// 4. `flushed_history` - conflict detection history
87+
///
88+
/// Read locks and write locks follow the same ordering.
89+
/// If a function needs multiple locks, it must acquire them in this order.
90+
/// See `rotate_memtable()`, `flush_immutable_to_sst()` for examples.
7991
pub(crate) struct CoreInner {
8092
/// The active memtable (write buffer) that receives all new writes.
8193
///
@@ -93,6 +105,13 @@ pub(crate) struct CoreInner {
93105
/// to flush them to disk as SSTables.
94106
pub(crate) immutable_memtables: Arc<RwLock<ImmutableMemtables>>,
95107

108+
/// Most recently flushed memtable kept for conflict detection.
109+
/// When a memtable is flushed to SST, it's moved here so that long-running
110+
/// transactions can still detect conflicts against recently flushed data.
111+
/// This prevents spurious TransactionRetry errors when immutable_memtables
112+
/// becomes empty after a flush.
113+
pub(crate) flushed_history: Arc<RwLock<Option<Arc<MemTable>>>>,
114+
96115
/// The level structure managing all SSTables on disk.
97116
///
98117
/// LSM trees organize SSTables into levels:
@@ -195,21 +214,30 @@ impl CoreInner {
195214
lockfile: Mutex::new(lockfile),
196215
error_handler: Arc::new(BackgroundErrorHandler::new()),
197216
visible_seq_num,
217+
flushed_history: Arc::new(RwLock::new(None)),
198218
})
199219
}
200220

201221
/// Get the earliest sequence number across all memtables.
202-
/// Any key with seq >= this value is guaranteed to be in memtables.
203-
/// Keys modified with seq < this value may have been flushed to SST.
222+
/// Any key with seq >= this value is guaranteed to be in memtables or flushed history.
204223
pub(crate) fn get_earliest_memtable_seq(&self) -> Result<u64> {
205-
// Check immutable memtables - the first (oldest by table_id) has the earliest seq
206-
let immutables = self.immutable_memtables.read()?;
207-
if let Some(oldest) = immutables.first() {
208-
return Ok(oldest.memtable.earliest_seq());
224+
// Check flushed history first (has oldest data)
225+
{
226+
let history = self.flushed_history.read()?;
227+
if let Some(ref oldest) = *history {
228+
return Ok(oldest.earliest_seq());
229+
}
230+
}
231+
232+
// Then check immutable memtables
233+
{
234+
let immutables = self.immutable_memtables.read()?;
235+
if let Some(oldest) = immutables.first() {
236+
return Ok(oldest.memtable.earliest_seq());
237+
}
209238
}
210-
drop(immutables);
211239

212-
// No immutables - use active memtable
240+
// Fall back to active memtable
213241
let memtable = self.active_memtable.read()?;
214242
Ok(memtable.earliest_seq())
215243
}
@@ -218,9 +246,10 @@ impl CoreInner {
218246
where
219247
I: Iterator<Item = &'a [u8]>,
220248
{
221-
// Acquire locks once for all keys - this is the key optimization
249+
// Lock order: active → immutables → history
222250
let memtable = self.active_memtable.read()?;
223251
let immutables = self.immutable_memtables.read()?;
252+
let history = self.flushed_history.read()?;
224253

225254
for key in keys {
226255
// Check active memtable first (most recent writes)
@@ -233,16 +262,31 @@ impl CoreInner {
233262
}
234263

235264
// Check immutable memtables (newest to oldest by iterating in reverse)
265+
let mut found_in_immutable = false;
236266
for entry in immutables.iter().rev() {
237267
if let Some((ikey, _)) = entry.memtable.get(key, None) {
238268
if ikey.seq_num() > start_seq {
239269
return Err(Error::TransactionWriteConflict);
240270
}
241271
// Key exists but was written before our transaction started - no conflict
272+
found_in_immutable = true;
242273
break;
243274
}
244275
}
245-
// Key not found in any memtable - no conflict for this key
276+
if found_in_immutable {
277+
continue;
278+
}
279+
280+
// Check flushed history
281+
if let Some(ref flushed) = *history {
282+
if let Some((ikey, _)) = flushed.get(key, None) {
283+
if ikey.seq_num() > start_seq {
284+
return Err(Error::TransactionWriteConflict);
285+
}
286+
// Key exists but was written before our transaction started - no conflict
287+
}
288+
}
289+
// Key not found in any memtable or history - no conflict for this key
246290
}
247291

248292
// No conflicts found for any key
@@ -264,9 +308,9 @@ impl CoreInner {
264308
///
265309
/// # Returns
266310
/// The flushed SSTable
267-
fn flush_and_update_manifest(
311+
fn flush_immutable_to_sst(
268312
&self,
269-
memtable: &MemTable,
313+
memtable: Arc<MemTable>,
270314
table_id: u64,
271315
wal_number: u64,
272316
) -> Result<Arc<Table>> {
@@ -324,8 +368,10 @@ impl CoreInner {
324368
);
325369

326370
// Step 4: Apply changeset atomically
371+
// Lock order: level_manifest → immutable_memtables → flushed_history
327372
let mut manifest = self.level_manifest.write()?;
328373
let mut memtable_lock = self.immutable_memtables.write()?;
374+
let mut history_lock = self.flushed_history.write()?;
329375

330376
let rollback = manifest.apply_changeset(&changeset)?;
331377
if let Err(e) = write_manifest_to_disk(&manifest) {
@@ -340,9 +386,13 @@ impl CoreInner {
340386
return Err(error);
341387
}
342388

343-
// Remove successfully flushed memtable from tracking
389+
// Remove successfully flushed memtable from immutables tracking
344390
memtable_lock.remove(table_id);
345391

392+
// Atomically add to flushed history for conflict detection
393+
// This ensures no visibility gap where memtable is in neither collection
394+
*history_lock = Some(memtable);
395+
346396
log::info!(
347397
"Manifest updated atomically: table_id={}, log_number={}, last_sequence={}",
348398
table_id,
@@ -402,9 +452,16 @@ impl CoreInner {
402452
// Set the WAL number on the new (empty) active memtable
403453
active_memtable.set_wal_number(new_wal_number);
404454

405-
// Get table ID and track immutable memtable with its WAL number
406-
let mut immutable_memtables = self.immutable_memtables.write()?;
455+
// LOCK ORDER: Get table_id from manifest BEFORE acquiring immutable_memtables lock.
456+
// This maintains consistent ordering: level_manifest -> immutable_memtables
457+
// which matches flush_immutable_to_sst() and prevents deadlock.
458+
//
459+
// Deadlock scenario prevented:
460+
// Thread A (rotate): holds imm.write, waits manifest.read
461+
// Thread B (flush): holds manifest.write, waits imm.write
462+
// By acquiring manifest.read first, we ensure no circular wait.
407463
let table_id = self.level_manifest.read()?.next_table_id();
464+
let mut immutable_memtables = self.immutable_memtables.write()?;
408465
immutable_memtables.add(table_id, flushed_wal_number, Arc::clone(&flushed_memtable));
409466

410467
// Release locks
@@ -425,7 +482,7 @@ impl CoreInner {
425482
///
426483
/// This method:
427484
/// 1. Gets the oldest entry from immutable queue (lowest table_id)
428-
/// 2. Flushes it to SST via flush_and_update_manifest (which also removes from queue)
485+
/// 2. Flushes it to SST via flush_immutable_to_sst (which also removes from queue)
429486
/// 3. Schedules async WAL cleanup
430487
fn flush_oldest_immutable_to_sst(&self) -> Result<Option<Arc<Table>>> {
431488
// Get the oldest immutable entry (clone to release lock before I/O)
@@ -460,8 +517,11 @@ impl CoreInner {
460517
);
461518

462519
// Flush to SST (this also removes from immutable queue and updates manifest)
463-
let table =
464-
self.flush_and_update_manifest(&entry.memtable, entry.table_id, entry.wal_number)?;
520+
let table = self.flush_immutable_to_sst(
521+
Arc::clone(&entry.memtable),
522+
entry.table_id,
523+
entry.wal_number,
524+
)?;
465525

466526
// Schedule async WAL cleanup
467527
let wal_dir = self.wal.read().get_dir_path().to_path_buf();
@@ -543,6 +603,11 @@ impl CoreInner {
543603
return Ok(None);
544604
}
545605

606+
// LOCK ORDER: Get table_id from manifest BEFORE acquiring immutable_memtables lock.
607+
// This maintains consistent ordering: active -> level_manifest -> immutable_memtables
608+
// which matches flush_immutable_to_sst() and prevents deadlock with background flush.
609+
let table_id = self.level_manifest.read()?.next_table_id();
610+
546611
let mut immutable_memtables = self.immutable_memtables.write()?;
547612

548613
// Get the current WAL number for the new memtable
@@ -561,9 +626,6 @@ impl CoreInner {
561626
// Set the WAL number on the new active memtable
562627
active_memtable.set_wal_number(current_wal_number);
563628

564-
// Get table ID for the SST file
565-
let table_id = self.level_manifest.read()?.next_table_id();
566-
567629
// Get the WAL number from the memtable (set when it started receiving writes)
568630
// or use the current WAL if not set
569631
let memtable_wal_number = flushed_memtable.get_wal_number();
@@ -586,8 +648,11 @@ impl CoreInner {
586648
};
587649

588650
// Step 3: Flush the immutable memtable to disk and update manifest
589-
let table =
590-
self.flush_and_update_manifest(&flushed_memtable, table_id, wal_that_was_flushed)?;
651+
let table = self.flush_immutable_to_sst(
652+
Arc::clone(&flushed_memtable),
653+
table_id,
654+
wal_that_was_flushed,
655+
)?;
591656

592657
Ok(Some(table))
593658
}
@@ -648,7 +713,11 @@ impl CoreInner {
648713

649714
// Fail-fast: return immediately on error
650715
// WAL replay will recover this and subsequent memtables on restart
651-
self.flush_and_update_manifest(&entry.memtable, entry.table_id, entry.wal_number)?;
716+
self.flush_immutable_to_sst(
717+
Arc::clone(&entry.memtable),
718+
entry.table_id,
719+
entry.wal_number,
720+
)?;
652721

653722
flushed_count += 1;
654723
log::debug!(
@@ -1151,7 +1220,7 @@ impl Core {
11511220
|memtable, wal_number| {
11521221
// Flush intermediate memtable to SST during recovery
11531222
let table_id = inner.level_manifest.read()?.next_table_id();
1154-
inner.flush_and_update_manifest(&memtable, table_id, wal_number)?;
1223+
inner.flush_immutable_to_sst(Arc::clone(&memtable), table_id, wal_number)?;
11551224
log::info!(
11561225
"Recovery: flushed memtable to SST table_id={}, wal_number={}",
11571226
table_id,
@@ -1528,7 +1597,11 @@ impl Tree {
15281597
|memtable, wal_number| {
15291598
// Flush intermediate memtable to SST during recovery
15301599
let table_id = self.core.inner.level_manifest.read()?.next_table_id();
1531-
self.core.inner.flush_and_update_manifest(&memtable, table_id, wal_number)?;
1600+
self.core.inner.flush_immutable_to_sst(
1601+
Arc::clone(&memtable),
1602+
table_id,
1603+
wal_number,
1604+
)?;
15321605
log::info!(
15331606
"Restore: flushed memtable to SST table_id={}, wal_number={}",
15341607
table_id,

0 commit comments

Comments
 (0)