Skip to content

Commit e21f0b8

Browse files
committed
fix commit feedback
1 parent f785506 commit e21f0b8

10 files changed

Lines changed: 417 additions & 1066 deletions

File tree

rust/otap-dataflow/crates/engine-macros/src/capability.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,18 @@
2828
//! These limitations are fundamental to the type-erased `HashMap<TypeId, Entry>`
2929
//! registry design and the `SharedAsLocal` adapter delegation pattern:
3030
//!
31-
//! - **Trait-level generics / lifetimes** — `TypeId::of::<T>()` requires a
32-
//! concrete monomorphized type; a generic registration struct cannot produce
33-
//! a single `TypeId` for the registry.
31+
//! - **Trait-level generics or lifetime parameters** — e.g.
32+
//! `#[capability] trait Foo<T>` or `#[capability] trait Bar<'a>`.
33+
//! Method-level generics (one bullet up) *are* supported because the
34+
//! trait object `dyn Foo` exists for the trait as a whole. Trait-level
35+
//! parameters, by contrast, mean the trait isn't one type but a
36+
//! *family* — `Foo<u32>`, `Foo<String>`, etc. — each with its own
37+
//! `TypeId::of::<Foo<T>>()`. The registry keys entries by a single
38+
//! `TypeId` per capability, so there is no monomorphized type for the
39+
//! generated registration struct to advertise. Concretize the
40+
//! parameter at the trait definition (e.g. work over `String` or a
41+
//! sealed enum) or split the family into separate `#[capability]`
42+
//! traits.
3443
//! - **Supertraits** (`trait Foo: Bar`) — the `SharedAsLocal` adapter only
3544
//! delegates methods defined directly on the `#[capability]` trait. It cannot
3645
//! auto-implement supertrait methods. Define all methods directly on the

