Skip to content

Commit 5ade7e4

Browse files
committed
feat(rkl): Add the container probe functionality.
Signed-off-by: Luxian <[email protected]>
1 parent 6eac0e4 commit 5ade7e4

File tree

9 files changed

+1147
-27
lines changed

9 files changed

+1147
-27
lines changed

project/common/src/lib.rs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,94 @@ pub struct ContainerSpec {
6868
pub args: Vec<String>,
6969

7070
pub resources: Option<ContainerRes>,
71+
72+
#[serde(rename = "livenessProbe", default)]
73+
pub liveness_probe: Option<Probe>,
74+
75+
#[serde(rename = "readinessProbe", default)]
76+
pub readiness_probe: Option<Probe>,
77+
78+
#[serde(rename = "startupProbe", default)]
79+
pub startup_probe: Option<Probe>,
80+
}
81+
82+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
83+
#[serde(rename_all = "camelCase")]
84+
pub enum ProbeAction {
85+
Exec(ExecAction),
86+
HttpGet(HttpGetAction),
87+
TcpSocket(TcpSocketAction),
88+
}
89+
90+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
91+
pub struct Probe {
92+
pub action: Option<ProbeAction>,
93+
94+
#[serde(rename = "initialDelaySeconds", default)]
95+
pub initial_delay_seconds: Option<u32>,
96+
97+
#[serde(rename = "periodSeconds", default)]
98+
pub period_seconds: Option<u32>,
99+
100+
#[serde(rename = "timeoutSeconds", default)]
101+
pub timeout_seconds: Option<u32>,
102+
103+
#[serde(rename = "successThreshold", default)]
104+
pub success_threshold: Option<u32>,
105+
106+
#[serde(rename = "failureThreshold", default)]
107+
pub failure_threshold: Option<u32>,
108+
}
109+
110+
impl Probe {
111+
/// Validates that the probe has exactly one action specified
112+
pub fn validate(&self) -> Result<(), String> {
113+
match &self.action {
114+
Some(_) => Ok(()),
115+
None => Err(
116+
"probe must specify exactly one action (exec, httpGet, or tcpSocket)".to_string(),
117+
),
118+
}
119+
}
120+
}
121+
122+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
123+
pub struct ExecAction {
124+
#[serde(default)]
125+
pub command: Vec<String>,
126+
}
127+
128+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
129+
pub struct HttpGetAction {
130+
#[serde(default = "default_http_path")]
131+
pub path: String,
132+
133+
pub port: u16,
134+
135+
#[serde(default)]
136+
pub host: Option<String>,
137+
}
138+
139+
fn default_http_path() -> String {
140+
"/".to_string()
141+
}
142+
143+
impl Default for HttpGetAction {
144+
fn default() -> Self {
145+
Self {
146+
path: default_http_path(),
147+
port: 0,
148+
host: None,
149+
}
150+
}
151+
}
152+
153+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
154+
pub struct TcpSocketAction {
155+
pub port: u16,
156+
157+
#[serde(default)]
158+
pub host: Option<String>,
71159
}
72160

73161
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
@@ -102,6 +190,46 @@ pub struct PodTask {
102190
pub struct PodStatus {
103191
#[serde(rename = "podIP")]
104192
pub pod_ip: Option<String>,
193+
194+
#[serde(rename = "containerStatuses", default)]
195+
pub container_statuses: Vec<ContainerStatus>,
196+
}
197+
198+
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
199+
pub struct ContainerStatus {
200+
pub name: String,
201+
202+
#[serde(rename = "readinessProbe", default)]
203+
pub readiness_probe: Option<ContainerProbeStatus>,
204+
205+
#[serde(rename = "livenessProbe", default)]
206+
pub liveness_probe: Option<ContainerProbeStatus>,
207+
208+
#[serde(rename = "startupProbe", default)]
209+
pub startup_probe: Option<ContainerProbeStatus>,
210+
}
211+
212+
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
213+
#[serde(rename_all = "PascalCase")]
214+
pub enum ProbeCondition {
215+
#[default]
216+
Pending,
217+
Ready,
218+
Failing,
219+
}
220+
221+
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
222+
pub struct ContainerProbeStatus {
223+
pub state: ProbeCondition,
224+
225+
#[serde(rename = "consecutiveSuccesses", default)]
226+
pub consecutive_successes: u32,
227+
228+
#[serde(rename = "consecutiveFailures", default)]
229+
pub consecutive_failures: u32,
230+
231+
#[serde(rename = "lastError", default)]
232+
pub last_error: Option<String>,
105233
}
106234

107235
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]

