Skip to content

Commit 993fa36

Browse files
gousluCopilot
andauthored
feat(engine): Extension System - Capability Registry & Resolver (#2732)
# Extension system — Phase 1 (capabilities, registry, builder, proc macro) Implements the Phase-1 extension/capability system for the OTAP dataflow engine. Extensions are first-class config siblings of nodes; nodes explicitly bind to extension instances via named capabilities, and receive typed handles resolved once at factory time — no hot-path registry lookups. Tracking docs: [`docs/extension-system-architecture.md`](rust/otap-dataflow/docs/extension-system-architecture.md) (rewritten in this PR). ## What's in this PR ### `#[capability]` proc macro (`engine-macros`) - New `capability.rs` expansion: from a single `#[capability] trait Foo { ... }` source it generates `local::Foo` (`!Send`-friendly) and `shared::Foo` (`Send + Clone`) trait variants plus a `SharedAsLocal` adapter and an `ExtensionCapability` impl. The dual variants are derived from one source, so authors can't accidentally let local and shared semantics diverge. - New `pipeline_factory.rs` expansion to build the static `PipelineFactory` registry used by `main.rs`. - All emitted paths use fully-qualified `::std::...` / `::async_trait::...` / `::otap_df_engine::...` so generated code is hygienic in any caller crate. ### Capability registry (`engine::capability`) - `CapabilityRegistry`: typed-keyed (`(extension_name, TypeId)`) storage with **typestate-enforced** single `.shared()` / `.local()` registration per builder — duplicates are unrepresentable rather than runtime errors. - Two execution models: native local (`Rc<dyn Local>`, lock-free) and native shared (`Box<dyn Shared>`, `Send + Clone`). A shared-only extension serves local consumers transparently via the `SharedAsLocal` adapter generated by the proc macro. - Two **instance policies** chosen at build time, invisible to consumers: `.cloned()` (clone a stored prototype) and `.constructed()` (per-consumer construction via a closure; Passive-only — `Active + Constructed` is statically rejected). - `resolve_bindings`: walks a node's `capabilities:` declaration and produces a per-node `Capabilities` bundle with all bindings resolved, surfacing config errors (unknown extension, unknown capability, capability not provided by bound extension, multiple bindings for the same capability). - `Capabilities`: per-node consumer API with `require_local`, `require_shared`, `optional_local`, `optional_shared`. Instances are minted lazily at the call site, not at resolution time. - `ConsumedTracker`: cross-node, per-(capability, extension) consumption flags driving `drop_local()` / `drop_shared()` cleanup for extensions no node ever claimed. ### One-shot consumption contract A binding is claimable **at most once per node**, regardless of execution model. The guard is the `Cell<Option<_>>::take()` on each resolved entry's `produce` closure — no auxiliary flag. - Same accessor twice → `CapabilityAlreadyConsumed`. - Different accessors on a SharedAsLocal-fallback binding share one underlying entry, so claiming either side consumes the other naturally. - Different accessors on a native-dual binding (extension registered both native local **and** native shared) take and drop the alternative entry's `produce` closure on success, so the per-binding contract holds uniformly. The cross-node tracker is only flipped by actual consumption, not by invalidation, so `drop_*` cleanup remains correct. ### Documentation - `docs/extension-system-architecture.md`: rewritten to describe the capability-based design, the local/shared duality, instance policies, Active vs Passive lifecycle, and the typestate builder. ## Tests New, focused unit tests cover: - Registry: typestate single-registration, duplicate rejection, `SharedAsLocal` adapter freshness per node, double-`Box` envelope for shared `produce`. - `resolve_bindings`: every error path (unknown extension / unknown capability / capability not provided / wrong extension), local-only and shared-only binding shapes, fallback path, native-dual path. - One-shot contract: second-call rejection on each of `require_local`, `require_shared`, `optional_local`, `optional_shared`; fallback cross-side rejection; native-dual cross-side rejection (both directions). - `ConsumedTracker`: per-extension consumption flags, with the invariant that mere invalidation does not flip a bucket. - Proc-macro end-to-end: `local-only`, `shared-only`, and `dual` forms of `extension_capabilities!` against the registry. ## Validation ```text cargo xtask check ✅ Cargo workspace structure complies with project policies. ✅ Formatting completed successfully. ✅ Clippy linting passed without warnings. ✅ All tests passed successfully. ``` --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 74bcd98 commit 993fa36

26 files changed

Lines changed: 6189 additions & 2281 deletions

File tree

rust/otap-dataflow/crates/config/src/node.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
use crate::error::Error;
1212
use crate::pipeline::telemetry::{AttributeValue, TelemetryAttribute};
1313
use crate::transport_headers_policy::{HeaderCapturePolicy, HeaderPropagationPolicy};
14-
use crate::{CapabilityId, Description, NodeId, NodeUrn, PortName};
14+
use crate::{CapabilityId, Description, ExtensionId, NodeUrn, PortName};
1515
use schemars::JsonSchema;
1616
use serde::{Deserialize, Serialize};
1717
use serde_json::Value;
@@ -25,7 +25,7 @@ use std::collections::HashMap;
2525
/// and returns an error so the user gets immediate feedback.
2626
fn deserialize_no_dup_keys<'de, D>(
2727
deserializer: D,
28-
) -> Result<HashMap<CapabilityId, NodeId>, D::Error>
28+
) -> Result<HashMap<CapabilityId, ExtensionId>, D::Error>
2929
where
3030
D: serde::Deserializer<'de>,
3131
{
@@ -35,7 +35,7 @@ where
3535
struct NoDupVisitor;
3636

3737
impl<'de> Visitor<'de> for NoDupVisitor {
38-
type Value = HashMap<CapabilityId, NodeId>;
38+
type Value = HashMap<CapabilityId, ExtensionId>;
3939

4040
fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4141
f.write_str("a map with no duplicate keys")
@@ -49,7 +49,7 @@ where
4949
"duplicate capability key '{key}'"
5050
)));
5151
}
52-
let _ = result.insert(CapabilityId::from(key), NodeId::from(value));
52+
let _ = result.insert(CapabilityId::from(key), ExtensionId::from(value));
5353
}
5454
Ok(result)
5555
}
@@ -115,7 +115,7 @@ pub struct NodeUserConfig {
115115
skip_serializing_if = "HashMap::is_empty",
116116
deserialize_with = "deserialize_no_dup_keys"
117117
)]
118-
pub capabilities: HashMap<CapabilityId, NodeId>,
118+
pub capabilities: HashMap<CapabilityId, ExtensionId>,
119119

120120
/// Entity configuration for the node.
121121
///

0 commit comments

Comments
 (0)