rust/otap-dataflow/crates/engine/src/capability/registry/capabilities.rs

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,25 @@ impl Capabilities {
4848
///
4949
/// # One-shot contract
5050
///
51-
/// Each capability binding may be claimed **at most once per
52-
/// node** across all four accessors (`require_local`,
53-
/// `require_shared`, `optional_local`, `optional_shared`). Node
54-
/// factories are expected to call this once at construction,
55-
/// store the returned handle, and clone/share it within the node
56-
/// as needed. A second call for the same capability — including
57-
/// the implicit shared claim made by the `SharedAsLocal` fallback
58-
/// — returns [`Error::CapabilityAlreadyConsumed`].
51+
/// Each resolved entry is one-shot per node: a `require_local`
52+
/// claim consumes the local entry, and a `require_shared` claim
53+
/// consumes the shared entry. Node factories are expected to
54+
/// call each accessor at most once at construction, store the
55+
/// returned handle, and clone/share it within the node as needed.
56+
///
57+
/// **SharedAsLocal fallback.** When a binding's extension only
58+
/// registered a shared variant, `require_local` falls through to
59+
/// the shared entry and routes its output through the
60+
/// capability's adapter. In that case there is only one
61+
/// underlying produce closure, so `require_local` and
62+
/// `require_shared` share a single one-shot guard \u2014 claiming
63+
/// either accessor consumes the binding for both. A second call
64+
/// returns [`Error::CapabilityAlreadyConsumed`].
65+
///
66+
/// **Native dual.** When a binding's extension registered both a
67+
/// native local and a native shared variant, the two are
68+
/// distinct objects with independent guards: a node may claim
69+
/// `require_local` and `require_shared` and receive both.
5970
///
6071
/// # Errors
6172
///
@@ -75,25 +86,54 @@ impl Capabilities {
7586
&self,
7687
) -> Result<Rc<C::Local>, Error> {
7788
let id = TypeId::of::<C>();
89+
90+
// Native local path. `Cell::take()` is the one-shot guard.
91+
if let Some(entry) = self.local.get(&id) {
92+
let produce = entry
93+
.produce
94+
.take()
95+
.ok_or_else(|| Error::CapabilityAlreadyConsumed {
96+
capability: C::name().to_owned(),
97+
})?;
98+
let rc_any = produce();
99+
let trait_object = rc_any
100+
.downcast_ref::<Rc<C::Local>>()
101+
.cloned()
102+
.unwrap_or_else(|| {
103+
panic!(
104+
"BUG: capability '{}': local entry type mismatch in registry",
105+
C::name(),
106+
)
107+
});
108+
entry.tracker_consumed.set(true);
109+
return Ok(trait_object);
110+
}
111+
112+
// SharedAsLocal fallback. The same `Cell::take()` on the
113+
// shared entry is the binding's one-shot guard, so claiming
114+
// the local-via-fallback accessor here naturally consumes
115+
// the native shared accessor too — a subsequent
116+
// `require_shared` returns [`Error::CapabilityAlreadyConsumed`].
78117
let entry = self
79-
.local
118+
.shared
80119
.get(&id)
81120
.ok_or_else(|| Error::CapabilityNotBound {
82121
capability: C::name().to_owned(),
83122
execution_model: "local",
84123
})?;
85-
if entry.claimed.replace(true) {
86-
return Err(Error::CapabilityAlreadyConsumed {
124+
let produce = entry
125+
.produce
126+
.take()
127+
.ok_or_else(|| Error::CapabilityAlreadyConsumed {
87128
capability: C::name().to_owned(),
88-
});
89-
}
90-
let rc_any = (entry.produce)();
129+
})?;
130+
let rc_any = (entry.adapt_as_local)(produce());
91131
let trait_object = rc_any
92132
.downcast_ref::<Rc<C::Local>>()
93133
.cloned()
94134
.unwrap_or_else(|| {
95135
panic!(
96-
"BUG: capability '{}': local entry type mismatch in registry",
136+
"BUG: capability '{}': SharedAsLocal adapter type mismatch in registry",
97137
C::name(),
98138
)
99139
});
@@ -135,12 +175,13 @@ impl Capabilities {
135175
capability: C::name().to_owned(),
136176
execution_model: "shared",
137177
})?;
138-
if entry.claimed.replace(true) {
139-
return Err(Error::CapabilityAlreadyConsumed {
178+
let produce = entry
179+
.produce
180+
.take()
181+
.ok_or_else(|| Error::CapabilityAlreadyConsumed {
140182
capability: C::name().to_owned(),
141-
});
142-
}
143-
let trait_object = (entry.produce)()
183+
})?;
184+
let trait_object = produce()
144185
.downcast::<Box<C::Shared>>()
145186
.map(|b| *b)
146187
.unwrap_or_else(|_| {
@@ -174,7 +215,9 @@ impl Capabilities {
174215
&self,
175216
) -> Result<Option<Rc<C::Local>>, Error> {
176217
let id = TypeId::of::<C>();
177-
if !self.local.contains_key(&id) {
218+
// Available either as a native local entry or as a
219+
// SharedAsLocal fallback through the shared entry.
220+
if !self.local.contains_key(&id) && !self.shared.contains_key(&id) {
178221
return Ok(None);
179222
}
180223
self.require_local::<C>().map(Some)

rust/otap-dataflow/crates/engine/src/capability/registry/entry.rs

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -141,31 +141,57 @@ impl SharedCapabilityEntry {
141141
// ── Resolved entries (per-node) ─────────────────────────────────────────────
142142

143143
/// A resolved local capability entry for a specific node.
144+
///
145+
/// Inserted only for **native** local registrations. The
146+
/// SharedAsLocal fallback path does *not* produce a separate local
147+
/// entry — instead, [`Capabilities::require_local`] falls through to
148+
/// the [`ResolvedSharedEntry`] when no native local entry exists,
149+
/// taking the shared `produce` cell and routing its output through
150+
/// the entry's `adapt_as_local` fn pointer. This collapses the
151+
/// fallback's two former entries into one, so the per-binding
152+
/// one-shot guard is encoded directly in the shared entry's
153+
/// `Cell::take()` — no separate claim flag needed.
144154
pub(crate) struct ResolvedLocalEntry {
145155
/// Per-node produce closure, cloned from the registry entry.
146-
pub(crate) produce: Box<dyn LocalProduce>,
147-
/// Per-node one-shot consumption flag for the consumer-side
148-
/// "claim each binding at most once per node" contract enforced
149-
/// by [`Capabilities`](super::Capabilities).
150-
pub(crate) claimed: Cell<bool>,
151-
/// Cross-node consumption flag for the underlying extension
152-
/// variant, shared with the [`ConsumedTracker`].
153156
///
154-
/// - For native local bindings: cell lives under the tracker's
155-
/// *local* bucket for the `(capability, extension)` pair.
156-
/// - For `SharedAsLocal` bindings: cell lives under the tracker's
157-
/// *shared* bucket — consuming the adapter is considered a use
158-
/// of the shared variant.
157+
/// Wrapped in `Cell<Option<_>>` so the consumer-side one-shot
158+
/// guard is encoded in the type: `Some` = unclaimed, `None` =
159+
/// already claimed. Consumption is a single `Cell::take()` that
160+
/// atomically moves the closure out and marks the binding as
161+
/// claimed; a second `take()` returns `None` and yields
162+
/// [`Error::CapabilityAlreadyConsumed`](super::Error::CapabilityAlreadyConsumed).
163+
pub(crate) produce: Cell<Option<Box<dyn LocalProduce>>>,
164+
/// Cross-node consumption flag for the local variant, shared
165+
/// with the [`ConsumedTracker`].
159166
pub(crate) tracker_consumed: Rc<Cell<bool>>,
160167
}
161168

162169
/// A resolved shared capability entry for a specific node.
170+
///
171+
/// Serves two roles. As a native shared entry, [`Capabilities::require_shared`]
172+
/// takes its `produce` cell to mint a `Box<dyn C::Shared>`. As the
173+
/// SharedAsLocal fallback for a binding with no native local entry,
174+
/// [`Capabilities::require_local`] takes the same cell and routes the
175+
/// output through `adapt_as_local` to mint an `Rc<dyn C::Local>`.
176+
/// Either path consumes the single underlying [`Cell::take()`], so the
177+
/// per-binding one-shot contract is enforced naturally without an
178+
/// auxiliary claim flag.
163179
pub(crate) struct ResolvedSharedEntry {
164180
/// Per-node produce closure, cloned from the registry entry.
165-
pub(crate) produce: Box<dyn SharedProduce>,
166-
/// Per-node one-shot consumption flag.
167-
pub(crate) claimed: Cell<bool>,
181+
///
182+
/// Wrapped in `Cell<Option<_>>` for the same reason as
183+
/// [`ResolvedLocalEntry::produce`]: `Cell::take()` doubles as
184+
/// the one-shot consumer-side guard, and is shared between the
185+
/// native shared and SharedAsLocal-fallback paths.
186+
pub(crate) produce: Cell<Option<Box<dyn SharedProduce>>>,
168187
/// Cross-node consumption flag, shared with the
169188
/// [`ConsumedTracker`].
170189
pub(crate) tracker_consumed: Rc<Cell<bool>>,
190+
/// Adapter fn pointer used by the SharedAsLocal fallback path
191+
/// in [`Capabilities::require_local`]. Takes a produced shared
192+
/// trait object (erased as `Box<dyn Any + Send>`) and returns a
193+
/// local trait object (erased as `Rc<dyn Any>` wrapping
194+
/// `Rc<dyn C::Local>`). Originally minted by the
195+
/// `#[capability]`-generated `shared_entry::<E>` caster.
196+
pub(crate) adapt_as_local: fn(Box<dyn Any + Send>) -> Rc<dyn Any>,
171197
}

rust/otap-dataflow/crates/engine/src/capability/registry/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub use storage::CapabilityRegistry;
3939

4040
// ── Crate-internal re-exports ────────────────────────────────────────────────
4141

42-
pub(crate) use entry::{LocalProduce, ResolvedLocalEntry, ResolvedSharedEntry};
42+
pub(crate) use entry::{ResolvedLocalEntry, ResolvedSharedEntry};
4343
// TODO(extension-system): wired by engine build phase in a follow-up PR;
4444
// until then only the test module imports it via `use super::*;`.
4545
#[allow(unused_imports)]

rust/otap-dataflow/crates/engine/src/capability/registry/resolve.rs

Lines changed: 18 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@
66
//! [`Capabilities`] for consumption.
77
88
use super::{
9-
Capabilities, CapabilityRegistry, ConsumedTracker, Error, LocalProduce, ResolvedLocalEntry,
9+
Capabilities, CapabilityRegistry, ConsumedTracker, Error, ResolvedLocalEntry,
1010
ResolvedSharedEntry,
1111
};
1212
use otap_df_config::ExtensionId;
1313
use std::any::TypeId;
1414
use std::collections::{HashMap, HashSet};
15-
use std::rc::Rc;
1615

1716
/// Resolves a node's capability bindings against the registry.
1817
///
@@ -108,13 +107,17 @@ pub(crate) fn resolve_bindings(
108107
)));
109108
}
110109

111-
// Resolve local entry: prefer a native local registration; else,
112-
// if the extension registered a shared entry, invoke the
113-
// capability's `SharedAsLocal` adapter to build a new local
114-
// wrapper around this node's own clone of the shared instance.
115-
let native_local = local_entry;
116-
117-
if let Some(local_entry) = native_local {
110+
// Resolve local entry. Native local registrations get their
111+
// own resolved entry; the SharedAsLocal fallback is *not*
112+
// materialized here — instead, [`Capabilities::require_local`]
113+
// falls through to the shared entry below and runs its
114+
// `adapt_as_local` adapter at consumption time. This collapses
115+
// the fallback's two former entries into one, so the
116+
// per-binding one-shot guard is the shared entry's
117+
// `Cell::take()`: claiming the local-via-fallback execution
118+
// model naturally consumes the native shared execution model
119+
// too.
120+
if let Some(local_entry) = local_entry {
118121
let tracker_consumed = tracker.ensure_local_consumer_slot(
119122
cap_type_id,
120123
known_cap.name,
@@ -124,8 +127,7 @@ pub(crate) fn resolve_bindings(
124127
let prior = local_entries.insert(
125128
cap_type_id,
126129
ResolvedLocalEntry {
127-
produce: local_entry.produce.clone_box(),
128-
claimed: std::cell::Cell::new(false),
130+
produce: std::cell::Cell::new(Some(local_entry.produce.clone_box())),
129131
tracker_consumed,
130132
},
131133
);
@@ -134,56 +136,11 @@ pub(crate) fn resolve_bindings(
134136
"resolve_bindings: duplicate local entry for capability '{cap_name_str}' \
135137
- the config layer should prevent two bindings with the same capability name",
136138
);
137-
} else if let Some(shared_entry) = shared_entry {
138-
// SharedAsLocal fallback: defer the shared mint to the
139-
// (single, one-shot) `require_local` call. The closure
140-
// captures a refcounted handle to the shared produce
141-
// closure plus the capability's `adapt_as_local` fn
142-
// pointer, then on invocation produces a new shared
143-
// instance and routes it through the adapter to yield a
144-
// type-erased `Rc<dyn Any>` wrapping the local trait
145-
// object.
146-
let adapt = shared_entry.adapt_as_local;
147-
let produce_shared: Rc<Box<dyn super::entry::SharedProduce>> =
148-
Rc::new(shared_entry.produce.clone_box());
149-
let local_produce: Box<dyn LocalProduce> = Box::new(move || adapt((*produce_shared)()));
150-
151-
// The extension only provided a shared variant — record
152-
// the adapter consumer's cross-node consumption against
153-
// the shared bucket. The `// Resolve shared entry` block
154-
// below also calls `ensure_shared_consumer_slot` for
155-
// this `(cap, ext)` pair; that method is get-or-insert,
156-
// so both users share the same `Rc<Cell<bool>>` for
157-
// tracker accounting. Per-node one-shot enforcement is
158-
// tracked separately in each entry's `claimed` cell, so
159-
// a node *can* claim both the local-via-shared and
160-
// native-shared sides of a fallback binding.
161-
let tracker_consumed = tracker.ensure_shared_consumer_slot(
162-
cap_type_id,
163-
known_cap.name,
164-
shared_entry.extension_id.clone(),
165-
);
166-
167-
let prior = local_entries.insert(
168-
cap_type_id,
169-
ResolvedLocalEntry {
170-
produce: local_produce,
171-
claimed: std::cell::Cell::new(false),
172-
tracker_consumed,
173-
},
174-
);
175-
debug_assert!(
176-
prior.is_none(),
177-
"resolve_bindings: duplicate local entry for capability '{cap_name_str}' \
178-
(SharedAsLocal fallback)",
179-
);
180139
}
181140

182-
// Resolve shared entry. If the SharedAsLocal fallback above
183-
// already called `ensure_shared_consumer_slot` for this
184-
// `(cap, ext)` pair, this call is a no-op lookup that returns
185-
// the same cell — get-or-insert idempotency keeps the two
186-
// users pointing at one slot.
141+
// Resolve shared entry. Used both as the native shared
142+
// binding and as the SharedAsLocal fallback target when no
143+
// native local entry exists.
187144
if let Some(shared_entry) = shared_entry {
188145
let tracker_consumed = tracker.ensure_shared_consumer_slot(
189146
cap_type_id,
@@ -194,9 +151,9 @@ pub(crate) fn resolve_bindings(
194151
let prior = shared_entries.insert(
195152
cap_type_id,
196153
ResolvedSharedEntry {
197-
produce: shared_entry.produce.clone_box(),
198-
claimed: std::cell::Cell::new(false),
154+
produce: std::cell::Cell::new(Some(shared_entry.produce.clone_box())),
199155
tracker_consumed,
156+
adapt_as_local: shared_entry.adapt_as_local,
200157
},
201158
);
202159
debug_assert!(

0 commit comments

Comments
 (0)