Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions boomerang_builder/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ impl EnvBuilder {
bank_info: Option<runtime::BankInfo>,
state: S,
is_enclave: bool,
) -> ReactorBuilderState {
) -> ReactorBuilderState<'_> {
ReactorBuilderState::new(name, parent, bank_info, state, is_enclave, self)
}

/// Get a previously built reactor
pub fn get_reactor_builder(
&mut self,
reactor_key: BuilderReactorKey,
) -> Result<ReactorBuilderState, BuilderError> {
) -> Result<ReactorBuilderState<'_>, BuilderError> {
if !self.reactor_builders.contains_key(reactor_key) {
return Err(BuilderError::ReactorKeyNotFound(reactor_key));
}
Expand Down Expand Up @@ -249,7 +249,7 @@ impl EnvBuilder {
name: &str,
reactor_key: BuilderReactorKey,
reaction_builder_fn: F,
) -> ReactionBuilderState
) -> ReactionBuilderState<'_>
where
F: FnOnce(&BuilderRuntimeParts) -> runtime::BoxedReactionFn + 'static,
{
Expand Down
4 changes: 2 additions & 2 deletions boomerang_builder/src/fqn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{fmt::Display, ops::Index};

use crate::{
runtime, ActionBuilder, BasePortBuilder, BuilderActionKey, BuilderPortKey, BuilderReactionKey,
ActionBuilder, BasePortBuilder, BuilderActionKey, BuilderPortKey, BuilderReactionKey,
BuilderReactorKey, EnvBuilder, ParentReactorBuilder, ReactionBuilder, ReactorBuilder,
};

Expand Down Expand Up @@ -291,7 +291,7 @@ impl Fqn for BuilderPortKey {

#[cfg(test)]
mod tests {
use crate::{Input, PortBuilder};
use crate::{runtime, Input, PortBuilder};

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion boomerang_builder/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl<'a> ReactorBuilderState<'a> {
}

/// Add a new reaction to this reactor.
pub fn add_reaction<F>(&mut self, name: &str, reaction_builder_fn: F) -> ReactionBuilderState
pub fn add_reaction<F>(&mut self, name: &str, reaction_builder_fn: F) -> ReactionBuilderState<'_>
where
F: for<'any> FnOnce(&'any BuilderRuntimeParts) -> runtime::BoxedReactionFn + 'static,
{
Expand Down
1 change: 1 addition & 0 deletions boomerang_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ kanal = "0.1"
document-features = { workspace = true }
downcast-rs = "1.2"
itertools.workspace = true
pin-project = "1.1"
rayon = { version = "1.7", optional = true }
serde = { workspace = true, optional = true, features = ["derive"] }
thiserror.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion boomerang_runtime/src/port/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl<'a, T: ReactorData> Deref for InputRef<'a, T> {
}
}

impl<'a, T: ReactorData> From<&'a (dyn BasePort)> for InputRef<'a, T> {
impl<'a, T: ReactorData> From<&'a dyn BasePort> for InputRef<'a, T> {
fn from(port: &'a dyn BasePort) -> Self {
InputRef::from(
port.downcast_ref::<Port<T>>()
Expand Down
2 changes: 1 addition & 1 deletion boomerang_runtime/src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ where
// PartitionMut for BasePort arrays
impl<'a, P, const N: usize> PartitionMut<'a, dyn BasePort> for [P; N]
where
P: From<&'a mut (dyn BasePort)>,
P: From<&'a mut dyn BasePort>,
{
fn part_mut(mut refs: RefsMut<'a, dyn BasePort>) -> Option<(Self, RefsMut<'a, dyn BasePort>)> {
if refs.len() < N {
Expand Down
179 changes: 110 additions & 69 deletions boomerang_runtime/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Runtime data storage

use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
use std::{pin::Pin, ptr::NonNull};

use crate::{
refs::{Refs, RefsMut},
Expand Down Expand Up @@ -108,11 +108,13 @@ struct Inner {
}

#[derive(Debug)]
#[pin_project::pin_project]
pub struct Store {
#[pin]
inner: Inner,
/// Internal caches of `ReactionTriggerCtxPtrs`
#[pin]
caches: tinymap::TinySecondaryMap<ReactionKey, ReactionTriggerCtxPtrs>,
_pin: PhantomPinned,
}

impl Store {
Expand Down Expand Up @@ -140,80 +142,76 @@ impl Store {
ports: env.ports,
},
caches: ptrs,
_pin: PhantomPinned,
};

let mut boxed = Box::new(res);

let contexts = unsafe {
boxed
.inner
// Pin the Box first, then use projection for safe access
let mut pinned = Box::pin(res);

// SAFETY: We're initializing the caches with self-references. This is safe because:
// 1. The data is already pinned and won't move
// 2. We're creating pointers to pinned data
// 3. The Store will remain pinned for its entire lifetime
unsafe {
// Get a mutable reference to the pinned struct through projection
let pin_mut = pinned.as_mut();
let ptr = pin_mut.get_unchecked_mut() as *mut Store;

// Access fields through the raw pointer for initialization
let inner = &mut (*ptr).inner;
let caches = &mut (*ptr).caches;

let contexts = inner
.contexts
.iter_many_unchecked_mut(boxed.inner.reactions.keys())
.map(|c| NonNull::new_unchecked(c))
};
.iter_many_unchecked_mut(inner.reactions.keys())
.map(|c| NonNull::new_unchecked(c));

let reactor_keys = boxed
.inner
.reactions
.keys()
.map(|reaction_key| reaction_graph.reaction_reactors[reaction_key]);
let reactor_keys = inner
.reactions
.keys()
.map(|reaction_key| reaction_graph.reaction_reactors[reaction_key]);

let reactors = unsafe {
boxed
.inner
let reactors = inner
.reactors
.iter_many_unchecked_ptrs_mut(reactor_keys)
.map(|r| NonNull::new_unchecked(&mut **r as *mut _))
};
.map(|r| NonNull::new_unchecked(&mut **r as *mut _));

let reactions = unsafe {
boxed
.inner
let reactions = inner
.reactions
.iter_many_unchecked_mut(boxed.inner.reactions.keys())
.map(|r| NonNull::new_unchecked(r))
};
.iter_many_unchecked_mut(inner.reactions.keys())
.map(|r| NonNull::new_unchecked(r));

let action_keys = reaction_graph
.reaction_actions
.values()
.map(|actions| actions.iter());
let action_keys = reaction_graph
.reaction_actions
.values()
.map(|actions| actions.iter());

let (_, grouped_actions) = unsafe {
boxed
.inner
let (_, grouped_actions) = inner
.actions
.iter_ptr_chunks_split_unchecked(std::iter::empty(), action_keys)
};
.iter_ptr_chunks_split_unchecked(std::iter::empty(), action_keys);

let port_ref_keys = reaction_graph
.reaction_use_ports
.values()
.map(|ports| ports.iter());
let port_ref_keys = reaction_graph
.reaction_use_ports
.values()
.map(|ports| ports.iter());

let port_mut_keys = reaction_graph
.reaction_effect_ports
.values()
.map(|ports| ports.iter());
let port_mut_keys = reaction_graph
.reaction_effect_ports
.values()
.map(|ports| ports.iter());

let (grouped_ref_ports, grouped_mut_ports) = unsafe {
boxed
.inner
let (grouped_ref_ports, grouped_mut_ports) = inner
.ports
.iter_ptr_chunks_split_unchecked(port_ref_keys, port_mut_keys)
};
.iter_ptr_chunks_split_unchecked(port_ref_keys, port_mut_keys);

for ((_, cache), context, reactor, reaction, actions, ref_ports, mut_ports) in itertools::izip!(
boxed.caches.iter_mut(),
contexts,
reactors,
reactions,
grouped_actions,
grouped_ref_ports,
grouped_mut_ports,
) {
unsafe {
for ((_, cache), context, reactor, reaction, actions, ref_ports, mut_ports) in itertools::izip!(
caches.iter_mut(),
contexts,
reactors,
reactions,
grouped_actions,
grouped_ref_ports,
grouped_mut_ports,
) {
cache.context = context;
cache.reactor = Some(reactor);
cache.reaction = reaction;
Expand All @@ -229,7 +227,7 @@ impl Store {
}
}

Box::into_pin(boxed)
pinned
}

pub fn push_action_value(
Expand All @@ -238,39 +236,82 @@ impl Store {
tag: Tag,
value: Box<dyn ReactorData>,
) {
// SAFETY: we are not moving anything from self
let actions = &mut unsafe { self.as_mut().get_unchecked_mut() }.inner.actions;
// SAFETY: we are projecting to a field, not moving anything from self
let actions = &mut self.as_mut().project().inner.actions;
actions[action_key].push_value(tag, value);
}

/// Returns an `Iterator` of `ReactionTriggerCtx` for each `Reaction` in the given
/// `reaction_keys`.
///
/// This uses the previously stored `ReactionTriggerCtxPtrs`.
/// This uses the previously stored `ReactionTriggerCtxPtrs` which contain `NonNull` pointers
/// to data within the `Store`'s pinned `inner` fields.
///
/// # Safety
/// TODO: Document safety
///
/// This method is unsafe because it extends the lifetime of mutable references to the cache
/// from the projection's temporary lifetime to `'a` (the lifetime of `&'a mut Pin<Box<Self>>`).
///
/// ## Caller Requirements:
///
/// 1. **No Aliasing**: The caller must ensure that no other references (mutable or immutable)
/// to the data pointed to by the returned `ReactionTriggerCtx` instances exist while the
/// returned iterator is alive. This includes:
/// - Other calls to `iter_borrow_storage` with overlapping `ReactionKey`s
/// - Direct access to `inner.contexts`, `inner.reactors`, `inner.reactions`, etc.
///
/// 2. **Sequential Processing**: Reactions returned by this iterator should be processed
/// sequentially (or with non-overlapping keys in parallel) to avoid multiple mutable
/// borrows of the same underlying data.
///
/// 3. **No Store Modification**: The `Store` must not be modified (e.g., through
/// `push_action_value`, `reset_ports`, etc.) while the returned iterator or any
/// `ReactionTriggerCtx` instances derived from it are alive.
///
/// ## Why This is Sound:
///
/// - The `Store` is pinned, so the data pointed to by the cached `NonNull` pointers
/// will not move or be invalidated.
/// - The `ReactionTriggerCtxPtrs` contain pointers that were created during `Store::new()`
/// and point to pinned fields within `inner`.
/// - Each `ReactionTriggerCtx` provides exclusive mutable access to a specific reaction's
/// resources (context, reactor, reaction, actions, ports), which doesn't overlap with
/// other reactions' resources when used correctly.
/// - The lifetime `'a` ensures that the returned contexts cannot outlive the `Store` itself.
pub unsafe fn iter_borrow_storage<'a>(
self: &'a mut Pin<Box<Self>>,
keys: impl Iterator<Item = ReactionKey> + 'a,
) -> impl Iterator<Item = ReactionTriggerCtx<'a>> + 'a {
let ptrs = &mut self.as_mut().get_unchecked_mut().caches;
ptrs.iter_many_unchecked_mut(keys)
// SAFETY: We use get_unchecked_mut to extend the lifetime of the mutable reference
// to the caches from the projection's temporary scope to 'a. This is safe because:
// 1. The Store is pinned and won't move
// 2. The caller must uphold the safety requirements documented above
// 3. The returned ReactionTriggerCtx instances borrow from data that lives as long as
// the Store itself (lifetime 'a)
let caches = self.as_mut().project().caches.get_unchecked_mut();
caches
.iter_many_unchecked_mut(keys)
.map(ReactionTriggerCtx::from)
}

/// Returns an `Iterator` of `PortKey`s that currently have a value set.
pub fn iter_set_port_keys(self: &Pin<Box<Self>>) -> impl Iterator<Item = PortKey> + '_ {
self.inner
self.as_ref()
.get_ref()
.inner
.ports
.iter()
.filter(|&(_, port)| port.is_set())
.map(|(key, _)| key)
}

pub fn reset_ports(self: &mut Pin<Box<Self>>) {
let store = unsafe { self.as_mut().get_unchecked_mut() };
store.inner.ports.values_mut().for_each(|p| p.cleanup());
self.as_mut()
.project()
.inner
.ports
.values_mut()
.for_each(|p| p.cleanup());
}

/// Turn this `Store` back into the `Env` it was built from.
Expand Down
2 changes: 1 addition & 1 deletion boomerang_tinymap/src/secondary_map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl<K: Key, V> TinySecondaryMap<K, V> {
}

/// Returns an iterator over the values in the map, ordered by key.
pub fn values(&self) -> ValuesIter<V> {
pub fn values(&self) -> ValuesIter<'_, V> {
ValuesIter {
num_values: self.num_values,
inner: self.data.iter().flatten(),
Expand Down
Loading