Skip to content

Commit 1178e72

Browse files
committed
[CONTINT-4562] Handle containers deletion and recreation asynchronously
1 parent 7add0a2 commit 1178e72

File tree

1 file changed

+32
-6
lines changed

1 file changed

+32
-6
lines changed

lading/src/generator/container.rs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use bollard::image::CreateImageOptions;
1313
use bollard::secret::ContainerCreateResponse;
1414
use serde::{Deserialize, Serialize};
1515
use std::collections::{HashMap, VecDeque};
16+
use tokio::sync::mpsc;
1617
use tokio_stream::StreamExt;
1718
use tracing::{debug, info, warn};
1819
use uuid::Uuid;
@@ -52,6 +53,9 @@ pub enum Error {
5253
/// Error produced by the Bollard Docker client.
5354
#[error("Bollard/Docker error: {0}")]
5455
Bollard(#[from] bollard::errors::Error),
56+
/// Error produced by tokio
57+
#[error("Tokio error: {0}")]
58+
Tokio(#[from] mpsc::error::SendError<ContainerCreateResponse>),
5559
}
5660

5761
#[derive(Debug)]
@@ -140,6 +144,7 @@ impl Container {
140144
// Wait for shutdown signal
141145
let shutdown_wait = self.shutdown.recv();
142146
tokio::pin!(shutdown_wait);
147+
let (new_container_tx, mut new_container_rx) = mpsc::channel(32);
143148
let mut recreate_interval = self.config.max_lifetime.map(|max_lifetime| {
144149
tokio::time::interval(tokio::time::Duration::from_millis(
145150
1_000 * max_lifetime / number_of_containers as u64,
@@ -150,9 +155,26 @@ impl Container {
150155
tokio::select! {
151156
// Destroy and replace containers
152157
_ = if let Some(ref mut interval) = recreate_interval { interval.tick() } else { std::future::pending().await } => {
153-
stop_and_remove_container(&docker, &containers.pop_front().ok_or(Error::Generic(String::from("No container left")))?).await?;
154-
containers.push_back(self.config.create_and_start_container(&docker, &full_image).await?);
158+
if let Some(container) = containers.pop_front() {
159+
let docker = docker.clone();
160+
let config = self.config.clone();
161+
let full_image = full_image.clone();
162+
let new_container_tx = new_container_tx.clone();
163+
tokio::spawn(async move {
164+
stop_and_remove_container(&docker, &container).await?;
165+
new_container_tx.send(config.create_and_start_container(&docker, &full_image).await?).await?;
166+
Ok::<(), Error>(())
167+
});
168+
}
155169
}
170+
171+
// Handle new containers from recycling tasks
172+
new_container = new_container_rx.recv() => {
173+
if let Some(container) = new_container {
174+
containers.push_back(container);
175+
}
176+
}
177+
156178
// Check that containers are still running every 10 seconds
157179
_ = liveness_interval.tick() => {
158180
for container in &containers {
@@ -166,11 +188,15 @@ impl Container {
166188
}
167189
}
168190
}
191+
192+
// Shutdown
169193
() = &mut shutdown_wait => {
170194
debug!("shutdown signal received");
171-
for container in &containers {
172-
stop_and_remove_container(&docker, container).await?;
173-
}
195+
drop(new_container_tx);
196+
197+
futures::future::join_all(
198+
containers.iter().map(|container| stop_and_remove_container(&docker, container))
199+
).await;
174200

175201
return Ok(())
176202
}
@@ -251,7 +277,7 @@ async fn stop_and_remove_container(
251277
) -> Result<(), Error> {
252278
info!("Stopping container: {id}", id = container.id);
253279
if let Err(e) = docker
254-
.stop_container(&container.id, Some(StopContainerOptions { t: 0 }))
280+
.stop_container(&container.id, Some(StopContainerOptions { t: 5 }))
255281
.await
256282
{
257283
warn!("Error stopping container {id}: {e}", id = container.id);

0 commit comments

Comments
 (0)