Skip to content

Commit 446e251

Browse files
feat(config): add optional field to EngineConfig (#2727)
Allow applications embedding the dataflow engine to carry their own engine-level configuration under `engine.custom`. The engine ignores this field entirely; embedding binaries can read namespaced keys for concerns like remote management, auth, or fleet coordination. # Change Summary Add an optional `custom: HashMap<String, serde_json::Value>` field to `EngineConfig`. This gives embedding applications an escape hatch for engine-level config without forking the config crate or pre-parsing YAML. The field defaults to an empty map and is omitted from serialized output when empty. ## What issue does this PR close? * Closes #2561 ## How are these changes tested? Three new unit tests in `engine.rs`: - `from_yaml_accepts_custom_config` — parses a config with multiple namespaced custom keys and verifies values - `custom_defaults_to_empty` — confirms the field defaults to an empty map when omitted - `custom_roundtrips_through_json` — serializes to JSON and deserializes back, verifying data is preserved ## Are there any user-facing changes? Yes. A new optional `custom` key is available under the `engine` section of the YAML/JSON config. Example: ```yaml engine: custom: remote_management: server_url: "ws://mgmt.example.com/v1" heartbeat_interval_secs: 10 custom_auth: provider: "oidc" token_endpoint: "https://auth.example.com/token" ``` Existing configs are unaffected since the field defaults to empty. --------- Co-authored-by: Lalit Kumar Bhasin <lalit_fin@yahoo.com>
1 parent ebf2403 commit 446e251

2 files changed

Lines changed: 117 additions & 3 deletions

File tree

rust/otap-dataflow/crates/config/src/engine.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::policy::{ChannelCapacityPolicy, Policies, TelemetryPolicy};
1818
use crate::topic::{TopicImplSelectionPolicy, TopicSpec};
1919
use schemars::JsonSchema;
2020
use serde::{Deserialize, Serialize};
21+
use serde_json::Value;
2122
use std::collections::HashMap;
2223

2324
pub use self::resolve::{
@@ -77,6 +78,15 @@ pub struct EngineConfig {
7778
/// Engine observability declarations.
7879
#[serde(default)]
7980
pub observability: EngineObservabilityConfig,
81+
82+
/// Opaque metadata for applications that embed the dataflow engine.
83+
///
84+
/// The engine ignores this field entirely — embedding binaries can
85+
/// read namespaced keys for their own engine-level concerns
86+
/// (remote management, auth, fleet coordination, etc.).
87+
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
88+
#[schemars(extend("x-kubernetes-preserve-unknown-fields" = true))]
89+
pub custom: HashMap<String, Value>,
8090
}
8191

8292
/// Engine-wide topic runtime settings.
@@ -319,6 +329,92 @@ groups:
319329
assert!(config.engine.observability.pipeline.is_some());
320330
}
321331

332+
#[test]
333+
fn from_yaml_accepts_custom_config() {
334+
let yaml = r#"
335+
version: otel_dataflow/v1
336+
engine:
337+
custom:
338+
remote_management:
339+
server_url: "ws://mgmt.example.com/v1"
340+
heartbeat_interval_secs: 10
341+
custom_auth:
342+
provider: "oidc"
343+
token_endpoint: "https://auth.example.com/token"
344+
groups:
345+
default:
346+
pipelines:
347+
main:
348+
nodes:
349+
receiver:
350+
type: "urn:test:receiver:example"
351+
config: null
352+
exporter:
353+
type: "urn:test:exporter:example"
354+
config: null
355+
connections:
356+
- from: receiver
357+
to: exporter
358+
"#;
359+
360+
let config = OtelDataflowSpec::from_yaml(yaml).expect("should parse custom config");
361+
assert_eq!(config.engine.custom.len(), 2);
362+
363+
let mgmt = config
364+
.engine
365+
.custom
366+
.get("remote_management")
367+
.expect("should have remote_management key");
368+
assert_eq!(mgmt["server_url"], "ws://mgmt.example.com/v1");
369+
assert_eq!(mgmt["heartbeat_interval_secs"], 10);
370+
371+
let auth = config
372+
.engine
373+
.custom
374+
.get("custom_auth")
375+
.expect("should have custom_auth key");
376+
assert_eq!(auth["provider"], "oidc");
377+
}
378+
379+
#[test]
380+
fn custom_defaults_to_empty() {
381+
let yaml = valid_engine_yaml(ENGINE_CONFIG_VERSION_V1);
382+
let config = OtelDataflowSpec::from_yaml(&yaml).expect("should parse");
383+
assert!(config.engine.custom.is_empty());
384+
}
385+
386+
#[test]
387+
fn custom_roundtrips_through_json() {
388+
let yaml = r#"
389+
version: otel_dataflow/v1
390+
engine:
391+
custom:
392+
my_app:
393+
key: "value"
394+
groups:
395+
default:
396+
pipelines:
397+
main:
398+
nodes:
399+
receiver:
400+
type: "urn:test:receiver:example"
401+
config: null
402+
exporter:
403+
type: "urn:test:exporter:example"
404+
config: null
405+
connections:
406+
- from: receiver
407+
to: exporter
408+
"#;
409+
410+
let config = OtelDataflowSpec::from_yaml(yaml).expect("should parse");
411+
let json = serde_json::to_string(&config).expect("should serialize to json");
412+
let roundtripped: OtelDataflowSpec =
413+
serde_json::from_str(&json).expect("should deserialize from json");
414+
assert_eq!(roundtripped.engine.custom.len(), 1);
415+
assert_eq!(roundtripped.engine.custom["my_app"]["key"], "value");
416+
}
417+
322418
#[test]
323419
fn from_yaml_requires_explicit_memory_limiter_mode() {
324420
let yaml = r#"

rust/otap-dataflow/docs/configuration-model.md

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,9 @@ Important behavior:
152152
- Graph topology is explicit via `connections`.
153153
- `kind` is inferred from node `type`.
154154
- For single-output nodes, you do not need `outputs` or `default_output`.
155-
- Parsing is strict: unknown fields are rejected (might be relaxed in the future
156-
if we find good use cases for extensibility, but for now this is intentional
157-
to catch typos and mistakes early).
155+
- Parsing is strict: unknown fields are rejected to catch typos and mistakes
156+
early. Applications that embed the engine can place their own engine-level
157+
configuration under `engine.custom` (see below).
158158

159159
## Engine Section
160160

@@ -165,6 +165,7 @@ Important behavior:
165165
- `telemetry`
166166
- `observed_state`
167167
- `observability`
168+
- `custom`
168169

169170
### Engine Topic Settings
170171

@@ -207,6 +208,23 @@ Optional observability policies are supported at:
207208

208209
`resources` is intentionally not supported for observability and is rejected.
209210

211+
### Custom Embedder Configuration
212+
213+
`engine.custom` is an optional map for applications that embed the dataflow
214+
engine as a library. The engine ignores this field entirely -- embedding
215+
binaries can read namespaced keys for their own engine-level concerns.
216+
217+
```yaml
218+
engine:
219+
custom:
220+
remote_management:
221+
server_url: "ws://mgmt.example.com/v1"
222+
heartbeat_interval_secs: 10
223+
custom_auth:
224+
provider: "oidc"
225+
token_endpoint: "https://auth.example.com/token"
226+
```
227+
210228
## Policy Hierarchy
211229

212230
Policies include channel capacity, health, runtime telemetry, resources

0 commit comments

Comments
 (0)