Skip to content

Commit 84c7795

Browse files
committed
improvements
1 parent 61d3bac commit 84c7795

7 files changed

Lines changed: 222 additions & 56 deletions

File tree

rust/otap-dataflow/crates/engine-macros/src/capability.rs

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,22 @@ fn trait_methods(trait_item: &ItemTrait) -> Vec<&TraitItemFn> {
178178
.collect()
179179
}
180180

181+
/// Emit a trait-method signature for a generated `local::` / `shared::`
182+
/// trait: keep the attributes, preserve a default body if present, or
183+
/// emit a `;`-terminated signature otherwise. Both generated traits
184+
/// want the same shape \u2014 the only difference between them is the
185+
/// `#[async_trait]` vs `#[async_trait(?Send)]` outer attribute added by
186+
/// the caller.
187+
fn emit_method(m: &TraitItemFn) -> TokenStream {
188+
let sig = &m.sig;
189+
let attrs = &m.attrs;
190+
if let Some(body) = &m.default {
191+
quote! { #(#attrs)* #sig #body }
192+
} else {
193+
quote! { #(#attrs)* #sig; }
194+
}
195+
}
196+
181197
/// Generate the full output for a `#[capability(...)]` annotation.
182198
pub(crate) fn expand_capability(args: CapabilityArgs, trait_item: ItemTrait) -> TokenStream {
183199
if let Err(err) = validate_trait(&trait_item) {
@@ -202,34 +218,11 @@ pub(crate) fn expand_capability(args: CapabilityArgs, trait_item: ItemTrait) ->
202218
}
203219
let known_cap_static = format_ident!("_KNOWN_CAP_{}", static_suffix);
204220

205-
// Generate method signatures for local trait (#[async_trait(?Send)])
206-
// Methods with default bodies preserve them; methods without get `;`.
207-
let local_methods: Vec<TokenStream> = methods
208-
.iter()
209-
.map(|m| {
210-
let sig = &m.sig;
211-
let attrs = &m.attrs;
212-
if let Some(body) = &m.default {
213-
quote! { #(#attrs)* #sig #body }
214-
} else {
215-
quote! { #(#attrs)* #sig; }
216-
}
217-
})
218-
.collect();
219-
220-
// Generate method signatures for shared trait (#[async_trait] + Send)
221-
let shared_methods: Vec<TokenStream> = methods
222-
.iter()
223-
.map(|m| {
224-
let sig = &m.sig;
225-
let attrs = &m.attrs;
226-
if let Some(body) = &m.default {
227-
quote! { #(#attrs)* #sig #body }
228-
} else {
229-
quote! { #(#attrs)* #sig; }
230-
}
231-
})
232-
.collect();
221+
// Generate method signatures for the local trait (#[async_trait(?Send)])
222+
// and the shared trait (#[async_trait] + Send). Shape is identical
223+
// between the two; only the outer async_trait attribute differs.
224+
let local_methods: Vec<TokenStream> = methods.iter().map(|m| emit_method(m)).collect();
225+
let shared_methods: Vec<TokenStream> = methods.iter().map(|m| emit_method(m)).collect();
233226

234227
// Generate SharedAsLocal adapter delegation methods
235228
let adapter_methods: Vec<TokenStream> = methods

rust/otap-dataflow/crates/engine/src/capability/mod.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,10 @@ pub trait ExtensionCapability: private::Sealed + 'static {
100100
/// inside this crate, so external crates still can't reach `Sealed` to
101101
/// forge an `ExtensionCapability` impl.
102102
#[doc(hidden)]
103-
#[allow(unused_imports)] // used by future `#[capability]` invocations
103+
// Suppressed until the first `#[capability]` invocation lands alongside
104+
// its first consumer (PR4+). Until then no generated code references the
105+
// re-export and rustc would otherwise warn.
106+
#[allow(unused_imports)]
104107
pub(crate) use private::Sealed as CapabilitySealed;
105108

106109
// ── KNOWN_CAPABILITIES (link-time registration) ──────────────────────────────
@@ -155,6 +158,9 @@ pub static KNOWN_CAPABILITIES: [KnownCapability] = [..];
155158
/// appropriate `*InstanceFactory`. The fn pointer internally builds one
156159
/// [`SharedCapabilityEntry`](registry::SharedCapabilityEntry) per
157160
/// listed capability and inserts it into the registry.
161+
///
162+
/// Returns [`registry::Error::InternalError`] on a duplicate
163+
/// `(capability, extension)` insert.
158164
#[derive(Clone)]
159165
pub struct ExtensionCapabilities {
160166
/// Capability names provided by the **shared** variant.
@@ -212,8 +218,9 @@ impl ExtensionCapabilities {
212218

213219
/// Declares which capabilities an extension provides.
214220
///
215-
/// The left side names the extension type(s); the right side is a single
216-
/// capability list that applies to both sides (no per-side divergence).
221+
/// The left-hand side names the extension type(s) — one or two,
222+
/// depending on form — and the right-hand side is a single capability
223+
/// list shared by both execution models.
217224
/// Three forms:
218225
///
219226
/// ```rust,ignore

rust/otap-dataflow/crates/engine/src/capability/registry/capabilities.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,10 @@ impl Capabilities {
5959
///
6060
/// # Errors
6161
///
62-
/// - [`Error::ConfigError`] if no extension is bound to this
63-
/// capability for this node. The user must add a binding to the
64-
/// node's config.
62+
/// - [`Error::CapabilityNotBound`] if no extension is bound to this
63+
/// capability for this node. Either add the binding to the
64+
/// node's capability declaration or switch to
65+
/// [`Self::optional_local`].
6566
/// - [`Error::CapabilityAlreadyConsumed`] if the capability was
6667
/// already claimed on this node.
6768
///
@@ -74,14 +75,13 @@ impl Capabilities {
7475
&self,
7576
) -> Result<Rc<C::Local>, Error> {
7677
let id = TypeId::of::<C>();
77-
let entry = self.local.get(&id).ok_or_else(|| {
78-
Error::ConfigError(Box::new(otap_df_config::error::Error::InvalidUserConfig {
79-
error: format!(
80-
"required local capability '{}' not bound for this node",
81-
C::name(),
82-
),
83-
}))
84-
})?;
78+
let entry = self
79+
.local
80+
.get(&id)
81+
.ok_or_else(|| Error::CapabilityNotBound {
82+
capability: C::name().to_owned(),
83+
execution_model: "local",
84+
})?;
8585
if entry.claimed.replace(true) {
8686
return Err(Error::CapabilityAlreadyConsumed {
8787
capability: C::name().to_owned(),
@@ -112,9 +112,10 @@ impl Capabilities {
112112
///
113113
/// # Errors
114114
///
115-
/// - [`Error::ConfigError`] if no extension is bound to this
116-
/// capability for this node. The user must add a binding to the
117-
/// node's config.
115+
/// - [`Error::CapabilityNotBound`] if no extension is bound to this
116+
/// capability for this node. Either add the binding to the
117+
/// node's capability declaration or switch to
118+
/// [`Self::optional_shared`].
118119
/// - [`Error::CapabilityAlreadyConsumed`] if the capability was
119120
/// already claimed on this node.
120121
///
@@ -127,14 +128,13 @@ impl Capabilities {
127128
&self,
128129
) -> Result<Box<C::Shared>, Error> {
129130
let id = TypeId::of::<C>();
130-
let entry = self.shared.get(&id).ok_or_else(|| {
131-
Error::ConfigError(Box::new(otap_df_config::error::Error::InvalidUserConfig {
132-
error: format!(
133-
"required shared capability '{}' not bound for this node",
134-
C::name(),
135-
),
136-
}))
137-
})?;
131+
let entry = self
132+
.shared
133+
.get(&id)
134+
.ok_or_else(|| Error::CapabilityNotBound {
135+
capability: C::name().to_owned(),
136+
execution_model: "shared",
137+
})?;
138138
if entry.claimed.replace(true) {
139139
return Err(Error::CapabilityAlreadyConsumed {
140140
capability: C::name().to_owned(),

rust/otap-dataflow/crates/engine/src/capability/registry/resolve.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,19 @@ pub(crate) fn resolve_bindings(
121121
local_entry.extension_id.clone(),
122122
);
123123

124-
let _ = local_entries.insert(
124+
let prior = local_entries.insert(
125125
cap_type_id,
126126
ResolvedLocalEntry {
127127
produce: local_entry.produce.clone_box(),
128128
claimed: std::cell::Cell::new(false),
129129
tracker_consumed,
130130
},
131131
);
132+
debug_assert!(
133+
prior.is_none(),
134+
"resolve_bindings: duplicate local entry for capability '{cap_name_str}' \
135+
- the config layer should prevent two bindings with the same capability name",
136+
);
132137
} else if let Some(shared_entry) = shared_entry {
133138
// SharedAsLocal fallback: defer the shared mint to the
134139
// (single, one-shot) `require_local` call. The closure
@@ -159,14 +164,19 @@ pub(crate) fn resolve_bindings(
159164
shared_entry.extension_id.clone(),
160165
);
161166

162-
let _ = local_entries.insert(
167+
let prior = local_entries.insert(
163168
cap_type_id,
164169
ResolvedLocalEntry {
165170
produce: local_produce,
166171
claimed: std::cell::Cell::new(false),
167172
tracker_consumed,
168173
},
169174
);
175+
debug_assert!(
176+
prior.is_none(),
177+
"resolve_bindings: duplicate local entry for capability '{cap_name_str}' \
178+
(SharedAsLocal fallback)",
179+
);
170180
}
171181

172182
// Resolve shared entry. If the SharedAsLocal fallback above
@@ -181,14 +191,18 @@ pub(crate) fn resolve_bindings(
181191
shared_entry.extension_id.clone(),
182192
);
183193

184-
let _ = shared_entries.insert(
194+
let prior = shared_entries.insert(
185195
cap_type_id,
186196
ResolvedSharedEntry {
187197
produce: shared_entry.produce.clone_box(),
188198
claimed: std::cell::Cell::new(false),
189199
tracker_consumed,
190200
},
191201
);
202+
debug_assert!(
203+
prior.is_none(),
204+
"resolve_bindings: duplicate shared entry for capability '{cap_name_str}'",
205+
);
192206
}
193207
}
194208

rust/otap-dataflow/crates/engine/src/capability/registry/tests.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,3 +939,110 @@ fn test_end_to_end_shared_fresh_policy_mints_independent_instances() {
939939
"fresh factory should have been invoked exactly 2x (one per require_shared consumer)"
940940
);
941941
}
942+
943+
// ── Envelope / error-shape regression tests ─────────────────────────────────
944+
945+
/// The shared-side registry envelope must be `Box<Box<dyn C::Shared>>`
946+
/// erased as `Box<dyn Any + Send>`. `require_shared` downcasts the
947+
/// outer `Any` to `Box<dyn C::Shared>` then dereferences. This test
948+
/// pins the shape so anyone refactoring the macro cannot collapse the
949+
/// double-box without the consumer path failing loudly.
950+
#[test]
951+
fn test_shared_entry_produce_uses_double_box_envelope() {
952+
let factory = shared_instance_factory("envelope-val");
953+
let entry = TestCap::shared_entry::<SharedImpl>("ext-env".into(), factory);
954+
955+
// Directly invoke the stored produce closure and downcast using
956+
// the exact shape the consumer (`require_shared`) relies on.
957+
let erased: Box<dyn Any + Send> = (entry.produce)();
958+
let boxed_trait_object: Box<Box<dyn TestCapShared>> = erased
959+
.downcast::<Box<dyn TestCapShared>>()
960+
.expect("shared_entry must emit Box<Box<dyn C::Shared>> erased as Box<dyn Any + Send>");
961+
assert_eq!((*boxed_trait_object).value(), "envelope-val");
962+
}
963+
964+
/// `require_local` on an unbound capability must return the dedicated
965+
/// `CapabilityNotBound` variant (not a generic `ConfigError`) so the
966+
/// diagnostic points at "node-code/declaration mismatch", not
967+
/// "user YAML problem".
968+
#[test]
969+
fn test_require_local_unbound_returns_capability_not_bound() {
970+
let reg = CapabilityRegistry::new();
971+
let mut tracker = ConsumedTracker::new();
972+
let caps = resolve_bindings(&HashMap::new(), &reg, &known_exts(&[]), &mut tracker).unwrap();
973+
974+
match caps.require_local::<TestCap>() {
975+
Err(Error::CapabilityNotBound {
976+
capability,
977+
execution_model,
978+
}) => {
979+
assert_eq!(capability, "test_cap");
980+
assert_eq!(execution_model, "local");
981+
}
982+
Err(other) => panic!("expected CapabilityNotBound, got {other:?}"),
983+
Ok(_) => panic!("expected CapabilityNotBound, got Ok"),
984+
}
985+
}
986+
987+
#[test]
988+
fn test_require_shared_unbound_returns_capability_not_bound() {
989+
let reg = CapabilityRegistry::new();
990+
let mut tracker = ConsumedTracker::new();
991+
let caps = resolve_bindings(&HashMap::new(), &reg, &known_exts(&[]), &mut tracker).unwrap();
992+
993+
match caps.require_shared::<TestCap>() {
994+
Err(Error::CapabilityNotBound {
995+
capability,
996+
execution_model,
997+
}) => {
998+
assert_eq!(capability, "test_cap");
999+
assert_eq!(execution_model, "shared");
1000+
}
1001+
Err(other) => panic!("expected CapabilityNotBound, got {other:?}"),
1002+
Ok(_) => panic!("expected CapabilityNotBound, got Ok"),
1003+
}
1004+
}
1005+
1006+
/// When a node binds a shared-only extension through its *local*-facing
1007+
/// capability (the `SharedAsLocal` fallback), consumption of the
1008+
/// fallback adapter must flip the shared bucket of `ConsumedTracker`
1009+
/// — not a phantom local bucket. Otherwise `unconsumed_shared` would
1010+
/// claim the shared variant is unused and the engine would drop it
1011+
/// while the adapter still depended on it.
1012+
#[test]
1013+
fn test_fallback_local_consumption_flips_shared_bucket() {
1014+
let mut reg = CapabilityRegistry::new();
1015+
register_shared(&mut reg, "ext-only-shared", "val");
1016+
1017+
let mut tracker = ConsumedTracker::new();
1018+
let caps = resolve_bindings(
1019+
&bindings("test_cap", "ext-only-shared"),
1020+
&reg,
1021+
&known_exts(&["ext-only-shared"]),
1022+
&mut tracker,
1023+
)
1024+
.unwrap();
1025+
1026+
// Before claim: both buckets show the shared extension unconsumed.
1027+
assert!(
1028+
tracker
1029+
.unconsumed_shared()
1030+
.iter()
1031+
.any(|(ext, _)| ext.as_ref() == "ext-only-shared"),
1032+
"shared bucket should list ext-only-shared before the fallback claim",
1033+
);
1034+
assert!(
1035+
tracker.unconsumed_local().is_empty(),
1036+
"no native local registration was made, so the local bucket must be empty",
1037+
);
1038+
1039+
// Fallback claim (local-facing, but backed by the shared factory).
1040+
let _ = caps.require_local::<TestCap>().unwrap();
1041+
1042+
// After claim: shared bucket flips (the shared *variant* is what
1043+
// got used).
1044+
assert!(
1045+
tracker.unconsumed_shared().is_empty(),
1046+
"fallback consumption must flip the shared bucket",
1047+
);
1048+
}

rust/otap-dataflow/crates/engine/src/error.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,26 @@ pub enum Error {
506506
/// The capability name.
507507
capability: String,
508508
},
509+
510+
/// A node factory called `require_local` / `require_shared` for a
511+
/// capability that the node's own configuration did not bind.
512+
///
513+
/// This almost always indicates a node-code / node-declaration
514+
/// mismatch: either the node's factory signals a required
515+
/// capability the node template forgot to declare, or the user's
516+
/// pipeline config is missing the binding entry for an optional
517+
/// provider the node expects. The message surfaces both
518+
/// interpretations.
519+
#[error(
520+
"required {execution_model} capability '{capability}' is not bound for this node; \
521+
add it to the node's capability bindings or switch to optional_{execution_model}"
522+
)]
523+
CapabilityNotBound {
524+
/// The capability name.
525+
capability: String,
526+
/// Execution model requested by the caller: `"local"` or `"shared"`.
527+
execution_model: &'static str,
528+
},
509529
}
510530

511531
impl Error {
@@ -555,6 +575,7 @@ impl Error {
555575
Error::SubscribeSingleGroupViolation => "SubscribeSingleGroupViolation",
556576
Error::SubscriptionClosed => "SubscriptionClosed",
557577
Error::CapabilityAlreadyConsumed { .. } => "CapabilityAlreadyConsumed",
578+
Error::CapabilityNotBound { .. } => "CapabilityNotBound",
558579
}
559580
.to_owned()
560581
}

0 commit comments

Comments
 (0)