Skip to content

Commit 0a4b3fd

Browse files
committed
Move memo dropping off to another thread
1 parent 6f603ec commit 0a4b3fd

File tree

15 files changed

+258
-113
lines changed

15 files changed

+258
-113
lines changed

Diff for: components/salsa-macro-rules/src/setup_tracked_fn.rs

+2
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ macro_rules! setup_tracked_fn {
201201
&self,
202202
aux: &dyn $zalsa::JarAux,
203203
first_index: $zalsa::IngredientIndex,
204+
memo_drop_sender: $zalsa::MemoDropSender,
204205
) -> Vec<Box<dyn $zalsa::Ingredient>> {
205206
let struct_index = $zalsa::macro_if! {
206207
if $needs_interner {
@@ -217,6 +218,7 @@ macro_rules! setup_tracked_fn {
217218
struct_index,
218219
first_index,
219220
aux,
221+
memo_drop_sender
220222
);
221223
fn_ingredient.set_capacity($lru);
222224
$zalsa::macro_if! {

Diff for: src/accumulator.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use accumulated::AnyAccumulated;
1212
use crate::{
1313
cycle::CycleRecoveryStrategy,
1414
ingredient::{fmt_index, Ingredient, Jar, MaybeChangedAfter},
15-
plumbing::JarAux,
15+
plumbing::{JarAux, MemoDropSender},
1616
table::Table,
1717
zalsa::IngredientIndex,
1818
zalsa_local::QueryOrigin,
@@ -50,6 +50,7 @@ impl<A: Accumulator> Jar for JarImpl<A> {
5050
&self,
5151
_aux: &dyn JarAux,
5252
first_index: IngredientIndex,
53+
_: MemoDropSender,
5354
) -> Vec<Box<dyn Ingredient>> {
5455
vec![Box::new(<IngredientImpl<A>>::new(first_index))]
5556
}

Diff for: src/exclusive.rs

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/// `std::sync::Exclusive`
2+
pub struct Exclusive<T: ?Sized> {
3+
inner: T,
4+
}
5+
6+
unsafe impl<T> Sync for Exclusive<T> {}
7+
8+
impl<T> Exclusive<T> {
9+
pub fn new(inner: T) -> Self {
10+
Self { inner }
11+
}
12+
pub fn get_mut(&mut self) -> &mut T {
13+
&mut self.inner
14+
}
15+
}

Diff for: src/function.rs

+13-17
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{any::Any, fmt, mem::ManuallyDrop, sync::Arc};
1+
use std::{any::Any, fmt, marker::PhantomData, sync::Arc};
22

33
use crate::{
44
accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues},
@@ -7,19 +7,16 @@ use crate::{
77
key::DatabaseKeyIndex,
88
plumbing::JarAux,
99
salsa_struct::SalsaStructInDb,
10-
table::Table,
10+
table::{memo::MemoDropSender, Table},
1111
zalsa::{IngredientIndex, MemoIngredientIndex, Zalsa},
1212
zalsa_local::QueryOrigin,
1313
Cycle, Database, Id, Revision,
1414
};
1515

16-
use self::delete::DeletedEntries;
17-
1816
use super::ingredient::Ingredient;
1917

2018
mod accumulated;
2119
mod backdate;
22-
mod delete;
2320
mod diff_outputs;
2421
mod execute;
2522
mod fetch;
@@ -113,7 +110,8 @@ pub struct IngredientImpl<C: Configuration> {
113110
/// current revision: you would be right, but we are being defensive, because
114111
/// we don't know that we can trust the database to give us the same runtime
115112
/// everytime and so forth.
116-
deleted_entries: DeletedEntries<C>,
113+
delete: MemoDropSender,
114+
config: PhantomData<fn(C) -> C>,
117115
}
118116

119117
/// True if `old_value == new_value`. Invoked by the generated
@@ -127,12 +125,18 @@ impl<C> IngredientImpl<C>
127125
where
128126
C: Configuration,
129127
{
130-
pub fn new(struct_index: IngredientIndex, index: IngredientIndex, aux: &dyn JarAux) -> Self {
128+
pub fn new(
129+
struct_index: IngredientIndex,
130+
index: IngredientIndex,
131+
aux: &dyn JarAux,
132+
delete: MemoDropSender,
133+
) -> Self {
131134
Self {
132135
index,
133136
memo_ingredient_index: aux.next_memo_ingredient_index(struct_index, index),
134137
lru: Default::default(),
135-
deleted_entries: Default::default(),
138+
delete,
139+
config: PhantomData,
136140
}
137141
}
138142

@@ -171,14 +175,7 @@ where
171175
// Unsafety conditions: memo must be in the map (it's not yet, but it will be by the time this
172176
// value is returned) and anything removed from map is added to deleted entries (ensured elsewhere).
173177
let db_memo = unsafe { self.extend_memo_lifetime(&memo) };
174-
// Safety: We delay the drop of `old_value` until a new revision starts which ensures no
175-
// references will exist for the memo contents.
176-
if let Some(old_value) = unsafe { self.insert_memo_into_table_for(zalsa, id, memo) } {
177-
// In case there is a reference to the old memo out there, we have to store it
178-
// in the deleted entries. This will get cleared when a new revision starts.
179-
self.deleted_entries
180-
.push(ManuallyDrop::into_inner(old_value));
181-
}
178+
self.insert_memo_into_table_for(zalsa, id, memo);
182179
db_memo
183180
}
184181
}
@@ -236,7 +233,6 @@ where
236233
fn reset_for_new_revision(&mut self, table: &mut Table) {
237234
self.lru
238235
.for_each_evicted(|evict| self.evict_value_from_memo_for(table.memos_mut(evict)));
239-
std::mem::take(&mut self.deleted_entries);
240236
}
241237

242238
fn fmt_index(&self, index: Option<crate::Id>, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {

Diff for: src/function/delete.rs

-25
This file was deleted.

Diff for: src/function/memo.rs

+13-17
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,22 @@ impl<C: Configuration> IngredientImpl<C> {
3535
}
3636

3737
/// Inserts the memo for the given key; (atomically) overwrites and returns any previously existing memo
38-
///
39-
/// # Safety
40-
///
41-
/// The caller needs to make sure to not drop the returned value until no more references into
42-
/// the database exist as there may be outstanding borrows into the `Arc` contents.
43-
pub(super) unsafe fn insert_memo_into_table_for<'db>(
38+
pub(super) fn insert_memo_into_table_for<'db>(
4439
&'db self,
4540
zalsa: &'db Zalsa,
4641
id: Id,
4742
memo: ArcMemo<'db, C>,
48-
) -> Option<ManuallyDrop<ArcMemo<'db, C>>> {
43+
) {
4944
let static_memo = unsafe { self.to_static(memo) };
50-
let old_static_memo = unsafe {
45+
// SAFETY: We delay the deletion of the old memo until the next revision starts.
46+
let old_memo = unsafe {
5147
zalsa
5248
.memo_table_for(id)
5349
.insert(self.memo_ingredient_index, static_memo)
54-
}?;
55-
let old_static_memo = ManuallyDrop::into_inner(old_static_memo);
56-
Some(ManuallyDrop::new(unsafe { self.to_self(old_static_memo) }))
50+
};
51+
if let Some(old_memo) = old_memo {
52+
self.delete.delay(ManuallyDrop::into_inner(old_memo));
53+
}
5754
}
5855

5956
/// Loads the current memo for `key_index`. This does not hold any sort of
@@ -111,12 +108,11 @@ impl<C: Configuration> IngredientImpl<C> {
111108
}
112109
}
113110
};
114-
// SAFETY: We queue the old value for deletion, delaying its drop until the next revision bump.
115-
let old = unsafe { table.map_memo(self.memo_ingredient_index, map) };
116-
if let Some(old) = old {
117-
// In case there is a reference to the old memo out there, we have to store it
118-
// in the deleted entries. This will get cleared when a new revision starts.
119-
self.deleted_entries.push(ManuallyDrop::into_inner(old));
111+
// SAFETY: We queue the old value for deletion, delaying its drop until the next revision
112+
// bump.
113+
let old_memo = unsafe { table.map_memo(self.memo_ingredient_index, map) };
114+
if let Some(old_memo) = old_memo {
115+
self.delete.delay(ManuallyDrop::into_inner(old_memo));
120116
}
121117
}
122118
}

Diff for: src/ingredient.rs

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
use crate::{
77
accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues},
88
cycle::CycleRecoveryStrategy,
9+
plumbing::MemoDropSender,
910
table::Table,
1011
zalsa::{IngredientIndex, MemoIngredientIndex},
1112
zalsa_local::QueryOrigin,
@@ -23,6 +24,7 @@ pub trait Jar: Any {
2324
&self,
2425
aux: &dyn JarAux,
2526
first_index: IngredientIndex,
27+
memo_drop_sender: MemoDropSender,
2628
) -> Vec<Box<dyn Ingredient>>;
2729

2830
/// If this jar's first ingredient is a salsa struct, return its `TypeId`

Diff for: src/input.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::{
1717
ingredient::{fmt_index, Ingredient, MaybeChangedAfter},
1818
input::singleton::{Singleton, SingletonChoice},
1919
key::{DatabaseKeyIndex, InputDependencyIndex},
20-
plumbing::{Jar, JarAux, Stamp},
20+
plumbing::{Jar, JarAux, MemoDropSender, Stamp},
2121
table::{memo::MemoTable, sync::SyncTable, Slot, Table},
2222
zalsa::{IngredientIndex, Zalsa},
2323
zalsa_local::QueryOrigin,
@@ -58,6 +58,7 @@ impl<C: Configuration> Jar for JarImpl<C> {
5858
&self,
5959
_aux: &dyn JarAux,
6060
struct_index: crate::zalsa::IngredientIndex,
61+
_: MemoDropSender,
6162
) -> Vec<Box<dyn Ingredient>> {
6263
let struct_ingredient: IngredientImpl<C> = IngredientImpl::new(struct_index);
6364

Diff for: src/interned.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::accumulator::accumulated_map::InputAccumulatedValues;
44
use crate::durability::Durability;
55
use crate::ingredient::{fmt_index, MaybeChangedAfter};
66
use crate::key::InputDependencyIndex;
7-
use crate::plumbing::{Jar, JarAux};
7+
use crate::plumbing::{Jar, JarAux, MemoDropSender};
88
use crate::table::memo::MemoTable;
99
use crate::table::sync::SyncTable;
1010
use crate::table::{Slot, Table};
@@ -103,6 +103,7 @@ impl<C: Configuration> Jar for JarImpl<C> {
103103
&self,
104104
_aux: &dyn JarAux,
105105
first_index: IngredientIndex,
106+
_: MemoDropSender,
106107
) -> Vec<Box<dyn Ingredient>> {
107108
vec![Box::new(IngredientImpl::<C>::new(first_index)) as _]
108109
}

Diff for: src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod database;
88
mod database_impl;
99
mod durability;
1010
mod event;
11+
mod exclusive;
1112
mod function;
1213
mod hash;
1314
mod id;
@@ -90,6 +91,7 @@ pub mod plumbing {
9091
pub use crate::salsa_struct::SalsaStructInDb;
9192
pub use crate::storage::HasStorage;
9293
pub use crate::storage::Storage;
94+
pub use crate::table::memo::MemoDropSender;
9395
pub use crate::tracked_struct::TrackedStructInDb;
9496
pub use crate::update::always_update;
9597
pub use crate::update::helper::Dispatch as UpdateDispatch;

Diff for: src/runtime.rs

+64-3
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,24 @@ use std::{
55
atomic::{AtomicBool, Ordering},
66
Arc,
77
},
8-
thread::ThreadId,
8+
thread::{JoinHandle, ThreadId},
99
};
1010

11+
use crossbeam::sync::{Parker, Unparker};
1112
use parking_lot::Mutex;
1213

1314
use crate::{
14-
active_query::ActiveQuery, cycle::CycleRecoveryStrategy, durability::Durability,
15-
key::DatabaseKeyIndex, revision::AtomicRevision, table::Table, zalsa_local::ZalsaLocal,
15+
active_query::ActiveQuery,
16+
cycle::CycleRecoveryStrategy,
17+
durability::Durability,
18+
exclusive::Exclusive,
19+
key::DatabaseKeyIndex,
20+
revision::AtomicRevision,
21+
table::{
22+
memo::{memo_drop_channel, spawn_memo_drop_thread, MemoDropReceiver, MemoDropSender},
23+
Table,
24+
},
25+
zalsa_local::ZalsaLocal,
1626
Cancelled, Cycle, Database, Event, EventKind, Revision,
1727
};
1828

@@ -43,6 +53,14 @@ pub struct Runtime {
4353

4454
/// Data for instances
4555
table: Table,
56+
// DROP ORDER: `memo_drop_sender` and `unparker` need to drop before `memo_drop_thread`
57+
// as `memo_drop_thread` exits once `memo_drop_sender` drops and its last parker barrier unparks
58+
memo_drop_unparker: UnparkOnDrop,
59+
memo_drop_sender: MemoDropSender,
60+
memo_drop_receiver: Exclusive<Option<MemoDropReceiver>>,
61+
memo_drop_thread: JoinOnDrop,
62+
// Do this once we can do via TAIT instead of storing the receiver temporarily in a field.
63+
// memo_drop_thread: LazyLock<Thread, impl FnOnce() -> Thread>,
4664
}
4765

4866
#[derive(Clone, Debug)]
@@ -80,11 +98,20 @@ impl<V> StampedValue<V> {
8098

8199
impl Default for Runtime {
82100
fn default() -> Self {
101+
let (memo_drop_sender, memo_drop_receiver) = memo_drop_channel();
102+
103+
let parker = Parker::new();
104+
let unparker = UnparkOnDrop(parker.unparker().clone());
105+
memo_drop_sender.park(parker);
83106
Runtime {
84107
revisions: [const { AtomicRevision::start() }; Durability::LEN],
85108
revision_canceled: Default::default(),
86109
dependency_graph: Default::default(),
87110
table: Default::default(),
111+
memo_drop_unparker: unparker,
112+
memo_drop_sender,
113+
memo_drop_receiver: Exclusive::new(Some(memo_drop_receiver)),
114+
memo_drop_thread: Default::default(),
88115
}
89116
}
90117
}
@@ -151,9 +178,20 @@ impl Runtime {
151178
let r_new = r_old.next();
152179
self.revisions[0].store(r_new);
153180
self.revision_canceled.store(false, Ordering::Release);
181+
self.memo_drop_thread.0.get_or_insert_with(|| {
182+
spawn_memo_drop_thread(self.memo_drop_receiver.get_mut().take().unwrap())
183+
});
154184
r_new
155185
}
156186

187+
/// Releases the previous barrier and acquires a new one, effectively kicking off a destruction
188+
/// cycle for all collected memos up to this point.
189+
pub(crate) fn memo_drop_barrier(&mut self) {
190+
let parker = Parker::new();
191+
mem::replace(&mut self.memo_drop_unparker.0, parker.unparker().clone()).unpark();
192+
self.memo_drop_sender.park(parker);
193+
}
194+
157195
/// Block until `other_id` completes executing `database_key`;
158196
/// panic or unwind in the case of a cycle.
159197
///
@@ -353,4 +391,27 @@ impl Runtime {
353391
.lock()
354392
.unblock_runtimes_blocked_on(database_key, wait_result);
355393
}
394+
395+
pub(crate) fn memo_drop_sender(&self) -> MemoDropSender {
396+
self.memo_drop_sender.clone()
397+
}
398+
}
399+
400+
#[derive(Default)]
401+
struct JoinOnDrop(Option<JoinHandle<()>>);
402+
403+
impl Drop for JoinOnDrop {
404+
fn drop(&mut self) {
405+
if let Some(drop_thread) = self.0.take() {
406+
drop_thread.join().unwrap();
407+
}
408+
}
409+
}
410+
411+
struct UnparkOnDrop(Unparker);
412+
413+
impl Drop for UnparkOnDrop {
414+
fn drop(&mut self) {
415+
self.0.unpark();
416+
}
356417
}

0 commit comments

Comments
 (0)