Skip to content

Commit

Permalink
Merge pull request #679 from Veykril/veykril/push-wwqrktzzmypo
Browse files Browse the repository at this point in the history
Replace all uses of `AtomicCell` with appropriate atomic primitives
  • Loading branch information
Veykril authored Feb 11, 2025
2 parents 351d9cf + 7faf13c commit ff3ef18
Show file tree
Hide file tree
Showing 20 changed files with 213 additions and 60 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ rust-version = "1.76"
[dependencies]
arc-swap = "1"
compact_str = { version = "0.8", optional = true }
crossbeam = "0.8"
crossbeam-queue = "0.3.11"
dashmap = { version = "6", features = ["raw-api"] }
hashlink = "0.9"
hashbrown = "0.14.3"
Expand Down Expand Up @@ -41,6 +41,7 @@ ordered-float = "4.2.1"
rustversion = "1.0"
test-log = { version = "0.2.11", features = ["trace"] }
trybuild = "1.0"
crossbeam-channel = "0.5.14"

[[bench]]
name = "compare"
Expand Down
2 changes: 1 addition & 1 deletion examples/lazy-input/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
time::Duration,
};

use crossbeam::channel::{unbounded, Sender};
use crossbeam_channel::{unbounded, Sender};
use dashmap::{mapref::entry::Entry, DashMap};
use eyre::{eyre, Context, Report, Result};
use notify_debouncer_mini::{
Expand Down
51 changes: 49 additions & 2 deletions src/accumulator/accumulated_map.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::ops;
use std::{
ops,
sync::atomic::{AtomicBool, Ordering},
};

use rustc_hash::FxHashMap;

Expand Down Expand Up @@ -55,7 +58,7 @@ impl Clone for AccumulatedMap {
///
/// Knowning whether any input has accumulated values makes aggregating the accumulated values
/// cheaper because we can skip over entire subtrees.
#[derive(Copy, Clone, Debug, Default)]
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub enum InputAccumulatedValues {
/// The query nor any of its inputs have any accumulated values.
#[default]
Expand Down Expand Up @@ -91,3 +94,47 @@ impl ops::BitOrAssign for InputAccumulatedValues {
*self = *self | rhs;
}
}

#[derive(Debug, Default)]
pub struct AtomicInputAccumulatedValues(AtomicBool);

impl Clone for AtomicInputAccumulatedValues {
fn clone(&self) -> Self {
Self(AtomicBool::new(self.0.load(Ordering::Relaxed)))
}
}

impl AtomicInputAccumulatedValues {
pub(crate) fn new(accumulated_inputs: InputAccumulatedValues) -> Self {
Self(AtomicBool::new(accumulated_inputs.is_any()))
}

pub(crate) fn store(&self, accumulated: InputAccumulatedValues) {
self.0.store(accumulated.is_any(), Ordering::Release);
}

pub(crate) fn load(&self) -> InputAccumulatedValues {
if self.0.load(Ordering::Acquire) {
InputAccumulatedValues::Any
} else {
InputAccumulatedValues::Empty
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn atomic_input_accumulated_values() {
let val = AtomicInputAccumulatedValues::new(InputAccumulatedValues::Empty);
assert_eq!(val.load(), InputAccumulatedValues::Empty);
val.store(InputAccumulatedValues::Any);
assert_eq!(val.load(), InputAccumulatedValues::Any);
let val = AtomicInputAccumulatedValues::new(InputAccumulatedValues::Any);
assert_eq!(val.load(), InputAccumulatedValues::Any);
val.store(InputAccumulatedValues::Empty);
assert_eq!(val.load(), InputAccumulatedValues::Empty);
}
}
5 changes: 2 additions & 3 deletions src/active_query.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::ops::Not;

use crossbeam::atomic::AtomicCell;

use super::zalsa_local::{QueryEdges, QueryOrigin, QueryRevisions};
use crate::accumulator::accumulated_map::AtomicInputAccumulatedValues;
use crate::key::OutputDependencyIndex;
use crate::tracked_struct::{DisambiguatorMap, IdentityHash, IdentityMap};
use crate::zalsa_local::QueryEdge;
Expand Down Expand Up @@ -131,7 +130,7 @@ impl ActiveQuery {
origin,
durability: self.durability,
tracked_struct_ids: self.tracked_struct_ids,
accumulated_inputs: AtomicCell::new(self.accumulated_inputs),
accumulated_inputs: AtomicInputAccumulatedValues::new(self.accumulated_inputs),
accumulated,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/function/delete.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crossbeam::queue::SegQueue;
use crossbeam_queue::SegQueue;

use super::{memo::ArcMemo, Configuration};

Expand Down
11 changes: 7 additions & 4 deletions src/function/lru.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::{hash::FxLinkedHashSet, Id};

use crossbeam::atomic::AtomicCell;
use parking_lot::Mutex;

#[derive(Default)]
pub(super) struct Lru {
capacity: AtomicCell<usize>,
capacity: AtomicUsize,
set: Mutex<FxLinkedHashSet<Id>>,
}

impl Lru {
pub(super) fn record_use(&self, index: Id) -> Option<Id> {
let capacity = self.capacity.load();
// Relaxed should be fine, we don't need to synchronize on this.
let capacity = self.capacity.load(Ordering::Relaxed);

if capacity == 0 {
// LRU is disabled
Expand All @@ -28,7 +30,8 @@ impl Lru {
}

pub(super) fn set_capacity(&self, capacity: usize) {
self.capacity.store(capacity);
// Relaxed should be fine, we don't need to synchronize on this.
self.capacity.store(capacity, Ordering::Relaxed);

if capacity == 0 {
let mut set = self.set.lock();
Expand Down
9 changes: 4 additions & 5 deletions src/function/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use crossbeam::atomic::AtomicCell;

use crate::accumulator::accumulated_map::InputAccumulatedValues;
use crate::revision::AtomicRevision;
use crate::zalsa_local::QueryOrigin;
use crate::{
key::DatabaseKeyIndex, zalsa::Zalsa, zalsa_local::QueryRevisions, Event, EventKind, Id,
Expand Down Expand Up @@ -95,7 +94,7 @@ impl<C: Configuration> IngredientImpl<C> {
origin: origin.clone(),
tracked_struct_ids: tracked_struct_ids.clone(),
accumulated: accumulated.clone(),
accumulated_inputs: AtomicCell::new(accumulated_inputs.load()),
accumulated_inputs: accumulated_inputs.clone(),
},
))
}
Expand All @@ -117,7 +116,7 @@ pub(super) struct Memo<V> {

/// Last revision when this memo was verified; this begins
/// as the current revision.
pub(super) verified_at: AtomicCell<Revision>,
pub(super) verified_at: AtomicRevision,

/// Revision information
pub(super) revisions: QueryRevisions,
Expand All @@ -132,7 +131,7 @@ impl<V> Memo<V> {
pub(super) fn new(value: Option<V>, revision_now: Revision, revisions: QueryRevisions) -> Self {
Memo {
value,
verified_at: AtomicCell::new(revision_now),
verified_at: AtomicRevision::from(revision_now),
revisions,
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/function/specify.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crossbeam::atomic::AtomicCell;

use crate::{
accumulator::accumulated_map::InputAccumulatedValues,
revision::AtomicRevision,
tracked_struct::TrackedStructInDb,
zalsa::ZalsaDatabase,
zalsa_local::{QueryOrigin, QueryRevisions},
Expand Down Expand Up @@ -81,7 +80,7 @@ where

let memo = Memo {
value: Some(value),
verified_at: AtomicCell::new(revision),
verified_at: AtomicRevision::from(revision),
revisions,
};

Expand Down
2 changes: 1 addition & 1 deletion src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl<C: Configuration> IngredientImpl<C> {
pub fn new_input(&self, db: &dyn Database, fields: C::Fields, stamps: C::Stamps) -> C::Struct {
let (zalsa, zalsa_local) = db.zalsas();

let id = self.singleton.with_lock(|| {
let id = self.singleton.with_scope(|| {
zalsa_local.allocate(zalsa.table(), self.ingredient_index, |_| Value::<C> {
fields,
stamps,
Expand Down
34 changes: 19 additions & 15 deletions src/input/singleton.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,56 @@
use crossbeam::atomic::AtomicCell;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicU32, Ordering};

use crate::{id::FromId, Id};
use crate::Id;

mod sealed {
pub trait Sealed {}
}

pub trait SingletonChoice: sealed::Sealed + Default {
fn with_lock(&self, cb: impl FnOnce() -> Id) -> Id;
fn with_scope(&self, cb: impl FnOnce() -> Id) -> Id;
fn index(&self) -> Option<Id>;
}

pub struct Singleton {
index: AtomicCell<Option<Id>>,
lock: Mutex<()>,
index: AtomicU32,
}
impl sealed::Sealed for Singleton {}
impl SingletonChoice for Singleton {
fn with_lock(&self, cb: impl FnOnce() -> Id) -> Id {
let _guard = self.lock.lock();
if self.index.load().is_some() {
fn with_scope(&self, cb: impl FnOnce() -> Id) -> Id {
if self.index.load(Ordering::Acquire) != 0 {
panic!("singleton struct may not be duplicated");
}
let id = cb();
self.index.store(Some(id));
drop(_guard);
if self
.index
.compare_exchange(0, id.as_u32() + 1, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
panic!("singleton struct may not be duplicated");
}
id
}

fn index(&self) -> Option<Id> {
self.index.load().map(FromId::from_id)
match self.index.load(Ordering::Acquire) {
0 => None,
id => Some(Id::from_u32(id - 1)),
}
}
}

impl Default for Singleton {
fn default() -> Self {
Self {
index: AtomicCell::new(None),
lock: Default::default(),
index: AtomicU32::new(0),
}
}
}
#[derive(Default)]
pub struct NotSingleton;
impl sealed::Sealed for NotSingleton {}
impl SingletonChoice for NotSingleton {
fn with_lock(&self, cb: impl FnOnce() -> Id) -> Id {
fn with_scope(&self, cb: impl FnOnce() -> Id) -> Id {
cb()
}
fn index(&self) -> Option<Id> {
Expand Down
Loading

0 comments on commit ff3ef18

Please sign in to comment.