Skip to content

Drop memos on separate thread #660

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rust-version = "1.76"
arc-swap = "1"
compact_str = { version = "0.8", optional = true }
crossbeam-queue = "0.3.11"
crossbeam-utils = "0.8.21"
dashmap = { version = "6", features = ["raw-api"] }
hashlink = "0.9"
hashbrown = "0.14.3"
Expand Down
7 changes: 5 additions & 2 deletions benches/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::mem::transmute;
use codspeed_criterion_compat::{
criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion,
};
use salsa::Setter;
use salsa::{Database, Setter};

#[salsa::input]
pub struct Input {
Expand Down Expand Up @@ -34,13 +34,16 @@ fn mutating_inputs(c: &mut Criterion) {
group.bench_function(BenchmarkId::new("mutating", n), |b| {
b.iter_batched_ref(
|| {
let db = salsa::DatabaseImpl::default();
let mut db = salsa::DatabaseImpl::default();
let base_string = "hello, world!".to_owned();
let base_len = base_string.len();

let string = base_string.clone().repeat(*n);
let new_len = string.len();

// spawn the LRU thread
db.synthetic_write(salsa::Durability::HIGH);

let input = Input::new(&db, base_string.clone());
let actual_len = length(&db, input);
assert_eq!(base_len, actual_len);
Expand Down
6 changes: 4 additions & 2 deletions benches/incremental.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use codspeed_criterion_compat::{criterion_group, criterion_main, BatchSize, Criterion};
use salsa::Setter;
use salsa::{Database, Setter};

#[salsa::input]
struct Input {
Expand All @@ -26,7 +26,9 @@ fn many_tracked_structs(criterion: &mut Criterion) {
criterion.bench_function("many_tracked_structs", |b| {
b.iter_batched_ref(
|| {
let db = salsa::DatabaseImpl::new();
let mut db = salsa::DatabaseImpl::new();
// spawn the LRU thread
db.synthetic_write(salsa::Durability::HIGH);

let input = Input::new(&db, 1_000);
let input2 = Input::new(&db, 1);
Expand Down
2 changes: 2 additions & 0 deletions components/salsa-macro-rules/src/setup_tracked_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ macro_rules! setup_tracked_fn {
&self,
aux: &dyn $zalsa::JarAux,
first_index: $zalsa::IngredientIndex,
memo_drop_sender: $zalsa::MemoDropSender,
) -> Vec<Box<dyn $zalsa::Ingredient>> {
let struct_index = $zalsa::macro_if! {
if $needs_interner {
Expand All @@ -227,6 +228,7 @@ macro_rules! setup_tracked_fn {
struct_index,
first_index,
aux,
memo_drop_sender
);
fn_ingredient.set_capacity($lru);
$zalsa::macro_if! {
Expand Down
6 changes: 4 additions & 2 deletions src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use accumulated::AnyAccumulated;
use crate::{
cycle::CycleRecoveryStrategy,
ingredient::{fmt_index, Ingredient, Jar, MaybeChangedAfter},
plumbing::JarAux,
plumbing::{JarAux, MemoDropSender},
table::Table,
zalsa::IngredientIndex,
zalsa_local::QueryOrigin,
Database, DatabaseKeyIndex, Id, Revision,
Expand Down Expand Up @@ -49,6 +50,7 @@ impl<A: Accumulator> Jar for JarImpl<A> {
&self,
_aux: &dyn JarAux,
first_index: IngredientIndex,
_: MemoDropSender,
) -> Vec<Box<dyn Ingredient>> {
vec![Box::new(<IngredientImpl<A>>::new(first_index))]
}
Expand Down Expand Up @@ -137,7 +139,7 @@ impl<A: Accumulator> Ingredient for IngredientImpl<A> {
false
}

fn reset_for_new_revision(&mut self) {
fn reset_for_new_revision(&mut self, _: &mut Table) {
panic!("unexpected reset on accumulator")
}

Expand Down
20 changes: 20 additions & 0 deletions src/exclusive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//! Bare-bones polyfill for the unstable [`std::sync::Exclusive`] type.

pub struct Exclusive<T: ?Sized> {
inner: T,
}

// SAFETY: We only hand out mutable access to the inner value through a mutable reference to the
// wrapper.
// Therefore we cannot alias the inner value making it trivially sync.
unsafe impl<T> Sync for Exclusive<T> {}

impl<T> Exclusive<T> {
pub fn new(inner: T) -> Self {
Self { inner }
}

pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
}
43 changes: 24 additions & 19 deletions src/function.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{any::Any, fmt, sync::Arc};
use std::{any::Any, fmt, marker::PhantomData, sync::Arc};

use crate::{
accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues},
Expand All @@ -7,18 +7,16 @@ use crate::{
key::DatabaseKeyIndex,
plumbing::JarAux,
salsa_struct::SalsaStructInDb,
table::{memo::MemoDropSender, Table},
zalsa::{IngredientIndex, MemoIngredientIndex, Zalsa},
zalsa_local::QueryOrigin,
Cycle, Database, Id, Revision,
};

use self::delete::DeletedEntries;

use super::ingredient::Ingredient;

mod accumulated;
mod backdate;
mod delete;
mod diff_outputs;
mod execute;
mod fetch;
Expand Down Expand Up @@ -112,7 +110,8 @@ pub struct IngredientImpl<C: Configuration> {
/// current revision: you would be right, but we are being defensive, because
/// we don't know that we can trust the database to give us the same runtime
/// everytime and so forth.
deleted_entries: DeletedEntries<C>,
delete: MemoDropSender,
config: PhantomData<fn(C) -> C>,
}

/// True if `old_value == new_value`. Invoked by the generated
Expand All @@ -126,12 +125,18 @@ impl<C> IngredientImpl<C>
where
C: Configuration,
{
pub fn new(struct_index: IngredientIndex, index: IngredientIndex, aux: &dyn JarAux) -> Self {
pub fn new(
struct_index: IngredientIndex,
index: IngredientIndex,
aux: &dyn JarAux,
delete: MemoDropSender,
) -> Self {
Self {
index,
memo_ingredient_index: aux.next_memo_ingredient_index(struct_index, index),
lru: Default::default(),
deleted_entries: Default::default(),
delete,
config: PhantomData,
}
}

Expand Down Expand Up @@ -167,16 +172,10 @@ where
memo: memo::Memo<C::Output<'db>>,
) -> &'db memo::Memo<C::Output<'db>> {
let memo = Arc::new(memo);
let db_memo = unsafe {
// Unsafety conditions: memo must be in the map (it's not yet, but it will be by the time this
// value is returned) and anything removed from map is added to deleted entries (ensured elsewhere).
self.extend_memo_lifetime(&memo)
};
if let Some(old_value) = self.insert_memo_into_table_for(zalsa, id, memo) {
// In case there is a reference to the old memo out there, we have to store it
// in the deleted entries. This will get cleared when a new revision starts.
self.deleted_entries.push(old_value);
}
// Unsafety conditions: memo must be in the map (it's not yet, but it will be by the time this
// value is returned) and anything removed from map is added to deleted entries (ensured elsewhere).
let db_memo = unsafe { self.extend_memo_lifetime(&memo) };
self.insert_memo_into_table_for(zalsa, id, memo);
db_memo
}
}
Expand Down Expand Up @@ -231,8 +230,14 @@ where
true
}

fn reset_for_new_revision(&mut self) {
std::mem::take(&mut self.deleted_entries);
fn reset_for_new_revision(&mut self, table: &mut Table) {
self.lru.for_each_evicted(|evict| {
Self::evict_value_from_memo_for(
table.memos_mut(evict),
self.memo_ingredient_index,
&self.delete,
)
});
}

fn fmt_index(&self, index: Option<crate::Id>, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
25 changes: 0 additions & 25 deletions src/function/delete.rs

This file was deleted.

6 changes: 2 additions & 4 deletions src/function/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ where
C: Configuration,
{
pub fn fetch<'db>(&'db self, db: &'db C::DbView, id: Id) -> &'db C::Output<'db> {
let (zalsa, zalsa_local) = db.zalsas();
let zalsa_local = db.zalsa_local();
zalsa_local.unwind_if_revision_cancelled(db.as_dyn_database());

let memo = self.refresh_memo(db, id);
Expand All @@ -19,9 +19,7 @@ where
changed_at,
} = memo.revisions.stamped_value(memo.value.as_ref().unwrap());

if let Some(evicted) = self.lru.record_use(id) {
self.evict_value_from_memo_for(zalsa, evicted);
}
self.lru.record_use(id);

zalsa_local.report_tracked_read(
self.database_key_index(id).into(),
Expand Down
26 changes: 15 additions & 11 deletions src/function/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,35 @@ pub(super) struct Lru {
}

impl Lru {
pub(super) fn record_use(&self, index: Id) -> Option<Id> {
pub(super) fn record_use(&self, index: Id) {
// Relaxed should be fine, we don't need to synchronize on this.
let capacity = self.capacity.load(Ordering::Relaxed);

if capacity == 0 {
// LRU is disabled
return None;
return;
}

let mut set = self.set.lock();
set.insert(index);
if set.len() > capacity {
return set.pop_front();
}

None
}

pub(super) fn set_capacity(&self, capacity: usize) {
// Relaxed should be fine, we don't need to synchronize on this.
self.capacity.store(capacity, Ordering::Relaxed);
}

if capacity == 0 {
let mut set = self.set.lock();
*set = FxLinkedHashSet::default();
pub(super) fn for_each_evicted(&mut self, mut cb: impl FnMut(Id)) {
let set = self.set.get_mut();
// Relaxed should be fine, we don't need to synchronize on this.
let cap = self.capacity.load(Ordering::Relaxed);
if set.len() <= cap || cap == 0 {
return;
}
while let Some(id) = set.pop_front() {
cb(id);
if set.len() <= cap {
break;
}
}
}
}
Loading