diff --git a/Cargo.toml b/Cargo.toml index c78fd2f93..0f65c36fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ rust-version = "1.76" arc-swap = "1" compact_str = { version = "0.8", optional = true } crossbeam-queue = "0.3.11" +crossbeam-utils = "0.8.21" dashmap = { version = "6", features = ["raw-api"] } hashlink = "0.9" hashbrown = "0.14.3" diff --git a/benches/compare.rs b/benches/compare.rs index d2ed9e1b7..1aee2c64b 100644 --- a/benches/compare.rs +++ b/benches/compare.rs @@ -3,7 +3,7 @@ use std::mem::transmute; use codspeed_criterion_compat::{ criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, }; -use salsa::Setter; +use salsa::{Database, Setter}; #[salsa::input] pub struct Input { @@ -34,13 +34,16 @@ fn mutating_inputs(c: &mut Criterion) { group.bench_function(BenchmarkId::new("mutating", n), |b| { b.iter_batched_ref( || { - let db = salsa::DatabaseImpl::default(); + let mut db = salsa::DatabaseImpl::default(); let base_string = "hello, world!".to_owned(); let base_len = base_string.len(); let string = base_string.clone().repeat(*n); let new_len = string.len(); + // spawn the LRU thread + db.synthetic_write(salsa::Durability::HIGH); + let input = Input::new(&db, base_string.clone()); let actual_len = length(&db, input); assert_eq!(base_len, actual_len); diff --git a/benches/incremental.rs b/benches/incremental.rs index 5e5aa5f42..1f5090307 100644 --- a/benches/incremental.rs +++ b/benches/incremental.rs @@ -1,5 +1,5 @@ use codspeed_criterion_compat::{criterion_group, criterion_main, BatchSize, Criterion}; -use salsa::Setter; +use salsa::{Database, Setter}; #[salsa::input] struct Input { @@ -26,7 +26,9 @@ fn many_tracked_structs(criterion: &mut Criterion) { criterion.bench_function("many_tracked_structs", |b| { b.iter_batched_ref( || { - let db = salsa::DatabaseImpl::new(); + let mut db = salsa::DatabaseImpl::new(); + // spawn the LRU thread + db.synthetic_write(salsa::Durability::HIGH); let input = Input::new(&db, 1_000); let input2 = Input::new(&db, 1); diff --git a/components/salsa-macro-rules/src/setup_tracked_fn.rs b/components/salsa-macro-rules/src/setup_tracked_fn.rs index 68bc68e0a..5200cf776 100644 --- a/components/salsa-macro-rules/src/setup_tracked_fn.rs +++ b/components/salsa-macro-rules/src/setup_tracked_fn.rs @@ -211,6 +211,7 @@ macro_rules! setup_tracked_fn { &self, aux: &dyn $zalsa::JarAux, first_index: $zalsa::IngredientIndex, + memo_drop_sender: $zalsa::MemoDropSender, ) -> Vec> { let struct_index = $zalsa::macro_if! { if $needs_interner { @@ -227,6 +228,7 @@ macro_rules! setup_tracked_fn { struct_index, first_index, aux, + memo_drop_sender ); fn_ingredient.set_capacity($lru); $zalsa::macro_if! { diff --git a/src/accumulator.rs b/src/accumulator.rs index 3518c9106..e5c2136ec 100644 --- a/src/accumulator.rs +++ b/src/accumulator.rs @@ -12,7 +12,8 @@ use accumulated::AnyAccumulated; use crate::{ cycle::CycleRecoveryStrategy, ingredient::{fmt_index, Ingredient, Jar, MaybeChangedAfter}, - plumbing::JarAux, + plumbing::{JarAux, MemoDropSender}, + table::Table, zalsa::IngredientIndex, zalsa_local::QueryOrigin, Database, DatabaseKeyIndex, Id, Revision, @@ -49,6 +50,7 @@ impl Jar for JarImpl { &self, _aux: &dyn JarAux, first_index: IngredientIndex, + _: MemoDropSender, ) -> Vec> { vec![Box::new(>::new(first_index))] } @@ -137,7 +139,7 @@ impl Ingredient for IngredientImpl { false } - fn reset_for_new_revision(&mut self) { + fn reset_for_new_revision(&mut self, _: &mut Table) { panic!("unexpected reset on accumulator") } diff --git a/src/exclusive.rs b/src/exclusive.rs new file mode 100644 index 000000000..aafb2951f --- /dev/null +++ b/src/exclusive.rs @@ -0,0 +1,20 @@ +//! Bare-bones polyfill for the unstable [`std::sync::Exclusive`] type. + +pub struct Exclusive { + inner: T, +} + +// SAFETY: We only hand out mutable access to the inner value through a mutable reference to the +// wrapper. +// Therefore we cannot alias the inner value making it trivially sync. +unsafe impl Sync for Exclusive {} + +impl Exclusive { + pub fn new(inner: T) -> Self { + Self { inner } + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} diff --git a/src/function.rs b/src/function.rs index 639b1819b..6cc6b4aae 100644 --- a/src/function.rs +++ b/src/function.rs @@ -1,4 +1,4 @@ -use std::{any::Any, fmt, sync::Arc}; +use std::{any::Any, fmt, marker::PhantomData, sync::Arc}; use crate::{ accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues}, @@ -7,18 +7,16 @@ use crate::{ key::DatabaseKeyIndex, plumbing::JarAux, salsa_struct::SalsaStructInDb, + table::{memo::MemoDropSender, Table}, zalsa::{IngredientIndex, MemoIngredientIndex, Zalsa}, zalsa_local::QueryOrigin, Cycle, Database, Id, Revision, }; -use self::delete::DeletedEntries; - use super::ingredient::Ingredient; mod accumulated; mod backdate; -mod delete; mod diff_outputs; mod execute; mod fetch; @@ -112,7 +110,8 @@ pub struct IngredientImpl { /// current revision: you would be right, but we are being defensive, because /// we don't know that we can trust the database to give us the same runtime /// everytime and so forth. - deleted_entries: DeletedEntries, + delete: MemoDropSender, + config: PhantomData C>, } /// True if `old_value == new_value`. Invoked by the generated @@ -126,12 +125,18 @@ impl IngredientImpl where C: Configuration, { - pub fn new(struct_index: IngredientIndex, index: IngredientIndex, aux: &dyn JarAux) -> Self { + pub fn new( + struct_index: IngredientIndex, + index: IngredientIndex, + aux: &dyn JarAux, + delete: MemoDropSender, + ) -> Self { Self { index, memo_ingredient_index: aux.next_memo_ingredient_index(struct_index, index), lru: Default::default(), - deleted_entries: Default::default(), + delete, + config: PhantomData, } } @@ -167,16 +172,10 @@ where memo: memo::Memo>, ) -> &'db memo::Memo> { let memo = Arc::new(memo); - let db_memo = unsafe { - // Unsafety conditions: memo must be in the map (it's not yet, but it will be by the time this - // value is returned) and anything removed from map is added to deleted entries (ensured elsewhere). - self.extend_memo_lifetime(&memo) - }; - if let Some(old_value) = self.insert_memo_into_table_for(zalsa, id, memo) { - // In case there is a reference to the old memo out there, we have to store it - // in the deleted entries. This will get cleared when a new revision starts. - self.deleted_entries.push(old_value); - } + // Unsafety conditions: memo must be in the map (it's not yet, but it will be by the time this + // value is returned) and anything removed from map is added to deleted entries (ensured elsewhere). + let db_memo = unsafe { self.extend_memo_lifetime(&memo) }; + self.insert_memo_into_table_for(zalsa, id, memo); db_memo } } @@ -231,8 +230,14 @@ where true } - fn reset_for_new_revision(&mut self) { - std::mem::take(&mut self.deleted_entries); + fn reset_for_new_revision(&mut self, table: &mut Table) { + self.lru.for_each_evicted(|evict| { + Self::evict_value_from_memo_for( + table.memos_mut(evict), + self.memo_ingredient_index, + &self.delete, + ) + }); } fn fmt_index(&self, index: Option, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/src/function/delete.rs b/src/function/delete.rs deleted file mode 100644 index 01507f2e8..000000000 --- a/src/function/delete.rs +++ /dev/null @@ -1,25 +0,0 @@ -use crossbeam_queue::SegQueue; - -use super::{memo::ArcMemo, Configuration}; - -/// Stores the list of memos that have been deleted so they can be freed -/// once the next revision starts. See the comment on the field -/// `deleted_entries` of [`FunctionIngredient`][] for more details. -pub(super) struct DeletedEntries { - seg_queue: SegQueue>, -} - -impl Default for DeletedEntries { - fn default() -> Self { - Self { - seg_queue: Default::default(), - } - } -} - -impl DeletedEntries { - pub(super) fn push(&self, memo: ArcMemo<'_, C>) { - let memo = unsafe { std::mem::transmute::, ArcMemo<'static, C>>(memo) }; - self.seg_queue.push(memo); - } -} diff --git a/src/function/fetch.rs b/src/function/fetch.rs index 8ffb4639c..7828f33b9 100644 --- a/src/function/fetch.rs +++ b/src/function/fetch.rs @@ -9,7 +9,7 @@ where C: Configuration, { pub fn fetch<'db>(&'db self, db: &'db C::DbView, id: Id) -> &'db C::Output<'db> { - let (zalsa, zalsa_local) = db.zalsas(); + let zalsa_local = db.zalsa_local(); zalsa_local.unwind_if_revision_cancelled(db.as_dyn_database()); let memo = self.refresh_memo(db, id); @@ -19,9 +19,7 @@ where changed_at, } = memo.revisions.stamped_value(memo.value.as_ref().unwrap()); - if let Some(evicted) = self.lru.record_use(id) { - self.evict_value_from_memo_for(zalsa, evicted); - } + self.lru.record_use(id); zalsa_local.report_tracked_read( self.database_key_index(id).into(), diff --git a/src/function/lru.rs b/src/function/lru.rs index 6322be0e4..e9e8af7e8 100644 --- a/src/function/lru.rs +++ b/src/function/lru.rs @@ -11,31 +11,35 @@ pub(super) struct Lru { } impl Lru { - pub(super) fn record_use(&self, index: Id) -> Option { + pub(super) fn record_use(&self, index: Id) { // Relaxed should be fine, we don't need to synchronize on this. let capacity = self.capacity.load(Ordering::Relaxed); - if capacity == 0 { // LRU is disabled - return None; + return; } let mut set = self.set.lock(); set.insert(index); - if set.len() > capacity { - return set.pop_front(); - } - - None } pub(super) fn set_capacity(&self, capacity: usize) { // Relaxed should be fine, we don't need to synchronize on this. self.capacity.store(capacity, Ordering::Relaxed); + } - if capacity == 0 { - let mut set = self.set.lock(); - *set = FxLinkedHashSet::default(); + pub(super) fn for_each_evicted(&mut self, mut cb: impl FnMut(Id)) { + let set = self.set.get_mut(); + // Relaxed should be fine, we don't need to synchronize on this. + let cap = self.capacity.load(Ordering::Relaxed); + if set.len() <= cap || cap == 0 { + return; + } + while let Some(id) = set.pop_front() { + cb(id); + if set.len() <= cap { + break; + } } } } diff --git a/src/function/memo.rs b/src/function/memo.rs index 1e4c4cf98..9451ca47e 100644 --- a/src/function/memo.rs +++ b/src/function/memo.rs @@ -1,10 +1,14 @@ use std::any::Any; use std::fmt::Debug; use std::fmt::Formatter; +use std::mem::ManuallyDrop; use std::sync::Arc; use crate::accumulator::accumulated_map::InputAccumulatedValues; +use crate::plumbing::MemoDropSender; use crate::revision::AtomicRevision; +use crate::table::memo::MemoTable; +use crate::zalsa::MemoIngredientIndex; use crate::zalsa_local::QueryOrigin; use crate::{ key::DatabaseKeyIndex, zalsa::Zalsa, zalsa_local::QueryRevisions, Event, EventKind, Id, @@ -31,18 +35,23 @@ impl IngredientImpl { unsafe { std::mem::transmute(memo) } } - /// Inserts the memo for the given key; (atomically) overwrites any previously existing memo. + /// Inserts the memo for the given key; (atomically) overwrites and returns any previously existing memo pub(super) fn insert_memo_into_table_for<'db>( &'db self, zalsa: &'db Zalsa, id: Id, memo: ArcMemo<'db, C>, - ) -> Option> { + ) { let static_memo = unsafe { self.to_static(memo) }; - let old_static_memo = zalsa - .memo_table_for(id) - .insert(self.memo_ingredient_index, static_memo)?; - unsafe { Some(self.to_self(old_static_memo)) } + // SAFETY: We delay the deletion of the old memo until the next revision starts. + let old_memo = unsafe { + zalsa + .memo_table_for(id) + .insert(self.memo_ingredient_index, static_memo) + }; + if let Some(old_memo) = old_memo { + self.delete.delay(ManuallyDrop::into_inner(old_memo)); + } } /// Loads the current memo for `key_index`. This does not hold any sort of @@ -60,51 +69,35 @@ impl IngredientImpl { /// Evicts the existing memo for the given key, replacing it /// with an equivalent memo that has no value. If the memo is untracked, BaseInput, /// or has values assigned as output of another query, this has no effect. - pub(super) fn evict_value_from_memo_for<'db>(&'db self, zalsa: &'db Zalsa, id: Id) { - let old = zalsa.memo_table_for(id).map_memo::>>( - self.memo_ingredient_index, - |memo| { - match memo.revisions.origin { - QueryOrigin::Assigned(_) - | QueryOrigin::DerivedUntracked(_) - | QueryOrigin::BaseInput => { - // Careful: Cannot evict memos whose values were - // assigned as output of another query - // or those with untracked inputs - // as their values cannot be reconstructed. - memo - } - QueryOrigin::Derived(_) => { - // QueryRevisions: !Clone to discourage cloning, we need it here though - let &QueryRevisions { - changed_at, - durability, - ref origin, - ref tracked_struct_ids, - ref accumulated, - ref accumulated_inputs, - } = &memo.revisions; - // Re-assemble the memo but with the value set to `None` - Arc::new(Memo::new( - None, - memo.verified_at.load(), - QueryRevisions { - changed_at, - durability, - origin: origin.clone(), - tracked_struct_ids: tracked_struct_ids.clone(), - accumulated: accumulated.clone(), - accumulated_inputs: accumulated_inputs.clone(), - }, - )) - } + pub(super) fn evict_value_from_memo_for( + table: &mut MemoTable, + memo_ingredient_index: MemoIngredientIndex, + memo_drop: &MemoDropSender, + ) { + let map = |memo: ArcMemo<'static, C>| -> ArcMemo<'static, C> { + match &memo.revisions.origin { + QueryOrigin::Assigned(_) + | QueryOrigin::DerivedUntracked(_) + | QueryOrigin::BaseInput => { + // Careful: Cannot evict memos whose values were + // assigned as output of another query + // or those with untracked inputs + // as their values cannot be reconstructed. + memo } - }, - ); - if let Some(old) = old { - // In case there is a reference to the old memo out there, we have to store it - // in the deleted entries. This will get cleared when a new revision starts. - self.deleted_entries.push(old); + QueryOrigin::Derived(_) => { + // Note that we cannot use `Arc::get_mut` here as the use of `ArcSwap` makes it + // impossible to get unique access to the interior Arc so we need to construct + // a new allocation + Arc::new(memo.without_value()) + } + } + }; + // SAFETY: We queue the old value for deletion, delaying its drop until the next revision + // bump. + let old_memo = unsafe { table.map_memo(memo_ingredient_index, map) }; + if let Some(old_memo) = old_memo { + memo_drop.delay(ManuallyDrop::into_inner(old_memo)); } } } @@ -135,6 +128,31 @@ impl Memo { revisions, } } + + pub(super) fn without_value(&self) -> Self { + let &QueryRevisions { + changed_at, + durability, + ref origin, + ref tracked_struct_ids, + ref accumulated, + ref accumulated_inputs, + } = &self.revisions; + // Re-assemble the memo but with the value set to `None` + Memo::new( + None, + self.verified_at.load(), + QueryRevisions { + changed_at, + durability, + origin: origin.clone(), + tracked_struct_ids: tracked_struct_ids.clone(), + accumulated: accumulated.clone(), + accumulated_inputs: accumulated_inputs.clone(), + }, + ) + } + /// True if this memo is known not to have changed based on its durability. pub(super) fn check_durability(&self, zalsa: &Zalsa) -> bool { let last_changed = zalsa.last_changed_revision(self.revisions.durability); diff --git a/src/ingredient.rs b/src/ingredient.rs index 84f6d3e33..1cb94a252 100644 --- a/src/ingredient.rs +++ b/src/ingredient.rs @@ -6,6 +6,8 @@ use std::{ use crate::{ accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues}, cycle::CycleRecoveryStrategy, + plumbing::MemoDropSender, + table::Table, zalsa::{IngredientIndex, MemoIngredientIndex}, zalsa_local::QueryOrigin, Database, DatabaseKeyIndex, Id, @@ -22,6 +24,7 @@ pub trait Jar: Any { &self, aux: &dyn JarAux, first_index: IngredientIndex, + memo_drop_sender: MemoDropSender, ) -> Vec>; /// If this jar's first ingredient is a salsa struct, return its `TypeId` @@ -122,7 +125,7 @@ pub trait Ingredient: Any + std::fmt::Debug + Send + Sync { /// /// **Important:** to actually receive resets, the ingredient must set /// [`IngredientRequiresReset::RESET_ON_NEW_REVISION`] to true. - fn reset_for_new_revision(&mut self); + fn reset_for_new_revision(&mut self, table: &mut Table); fn fmt_index(&self, index: Option, fmt: &mut fmt::Formatter<'_>) -> fmt::Result; } diff --git a/src/input.rs b/src/input.rs index e671c3254..b38e2d3a0 100644 --- a/src/input.rs +++ b/src/input.rs @@ -17,7 +17,7 @@ use crate::{ ingredient::{fmt_index, Ingredient, MaybeChangedAfter}, input::singleton::{Singleton, SingletonChoice}, key::{DatabaseKeyIndex, InputDependencyIndex}, - plumbing::{Jar, JarAux, Stamp}, + plumbing::{Jar, JarAux, MemoDropSender, Stamp}, table::{memo::MemoTable, sync::SyncTable, Slot, Table}, zalsa::{IngredientIndex, Zalsa}, zalsa_local::QueryOrigin, @@ -58,6 +58,7 @@ impl Jar for JarImpl { &self, _aux: &dyn JarAux, struct_index: crate::zalsa::IngredientIndex, + _: MemoDropSender, ) -> Vec> { let struct_ingredient: IngredientImpl = IngredientImpl::new(struct_index); @@ -260,7 +261,7 @@ impl Ingredient for IngredientImpl { false } - fn reset_for_new_revision(&mut self) { + fn reset_for_new_revision(&mut self, _: &mut Table) { panic!("unexpected call to `reset_for_new_revision`") } @@ -328,6 +329,10 @@ where &self.memos } + fn memos_mut(&mut self) -> &mut crate::table::memo::MemoTable { + &mut self.memos + } + unsafe fn syncs(&self, _current_revision: Revision) -> &SyncTable { &self.syncs } diff --git a/src/input/input_field.rs b/src/input/input_field.rs index cbb511904..b22d03b2d 100644 --- a/src/input/input_field.rs +++ b/src/input/input_field.rs @@ -1,6 +1,7 @@ use crate::cycle::CycleRecoveryStrategy; use crate::ingredient::{fmt_index, Ingredient, MaybeChangedAfter}; use crate::input::Configuration; +use crate::table::Table; use crate::zalsa::IngredientIndex; use crate::zalsa_local::QueryOrigin; use crate::{Database, DatabaseKeyIndex, Id, Revision}; @@ -85,7 +86,7 @@ where false } - fn reset_for_new_revision(&mut self) { + fn reset_for_new_revision(&mut self, _: &mut Table) { panic!("unexpected call: input fields don't register for resets"); } diff --git a/src/interned.rs b/src/interned.rs index 92c358c83..90ee3f688 100644 --- a/src/interned.rs +++ b/src/interned.rs @@ -4,10 +4,10 @@ use crate::accumulator::accumulated_map::InputAccumulatedValues; use crate::durability::Durability; use crate::ingredient::{fmt_index, MaybeChangedAfter}; use crate::key::InputDependencyIndex; -use crate::plumbing::{Jar, JarAux}; +use crate::plumbing::{Jar, JarAux, MemoDropSender}; use crate::table::memo::MemoTable; use crate::table::sync::SyncTable; -use crate::table::Slot; +use crate::table::{Slot, Table}; use crate::zalsa::IngredientIndex; use crate::zalsa_local::QueryOrigin; use crate::{Database, DatabaseKeyIndex, Id}; @@ -103,6 +103,7 @@ impl Jar for JarImpl { &self, _aux: &dyn JarAux, first_index: IngredientIndex, + _: MemoDropSender, ) -> Vec> { vec![Box::new(IngredientImpl::::new(first_index)) as _] } @@ -327,7 +328,7 @@ where false } - fn reset_for_new_revision(&mut self) { + fn reset_for_new_revision(&mut self, _: &mut Table) { // Interned ingredients do not, normally, get deleted except when they are "reset" en masse. // There ARE methods (e.g., `clear_deleted_entries` and `remove`) for deleting individual // items, but those are only used for tracked struct ingredients. @@ -362,6 +363,10 @@ where &self.memos } + fn memos_mut(&mut self) -> &mut MemoTable { + &mut self.memos + } + unsafe fn syncs(&self, _current_revision: Revision) -> &crate::table::sync::SyncTable { &self.syncs } diff --git a/src/lib.rs b/src/lib.rs index 7b029f735..ed5ca141f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ mod database; mod database_impl; mod durability; mod event; +mod exclusive; mod function; mod hash; mod id; @@ -90,6 +91,7 @@ pub mod plumbing { pub use crate::salsa_struct::SalsaStructInDb; pub use crate::storage::HasStorage; pub use crate::storage::Storage; + pub use crate::table::memo::MemoDropSender; pub use crate::tracked_struct::TrackedStructInDb; pub use crate::update::always_update; pub use crate::update::helper::Dispatch as UpdateDispatch; diff --git a/src/runtime.rs b/src/runtime.rs index ae07afa4e..77ffa3dba 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -5,14 +5,24 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - thread::ThreadId, + thread::{JoinHandle, ThreadId}, }; +use crossbeam_utils::sync::{Parker, Unparker}; use parking_lot::Mutex; use crate::{ - active_query::ActiveQuery, cycle::CycleRecoveryStrategy, durability::Durability, - key::DatabaseKeyIndex, revision::AtomicRevision, table::Table, zalsa_local::ZalsaLocal, + active_query::ActiveQuery, + cycle::CycleRecoveryStrategy, + durability::Durability, + exclusive::Exclusive, + key::DatabaseKeyIndex, + revision::AtomicRevision, + table::{ + memo::{memo_drop_channel, spawn_memo_drop_thread, MemoDropReceiver, MemoDropSender}, + Table, + }, + zalsa_local::ZalsaLocal, Cancelled, Cycle, Database, Event, EventKind, Revision, }; @@ -43,6 +53,14 @@ pub struct Runtime { /// Data for instances table: Table, + // DROP ORDER: `memo_drop_sender` and `unparker` need to drop before `memo_drop_thread` + // as `memo_drop_thread` exits once `memo_drop_sender` drops and its last parker barrier unparks + memo_drop_unparker: UnparkOnDrop, + memo_drop_sender: MemoDropSender, + memo_drop_receiver: Exclusive>, + memo_drop_thread: JoinOnDrop, + // Replace `memo_drop_receiver` and `memo_drop_thread` with the following once TAIT are stable. + // memo_drop_thread: LazyCell JoinOnDrop>, } #[derive(Clone, Debug)] @@ -80,11 +98,20 @@ impl StampedValue { impl Default for Runtime { fn default() -> Self { + let (memo_drop_sender, memo_drop_receiver) = memo_drop_channel(); + + let parker = Parker::new(); + let unparker = UnparkOnDrop(parker.unparker().clone()); + memo_drop_sender.park(parker); Runtime { revisions: [const { AtomicRevision::start() }; Durability::LEN], revision_canceled: Default::default(), dependency_graph: Default::default(), table: Default::default(), + memo_drop_unparker: unparker, + memo_drop_sender, + memo_drop_receiver: Exclusive::new(Some(memo_drop_receiver)), + memo_drop_thread: Default::default(), } } } @@ -138,6 +165,10 @@ impl Runtime { &self.table } + pub(crate) fn table_mut(&mut self) -> &mut Table { + &mut self.table + } + /// Increments the "current revision" counter and clears /// the cancellation flag. /// @@ -147,9 +178,20 @@ impl Runtime { let r_new = r_old.next(); self.revisions[0].store(r_new); self.revision_canceled.store(false, Ordering::Release); + self.memo_drop_thread.0.get_or_insert_with(|| { + spawn_memo_drop_thread(self.memo_drop_receiver.get_mut().take().unwrap()) + }); r_new } + /// Releases the previous barrier and acquires a new one, effectively kicking off a destruction + /// cycle for all collected memos up to this point. + pub(crate) fn memo_drop_barrier(&mut self) { + let parker = Parker::new(); + mem::replace(&mut self.memo_drop_unparker.0, parker.unparker().clone()).unpark(); + self.memo_drop_sender.park(parker); + } + /// Block until `other_id` completes executing `database_key`; /// panic or unwind in the case of a cycle. /// @@ -349,4 +391,27 @@ impl Runtime { .lock() .unblock_runtimes_blocked_on(database_key, wait_result); } + + pub(crate) fn memo_drop_sender(&self) -> MemoDropSender { + self.memo_drop_sender.clone() + } +} + +#[derive(Default)] +struct JoinOnDrop(Option>); + +impl Drop for JoinOnDrop { + fn drop(&mut self) { + if let Some(drop_thread) = self.0.take() { + drop_thread.join().unwrap(); + } + } +} + +struct UnparkOnDrop(Unparker); + +impl Drop for UnparkOnDrop { + fn drop(&mut self) { + self.0.unpark(); + } } diff --git a/src/table.rs b/src/table.rs index 1b88320e7..16b75130c 100644 --- a/src/table.rs +++ b/src/table.rs @@ -23,7 +23,7 @@ const PAGE_LEN_MASK: usize = PAGE_LEN - 1; const PAGE_LEN: usize = 1 << PAGE_LEN_BITS; const MAX_PAGES: usize = 1 << (32 - PAGE_LEN_BITS); -pub(crate) struct Table { +pub struct Table { pub(crate) pages: AppendOnlyVec>, } @@ -37,6 +37,9 @@ pub(crate) trait TablePage: Any + Send + Sync { /// The `current_revision` MUST be the current revision of the database owning this table page. unsafe fn memos(&self, slot: SlotIndex, current_revision: Revision) -> &MemoTable; + /// Access the memos attached to `slot`. + fn memos_mut(&mut self, slot: SlotIndex) -> &mut MemoTable; + /// Access the syncs attached to `slot`. /// /// # Safety condition @@ -75,6 +78,9 @@ pub(crate) trait Slot: Any + Send + Sync { /// The current revision MUST be the current revision of the database containing this slot. unsafe fn memos(&self, current_revision: Revision) -> &MemoTable; + /// Mutably access the [`MemoTable`] for this slot. + fn memos_mut(&mut self) -> &mut MemoTable; + /// Access the [`SyncTable`][] for this slot. /// /// # Safety condition @@ -123,7 +129,7 @@ impl Table { /// # Panics /// /// If `id` is out of bounds or the does not have the type `T`. - pub fn get(&self, id: Id) -> &T { + pub(crate) fn get(&self, id: Id) -> &T { let (page, slot) = split_id(id); let page_ref = self.page::(page); page_ref.get(slot) @@ -138,7 +144,7 @@ impl Table { /// # Safety /// /// See [`Page::get_raw`][]. - pub fn get_raw(&self, id: Id) -> *mut T { + pub(crate) fn get_raw(&self, id: Id) -> *mut T { let (page, slot) = split_id(id); let page_ref = self.page::(page); page_ref.get_raw(slot) @@ -149,12 +155,12 @@ impl Table { /// # Panics /// /// If `page` is out of bounds or the type `T` is incorrect. - pub fn page(&self, page: PageIndex) -> &Page { + pub(crate) fn page(&self, page: PageIndex) -> &Page { self.pages[page.0].assert_type::>() } /// Allocate a new page for the given ingredient and with slots of type `T` - pub fn push_page(&self, ingredient: IngredientIndex) -> PageIndex { + pub(crate) fn push_page(&self, ingredient: IngredientIndex) -> PageIndex { let page = Box::new(>::new(ingredient)); PageIndex::new(self.pages.push(page)) } @@ -165,18 +171,24 @@ impl Table { /// /// The parameter `current_revision` MUST be the current revision /// of the owner of database owning this table. - pub unsafe fn memos(&self, id: Id, current_revision: Revision) -> &MemoTable { + pub(crate) unsafe fn memos(&self, id: Id, current_revision: Revision) -> &MemoTable { let (page, slot) = split_id(id); self.pages[page.0].memos(slot, current_revision) } + /// Get the memo table associated with `id` + pub(crate) fn memos_mut(&mut self, id: Id) -> &mut MemoTable { + let (page, slot) = split_id(id); + self.pages[page.0].memos_mut(slot) + } + /// Get the sync table associated with `id` /// /// # Safety condition /// /// The parameter `current_revision` MUST be the current revision /// of the owner of database owning this table. - pub unsafe fn syncs(&self, id: Id, current_revision: Revision) -> &SyncTable { + pub(crate) unsafe fn syncs(&self, id: Id, current_revision: Revision) -> &SyncTable { let (page, slot) = split_id(id); self.pages[page.0].syncs(slot, current_revision) } @@ -211,6 +223,16 @@ impl Page { unsafe { (*self.data[slot.0].get()).assume_init_ref() } } + /// Returns a reference to the given slot. + /// + /// # Panics + /// + /// If slot is out of bounds + pub(crate) fn get_mut(&mut self, slot: SlotIndex) -> &mut T { + self.check_bounds(slot); + unsafe { (*self.data[slot.0].get()).assume_init_mut() } + } + pub(crate) fn slots(&self) -> impl Iterator { let len = self.allocated.load(std::sync::atomic::Ordering::Acquire); let mut idx = 0; @@ -274,6 +296,10 @@ impl TablePage for Page { self.get(slot).memos(current_revision) } + fn memos_mut(&mut self, slot: SlotIndex) -> &mut MemoTable { + self.get_mut(slot).memos_mut() + } + unsafe fn syncs(&self, slot: SlotIndex, current_revision: Revision) -> &SyncTable { self.get(slot).syncs(current_revision) } diff --git a/src/table/memo.rs b/src/table/memo.rs index e06fe853e..30544ccde 100644 --- a/src/table/memo.rs +++ b/src/table/memo.rs @@ -1,10 +1,12 @@ use std::{ any::{Any, TypeId}, fmt::Debug, + mem::ManuallyDrop, sync::Arc, }; use arc_swap::ArcSwap; +use crossbeam_utils::sync::Parker; use parking_lot::RwLock; use crate::{zalsa::MemoIngredientIndex, zalsa_local::QueryOrigin}; @@ -22,6 +24,71 @@ pub(crate) trait Memo: Any + Send + Sync + Debug { fn origin(&self) -> &QueryOrigin; } +/// An untyped memo that can only be dropped. +pub struct MemoDrop(ManuallyDrop>, unsafe fn(Arc)); + +impl MemoDrop { + pub fn new(memo: Arc) -> Self { + Self( + ManuallyDrop::new(to_dummy::(memo)), + // SAFETY: `M` is the same as used in `to_dummy` + |memo| unsafe { + from_dummy::(memo); + }, + ) + } +} + +impl Drop for MemoDrop { + fn drop(&mut self) { + // SAFETY: We only construct this type with a valid drop function pointer + unsafe { self.1(ManuallyDrop::take(&mut self.0)) }; + } +} + +pub fn spawn_memo_drop_thread(receiver: MemoDropReceiver) -> std::thread::JoinHandle<()> { + std::thread::spawn(|| { + receiver.0.into_iter().for_each(|e| match e { + MemoDropAction::Drop(memo_drop) => drop(memo_drop), + // We were instructed to park the thread, this means we have dropped everything for the + // last revision. + MemoDropAction::Park(parker) => parker.park(), + }); + }) +} + +pub fn memo_drop_channel() -> (MemoDropSender, MemoDropReceiver) { + let (tx, rx) = std::sync::mpsc::channel(); + (MemoDropSender(tx), MemoDropReceiver(rx)) +} + +#[derive(Clone)] +pub struct MemoDropSender(std::sync::mpsc::Sender); + +impl MemoDropSender { + /// Put the memo into the drop queue. + pub(crate) fn delay(&self, memo: Arc) { + self.0 + .send(MemoDropAction::Drop(MemoDrop::new(memo))) + .unwrap(); + } + + /// Emit a park barrier for the receiver side instructing it to not drop any memos following the + /// barrier until the given parker is unparked. + pub(crate) fn park(&self, parker: Parker) { + self.0.send(MemoDropAction::Park(parker)).unwrap(); + } +} + +pub struct MemoDropReceiver(std::sync::mpsc::Receiver); + +/// A drop action to be performed by the owner of the `MemoDropReceiver`. +enum MemoDropAction { + /// Drop the contained memo. + Drop(MemoDrop), + /// Park the current thread on the given parker. + Park(Parker), +} /// Wraps the data stored for a memoized entry. /// This struct has a customized Drop that will /// ensure that its `data` field is properly freed. @@ -62,28 +129,15 @@ struct MemoEntryData { struct DummyMemo {} impl MemoTable { - fn to_dummy(memo: Arc) -> Arc { - unsafe { std::mem::transmute::, Arc>(memo) } - } - - unsafe fn from_dummy(memo: Arc) -> Arc { - unsafe { std::mem::transmute::, Arc>(memo) } - } - - fn to_dyn_fn() -> fn(Arc) -> Arc { - let f: fn(Arc) -> Arc = |x| x; - unsafe { - std::mem::transmute::) -> Arc, fn(Arc) -> Arc>( - f, - ) - } - } - - pub(crate) fn insert( + /// # Safety + /// + /// The caller needs to make sure to not drop the returned value until no more references into + /// the database exist as there may be outstanding borrows into the `Arc` contents. + pub(crate) unsafe fn insert( &self, memo_ingredient_index: MemoIngredientIndex, memo: Arc, - ) -> Option> { + ) -> Option>> { // If the memo slot is already occupied, it must already have the // right type info etc, and we only need the read-lock. if let Some(MemoEntry { @@ -100,19 +154,24 @@ impl MemoTable { TypeId::of::(), "inconsistent type-id for `{memo_ingredient_index:?}`" ); - let old_memo = arc_swap.swap(Self::to_dummy(memo)); - return unsafe { Some(Self::from_dummy(old_memo)) }; + let old_memo = arc_swap.swap(to_dummy(memo)); + return Some(ManuallyDrop::new(unsafe { from_dummy(old_memo) })); } // Otherwise we need the write lock. - self.insert_cold(memo_ingredient_index, memo) + // SAFETY: The caller is responsible for dropping + unsafe { self.insert_cold(memo_ingredient_index, memo) } } - fn insert_cold( + /// # Safety + /// + /// The caller needs to make sure to not drop the returned value until no more references into + /// the database exist as there may be outstanding borrows into the `Arc` contents. + unsafe fn insert_cold( &self, memo_ingredient_index: MemoIngredientIndex, memo: Arc, - ) -> Option> { + ) -> Option>> { let mut memos = self.memos.write(); let memo_ingredient_index = memo_ingredient_index.as_usize(); if memos.len() < memo_ingredient_index + 1 { @@ -122,17 +181,19 @@ impl MemoTable { &mut memos[memo_ingredient_index].data, Some(MemoEntryData { type_id: TypeId::of::(), - to_dyn_fn: Self::to_dyn_fn::(), - arc_swap: ArcSwap::new(Self::to_dummy(memo)), + to_dyn_fn: to_dyn_fn::(), + arc_swap: ArcSwap::new(to_dummy(memo)), }), ); - old_entry.map( - |MemoEntryData { - type_id: _, - to_dyn_fn: _, - arc_swap, - }| unsafe { Self::from_dummy(arc_swap.into_inner()) }, - ) + old_entry + .map( + |MemoEntryData { + type_id: _, + to_dyn_fn: _, + arc_swap, + }| unsafe { from_dummy(arc_swap.into_inner()) }, + ) + .map(ManuallyDrop::new) } pub(crate) fn get( @@ -160,19 +221,22 @@ impl MemoTable { ); // SAFETY: type_id check asserted above - unsafe { Some(Self::from_dummy(arc_swap.load_full())) } + Some(unsafe { from_dummy(arc_swap.load_full()) }) } /// Calls `f` on the memo at `memo_ingredient_index` and replaces the memo with the result of `f`. /// If the memo is not present, `f` is not called. - pub(crate) fn map_memo( - &self, + /// + /// # Safety + /// + /// The caller needs to make sure to not drop the returned value until no more references into + /// the database exist as there may be outstanding borrows into the `Arc` contents. + pub(crate) unsafe fn map_memo( + &mut self, memo_ingredient_index: MemoIngredientIndex, f: impl FnOnce(Arc) -> Arc, - ) -> Option> { - // If the memo slot is already occupied, it must already have the - // right type info etc, and we only need the read-lock. - let memos = self.memos.read(); + ) -> Option>> { + let memos = self.memos.get_mut(); let Some(MemoEntry { data: Some(MemoEntryData { @@ -189,12 +253,24 @@ impl MemoTable { TypeId::of::(), "inconsistent type-id for `{memo_ingredient_index:?}`" ); + // arc-swap does not expose accessing the interior mutably at all unfortunately + // https://github.com/vorner/arc-swap/issues/131 + // so we are required to allocate a nwe arc within `f` instead of being able + // to swap out the interior // SAFETY: type_id check asserted above - let memo = f(unsafe { Self::from_dummy(arc_swap.load_full()) }); - Some(unsafe { Self::from_dummy::(arc_swap.swap(Self::to_dummy(memo))) }) + let memo = f(unsafe { from_dummy(arc_swap.load_full()) }); + Some(ManuallyDrop::new(unsafe { + from_dummy::(arc_swap.swap(to_dummy(memo))) + })) } - pub(crate) fn into_memos(self) -> impl Iterator)> { + /// # Safety + /// + /// The caller needs to make sure to not drop the returned value until no more references into + /// the database exist as there may be outstanding borrows into the `Arc` contents. + pub(crate) unsafe fn into_memos( + self, + ) -> impl Iterator>)> { self.memos .into_inner() .into_iter() @@ -211,7 +287,7 @@ impl MemoTable { )| { ( MemoIngredientIndex::from_usize(index), - to_dyn_fn(arc_swap.into_inner()), + ManuallyDrop::new(to_dyn_fn(arc_swap.into_inner())), ) }, ) @@ -243,3 +319,18 @@ impl std::fmt::Debug for MemoTable { f.debug_struct("MemoTable").finish() } } + +fn to_dyn_fn() -> fn(Arc) -> Arc { + let f: fn(Arc) -> Arc = |x| x as Arc; + unsafe { + std::mem::transmute::) -> Arc, fn(Arc) -> Arc>(f) + } +} + +fn to_dummy(memo: Arc) -> Arc { + unsafe { std::mem::transmute::, Arc>(memo) } +} + +unsafe fn from_dummy(memo: Arc) -> Arc { + unsafe { std::mem::transmute::, Arc>(memo) } +} diff --git a/src/tracked_struct.rs b/src/tracked_struct.rs index 9c11d1a15..b5483914e 100644 --- a/src/tracked_struct.rs +++ b/src/tracked_struct.rs @@ -1,4 +1,4 @@ -use std::{any::TypeId, fmt, hash::Hash, marker::PhantomData, ops::DerefMut}; +use std::{any::TypeId, fmt, hash::Hash, marker::PhantomData, mem::ManuallyDrop, ops::DerefMut}; use crossbeam_queue::SegQueue; use tracked_field::FieldIngredientImpl; @@ -8,7 +8,7 @@ use crate::{ cycle::CycleRecoveryStrategy, ingredient::{fmt_index, Ingredient, Jar, JarAux, MaybeChangedAfter}, key::{DatabaseKeyIndex, InputDependencyIndex}, - plumbing::ZalsaLocal, + plumbing::{MemoDropSender, ZalsaLocal}, revision::OptionalAtomicRevision, runtime::StampedValue, salsa_struct::SalsaStructInDb, @@ -111,6 +111,7 @@ impl Jar for JarImpl { &self, _aux: &dyn JarAux, struct_index: crate::zalsa::IngredientIndex, + _: MemoDropSender, ) -> Vec> { let struct_ingredient = >::new(struct_index); @@ -580,15 +581,10 @@ where None => { panic!("cannot delete write-locked id `{id:?}`; value leaked across threads"); } - + Some(r) if r == current_revision => panic!( + "cannot delete read-locked id `{id:?}`; value leaked across threads or user functions not deterministic" + ), Some(r) => { - if r == current_revision { - panic!( - "cannot delete read-locked id `{id:?}`; \ - value leaked across threads or user functions not deterministic" - ) - } - if data_ref.updated_at.compare_exchange(Some(r), None).is_err() { panic!("race occurred when deleting value `{id:?}`") } @@ -598,7 +594,10 @@ where // Take the memo table. This is safe because we have modified `data_ref.updated_at` to `None` // and the code that references the memo-table has a read-lock. let memo_table = unsafe { (*data).take_memo_table() }; - for (memo_ingredient_index, memo) in memo_table.into_memos() { + // SAFETY: We have verified that no more references to these memos exist and so we are good + // to drop them. + for (memo_ingredient_index, memo) in unsafe { memo_table.into_memos() } { + let memo = ManuallyDrop::into_inner(memo); let ingredient_index = zalsa.ingredient_index_for_memo(self.ingredient_index, memo_ingredient_index); @@ -759,7 +758,9 @@ where false } - fn reset_for_new_revision(&mut self) {} + fn reset_for_new_revision(&mut self, _: &mut Table) { + panic!("tracked struct ingredients do not require reset") + } } impl std::fmt::Debug for IngredientImpl @@ -831,6 +832,10 @@ where &self.memos } + fn memos_mut(&mut self) -> &mut crate::table::memo::MemoTable { + &mut self.memos + } + unsafe fn syncs(&self, current_revision: Revision) -> &crate::table::sync::SyncTable { // Acquiring the read lock here with the current revision // ensures that there is no danger of a race diff --git a/src/tracked_struct/tracked_field.rs b/src/tracked_struct/tracked_field.rs index a94533598..e3d1e07a0 100644 --- a/src/tracked_struct/tracked_field.rs +++ b/src/tracked_struct/tracked_field.rs @@ -2,6 +2,7 @@ use std::marker::PhantomData; use crate::{ ingredient::{Ingredient, MaybeChangedAfter}, + table::Table, zalsa::IngredientIndex, Database, Id, }; @@ -97,7 +98,7 @@ where false } - fn reset_for_new_revision(&mut self) { + fn reset_for_new_revision(&mut self, _: &mut Table) { panic!("tracked field ingredients do not require reset") } diff --git a/src/zalsa.rs b/src/zalsa.rs index cfd775d0c..09f6eb14e 100644 --- a/src/zalsa.rs +++ b/src/zalsa.rs @@ -205,7 +205,8 @@ impl Zalsa { }); if should_create { let aux = JarAuxImpl(self, &jar_map); - let ingredients = jar.create_ingredients(&aux, index); + let ingredients = + jar.create_ingredients(&aux, index, self.runtime.memo_drop_sender()); for ingredient in ingredients { let expected_index = ingredient.ingredient_index(); @@ -272,13 +273,17 @@ impl Zalsa { let new_revision = self.runtime.new_revision(); for index in self.ingredients_requiring_reset.iter() { - self.ingredients_vec[index.as_usize()].reset_for_new_revision(); + self.ingredients_vec[index.as_usize()].reset_for_new_revision(self.runtime.table_mut()); } + // Call `memo_drop_barrier` after having called `reset_for_new_revision` on all ingredients + // so that we collect all LRU evictions that happened during the reset. + self.runtime.memo_drop_barrier(); + new_revision } - /// See [`Runtime::block_on_or_unwind`][] + /// See [`Runtime::block_on_or_unwind`] pub(crate) fn block_on_or_unwind( &self, db: &dyn Database, diff --git a/tests/lru.rs b/tests/lru.rs index e8ad930db..bdd1ba68a 100644 --- a/tests/lru.rs +++ b/tests/lru.rs @@ -11,23 +11,32 @@ use common::LogDatabase; use salsa::Database as _; use test_log::test; -#[derive(Debug, PartialEq, Eq)] -struct HotPotato(u32); +#[derive(Debug)] +struct HotPotato(u32, Arc); + +impl Eq for HotPotato {} +impl PartialEq for HotPotato { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} thread_local! { - static N_POTATOES: AtomicUsize = const { AtomicUsize::new(0) } + static N_POTATOES: Arc = Arc::new(AtomicUsize::new(0)) } impl HotPotato { fn new(id: u32) -> HotPotato { - N_POTATOES.with(|n| n.fetch_add(1, Ordering::SeqCst)); - HotPotato(id) + N_POTATOES.with(|n| { + n.fetch_add(1, Ordering::SeqCst); + HotPotato(id, n.clone()) + }) } } impl Drop for HotPotato { fn drop(&mut self) { - N_POTATOES.with(|n| n.fetch_sub(1, Ordering::SeqCst)); + self.1.fetch_sub(1, Ordering::SeqCst); } } @@ -36,7 +45,7 @@ struct MyInput { field: u32, } -#[salsa::tracked(lru = 32)] +#[salsa::tracked(lru = 16)] fn get_hot_potato(db: &dyn LogDatabase, input: MyInput) -> Arc { db.push_log(format!("get_hot_potato({:?})", input.field(db))); Arc::new(HotPotato::new(input.field(db))) @@ -48,17 +57,24 @@ fn get_hot_potato2(db: &dyn LogDatabase, input: MyInput) -> u32 { get_hot_potato(db, input).0 } -#[salsa::tracked(lru = 32)] -fn get_volatile(db: &dyn LogDatabase, _input: MyInput) -> usize { - static COUNTER: AtomicUsize = AtomicUsize::new(0); - db.report_untracked_read(); - COUNTER.fetch_add(1, Ordering::SeqCst) -} - fn load_n_potatoes() -> usize { N_POTATOES.with(|n| n.load(Ordering::SeqCst)) } +fn wait_until_n_potatoes(n: usize) { + let now = std::time::Instant::now(); + while load_n_potatoes() != n { + std::thread::yield_now(); + if now.elapsed().as_secs() > 10 { + panic!( + "timed out waiting for {} potatoes, we've got {} instead", + n, + load_n_potatoes() + ); + } + } +} + #[test] fn lru_works() { let mut db = common::LoggerDatabase::default(); @@ -67,30 +83,13 @@ fn lru_works() { for i in 0..128u32 { let input = MyInput::new(&db, i); let p = get_hot_potato(&db, input); - assert_eq!(p.0, i) + assert_eq!(p.0, i); } + assert_eq!(load_n_potatoes(), 128); // trigger the GC db.synthetic_write(salsa::Durability::HIGH); - assert_eq!(load_n_potatoes(), 32); -} - -#[test] -fn lru_doesnt_break_volatile_queries() { - let db = common::LoggerDatabase::default(); - - // Create all inputs first, so that there are no revision changes among calls to `get_volatile` - let inputs: Vec = (0..128usize).map(|i| MyInput::new(&db, i as u32)).collect(); - - // Here, we check that we execute each volatile query at most once, despite - // LRU. That does mean that we have more values in DB than the LRU capacity, - // but it's much better than inconsistent results from volatile queries! - for _ in 0..3 { - for (i, input) in inputs.iter().enumerate() { - let x = get_volatile(&db, *input); - assert_eq!(x, i); - } - } + wait_until_n_potatoes(16); } #[test] @@ -98,39 +97,42 @@ fn lru_can_be_changed_at_runtime() { let mut db = common::LoggerDatabase::default(); assert_eq!(load_n_potatoes(), 0); - let inputs: Vec<(u32, MyInput)> = (0..128).map(|i| (i, MyInput::new(&db, i))).collect(); + let inputs: Vec<(u32, MyInput)> = (0..64).map(|i| (i, MyInput::new(&db, i))).collect(); for &(i, input) in inputs.iter() { let p = get_hot_potato(&db, input); - assert_eq!(p.0, i) + assert_eq!(p.0, i); } + assert_eq!(load_n_potatoes(), 64); // trigger the GC db.synthetic_write(salsa::Durability::HIGH); - assert_eq!(load_n_potatoes(), 32); + std::thread::sleep(std::time::Duration::from_millis(100)); + + wait_until_n_potatoes(16); - get_hot_potato::set_lru_capacity(&db, 64); - assert_eq!(load_n_potatoes(), 32); + get_hot_potato::set_lru_capacity(&db, 32); for &(i, input) in inputs.iter() { let p = get_hot_potato(&db, input); - assert_eq!(p.0, i) + assert_eq!(p.0, i); } + assert_eq!(load_n_potatoes(), 64); // trigger the GC db.synthetic_write(salsa::Durability::HIGH); - assert_eq!(load_n_potatoes(), 64); + wait_until_n_potatoes(32); // Special case: setting capacity to zero disables LRU get_hot_potato::set_lru_capacity(&db, 0); - assert_eq!(load_n_potatoes(), 64); for &(i, input) in inputs.iter() { let p = get_hot_potato(&db, input); - assert_eq!(p.0, i) + assert_eq!(p.0, i); } + assert_eq!(load_n_potatoes(), 64); // trigger the GC db.synthetic_write(salsa::Durability::HIGH); - assert_eq!(load_n_potatoes(), 128); + wait_until_n_potatoes(64); drop(db); assert_eq!(load_n_potatoes(), 0); @@ -139,10 +141,10 @@ fn lru_can_be_changed_at_runtime() { #[test] fn lru_keeps_dependency_info() { let mut db = common::LoggerDatabase::default(); - let capacity = 32; + let capacity = 16; // Invoke `get_hot_potato2` 33 times. This will (in turn) invoke - // `get_hot_potato`, which will trigger LRU after 32 executions. + // `get_hot_potato`, which will trigger LRU after 16 executions. let inputs: Vec = (0..(capacity + 1)) .map(|i| MyInput::new(&db, i as u32)) .collect();