Skip to content

Commit 516802f

Browse files
authored
Validation framework testcontainer env var update (open-telemetry#2506)
# Change Summary Added a new struct to describe a template env var to allow for situations where env var needs the host port which the framework sets dynamically ## What issue does this PR close? * Closes open-telemetry#2499 ## How are these changes tested? unit test ## Are there any user-facing changes? no
1 parent ca718d4 commit 516802f

3 files changed

Lines changed: 335 additions & 1 deletion

File tree

rust/otap-dataflow/crates/validation/README.md

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,9 +472,45 @@ let localstack = ContainerConfig::new("localstack/localstack", "3.4")
472472
Docker image and tag
473473
- `.env(key, value)` - set an environment variable on the container
474474
- can be chained multiple times
475+
- `.env_host_port(key, value_template, internal_port)` - set an environment
476+
variable whose value is a Jinja2 template; `{{ host_port }}` is replaced
477+
with the host port mapped to `internal_port` after port allocation
478+
- can be chained multiple times
479+
- if no connection maps the given `internal_port`, the framework
480+
auto-allocates a host port during config wiring
481+
- the resolved host port is consistent with the port used for Docker
482+
port mapping and any `ContainerConnection` or
483+
`PipelineContainerConnection` referencing the same internal port
475484
- `.entrypoint(cmd)` - override the container's entrypoint
476485
- optional
477486

487+
##### Templated environment variables
488+
489+
Some containers require environment variables that reference the host port
490+
assigned to one of their exposed ports. For example, Kafka's advertised
491+
listeners must contain the host-reachable address so that clients outside
492+
the container can connect. Use `env_host_port` for this:
493+
494+
```rust
495+
use otap_df_validation::ContainerConfig;
496+
497+
let kafka = ContainerConfig::new("confluentinc/cp-kafka", "7.5.0")
498+
.env("KAFKA_NODE_ID", "1")
499+
.env_host_port(
500+
"KAFKA_ADVERTISED_LISTENERS",
501+
"PLAINTEXT://127.0.0.1:{{ host_port }}",
502+
9092,
503+
);
504+
```
505+
506+
After config wiring, if host port 54321 was allocated for container port
507+
9092, the container starts with `KAFKA_ADVERTISED_LISTENERS` set to
508+
`PLAINTEXT://127.0.0.1:54321`.
509+
510+
If a `PipelineContainerConnection` or `ContainerConnection` also
511+
references internal port 9092 on the same container, they all share the
512+
same allocated host port.
513+
478514
Register the container on the scenario with `add_container`:
479515

480516
```rust
@@ -671,7 +707,12 @@ Scenario::new()
671707
"localstack",
672708
ContainerConfig::new("localstack/localstack", "3.4")
673709
.env("SERVICES", "s3")
674-
.env("DEFAULT_REGION", "us-east-1"),
710+
.env("DEFAULT_REGION", "us-east-1")
711+
.env_host_port(
712+
"LOCALSTACK_HOST",
713+
"127.0.0.1:{{ host_port }}",
714+
4566,
715+
),
675716
)
676717
.add_generator(
677718
"gen",

rust/otap-dataflow/crates/validation/src/container.rs

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,24 @@
1111
//! interact with plain Rust types defined here.
1212
1313
use crate::error::ValidationError;
14+
use crate::template::render_jinja;
15+
use minijinja::context;
1416
use std::collections::HashMap;
1517
use testcontainers::core::IntoContainerPort;
1618
use testcontainers::core::WaitFor;
1719
use testcontainers::runners::AsyncRunner;
1820
use testcontainers::{ContainerAsync, GenericImage, ImageExt};
1921

22+
/// An environment variable whose value is a Jinja2 template resolved
23+
/// after host port allocation. `{{ host_port }}` in the template is
24+
/// replaced with the host port mapped to `internal_port`.
25+
#[derive(Debug, Clone)]
26+
pub(crate) struct TemplatedEnvVar {
27+
pub(crate) key: String,
28+
pub(crate) value_template: String,
29+
pub(crate) internal_port: u16,
30+
}
31+
2032
/// Describes a Docker container to run alongside the validation scenario.
2133
///
2234
/// Use the builder methods to configure the image and environment variables.
@@ -43,6 +55,9 @@ pub struct ContainerConfig {
4355
/// with host port as the value. Populated by the framework during
4456
/// config wiring for container connections.
4557
pub(crate) mapped_ports: HashMap<u16, u16>,
58+
/// Templated environment variables to be resolved after port allocation.
59+
/// `None` when no templated env vars have been set.
60+
pub(crate) templated_env_vars: Option<Vec<TemplatedEnvVar>>,
4661
}
4762

4863
impl ContainerConfig {
@@ -55,6 +70,7 @@ impl ContainerConfig {
5570
env_vars: Vec::new(),
5671
entrypoint: None,
5772
mapped_ports: HashMap::new(),
73+
templated_env_vars: None,
5874
}
5975
}
6076

@@ -72,6 +88,61 @@ impl ContainerConfig {
7288
self
7389
}
7490

91+
/// Set an environment variable whose value is a Jinja2 template.
92+
/// After port allocation, `{{ host_port }}` is replaced with the host
93+
/// port mapped to `internal_port`. If no connection maps that port,
94+
/// the framework auto-allocates one during config wiring.
95+
///
96+
/// # Example
97+
///
98+
/// ```ignore
99+
/// ContainerConfig::new("confluentinc/cp-kafka", "7.5.0")
100+
/// .env_host_port(
101+
/// "KAFKA_ADVERTISED_LISTENERS",
102+
/// "PLAINTEXT://127.0.0.1:{{ host_port }}",
103+
/// 9092,
104+
/// )
105+
/// ```
106+
#[must_use]
107+
pub fn env_host_port(
108+
mut self,
109+
key: impl Into<String>,
110+
value_template: impl Into<String>,
111+
internal_port: u16,
112+
) -> Self {
113+
self.templated_env_vars
114+
.get_or_insert_with(Vec::new)
115+
.push(TemplatedEnvVar {
116+
key: key.into(),
117+
value_template: value_template.into(),
118+
internal_port,
119+
});
120+
self
121+
}
122+
123+
/// Resolve all templated environment variables by rendering their
124+
/// Jinja2 templates with `host_port` set to the allocated host port
125+
/// from `mapped_ports`. Resolved values are appended to `env_vars`
126+
/// in FIFO order. After resolution, `templated_env_vars` is set to
127+
/// `None`.
128+
pub(crate) fn resolve_templated_env_vars(&mut self) -> Result<(), ValidationError> {
129+
let vars = match self.templated_env_vars.take() {
130+
Some(v) => v,
131+
None => return Ok(()),
132+
};
133+
for tev in vars {
134+
let &host_port = self.mapped_ports.get(&tev.internal_port).ok_or_else(|| {
135+
ValidationError::Config(format!(
136+
"templated env var '{}' references unmapped internal port {}",
137+
tev.key, tev.internal_port
138+
))
139+
})?;
140+
let rendered = render_jinja(&tev.value_template, context! { host_port => host_port })?;
141+
self.env_vars.push((tev.key, rendered));
142+
}
143+
Ok(())
144+
}
145+
75146
/// Start the container described by this configuration.
76147
///
77148
/// Builds the Docker image, applies port mappings, environment variables,
@@ -119,6 +190,7 @@ mod tests {
119190
assert!(config.env_vars.is_empty());
120191
assert!(config.entrypoint.is_none());
121192
assert!(config.mapped_ports.is_empty());
193+
assert!(config.templated_env_vars.is_none());
122194
}
123195

124196
#[test]
@@ -160,4 +232,98 @@ mod tests {
160232
// Stop the container via testcontainers stop().
161233
container.stop().await.expect("container should stop");
162234
}
235+
236+
#[test]
237+
fn env_host_port_stores_templated_var() {
238+
let config = ContainerConfig::new("kafka", "7.5.0").env_host_port(
239+
"KAFKA_ADVERTISED_LISTENERS",
240+
"PLAINTEXT://127.0.0.1:{{ host_port }}",
241+
9092,
242+
);
243+
let tevs = config.templated_env_vars.as_ref().expect("should be Some");
244+
assert_eq!(tevs.len(), 1);
245+
assert_eq!(tevs[0].key, "KAFKA_ADVERTISED_LISTENERS");
246+
assert_eq!(
247+
tevs[0].value_template,
248+
"PLAINTEXT://127.0.0.1:{{ host_port }}"
249+
);
250+
assert_eq!(tevs[0].internal_port, 9092);
251+
}
252+
253+
#[test]
254+
fn env_host_port_multiple_preserves_order() {
255+
let config = ContainerConfig::new("kafka", "7.5.0")
256+
.env_host_port("FIRST", "{{ host_port }}", 9092)
257+
.env_host_port("SECOND", "{{ host_port }}", 8080);
258+
let tevs = config.templated_env_vars.as_ref().expect("should be Some");
259+
assert_eq!(tevs.len(), 2);
260+
assert_eq!(tevs[0].key, "FIRST");
261+
assert_eq!(tevs[0].internal_port, 9092);
262+
assert_eq!(tevs[1].key, "SECOND");
263+
assert_eq!(tevs[1].internal_port, 8080);
264+
}
265+
266+
#[test]
267+
fn resolve_templated_env_vars_renders_and_consumes() {
268+
let mut config = ContainerConfig::new("kafka", "7.5.0").env_host_port(
269+
"LISTENERS",
270+
"PLAINTEXT://127.0.0.1:{{ host_port }}",
271+
9092,
272+
);
273+
// Simulate port allocation during wiring.
274+
let _ = config.mapped_ports.insert(9092, 54321);
275+
276+
config
277+
.resolve_templated_env_vars()
278+
.expect("resolve should succeed");
279+
280+
assert!(config.templated_env_vars.is_none());
281+
assert_eq!(config.env_vars.len(), 1);
282+
assert_eq!(config.env_vars[0].0, "LISTENERS");
283+
assert_eq!(config.env_vars[0].1, "PLAINTEXT://127.0.0.1:54321");
284+
}
285+
286+
#[test]
287+
fn resolve_templated_env_vars_preserves_order() {
288+
let mut config = ContainerConfig::new("img", "tag")
289+
.env_host_port("FIRST", "a:{{ host_port }}", 9092)
290+
.env_host_port("SECOND", "b:{{ host_port }}", 8080);
291+
let _ = config.mapped_ports.insert(9092, 11111);
292+
let _ = config.mapped_ports.insert(8080, 22222);
293+
294+
config
295+
.resolve_templated_env_vars()
296+
.expect("resolve should succeed");
297+
298+
assert!(config.templated_env_vars.is_none());
299+
assert_eq!(config.env_vars.len(), 2);
300+
assert_eq!(config.env_vars[0], ("FIRST".into(), "a:11111".into()));
301+
assert_eq!(config.env_vars[1], ("SECOND".into(), "b:22222".into()));
302+
}
303+
304+
#[test]
305+
fn resolve_templated_env_vars_unmapped_port_errors() {
306+
let mut config =
307+
ContainerConfig::new("img", "tag").env_host_port("MY_VAR", "{{ host_port }}", 5432);
308+
309+
let err = config
310+
.resolve_templated_env_vars()
311+
.expect_err("should error on unmapped port");
312+
assert!(matches!(err, ValidationError::Config(_)));
313+
assert!(err.to_string().contains("unmapped internal port"));
314+
assert!(err.to_string().contains("MY_VAR"));
315+
}
316+
317+
#[test]
318+
fn resolve_templated_env_vars_none_is_noop() {
319+
let mut config = ContainerConfig::new("img", "tag");
320+
assert!(config.templated_env_vars.is_none());
321+
322+
config
323+
.resolve_templated_env_vars()
324+
.expect("noop resolve should succeed");
325+
326+
assert!(config.templated_env_vars.is_none());
327+
assert!(config.env_vars.is_empty());
328+
}
163329
}

0 commit comments

Comments
 (0)