project/rkl/src/commands/compose/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,9 @@ impl ComposeManager {
223223
ports: container_ports,
224224
args: srv.command.clone(),
225225
resources: None,
226+
liveness_probe: None,
227+
readiness_probe: None,
228+
startup_probe: None,
226229
};
227230

228231
// handle the services volume name

project/rkl/src/commands/container/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ impl ContainerRunner {
166166
ports: vec![],
167167
args: vec![],
168168
resources: None,
169+
liveness_probe: None,
170+
readiness_probe: None,
171+
startup_probe: None,
169172
},
170173
config: None,
171174
container_id: container_id.to_string(),
@@ -756,6 +759,9 @@ mod test {
756759
ports: vec![],
757760
args: vec!["/bin/echo".to_string(), "hi".to_string()],
758761
resources: None,
762+
liveness_probe: None,
763+
readiness_probe: None,
764+
startup_probe: None,
759765
};
760766
let runner = ContainerRunner::from_spec(spec.clone(), None).unwrap();
761767
assert_eq!(runner.container_id, "demo1");
@@ -779,6 +785,9 @@ mod test {
779785
ports: vec![],
780786
args: vec![],
781787
resources: None,
788+
liveness_probe: None,
789+
readiness_probe: None,
790+
startup_probe: None,
782791
},
783792
None,
784793
)

project/rkl/src/commands/pod/mod.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,20 @@ use std::path::{Path, PathBuf};
1414
use std::time::{SystemTime, UNIX_EPOCH};
1515
use tracing::info;
1616

17+
use common::PodTask;
18+
1719
pub mod cluster;
1820
pub mod standalone;
1921

22+
#[allow(dead_code)]
23+
#[derive(Debug, Clone)]
24+
pub struct PodRunResult {
25+
pub pod_sandbox_id: String,
26+
pub pod_ip: String,
27+
pub container_names: Vec<String>,
28+
pub pod_task: PodTask,
29+
}
30+
2031
#[derive(Subcommand)]
2132
pub enum PodCommand {
2233
#[command(about = "Run a pod from a YAML file using rkl run pod.yaml")]
@@ -144,7 +155,7 @@ impl PodInfo {
144155
}
145156
}
146157

