Skip to content

Commit a2207cf

Browse files
committed
[CONTINT-4562] Add a parameter to continuously stop and recreate containers
1 parent 1044172 commit a2207cf

File tree

3 files changed

+123
-105
lines changed

3 files changed

+123
-105
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

77
## [Unreleased]
8+
## Added
9+
- Add a `max_lifetime` parameter to the `container` generator to make it generate
10+
a continuous stream of docker containers deletion and re-creation.
811

912
## [0.25.9]
1013
## Added
1114
- Introduce a `container` generator able to generate an arbitrary number
12-
of docker containers
15+
of docker containers.
1316
## Changed
1417
- Lading's byte-unit crate is now updated to 5.x. This version of the crate is
1518
very strict about the difference between MiB, Mb etc.

examples/lading-container.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ generator:
1919
network_disabled: false
2020
exposed_ports:
2121
- 5000/tcp
22+
max_lifetime: 20
2223
number_of_containers: 10

lading/src/generator/container.rs

Lines changed: 118 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ use bollard::container::{
1212
use bollard::image::CreateImageOptions;
1313
use bollard::secret::ContainerCreateResponse;
1414
use serde::{Deserialize, Serialize};
15-
use std::collections::HashMap;
15+
use std::collections::{HashMap, VecDeque};
1616
use tokio_stream::StreamExt;
1717
use tracing::{debug, info, warn};
1818
use uuid::Uuid;
1919

2020
use super::General;
2121

22-
#[derive(Debug, Deserialize, Serialize, PartialEq)]
22+
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
2323
#[serde(deny_unknown_fields)]
2424
/// Configuration of the container generator.
2525
pub struct Config {
@@ -37,8 +37,10 @@ pub struct Config {
3737
pub network_disabled: Option<bool>,
3838
/// Ports to expose from the container
3939
pub exposed_ports: Option<Vec<String>>,
40+
/// Maximum lifetime of containers before being replaced
41+
pub max_lifetime: Option<u64>,
4042
/// Number of containers to spin up (defaults to 1)
41-
pub number_of_containers: Option<u32>,
43+
pub number_of_containers: Option<usize>,
4244
}
4345

4446
/// Errors produced by the `Container` generator.
@@ -55,14 +57,7 @@ pub enum Error {
5557
#[derive(Debug)]
5658
/// Represents a container that can be spun up from a configured image.
5759
pub struct Container {
58-
image: String,
59-
tag: String,
60-
args: Option<Vec<String>>,
61-
env: Option<Vec<String>>,
62-
labels: Option<HashMap<String, String>>,
63-
network_disabled: Option<bool>,
64-
exposed_ports: Option<Vec<String>>,
65-
number_of_containers: usize,
60+
config: Config,
6661
shutdown: lading_signal::Watcher,
6762
}
6863

@@ -80,50 +75,11 @@ impl Container {
8075
shutdown: lading_signal::Watcher,
8176
) -> Result<Self, Error> {
8277
Ok(Self {
83-
image: config.repository.clone(),
84-
tag: config.tag.clone(),
85-
args: config.args.clone(),
86-
env: config.env.clone(),
87-
labels: config.labels.clone(),
88-
network_disabled: config.network_disabled,
89-
exposed_ports: config.exposed_ports.clone(),
90-
number_of_containers: config.number_of_containers.unwrap_or(1) as usize,
78+
config: config.clone(),
9179
shutdown,
9280
})
9381
}
9482

95-
/// Convert the `Container` instance to a `ContainerConfig` for the Docker API.
96-
#[must_use]
97-
fn to_container_config<'a>(&'a self, full_image: &'a str) -> ContainerConfig<&'a str> {
98-
ContainerConfig {
99-
image: Some(full_image),
100-
tty: Some(true),
101-
cmd: self
102-
.args
103-
.as_ref()
104-
.map(|args| args.iter().map(String::as_str).collect()),
105-
env: self
106-
.env
107-
.as_ref()
108-
.map(|env| env.iter().map(String::as_str).collect()),
109-
labels: self.labels.as_ref().map(|labels| {
110-
labels
111-
.iter()
112-
.map(|(key, value)| (key.as_str(), value.as_str()))
113-
.collect()
114-
}),
115-
network_disabled: self.network_disabled,
116-
#[allow(clippy::zero_sized_map_values)]
117-
exposed_ports: self.exposed_ports.as_ref().map(|ports| {
118-
ports
119-
.iter()
120-
.map(|port| (port.as_str(), HashMap::new()))
121-
.collect()
122-
}),
123-
..Default::default()
124-
}
125-
}
126-
12783
/// Run the `Container` generator.
12884
///
12985
/// # Errors
@@ -138,11 +94,14 @@ impl Container {
13894
/// 4. Wait for shutdown signal.
13995
/// 5. On shutdown, stop and remove the container.
14096
pub async fn spin(self) -> Result<(), Error> {
141-
info!("Container generator running: {}:{}", self.image, self.tag);
97+
info!(
98+
"Container generator running: {}:{}",
99+
self.config.repository, self.config.tag
100+
);
142101

143102
let docker = Docker::connect_with_local_defaults()?;
144103

145-
let full_image = format!("{}:{}", self.image, self.tag);
104+
let full_image = format!("{}:{}", self.config.repository, self.config.tag);
146105
debug!("Ensuring image is available: {full_image}");
147106

148107
// Pull the image
@@ -169,46 +128,35 @@ impl Container {
169128
}
170129
}
171130

172-
let mut containers = Vec::with_capacity(self.number_of_containers);
173-
174-
for _ in 0..self.number_of_containers {
175-
let container_name = format!("lading_container_{}", Uuid::new_v4());
176-
debug!("Creating container: {container_name}");
177-
178-
let container = docker
179-
.create_container(
180-
Some(CreateContainerOptions {
181-
name: &container_name,
182-
platform: None,
183-
}),
184-
self.to_container_config(&full_image),
185-
)
186-
.await?;
187-
188-
debug!("Created container with id: {id}", id = container.id);
189-
for warning in &container.warnings {
190-
warn!("Container warning: {warning}");
191-
}
192-
193-
containers.push(container);
194-
}
195-
196-
for container in &containers {
197-
docker
198-
.start_container(&container.id, None::<StartContainerOptions<String>>)
199-
.await?;
131+
let number_of_containers = self.config.number_of_containers.unwrap_or(1);
132+
let mut containers = VecDeque::with_capacity(number_of_containers);
200133

201-
debug!("Started container: {id}", id = container.id);
134+
for _ in 0..number_of_containers {
135+
containers.push_back(
136+
self.config
137+
.create_and_start_container(&docker, &full_image)
138+
.await?,
139+
);
202140
}
203141

204142
// Wait for shutdown signal
205143
let shutdown_wait = self.shutdown.recv();
206144
tokio::pin!(shutdown_wait);
207-
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
145+
let mut recreate_interval = self.config.max_lifetime.map(|max_lifetime| {
146+
tokio::time::interval(tokio::time::Duration::from_millis(
147+
1_000 * max_lifetime / number_of_containers as u64,
148+
))
149+
});
150+
let mut liveness_interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
208151
loop {
209152
tokio::select! {
153+
// Destroy and replace containers
154+
_ = if let Some(ref mut interval) = recreate_interval { interval.tick() } else { std::future::pending().await } => {
155+
stop_and_remove_container(&docker, &containers.pop_front().ok_or(Error::Generic(String::from("No container left")))?).await?;
156+
containers.push_back(self.config.create_and_start_container(&docker, &full_image).await?);
157+
}
210158
// Check that containers are still running every 10 seconds
211-
_ = interval.tick() => {
159+
_ = liveness_interval.tick() => {
212160
for container in &containers {
213161
if let Some(state) = docker.inspect_container(&container.id, None).await?.state {
214162
if !state.running.unwrap_or(false) {
@@ -223,39 +171,105 @@ impl Container {
223171
() = &mut shutdown_wait => {
224172
debug!("shutdown signal received");
225173
for container in &containers {
226-
Self::stop_and_remove_container(&docker, container).await?;
174+
stop_and_remove_container(&docker, container).await?;
227175
}
228176

229177
return Ok(())
230178
}
231179
}
232180
}
233181
}
182+
}
234183

235-
async fn stop_and_remove_container(
236-
docker: &Docker,
237-
container: &ContainerCreateResponse,
238-
) -> Result<(), Error> {
239-
info!("Stopping container: {id}", id = container.id);
240-
if let Err(e) = docker
241-
.stop_container(&container.id, Some(StopContainerOptions { t: 5 }))
242-
.await
243-
{
244-
warn!("Error stopping container {id}: {e}", id = container.id);
184+
impl Config {
185+
/// Convert the `Container` instance to a `ContainerConfig` for the Docker API.
186+
#[must_use]
187+
fn to_container_config<'a>(&'a self, full_image: &'a str) -> ContainerConfig<&'a str> {
188+
ContainerConfig {
189+
image: Some(full_image),
190+
tty: Some(true),
191+
cmd: self
192+
.args
193+
.as_ref()
194+
.map(|args| args.iter().map(String::as_str).collect()),
195+
env: self
196+
.env
197+
.as_ref()
198+
.map(|env| env.iter().map(String::as_str).collect()),
199+
labels: self.labels.as_ref().map(|labels| {
200+
labels
201+
.iter()
202+
.map(|(key, value)| (key.as_str(), value.as_str()))
203+
.collect()
204+
}),
205+
network_disabled: self.network_disabled,
206+
#[allow(clippy::zero_sized_map_values)]
207+
exposed_ports: self.exposed_ports.as_ref().map(|ports| {
208+
ports
209+
.iter()
210+
.map(|port| (port.as_str(), HashMap::new()))
211+
.collect()
212+
}),
213+
..Default::default()
245214
}
215+
}
246216

247-
debug!("Removing container: {id}", id = container.id);
248-
docker
249-
.remove_container(
250-
&container.id,
251-
Some(RemoveContainerOptions {
252-
force: true,
253-
..Default::default()
217+
async fn create_and_start_container(
218+
&self,
219+
docker: &Docker,
220+
full_image: &str,
221+
) -> Result<ContainerCreateResponse, Error> {
222+
let container_name = format!("lading_container_{}", Uuid::new_v4());
223+
debug!("Creating container: {container_name}");
224+
225+
let container = docker
226+
.create_container(
227+
Some(CreateContainerOptions {
228+
name: &container_name,
229+
platform: None,
254230
}),
231+
self.to_container_config(full_image),
255232
)
256233
.await?;
257234

258-
debug!("Removed container: {id}", id = container.id);
259-
Ok(())
235+
debug!("Created container with id: {id}", id = container.id);
236+
for warning in &container.warnings {
237+
warn!("Container warning: {warning}");
238+
}
239+
240+
docker
241+
.start_container(&container.id, None::<StartContainerOptions<String>>)
242+
.await?;
243+
244+
debug!("Started container: {id}", id = container.id);
245+
246+
Ok(container)
260247
}
261248
}
249+
250+
async fn stop_and_remove_container(
251+
docker: &Docker,
252+
container: &ContainerCreateResponse,
253+
) -> Result<(), Error> {
254+
info!("Stopping container: {id}", id = container.id);
255+
if let Err(e) = docker
256+
.stop_container(&container.id, Some(StopContainerOptions { t: 0 }))
257+
.await
258+
{
259+
warn!("Error stopping container {id}: {e}", id = container.id);
260+
}
261+
262+
debug!("Removing container: {id}", id = container.id);
263+
docker
264+
.remove_container(
265+
&container.id,
266+
Some(RemoveContainerOptions {
267+
force: true,
268+
..Default::default()
269+
}),
270+
)
271+
.await?;
272+
273+
debug!("Removed container: {id}", id = container.id);
274+
Ok(())
275+
}

0 commit comments

Comments
 (0)