Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions project/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,94 @@ pub struct ContainerSpec {
pub args: Vec<String>,

pub resources: Option<ContainerRes>,

#[serde(rename = "livenessProbe", default)]
pub liveness_probe: Option<Probe>,

#[serde(rename = "readinessProbe", default)]
pub readiness_probe: Option<Probe>,

#[serde(rename = "startupProbe", default)]
pub startup_probe: Option<Probe>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub enum ProbeAction {
Exec(ExecAction),
HttpGet(HttpGetAction),
TcpSocket(TcpSocketAction),
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
pub struct Probe {
pub action: Option<ProbeAction>,

#[serde(rename = "initialDelaySeconds", default)]
pub initial_delay_seconds: Option<u32>,

#[serde(rename = "periodSeconds", default)]
pub period_seconds: Option<u32>,

#[serde(rename = "timeoutSeconds", default)]
pub timeout_seconds: Option<u32>,

#[serde(rename = "successThreshold", default)]
pub success_threshold: Option<u32>,

#[serde(rename = "failureThreshold", default)]
pub failure_threshold: Option<u32>,
}

impl Probe {
/// Validates that the probe has exactly one action specified
pub fn validate(&self) -> Result<(), String> {
match &self.action {
Some(_) => Ok(()),
None => Err(
"probe must specify exactly one action (exec, httpGet, or tcpSocket)".to_string(),
),
}
}
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
pub struct ExecAction {
#[serde(default)]
pub command: Vec<String>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct HttpGetAction {
#[serde(default = "default_http_path")]
pub path: String,

pub port: u16,

#[serde(default)]
pub host: Option<String>,
}

fn default_http_path() -> String {
"/".to_string()
}

impl Default for HttpGetAction {
fn default() -> Self {
Self {
path: default_http_path(),
port: 0,
host: None,
}
}
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
pub struct TcpSocketAction {
pub port: u16,

#[serde(default)]
pub host: Option<String>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -102,6 +190,46 @@ pub struct PodTask {
pub struct PodStatus {
#[serde(rename = "podIP")]
pub pod_ip: Option<String>,

#[serde(rename = "containerStatuses", default)]
pub container_statuses: Vec<ContainerStatus>,
}

#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
pub struct ContainerStatus {
pub name: String,

#[serde(rename = "readinessProbe", default)]
pub readiness_probe: Option<ContainerProbeStatus>,

#[serde(rename = "livenessProbe", default)]
pub liveness_probe: Option<ContainerProbeStatus>,

#[serde(rename = "startupProbe", default)]
pub startup_probe: Option<ContainerProbeStatus>,
}

#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
#[serde(rename_all = "PascalCase")]
pub enum ProbeCondition {
#[default]
Pending,
Ready,
Failing,
}

#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
pub struct ContainerProbeStatus {
pub state: ProbeCondition,

#[serde(rename = "consecutiveSuccesses", default)]
pub consecutive_successes: u32,

#[serde(rename = "consecutiveFailures", default)]
pub consecutive_failures: u32,

#[serde(rename = "lastError", default)]
pub last_error: Option<String>,
}

#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
Expand Down
3 changes: 3 additions & 0 deletions project/rkl/src/commands/compose/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ impl ComposeManager {
ports: container_ports,
args: srv.command.clone(),
resources: None,
liveness_probe: None,
readiness_probe: None,
startup_probe: None,
};

// handle the services volume name
Expand Down
9 changes: 9 additions & 0 deletions project/rkl/src/commands/container/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ impl ContainerRunner {
ports: vec![],
args: vec![],
resources: None,
liveness_probe: None,
readiness_probe: None,
startup_probe: None,
},
config: None,
container_id: container_id.to_string(),
Expand Down Expand Up @@ -756,6 +759,9 @@ mod test {
ports: vec![],
args: vec!["/bin/echo".to_string(), "hi".to_string()],
resources: None,
liveness_probe: None,
readiness_probe: None,
startup_probe: None,
};
let runner = ContainerRunner::from_spec(spec.clone(), None).unwrap();
assert_eq!(runner.container_id, "demo1");
Expand All @@ -779,6 +785,9 @@ mod test {
ports: vec![],
args: vec![],
resources: None,
liveness_probe: None,
readiness_probe: None,
startup_probe: None,
},
None,
)
Expand Down
27 changes: 22 additions & 5 deletions project/rkl/src/commands/pod/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,21 @@ use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::info;

use common::PodTask;

pub mod cluster;
pub mod standalone;

#[derive(Debug, Clone)]
pub struct PodRunResult {
#[allow(unused)]
pub pod_sandbox_id: String,
pub pod_ip: String,
#[allow(unused)]
pub container_names: Vec<String>,
Comment on lines +24 to +28
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pod_sandbox_id and container_names fields are marked with #[allow(unused)]. Since this is a public API, consider removing these fields if they are truly unused, or remove the attribute if they will be used in the future. Keeping unused public fields in a struct makes the API unclear.

Suggested change
#[allow(unused)]
pub pod_sandbox_id: String,
pub pod_ip: String,
#[allow(unused)]
pub container_names: Vec<String>,
pub pod_ip: String,

Copilot uses AI. Check for mistakes.
pub pod_task: PodTask,
}

#[derive(Subcommand)]
pub enum PodCommand {
#[command(about = "Run a pod from a YAML file using rkl run pod.yaml")]
Expand Down Expand Up @@ -144,7 +156,7 @@ impl PodInfo {
}
}

pub fn run_pod_from_taskrunner(mut task_runner: TaskRunner) -> Result<String, anyhow::Error> {
pub fn run_pod_from_taskrunner(mut task_runner: TaskRunner) -> Result<PodRunResult, anyhow::Error> {
let pod_name = task_runner.task.metadata.name.clone();
let (pod_sandbox_id, podip) = task_runner.run()?;
info!("PodSandbox ID: {}", pod_sandbox_id);
Expand All @@ -159,18 +171,23 @@ pub fn run_pod_from_taskrunner(mut task_runner: TaskRunner) -> Result<String, an

let root_path = rootpath::determine(None)?;
let pod_info = PodInfo {
pod_sandbox_id,
container_names,
pod_sandbox_id: pod_sandbox_id.clone(),
container_names: container_names.clone(),
};
pod_info.save(&root_path, &pod_name)?;

info!("Pod {} created and started successfully", pod_name);
Ok(podip)
Ok(PodRunResult {
pod_sandbox_id,
pod_ip: podip,
container_names,
pod_task: task_runner.task.clone(),
})
}

pub fn run_pod(pod_yaml: &str) -> Result<String, anyhow::Error> {
let task_runner = TaskRunner::from_file(pod_yaml)?;
run_pod_from_taskrunner(task_runner)
run_pod_from_taskrunner(task_runner).map(|res| res.pod_ip)
}

#[allow(dead_code)]
Expand Down
37 changes: 37 additions & 0 deletions project/rkl/src/commands/pod/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use anyhow::{Result, anyhow};
use liboci_cli::{Delete, Start, State};
use tracing::{error, info};

use crate::daemon::probe::collect_container_statuses;

pub fn delete_pod(pod_name: &str) -> Result<(), anyhow::Error> {
let root_path = rootpath::determine(None)?;
let pod_info = PodInfo::load(&root_path, pod_name)?;
Expand Down Expand Up @@ -159,6 +161,41 @@ pub fn state_pod(pod_name: &str) -> Result<(), anyhow::Error> {
);
}

let probe_statuses = collect_container_statuses(pod_name);
if !probe_statuses.is_empty() {
println!("Probe status:");
for status in probe_statuses {
println!(" container: {}", status.name);
if let Some(probe) = status.readiness_probe {
println!(
" readiness: {:?} (successes: {}, failures: {}, last_error: {})",
probe.state,
probe.consecutive_successes,
probe.consecutive_failures,
probe.last_error.unwrap_or_else(|| "<none>".to_string())
);
}
if let Some(probe) = status.liveness_probe {
println!(
" liveness: {:?} (successes: {}, failures: {}, last_error: {})",
probe.state,
probe.consecutive_successes,
probe.consecutive_failures,
probe.last_error.unwrap_or_else(|| "<none>".to_string())
);
}
if let Some(probe) = status.startup_probe {
println!(
" startup: {:?} (successes: {}, failures: {}, last_error: {})",
probe.state,
probe.consecutive_successes,
probe.consecutive_failures,
probe.last_error.unwrap_or_else(|| "<none>".to_string())
);
}
}
}

Ok(())
}

Expand Down
42 changes: 40 additions & 2 deletions project/rkl/src/daemon/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{env, fs, net::SocketAddr, path::Path, sync::Arc, time::Duration};
use tokio::time;

use crate::commands::pod;
use crate::daemon::probe::{build_probe_registrations, deregister_pod_probes, register_pod_probes};
use crate::network::receiver::{NetworkConfigMessage, NetworkReceiver};
use crate::task::TaskRunner;
use chrono::Utc;
Expand All @@ -22,6 +23,7 @@ use rustls::crypto::CryptoProvider;
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
use rustls::{ClientConfig as RustlsClientConfig, RootCertStore, SignatureScheme};
use std::collections::HashMap;
use tracing::info;

use sysinfo::{Disks, System};

Expand Down Expand Up @@ -333,10 +335,37 @@ pub async fn run_once(
};

match pod::run_pod_from_taskrunner(runner) {
Ok(podip) => {
Ok(result) => {
let pod_name = result.pod_task.metadata.name.clone();

match build_probe_registrations(
&result.pod_task,
&result.pod_ip,
) {
Ok(registrations) => {
if let Err(err) =
register_pod_probes(&pod_name, registrations)
{
eprintln!(
"[worker] registering probes for pod {} failed: {err:?}",
pod_name
);
Comment on lines +349 to +352
Copy link

Copilot AI Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using eprintln! for error logging is inconsistent with the tracing infrastructure used elsewhere in the codebase (imported on line 8). Use tracing::error! or tracing::warn! instead for consistent structured logging and observability.

Copilot uses AI. Check for mistakes.
}
}
Err(err) => {
eprintln!(
"[worker] building probe registrations for pod {} failed: {err:?}",
pod_name
);
Comment on lines +356 to +359
Copy link

Copilot AI Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using eprintln! for error logging is inconsistent with the tracing infrastructure used elsewhere in the codebase. Use tracing::error! or tracing::warn! instead for consistent structured logging and observability.

Copilot uses AI. Check for mistakes.
}
}

let _ = send_uni(
&connection,
&RksMessage::SetPodip((pod.metadata.name.clone(), podip)),
&RksMessage::SetPodip((
pod.metadata.name.clone(),
result.pod_ip.clone(),
)),
)
.await;
}
Expand All @@ -357,6 +386,15 @@ pub async fn run_once(
println!("[worker] DeletePod {name}");
match pod::standalone::delete_pod(&name) {
Ok(_) => {
// Ensure probe deregistration completes before sending the Ack.
// Previously this was spawned as a detached task which could
// panic or fail silently. Awaiting here surfaces errors and
// ensures cleanup has finished when the controller receives
// the acknowledgement.
info!(pod = %name, "deregistering probes for pod");
deregister_pod_probes(&name).await;
info!(pod = %name, "probes deregistered for pod");

let _ = send_uni(&connection, &RksMessage::Ack).await;
}
Err(e) => {
Expand Down
1 change: 1 addition & 0 deletions project/rkl/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use sync_loop::SyncLoop;
pub mod static_pods;
//mod status_access;
pub mod client;
pub mod probe;
use client::init_crypto;

#[tokio::main]
Expand Down
Loading