147-
pub fn run_pod_from_taskrunner(mut task_runner: TaskRunner) -> Result<String, anyhow::Error> {
158+
pub fn run_pod_from_taskrunner(mut task_runner: TaskRunner) -> Result<PodRunResult, anyhow::Error> {
148159
let pod_name = task_runner.task.metadata.name.clone();
149160
let (pod_sandbox_id, podip) = task_runner.run()?;
150161
info!("PodSandbox ID: {}", pod_sandbox_id);
@@ -159,18 +170,23 @@ pub fn run_pod_from_taskrunner(mut task_runner: TaskRunner) -> Result<String, an
159170

160171
let root_path = rootpath::determine(None)?;
161172
let pod_info = PodInfo {
162-
pod_sandbox_id,
163-
container_names,
173+
pod_sandbox_id: pod_sandbox_id.clone(),
174+
container_names: container_names.clone(),
164175
};
165176
pod_info.save(&root_path, &pod_name)?;
166177

167178
info!("Pod {} created and started successfully", pod_name);
168-
Ok(podip)
179+
Ok(PodRunResult {
180+
pod_sandbox_id,
181+
pod_ip: podip,
182+
container_names,
183+
pod_task: task_runner.task.clone(),
184+
})
169185
}
170186

171187
pub fn run_pod(pod_yaml: &str) -> Result<String, anyhow::Error> {
172188
let task_runner = TaskRunner::from_file(pod_yaml)?;
173-
run_pod_from_taskrunner(task_runner)
189+
run_pod_from_taskrunner(task_runner).map(|res| res.pod_ip)
174190
}
175191

176192
#[allow(dead_code)]

project/rkl/src/commands/pod/standalone.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use anyhow::{Result, anyhow};
77
use liboci_cli::{Delete, Start, State};
88
use tracing::{error, info};
99

10+
use crate::daemon::probe::collect_container_statuses;
11+
1012
pub fn delete_pod(pod_name: &str) -> Result<(), anyhow::Error> {
1113
let root_path = rootpath::determine(None)?;
1214
let pod_info = PodInfo::load(&root_path, pod_name)?;
@@ -159,6 +161,41 @@ pub fn state_pod(pod_name: &str) -> Result<(), anyhow::Error> {
159161
);
160162
}
161163

164+
let probe_statuses = collect_container_statuses(pod_name);
165+
if !probe_statuses.is_empty() {
166+
println!("Probe status:");
167+
for status in probe_statuses {
168+
println!(" container: {}", status.name);
169+
if let Some(probe) = status.readiness_probe {
170+
println!(
171+
" readiness: {:?} (successes: {}, failures: {}, last_error: {})",
172+
probe.state,
173+
probe.consecutive_successes,
174+
probe.consecutive_failures,
175+
probe.last_error.unwrap_or_else(|| "<none>".to_string())
176+
);
177+
}
178+
if let Some(probe) = status.liveness_probe {
179+
println!(
180+
" liveness: {:?} (successes: {}, failures: {}, last_error: {})",
181+
probe.state,
182+
probe.consecutive_successes,
183+
probe.consecutive_failures,
184+
probe.last_error.unwrap_or_else(|| "<none>".to_string())
185+
);
186+
}
187+
if let Some(probe) = status.startup_probe {
188+
println!(
189+
" startup: {:?} (successes: {}, failures: {}, last_error: {})",
190+
probe.state,
191+
probe.consecutive_successes,
192+
probe.consecutive_failures,
193+
probe.last_error.unwrap_or_else(|| "<none>".to_string())
194+
);
195+
}
196+
}
197+
}
198+
162199
Ok(())
163200
}
164201

project/rkl/src/daemon/client.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{env, fs, net::SocketAddr, path::Path, sync::Arc, time::Duration};
66
use tokio::time;
77

88
use crate::commands::pod;
9+
use crate::daemon::probe::{build_probe_registrations, deregister_pod_probes, register_pod_probes};
910
use crate::network::receiver::{NetworkConfigMessage, NetworkReceiver};
1011
use crate::task::TaskRunner;
1112
use chrono::Utc;
@@ -333,10 +334,37 @@ pub async fn run_once(
333334
};
334335

335336
match pod::run_pod_from_taskrunner(runner) {
336-
Ok(podip) => {
337+
Ok(result) => {
338+
let pod_name = result.pod_task.metadata.name.clone();
339+
340+
match build_probe_registrations(
341+
&result.pod_task,
342+
&result.pod_ip,
343+
) {
344+
Ok(registrations) => {
345+
if let Err(err) =
346+
register_pod_probes(&pod_name, registrations)
347+
{
348+
eprintln!(
349+
"[worker] registering probes for pod {} failed: {err:?}",
350+
pod_name
351+
);
352+
}
353+
}
354+
Err(err) => {
355+
eprintln!(
356+
"[worker] building probe registrations for pod {} failed: {err:?}",
357+
pod_name
358+
);
359+
}
360+
}
361+
337362
let _ = send_uni(
338363
&connection,
339-
&RksMessage::SetPodip((pod.metadata.name.clone(), podip)),
364+
&RksMessage::SetPodip((
365+
pod.metadata.name.clone(),
366+
result.pod_ip.clone(),
367+
)),
340368
)
341369
.await;
342370
}
@@ -357,6 +385,12 @@ pub async fn run_once(
357385
println!("[worker] DeletePod {name}");
358386
match pod::standalone::delete_pod(&name) {
359387
Ok(_) => {
388+
tokio::spawn({
389+
let pod_name = name.clone();
390+
async move {
391+
deregister_pod_probes(&pod_name).await;
392+
}
393+
});
360394
let _ = send_uni(&connection, &RksMessage::Ack).await;
361395
}
362396
Err(e) => {

project/rkl/src/daemon/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use sync_loop::SyncLoop;
33
pub mod static_pods;
44
//mod status_access;
55
pub mod client;
6+
pub mod probe;
67
use client::init_crypto;
78

89
#[tokio::main]

0 commit comments

Comments
 (0)