diff --git a/Cargo.lock b/Cargo.lock index 56a3385..636b58c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -291,7 +291,7 @@ dependencies = [ [[package]] name = "dispenser" -version = "0.7.0" +version = "0.8.0" dependencies = [ "base64", "chrono", diff --git a/Cargo.toml b/Cargo.toml index ee74afc..72c63a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dispenser" -version = "0.7.0" +version = "0.8.0" edition = "2021" license = "MIT" diff --git a/INSTALL.deb.md b/INSTALL.deb.md index fd57ac6..8f26bfa 100644 --- a/INSTALL.deb.md +++ b/INSTALL.deb.md @@ -21,7 +21,7 @@ wget ... ```sh -sudo apt install ./dispenser-0.7-0.x86_64.deb +sudo apt install ./dispenser-0.8.0-0.x86_64.deb ``` You can validate that it was successfully installed by switching to the diff --git a/INSTALL.redhat.md b/INSTALL.redhat.md index 3247dce..ebb9192 100644 --- a/INSTALL.redhat.md +++ b/INSTALL.redhat.md @@ -23,7 +23,7 @@ wget ... ```sh -sudo dnf install ./dispenser-0.7-0.x86_64.rpm +sudo dnf install ./dispenser-0.8.0-0.x86_64.rpm ``` You can validate that it was successfully installed by switching to the diff --git a/README.md b/README.md index 56b566d..ce37dd9 100644 --- a/README.md +++ b/README.md @@ -30,9 +30,9 @@ Download the latest `.deb` or `.rpm` package from the [releases page](https://gi ```sh # Download the .deb package -# wget https://github.com/ixpantia/dispenser/releases/download/v0.7.0/dispenser-0.7-0.x86_64.deb +# wget https://github.com/ixpantia/dispenser/releases/download/v0.8.0/dispenser-0.8.0-0.x86_64.deb -sudo apt install ./dispenser-0.7-0.x86_64.deb +sudo apt install ./dispenser-0.8.0-0.x86_64.deb ``` ### RHEL / CentOS / Fedora @@ -41,7 +41,7 @@ sudo apt install ./dispenser-0.7-0.x86_64.deb # Download the .rpm package # wget ... -sudo dnf install ./dispenser-0.7-0.x86_64.rpm +sudo dnf install ./dispenser-0.8.0-0.x86_64.rpm ``` The installation process will: diff --git a/SERVICE_CONFIG.md b/SERVICE_CONFIG.md index c360f41..bd14f18 100644 --- a/SERVICE_CONFIG.md +++ b/SERVICE_CONFIG.md @@ -346,6 +346,26 @@ cron = "0 3 * * *" # Every day at 3 AM See [CRON.md](CRON.md) for more details on cron scheduling. +### `pull` (optional) + +Controls when Dispenser should pull the Docker image from the registry. This is useful for ensuring that services (especially scheduled jobs) are always up-to-date with the latest image when they run, without necessarily triggering a redeployment on every image update if `watch` is `false`. + +**Valid values:** +- `always` - Pull the image from the registry every time the container is started or recreated. +- `on-startup` - Pull the image only if the container does not exist. (default) + +**Default:** `on-startup` + +```toml +[dispenser] +# For a background scheduled job that should always run the latest image, +# but not necessarily restart if the image updates outside of its schedule. +watch = false +initialize = "on-trigger" +cron = "0 3 * * *" # Every day at 3 AM +pull = "always" +``` + ## Service Dependencies The `[depends_on]` section defines dependencies between services. diff --git a/deb/DEBIAN/control b/deb/DEBIAN/control index 86027ef..07ec0a4 100644 --- a/deb/DEBIAN/control +++ b/deb/DEBIAN/control @@ -1,5 +1,5 @@ Package: dispenser -Version: 0.7 +Version: VERSION_PLACEHOLDER Maintainer: ixpantia S.A. Architecture: amd64 Description: Continously Deploy services with Docker Compose diff --git a/justfile b/justfile index e697a45..07c8d72 100644 --- a/justfile +++ b/justfile @@ -1,20 +1,23 @@ # justfile for dispenser project -DISPENSER_VERSION := "0.7" +DISPENSER_VERSION := shell('grep "^version" Cargo.toml | head -n1 | cut -d \" -f 2') TARGET_BIN := "target/x86_64-unknown-linux-musl/release/dispenser" -USR_BIN_DEB := "deb/usr/local/bin/dispenser" USR_BIN_RPM := "rpm/usr/local/bin/dispenser" +version: + echo "{{DISPENSER_VERSION}}" + build: CARGO_TARGET_DIR="./target" cargo build --release --target "x86_64-unknown-linux-musl" build-deb: build - mkdir -p deb/usr/local/bin/ - rm -f {{USR_BIN_DEB}} - mv {{TARGET_BIN}} {{USR_BIN_DEB}} - dpkg-deb --build deb - rm -f dispenser.deb - mv deb.deb dispenser-{{DISPENSER_VERSION}}-0.x86_64.deb + rm -rf target/deb_stage + mkdir -p target/deb_stage + cp -R deb/* target/deb_stage/ + mkdir -p target/deb_stage/usr/local/bin + cp {{TARGET_BIN}} target/deb_stage/usr/local/bin/dispenser + sed 's/VERSION_PLACEHOLDER/{{DISPENSER_VERSION}}/' deb/DEBIAN/control > target/deb_stage/DEBIAN/control + dpkg-deb --build target/deb_stage dispenser-{{DISPENSER_VERSION}}-0.x86_64.deb build-rpm: build rm -rf rpmstage rpmout @@ -24,4 +27,14 @@ build-rpm: build mkdir -p rpmout cp {{TARGET_BIN}} rpmstage/usr/local/bin/dispenser cp rpm/usr/lib/systemd/system/dispenser.service rpmstage/usr/lib/systemd/system/ - rpmbuild --target=x86_64 --buildroot $(pwd)/rpmstage --define "_topdir $(pwd)/rpmout" -bb rpm/dispenser.spec --noclean + rpmbuild --target=x86_64 --buildroot $(pwd)/rpmstage --define "_topdir $(pwd)/rpmout" --define "version {{DISPENSER_VERSION}}" -bb rpm/dispenser.spec --noclean + +# Usage: just bump 0.8.0 +bump NEW_VERSION: + @echo "Bumping version to {{NEW_VERSION}}..." + # Update Cargo.toml + sed -i '' 's/^version = ".*"/version = "{{NEW_VERSION}}"/' Cargo.toml + # Update Documentation (URLs and filenames) + sed -i '' -E 's/v[0-9]+\.[0-9]+\.[0-9]+/v{{NEW_VERSION}}/g' README.md INSTALL*.md + sed -i '' -E 's/dispenser-[0-9]+\.[0-9]+/dispenser-{{NEW_VERSION}}/g' README.md INSTALL*.md + @echo "Done. Don't forget to commit and tag!" diff --git a/rpm/dispenser.spec b/rpm/dispenser.spec index 0bdc587..c60a524 100644 --- a/rpm/dispenser.spec +++ b/rpm/dispenser.spec @@ -1,5 +1,5 @@ Name: dispenser -Version: 0.7 +Version: %{version} Release: 0 Summary: Continously Deploy services with Docker Compose License: see /usr/share/doc/dispenser/copyright diff --git a/src/main.rs b/src/main.rs index 9ac4400..ea5fc91 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ use std::{process::ExitCode, sync::Arc}; use crate::service::{ - file::EntrypointFile, manager::{ServiceMangerConfig, ServicesManager}, vars::ServiceConfigError, }; @@ -59,6 +58,12 @@ async fn main() -> ExitCode { } }; + if let Err(e) = manager.validate_containers_not_present().await { + log::error!("{e}"); + log::error!("It seems that some of the containers declared already exist. This prevents dispenser from properly managing the life-cycle of these containers. Please remove them and restart dispenser."); + std::process::exit(1); + } + // Wrap the manager in a Mutex so we can replace it on reload let manager_holder = Arc::new(Mutex::new(manager)); diff --git a/src/service/file.rs b/src/service/file.rs index fcc8632..0195a83 100644 --- a/src/service/file.rs +++ b/src/service/file.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, path::PathBuf}; use super::vars::{render_template, ServiceConfigError, ServiceVarsMaterialized}; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct EntrypointFile { #[serde(rename = "service")] pub services: Vec, @@ -28,13 +28,13 @@ impl EntrypointFile { // Render the template with variables let rendered_config = - render_template(&config, &vars).map_err(|e| ServiceConfigError::Template((path, e)))?; + render_template(&config, vars).map_err(|e| ServiceConfigError::Template((path, e)))?; // Parse the rendered config as TOML Ok(toml::from_str(&rendered_config)?) } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct NetworkDeclarationEntry { pub name: String, #[serde(default = "default_network_driver")] @@ -61,7 +61,7 @@ fn default_true() -> bool { true } -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] pub enum NetworkDriver { #[default] #[serde(alias = "bridge")] @@ -76,7 +76,7 @@ pub enum NetworkDriver { None, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct EntrypointFileEntry { /// Path to the directory where a service.toml file is found. /// This toml file should be deserialized into a ServiceFile. @@ -84,7 +84,7 @@ pub struct EntrypointFileEntry { pub path: PathBuf, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ServiceFile { pub service: ServiceEntry, #[serde(default, rename = "port")] @@ -119,7 +119,7 @@ pub enum Initialize { OnTrigger, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum DependsOnCondition { #[serde( alias = "service-started", @@ -135,20 +135,31 @@ pub enum DependsOnCondition { ServiceCompleted, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub enum PullOptions { + #[serde(alias = "always")] + Always, + #[default] + #[serde(alias = "on-startup", alias = "on_startup", alias = "onstartup")] + OnStartup, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct DispenserConfig { pub watch: bool, #[serde(default)] pub initialize: Initialize, pub cron: Option, + #[serde(default)] + pub pull: PullOptions, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Network { pub name: String, } -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] pub enum Restart { #[serde(alias = "always")] Always, @@ -165,13 +176,13 @@ pub enum Restart { UnlessStopped, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct PortEntry { pub host: u16, pub container: u16, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct VolumeEntry { pub source: String, pub target: String, @@ -179,7 +190,7 @@ pub struct VolumeEntry { pub readonly: bool, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ServiceEntry { pub name: String, pub image: String, diff --git a/src/service/instance.rs b/src/service/instance.rs index bb6a908..851438b 100644 --- a/src/service/instance.rs +++ b/src/service/instance.rs @@ -5,12 +5,13 @@ use cron::Schedule; use crate::service::{ file::{ - DependsOnCondition, DispenserConfig, Initialize, Network, PortEntry, Restart, ServiceEntry, - VolumeEntry, + DependsOnCondition, DispenserConfig, Initialize, Network, PortEntry, PullOptions, Restart, + ServiceEntry, VolumeEntry, }, manifest::{ImageWatcher, ImageWatcherStatus}, }; +#[derive(Debug, Clone, PartialEq, Eq)] pub struct CronWatcher { schedule: Schedule, next: Option>, @@ -33,6 +34,7 @@ impl CronWatcher { } } +#[derive(Debug, Clone, PartialEq, Eq)] pub struct ServiceInstance { pub dir: PathBuf, pub service: ServiceEntry, @@ -54,35 +56,6 @@ pub enum ContainerStatus { NotFound, } -/// Parse memory string (e.g., "512m", "2g") to bytes -fn parse_memory_to_bytes(memory_str: &str) -> i64 { - let memory_str = memory_str.trim().to_lowercase(); - let (value, unit) = if memory_str.ends_with("k") || memory_str.ends_with("kb") { - let val = memory_str.trim_end_matches("kb").trim_end_matches("k"); - (val, 1024i64) - } else if memory_str.ends_with("m") || memory_str.ends_with("mb") { - let val = memory_str.trim_end_matches("mb").trim_end_matches("m"); - (val, 1024i64 * 1024) - } else if memory_str.ends_with("g") || memory_str.ends_with("gb") { - let val = memory_str.trim_end_matches("gb").trim_end_matches("g"); - (val, 1024i64 * 1024 * 1024) - } else if memory_str.ends_with("b") { - let val = memory_str.trim_end_matches("b"); - (val, 1i64) - } else { - // Assume bytes if no unit - (memory_str.as_str(), 1i64) - }; - - value.parse::().unwrap_or(0) * unit -} - -/// Parse CPU string (e.g., "1.5", "2") to nano CPUs (1 CPU = 1e9 nano CPUs) -fn parse_cpus_to_nano(cpus_str: &str) -> i64 { - let cpus: f64 = cpus_str.trim().parse().unwrap_or(0.0); - (cpus * 1_000_000_000.0) as i64 -} - /// This function queries the status of a container /// Returns whether it's up, exited successfully (0 exit status), or failed async fn get_container_status(container_name: &str) -> Result { @@ -146,10 +119,9 @@ impl ServiceInstance { tokio::time::sleep(Duration::from_secs(1)).await; } - if let Err(e) = self.pull_image().await { - log::error!("Failed to pull image for {}: {}", self.service.name, e); + if self.dispenser.pull == PullOptions::Always || self.container_does_not_exist().await { + self.recreate_container().await?; } - self.recreate_if_required().await; let output = tokio::process::Command::new("docker") .args(["start", &self.service.name]) @@ -166,8 +138,7 @@ impl ServiceInstance { self.service.name, error_msg ); - Err(std::io::Error::new( - std::io::ErrorKind::Other, + Err(std::io::Error::other( format!("Failed to start container: {}", error_msg), )) } @@ -185,8 +156,7 @@ impl ServiceInstance { } else { let error_msg = String::from_utf8_lossy(&output.stderr); log::error!("Failed to pull image {}: {}", self.service.image, error_msg); - Err(std::io::Error::new( - std::io::ErrorKind::Other, + Err(std::io::Error::other( format!("Failed to pull image: {}", error_msg), )) } @@ -209,8 +179,7 @@ impl ServiceInstance { self.service.name, error_msg ); - Err(std::io::Error::new( - std::io::ErrorKind::Other, + Err(std::io::Error::other( format!("Failed to warn container: {}", error_msg), )) } @@ -233,8 +202,7 @@ impl ServiceInstance { self.service.name, error_msg ); - Err(std::io::Error::new( - std::io::ErrorKind::Other, + Err(std::io::Error::other( format!("Failed to remove container: {}", error_msg), )) } @@ -331,8 +299,7 @@ impl ServiceInstance { self.service.name, error_msg ); - Err(std::io::Error::new( - std::io::ErrorKind::Other, + Err(std::io::Error::other( format!("Failed to create container: {}", error_msg), )) } @@ -346,11 +313,7 @@ impl ServiceInstance { Ok(()) } - /// Validate if the current container is different from - /// this instance or if it does not exist. - /// - /// If anything has changed like: environment variables, volumes, ports, etc we need to recreate - pub async fn requires_recreate(&self) -> bool { + pub async fn container_does_not_exist(&self) -> bool { // Get the container inspection data let output = match tokio::process::Command::new("docker") .args(["inspect", "--format", "{{json .}}", &self.service.name]) @@ -371,206 +334,22 @@ impl ServiceInstance { ); return true; } + false + } - let inspect_str = String::from_utf8_lossy(&output.stdout); - let inspect_json: serde_json::Value = match serde_json::from_str(&inspect_str) { - Ok(json) => json, - Err(e) => { - log::warn!("Failed to parse docker inspect JSON: {}", e); - return true; - } - }; - - // Check if the image has changed - let current_image = inspect_json["Config"]["Image"].as_str().unwrap_or(""); - if current_image != self.service.image { - log::info!( - "Image changed for {}: {} -> {}", - self.service.name, - current_image, - self.service.image - ); - return true; - } - - // Check restart policy - let current_restart = inspect_json["HostConfig"]["RestartPolicy"]["Name"] - .as_str() - .unwrap_or(""); - let expected_restart = match self.restart { - Restart::Always => "always", - Restart::No => "no", - Restart::OnFailure => "on-failure", - Restart::UnlessStopped => "unless-stopped", - }; - if current_restart != expected_restart { - log::info!( - "Restart policy changed for {}: {} -> {}", - self.service.name, - current_restart, - expected_restart - ); - return true; - } - - // Check environment variables - if let Some(current_env) = inspect_json["Config"]["Env"].as_array() { - let mut current_env_map = HashMap::new(); - for env_str in current_env { - if let Some(s) = env_str.as_str() { - if let Some(pos) = s.find('=') { - let (key, value) = s.split_at(pos); - current_env_map.insert(key.to_string(), value[1..].to_string()); - } - } - } - - for (key, value) in &self.env { - if current_env_map.get(key) != Some(value) { - log::info!( - "Environment variable changed for {}: {}", - self.service.name, - key - ); - return true; - } - } - } - - // Check port bindings - if let Some(port_bindings) = inspect_json["HostConfig"]["PortBindings"].as_object() { - for port in &self.ports { - let container_port_key = format!("{}/tcp", port.container); - if let Some(bindings) = port_bindings.get(&container_port_key) { - if let Some(binding_array) = bindings.as_array() { - if binding_array.is_empty() { - log::info!("Port binding changed for {}", self.service.name); - return true; - } - let host_port = binding_array[0]["HostPort"].as_str().unwrap_or(""); - if host_port != port.host.to_string() { - log::info!( - "Port mapping changed for {}: {} -> {}", - self.service.name, - host_port, - port.host - ); - return true; - } - } - } else { - log::info!("Port binding missing for {}", self.service.name); - return true; - } - } - } else if !self.ports.is_empty() { - log::info!("Port bindings changed for {}", self.service.name); - return true; - } - - // Check volume bindings - if let Some(binds) = inspect_json["HostConfig"]["Binds"].as_array() { - let current_binds: Vec = binds - .iter() - .filter_map(|v| v.as_str().map(String::from)) - .collect(); - - for volume in &self.volume { - // Normalize the source path to an absolute path for comparison - let source_path = if std::path::Path::new(&volume.source).is_relative() { - self.dir - .join(&volume.source) - .canonicalize() - .unwrap_or_else(|_| self.dir.join(&volume.source)) - .to_string_lossy() - .to_string() - } else { - volume.source.clone() - }; - - let expected_bind = format!("{}:{}", source_path, volume.target); - if !current_binds.iter().any(|b| b == &expected_bind) { - log::info!( - "Volume binding changed for {}: {}", - self.service.name, - expected_bind - ); - return true; - } - } - } else if !self.volume.is_empty() { - log::info!("Volume bindings changed for {}", self.service.name); - return true; - } - - // Check networks - if let Some(networks) = inspect_json["NetworkSettings"]["Networks"].as_object() { - for network in &self.network { - if !networks.contains_key(&network.name) { - log::info!( - "Network changed for {}: {}", - self.service.name, - network.name - ); - return true; - } - } - } else if !self.network.is_empty() { - log::info!("Networks changed for {}", self.service.name); + /// Validate if the current container is different from + /// this instance or if it does not exist. + pub async fn requires_recreate(&self, other: &Self) -> bool { + if self.container_does_not_exist().await { return true; } - - // Check memory limit - if let Some(expected_memory) = &self.service.memory { - let current_memory = inspect_json["HostConfig"]["Memory"].as_i64().unwrap_or(0); - // Parse expected memory string (e.g., "512m", "2g") to bytes - let expected_bytes = parse_memory_to_bytes(expected_memory); - if current_memory != expected_bytes { - log::info!( - "Memory limit changed for {}: {} -> {}", - self.service.name, - current_memory, - expected_bytes - ); - return true; - } - } else { - // Check if container has a memory limit but we don't expect one - let current_memory = inspect_json["HostConfig"]["Memory"].as_i64().unwrap_or(0); - if current_memory != 0 { - log::info!("Memory limit changed for {} (removed)", self.service.name); - return true; - } - } - - // Check CPU limit - if let Some(expected_cpus) = &self.service.cpus { - let current_cpus = inspect_json["HostConfig"]["NanoCpus"].as_i64().unwrap_or(0); - // Parse expected CPUs string to nano CPUs (1 CPU = 1e9 nano CPUs) - let expected_nano_cpus = parse_cpus_to_nano(expected_cpus); - if current_cpus != expected_nano_cpus { - log::info!( - "CPU limit changed for {}: {} -> {}", - self.service.name, - current_cpus, - expected_nano_cpus - ); - return true; - } - } else { - // Check if container has a CPU limit but we don't expect one - let current_cpus = inspect_json["HostConfig"]["NanoCpus"].as_i64().unwrap_or(0); - if current_cpus != 0 { - log::info!("CPU limit changed for {} (removed)", self.service.name); - return true; - } - } - - false + // If self and other are not equal we need to recreate the + // container + self != other } - pub async fn recreate_if_required(&self) { - if self.requires_recreate().await { + pub async fn recreate_if_required(&self, other: &Self) { + if self.requires_recreate(other).await { if let Err(e) = self.recreate_container().await { log::error!("Failed to recreate container {}: {}", self.service.name, e); } @@ -606,13 +385,20 @@ impl ServiceInstance { if self.dispenser.watch && poll_images { // try to update the watchers and check // if any of them were updated - if let Some(ref image_watcher) = self.image_watcher { + if let Some(image_watcher) = &mut self.image_watcher { match image_watcher.update().await { ImageWatcherStatus::Updated => { log::info!( "Image updated for service {}, recreating container...", self.service.name ); + if let Err(e) = self.recreate_container().await { + log::error!( + "Failed to recreate container {}: {}", + self.service.name, + e + ); + } if let Err(e) = self.run_container().await { log::error!("Failed to run container {}: {}", self.service.name, e); } diff --git a/src/service/manager.rs b/src/service/manager.rs index f5ab924..21748b2 100644 --- a/src/service/manager.rs +++ b/src/service/manager.rs @@ -1,6 +1,5 @@ use std::{path::PathBuf, sync::Arc, time::Duration}; -use futures_util::future; use tokio::{sync::Mutex, task::JoinSet}; use crate::service::{ @@ -48,19 +47,105 @@ impl ServiceMangerConfig { } struct ServiceManagerInner { + // These two are craeted together. We can zip them + pub service_names: Vec, instances: Vec>>, networks: Vec, delay: Duration, } pub struct ServicesManager { - pub service_names: Vec, - inner: Mutex, + inner: ServiceManagerInner, cancel_tx: tokio::sync::mpsc::Sender<()>, cancel_rx: Mutex>, } impl ServicesManager { + // We should ensure that the containers don't exist before start up. + // This is to make 100% sure that dispenser controls these containers + // and they don't exist previously. + pub async fn validate_containers_not_present(&self) -> Result<(), String> { + let mut join_set = JoinSet::new(); + + for instance in &self.inner.instances { + let instance_clone = Arc::clone(instance); + join_set.spawn(async move { + let instance = instance_clone.lock().await; + match instance.container_does_not_exist().await { + true => Ok(()), + false => Err(format!( + "Container {} already exists", + instance.service.name + )), + } + }); + } + + while let Some(result) = join_set.join_next().await { + match result { + Ok(Ok(_)) => { + // Container validation succeeded + } + Ok(Err(e)) => { + log::error!("Container validation failed: {}", e); + return Err(e); + } + Err(e) => { + let error_msg = format!("Task join error: {}", e); + log::error!("{}", error_msg); + return Err(error_msg); + } + } + } + + Ok(()) + } + pub async fn recreate_if_changed_and_cleanup(&self, other: &Self) { + let mut join_set = JoinSet::new(); + + for this_instance in &self.inner.instances { + let this_instance_clone = Arc::clone(this_instance); + let other_service_names = other.inner.service_names.clone(); + let other_instances = other.inner.instances.clone(); + + join_set.spawn(async move { + let this_instance = this_instance_clone.lock().await; + // If the instance is present in other we check for recreation + match other_service_names + .iter() + .zip(other_instances.iter()) + .find(|(o, _)| *o == &this_instance.service.name) + { + Some((_, other_instance)) => { + let other_instance = other_instance.lock().await; + this_instance.recreate_if_required(&other_instance).await; + } + None => { + // If the new container does not exist in other + // it will be recreated uppon startup + } + }; + }); + } + + // Wait for all recreation tasks to complete + while let Some(result) = join_set.join_next().await { + if let Err(e) = result { + log::error!("Failed to recreate instance: {}", e); + } + } + + // Cleanup: remove containers that exist in self but not in other + let removed_services = self + .inner + .service_names + .iter() + .filter(|s| !other.inner.service_names.contains(s)) + .cloned() + .collect(); + + self.remove_containers(removed_services).await; + } pub async fn from_config(config: ServiceMangerConfig) -> Result { // Get the delay from config (in seconds) let delay = Duration::from_secs(config.entrypoint_file.delay); @@ -97,11 +182,7 @@ impl ServicesManager { }; // Create cron watcher if cron schedule is specified - let cron_watcher = service_file - .dispenser - .cron - .as_ref() - .map(|schedule| CronWatcher::new(schedule)); + let cron_watcher = service_file.dispenser.cron.as_ref().map(CronWatcher::new); let service_name = service_file.service.name.clone(); @@ -141,41 +222,42 @@ impl ServicesManager { let cancel_rx = Mutex::new(cancel_rx); let inner = ServiceManagerInner { + service_names, instances, networks, delay, }; Ok(ServicesManager { - service_names, - inner: Mutex::new(inner), + inner, cancel_tx, cancel_rx, }) } pub async fn cancel(&self) { - let _ = self.cancel_tx.send(()); + let _ = self.cancel_tx.send(()).await; } pub async fn start_polling(&self) { log::info!("Starting polling task"); - let inner = self.inner.lock().await; - let delay = inner.delay; + let delay = self.inner.delay; - let polls = inner + let polls = self + .inner .instances .iter() - .map(|instance| { - let instance = Arc::clone(instance); - async move { - let mut last_image_poll = std::time::Instant::now(); - let mut init = true; - loop { - let poll_images = last_image_poll.elapsed() >= delay; - if poll_images { - last_image_poll = std::time::Instant::now(); - } + .cloned() + .map(|instance| async move { + let mut last_image_poll = std::time::Instant::now(); + let mut init = true; + loop { + let poll_images = last_image_poll.elapsed() >= delay; + if poll_images { + last_image_poll = std::time::Instant::now(); + } + // Scope to release the lock + { let poll_start = std::time::Instant::now(); let mut instance = instance.lock().await; instance.poll(poll_images, init).await; @@ -186,8 +268,8 @@ impl ServicesManager { poll_duration ); init = false; - tokio::time::sleep(Duration::from_secs(1)).await; } + tokio::time::sleep(Duration::from_secs(1)).await; } }) .collect::>(); @@ -206,9 +288,8 @@ impl ServicesManager { /// This should be called on shutdown to remove non-external networks pub async fn cleanup_networks(&self) { log::info!("Cleaning up networks"); - let inner = self.inner.lock().await; - for network in &inner.networks { + for network in &self.inner.networks { if let Err(e) = network.remove_network().await { log::warn!("Failed to remove network {}: {}", network.name, e); } @@ -216,17 +297,31 @@ impl ServicesManager { } pub async fn remove_containers(&self, names: Vec) { - let instances = self.inner.lock().await; - for instance in &instances.instances { - let instance = instance.lock().await; - if names.contains(&instance.service.name) { - let _ = instance.stop_container().await; - let _ = instance.remove_container().await; + let mut join_set = JoinSet::new(); + + for instance in &self.inner.instances { + let instance_clone = Arc::clone(instance); + let names_clone = names.clone(); + + join_set.spawn(async move { + let instance = instance_clone.lock().await; + if names_clone.contains(&instance.service.name) { + let _ = instance.stop_container().await; + let _ = instance.remove_container().await; + } + }); + } + + while let Some(result) = join_set.join_next().await { + if let Err(e) = result { + log::error!("Failed to remove container: {}", e); } } } + pub async fn shutdown(&self) { - self.remove_containers(self.service_names.clone()).await; + self.remove_containers(self.inner.service_names.clone()) + .await; self.cleanup_networks().await; } } diff --git a/src/service/manifest.rs b/src/service/manifest.rs index ef905cb..978a76a 100644 --- a/src/service/manifest.rs +++ b/src/service/manifest.rs @@ -1,5 +1,4 @@ -use std::sync::Arc; -use tokio::{process::Command, sync::Mutex}; +use tokio::process::Command; use thiserror::Error; @@ -64,10 +63,17 @@ pub struct Sha256 { pub inner: [u8; 64], } -#[derive(Clone)] +impl std::fmt::Debug for Sha256 { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let hash_str = std::str::from_utf8(&self.inner).unwrap_or(""); + write!(f, "Sha256(sha256:{})", hash_str) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] pub struct ImageWatcher { image: Box, - last_digest: Arc>>, + last_digest: Option, } #[derive(Debug, Copy, Clone)] @@ -80,19 +86,19 @@ pub enum ImageWatcherStatus { impl ImageWatcher { pub async fn initialize(image: &str) -> Self { log::info!("Initializing watch for {image}"); - let last_digest = Arc::new(Mutex::new(match get_latest_digest(image).await { + let last_digest = match get_latest_digest(image).await { Ok(digest) => Some(digest), Err(e) => { log::warn!("{e}"); None } - })); + }; let image = image.into(); ImageWatcher { image, last_digest } } - pub async fn update(&self) -> ImageWatcherStatus { - let last_digest = *self.last_digest.lock().await; + pub async fn update(&mut self) -> ImageWatcherStatus { + let last_digest = self.last_digest; let new_sha256 = get_latest_digest(&self.image).await; match new_sha256 { Err(e) => { @@ -101,8 +107,7 @@ impl ImageWatcher { } Ok(new_sha256) if last_digest == Some(new_sha256) => ImageWatcherStatus::NotUpdated, Ok(new_sha256) => { - let mut last_digest = self.last_digest.lock().await; - *last_digest = Some(new_sha256); + self.last_digest = Some(new_sha256); log::info!( "Found a new version for {}, update will start soon...", self.image, diff --git a/src/service/network.rs b/src/service/network.rs index 1bd03a2..34edb30 100644 --- a/src/service/network.rs +++ b/src/service/network.rs @@ -130,8 +130,7 @@ impl NetworkInstance { } else { let error_msg = String::from_utf8_lossy(&output.stderr); log::error!("Failed to create network {}: {}", self.name, error_msg); - Err(std::io::Error::new( - std::io::ErrorKind::Other, + Err(std::io::Error::other( format!("Failed to create network: {}", error_msg), )) } diff --git a/src/service/vars.rs b/src/service/vars.rs index 41ec4be..4dcb39b 100644 --- a/src/service/vars.rs +++ b/src/service/vars.rs @@ -177,7 +177,7 @@ pub fn render_template( env.set_undefined_behavior(minijinja::UndefinedBehavior::Strict); let template = env.template_from_str(template_str)?; - Ok(template.render(vars)?) + template.render(vars) } #[cfg(test)] diff --git a/src/signals.rs b/src/signals.rs index bf13ba4..e12f634 100644 --- a/src/signals.rs +++ b/src/signals.rs @@ -1,5 +1,5 @@ use crate::service::manager::ServicesManager; -use crate::service::{file::EntrypointFile, manager::ServiceMangerConfig}; +use crate::service::manager::ServiceMangerConfig; use signal_hook::{ consts::{SIGHUP, SIGINT}, iterator::Signals, @@ -8,17 +8,6 @@ use std::process::ExitCode; use std::sync::Arc; use tokio::sync::Mutex; -pub async fn remove_unused_services(old_manager: &ServicesManager, new_manager: &ServicesManager) { - let removed_services = old_manager - .service_names - .iter() - .filter(|s| !new_manager.service_names.contains(s)) - .cloned() - .collect(); - - old_manager.remove_containers(removed_services).await; -} - pub fn send_signal(signal: crate::cli::Signal) -> ExitCode { let pid_file = &crate::cli::get_cli_args().pid_file; @@ -123,7 +112,9 @@ pub async fn reload_manager( log::info!("Canceling old manager..."); old_manager.cancel().await; - remove_unused_services(&old_manager, &new_manager).await; + new_manager + .recreate_if_changed_and_cleanup(&old_manager) + .await; let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); log::info!("Reload complete");