Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions boomerang_builder/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ impl EnvBuilder {
bank_info: Option<runtime::BankInfo>,
state: S,
is_enclave: bool,
) -> ReactorBuilderState {
) -> ReactorBuilderState<'_> {
ReactorBuilderState::new(name, parent, bank_info, state, is_enclave, self)
}

/// Get a previously built reactor
pub fn get_reactor_builder(
&mut self,
reactor_key: BuilderReactorKey,
) -> Result<ReactorBuilderState, BuilderError> {
) -> Result<ReactorBuilderState<'_>, BuilderError> {
if !self.reactor_builders.contains_key(reactor_key) {
return Err(BuilderError::ReactorKeyNotFound(reactor_key));
}
Expand Down Expand Up @@ -249,7 +249,7 @@ impl EnvBuilder {
name: &str,
reactor_key: BuilderReactorKey,
reaction_builder_fn: F,
) -> ReactionBuilderState
) -> ReactionBuilderState<'_>
where
F: FnOnce(&BuilderRuntimeParts) -> runtime::BoxedReactionFn + 'static,
{
Expand Down
4 changes: 2 additions & 2 deletions boomerang_builder/src/fqn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{fmt::Display, ops::Index};

use crate::{
runtime, ActionBuilder, BasePortBuilder, BuilderActionKey, BuilderPortKey, BuilderReactionKey,
ActionBuilder, BasePortBuilder, BuilderActionKey, BuilderPortKey, BuilderReactionKey,
BuilderReactorKey, EnvBuilder, ParentReactorBuilder, ReactionBuilder, ReactorBuilder,
};

Expand Down Expand Up @@ -291,7 +291,7 @@ impl Fqn for BuilderPortKey {

#[cfg(test)]
mod tests {
use crate::{Input, PortBuilder};
use crate::{runtime, Input, PortBuilder};

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion boomerang_builder/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl<'a> ReactorBuilderState<'a> {
}

/// Add a new reaction to this reactor.
pub fn add_reaction<F>(&mut self, name: &str, reaction_builder_fn: F) -> ReactionBuilderState
pub fn add_reaction<F>(&mut self, name: &str, reaction_builder_fn: F) -> ReactionBuilderState<'_>
where
F: for<'any> FnOnce(&'any BuilderRuntimeParts) -> runtime::BoxedReactionFn + 'static,
{
Expand Down
1 change: 1 addition & 0 deletions boomerang_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ kanal = "0.1"
document-features = { workspace = true }
downcast-rs = "1.2"
itertools.workspace = true
pin-project = "1.1"
rayon = { version = "1.7", optional = true }
serde = { workspace = true, optional = true, features = ["derive"] }
thiserror.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion boomerang_runtime/src/port/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl<'a, T: ReactorData> Deref for InputRef<'a, T> {
}
}

impl<'a, T: ReactorData> From<&'a (dyn BasePort)> for InputRef<'a, T> {
impl<'a, T: ReactorData> From<&'a dyn BasePort> for InputRef<'a, T> {
fn from(port: &'a dyn BasePort) -> Self {
InputRef::from(
port.downcast_ref::<Port<T>>()
Expand Down
2 changes: 1 addition & 1 deletion boomerang_runtime/src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ where
// PartitionMut for BasePort arrays
impl<'a, P, const N: usize> PartitionMut<'a, dyn BasePort> for [P; N]
where
P: From<&'a mut (dyn BasePort)>,
P: From<&'a mut dyn BasePort>,
{
fn part_mut(mut refs: RefsMut<'a, dyn BasePort>) -> Option<(Self, RefsMut<'a, dyn BasePort>)> {
if refs.len() < N {
Expand Down
174 changes: 106 additions & 68 deletions boomerang_runtime/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Runtime data storage

use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
use std::{pin::Pin, ptr::NonNull};

use crate::{
refs::{Refs, RefsMut},
Expand Down Expand Up @@ -108,11 +108,13 @@ struct Inner {
}

#[derive(Debug)]
#[pin_project::pin_project]
pub struct Store {
#[pin]
inner: Inner,
/// Internal caches of `ReactionTriggerCtxPtrs`
#[pin]
caches: tinymap::TinySecondaryMap<ReactionKey, ReactionTriggerCtxPtrs>,
_pin: PhantomPinned,
}

impl Store {
Expand Down Expand Up @@ -140,80 +142,73 @@ impl Store {
ports: env.ports,
},
caches: ptrs,
_pin: PhantomPinned,
};

let mut boxed = Box::new(res);
// Pin the Box first, then use projection for safe access
let mut pinned = Box::pin(res);

let contexts = unsafe {
boxed
.inner
// Use pin-project's projection to safely access the pinned fields
let this = pinned.as_mut().project();
let inner = this.inner.get_mut();
let caches = this.caches.get_mut();

// SAFETY: We're initializing the caches with self-references. This is safe because:
// 1. The data is already pinned and won't move
// 2. We're creating pointers to pinned data
// 3. The Store will remain pinned for its entire lifetime
unsafe {
let contexts = inner
.contexts
.iter_many_unchecked_mut(boxed.inner.reactions.keys())
.map(|c| NonNull::new_unchecked(c))
};
.iter_many_unchecked_mut(inner.reactions.keys())
.map(|c| NonNull::new_unchecked(c));

let reactor_keys = boxed
.inner
.reactions
.keys()
.map(|reaction_key| reaction_graph.reaction_reactors[reaction_key]);
let reactor_keys = inner
.reactions
.keys()
.map(|reaction_key| reaction_graph.reaction_reactors[reaction_key]);

let reactors = unsafe {
boxed
.inner
let reactors = inner
.reactors
.iter_many_unchecked_ptrs_mut(reactor_keys)
.map(|r| NonNull::new_unchecked(&mut **r as *mut _))
};
.map(|r| NonNull::new_unchecked(&mut **r as *mut _));

let reactions = unsafe {
boxed
.inner
let reactions = inner
.reactions
.iter_many_unchecked_mut(boxed.inner.reactions.keys())
.map(|r| NonNull::new_unchecked(r))
};
.iter_many_unchecked_mut(inner.reactions.keys())
.map(|r| NonNull::new_unchecked(r));

let action_keys = reaction_graph
.reaction_actions
.values()
.map(|actions| actions.iter());
let action_keys = reaction_graph
.reaction_actions
.values()
.map(|actions| actions.iter());

let (_, grouped_actions) = unsafe {
boxed
.inner
let (_, grouped_actions) = inner
.actions
.iter_ptr_chunks_split_unchecked(std::iter::empty(), action_keys)
};
.iter_ptr_chunks_split_unchecked(std::iter::empty(), action_keys);

let port_ref_keys = reaction_graph
.reaction_use_ports
.values()
.map(|ports| ports.iter());
let port_ref_keys = reaction_graph
.reaction_use_ports
.values()
.map(|ports| ports.iter());

let port_mut_keys = reaction_graph
.reaction_effect_ports
.values()
.map(|ports| ports.iter());
let port_mut_keys = reaction_graph
.reaction_effect_ports
.values()
.map(|ports| ports.iter());

let (grouped_ref_ports, grouped_mut_ports) = unsafe {
boxed
.inner
let (grouped_ref_ports, grouped_mut_ports) = inner
.ports
.iter_ptr_chunks_split_unchecked(port_ref_keys, port_mut_keys)
};
.iter_ptr_chunks_split_unchecked(port_ref_keys, port_mut_keys);

for ((_, cache), context, reactor, reaction, actions, ref_ports, mut_ports) in itertools::izip!(
boxed.caches.iter_mut(),
contexts,
reactors,
reactions,
grouped_actions,
grouped_ref_ports,
grouped_mut_ports,
) {
unsafe {
for ((_, cache), context, reactor, reaction, actions, ref_ports, mut_ports) in itertools::izip!(
caches.iter_mut(),
contexts,
reactors,
reactions,
grouped_actions,
grouped_ref_ports,
grouped_mut_ports,
) {
cache.context = context;
cache.reactor = Some(reactor);
cache.reaction = reaction;
Expand All @@ -229,7 +224,7 @@ impl Store {
}
}

Box::into_pin(boxed)
pinned
}

pub fn push_action_value(
Expand All @@ -238,39 +233,82 @@ impl Store {
tag: Tag,
value: Box<dyn ReactorData>,
) {
// SAFETY: we are not moving anything from self
let actions = &mut unsafe { self.as_mut().get_unchecked_mut() }.inner.actions;
// SAFETY: we are projecting to a field, not moving anything from self
let actions = &mut self.as_mut().project().inner.actions;
actions[action_key].push_value(tag, value);
}

/// Returns an `Iterator` of `ReactionTriggerCtx` for each `Reaction` in the given
/// `reaction_keys`.
///
/// This uses the previously stored `ReactionTriggerCtxPtrs`.
/// This uses the previously stored `ReactionTriggerCtxPtrs` which contain `NonNull` pointers
/// to data within the `Store`'s pinned `inner` fields.
///
/// # Safety
/// TODO: Document safety
///
/// This method is unsafe because it extends the lifetime of mutable references to the cache
/// from the projection's temporary lifetime to `'a` (the lifetime of `&'a mut Pin<Box<Self>>`).
///
/// ## Caller Requirements:
///
/// 1. **No Aliasing**: The caller must ensure that no other references (mutable or immutable)
/// to the data pointed to by the returned `ReactionTriggerCtx` instances exist while the
/// returned iterator is alive. This includes:
/// - Other calls to `iter_borrow_storage` with overlapping `ReactionKey`s
/// - Direct access to `inner.contexts`, `inner.reactors`, `inner.reactions`, etc.
///
/// 2. **Sequential Processing**: Reactions returned by this iterator should be processed
/// sequentially (or with non-overlapping keys in parallel) to avoid multiple mutable
/// borrows of the same underlying data.
///
/// 3. **No Store Modification**: The `Store` must not be modified (e.g., through
/// `push_action_value`, `reset_ports`, etc.) while the returned iterator or any
/// `ReactionTriggerCtx` instances derived from it are alive.
///
/// ## Why This is Sound:
///
/// - The `Store` is pinned, so the data pointed to by the cached `NonNull` pointers
/// will not move or be invalidated.
/// - The `ReactionTriggerCtxPtrs` contain pointers that were created during `Store::new()`
/// and point to pinned fields within `inner`.
/// - Each `ReactionTriggerCtx` provides exclusive mutable access to a specific reaction's
/// resources (context, reactor, reaction, actions, ports), which doesn't overlap with
/// other reactions' resources when used correctly.
/// - The lifetime `'a` ensures that the returned contexts cannot outlive the `Store` itself.
pub unsafe fn iter_borrow_storage<'a>(
self: &'a mut Pin<Box<Self>>,
keys: impl Iterator<Item = ReactionKey> + 'a,
) -> impl Iterator<Item = ReactionTriggerCtx<'a>> + 'a {
let ptrs = &mut self.as_mut().get_unchecked_mut().caches;
ptrs.iter_many_unchecked_mut(keys)
// SAFETY: We use pin-project's projection to safely access the caches field.
// The lifetime 'a properly represents the borrow of the Store. This is safe because:
// 1. The Store is pinned and won't move
// 2. The caller must uphold the safety requirements documented above
// 3. The returned ReactionTriggerCtx instances borrow from data that lives as long as
// the Store itself (lifetime 'a)
let caches = self.as_mut().project().caches.get_mut();
caches
.iter_many_unchecked_mut(keys)
.map(ReactionTriggerCtx::from)
}

/// Returns an `Iterator` of `PortKey`s that currently have a value set.
pub fn iter_set_port_keys(self: &Pin<Box<Self>>) -> impl Iterator<Item = PortKey> + '_ {
self.inner
self.as_ref()
.get_ref()
.inner
.ports
.iter()
.filter(|&(_, port)| port.is_set())
.map(|(key, _)| key)
}

pub fn reset_ports(self: &mut Pin<Box<Self>>) {
let store = unsafe { self.as_mut().get_unchecked_mut() };
store.inner.ports.values_mut().for_each(|p| p.cleanup());
self.as_mut()
.project()
.inner
.ports
.values_mut()
.for_each(|p| p.cleanup());
}

/// Turn this `Store` back into the `Env` it was built from.
Expand Down
2 changes: 1 addition & 1 deletion boomerang_tinymap/src/secondary_map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl<K: Key, V> TinySecondaryMap<K, V> {
}

/// Returns an iterator over the values in the map, ordered by key.
pub fn values(&self) -> ValuesIter<V> {
pub fn values(&self) -> ValuesIter<'_, V> {
ValuesIter {
num_values: self.num_values,
inner: self.data.iter().flatten(),
Expand Down
Loading