@@ -18,6 +18,7 @@ use crate::policy::{ChannelCapacityPolicy, Policies, TelemetryPolicy};
1818use crate :: topic:: { TopicImplSelectionPolicy , TopicSpec } ;
1919use schemars:: JsonSchema ;
2020use serde:: { Deserialize , Serialize } ;
21+ use serde_json:: Value ;
2122use std:: collections:: HashMap ;
2223
2324pub use self :: resolve:: {
@@ -77,6 +78,15 @@ pub struct EngineConfig {
7778 /// Engine observability declarations.
7879 #[ serde( default ) ]
7980 pub observability : EngineObservabilityConfig ,
81+
82+ /// Opaque metadata for applications that embed the dataflow engine.
83+ ///
84+ /// The engine ignores this field entirely — embedding binaries can
85+ /// read namespaced keys for their own engine-level concerns
86+ /// (remote management, auth, fleet coordination, etc.).
87+ #[ serde( default , skip_serializing_if = "HashMap::is_empty" ) ]
88+ #[ schemars( extend( "x-kubernetes-preserve-unknown-fields" = true ) ) ]
89+ pub custom : HashMap < String , Value > ,
8090}
8191
8292/// Engine-wide topic runtime settings.
@@ -319,6 +329,92 @@ groups:
319329 assert ! ( config. engine. observability. pipeline. is_some( ) ) ;
320330 }
321331
332+ #[ test]
333+ fn from_yaml_accepts_custom_config ( ) {
334+ let yaml = r#"
335+ version: otel_dataflow/v1
336+ engine:
337+ custom:
338+ remote_management:
339+ server_url: "ws://mgmt.example.com/v1"
340+ heartbeat_interval_secs: 10
341+ custom_auth:
342+ provider: "oidc"
343+ token_endpoint: "https://auth.example.com/token"
344+ groups:
345+ default:
346+ pipelines:
347+ main:
348+ nodes:
349+ receiver:
350+ type: "urn:test:receiver:example"
351+ config: null
352+ exporter:
353+ type: "urn:test:exporter:example"
354+ config: null
355+ connections:
356+ - from: receiver
357+ to: exporter
358+ "# ;
359+
360+ let config = OtelDataflowSpec :: from_yaml ( yaml) . expect ( "should parse custom config" ) ;
361+ assert_eq ! ( config. engine. custom. len( ) , 2 ) ;
362+
363+ let mgmt = config
364+ . engine
365+ . custom
366+ . get ( "remote_management" )
367+ . expect ( "should have remote_management key" ) ;
368+ assert_eq ! ( mgmt[ "server_url" ] , "ws://mgmt.example.com/v1" ) ;
369+ assert_eq ! ( mgmt[ "heartbeat_interval_secs" ] , 10 ) ;
370+
371+ let auth = config
372+ . engine
373+ . custom
374+ . get ( "custom_auth" )
375+ . expect ( "should have custom_auth key" ) ;
376+ assert_eq ! ( auth[ "provider" ] , "oidc" ) ;
377+ }
378+
379+ #[ test]
380+ fn custom_defaults_to_empty ( ) {
381+ let yaml = valid_engine_yaml ( ENGINE_CONFIG_VERSION_V1 ) ;
382+ let config = OtelDataflowSpec :: from_yaml ( & yaml) . expect ( "should parse" ) ;
383+ assert ! ( config. engine. custom. is_empty( ) ) ;
384+ }
385+
386+ #[ test]
387+ fn custom_roundtrips_through_json ( ) {
388+ let yaml = r#"
389+ version: otel_dataflow/v1
390+ engine:
391+ custom:
392+ my_app:
393+ key: "value"
394+ groups:
395+ default:
396+ pipelines:
397+ main:
398+ nodes:
399+ receiver:
400+ type: "urn:test:receiver:example"
401+ config: null
402+ exporter:
403+ type: "urn:test:exporter:example"
404+ config: null
405+ connections:
406+ - from: receiver
407+ to: exporter
408+ "# ;
409+
410+ let config = OtelDataflowSpec :: from_yaml ( yaml) . expect ( "should parse" ) ;
411+ let json = serde_json:: to_string ( & config) . expect ( "should serialize to json" ) ;
412+ let roundtripped: OtelDataflowSpec =
413+ serde_json:: from_str ( & json) . expect ( "should deserialize from json" ) ;
414+ assert_eq ! ( roundtripped. engine. custom. len( ) , 1 ) ;
415+ assert_eq ! ( roundtripped. engine. custom[ "my_app" ] [ "key" ] , "value" ) ;
416+ }
417+
322418 #[ test]
323419 fn from_yaml_requires_explicit_memory_limiter_mode ( ) {
324420 let yaml = r#"
0 commit comments