Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl fmt::Debug for OrcaError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match &self.kind {
Kind::AgentCommunicationFailure { backtrace, .. }
| Kind::FailedToStartPod { backtrace, .. }
| Kind::IncompletePacket { backtrace, .. }
| Kind::InvalidFilepath { backtrace, .. }
| Kind::MissingInfo { backtrace, .. }
Expand Down
2 changes: 1 addition & 1 deletion src/core/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Operator for JoinOperator {
.filter_map(|(parent_stream, parent_packets)| {
(parent_stream != &stream_name).then_some(parent_packets.clone())
})
.chain(vec![vec![packet.clone()]].into_iter())
.chain(vec![vec![packet.clone()]])
.collect::<Vec<_>>();
drop(received_packets);

Expand Down
191 changes: 114 additions & 77 deletions src/core/orchestrator/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
use bollard::{
container::{Config, CreateContainerOptions, ListContainersOptions},
models::{ContainerStateStatusEnum, HostConfig},
secret::{ContainerInspectResponse, ContainerSummary},
};
use chrono::DateTime;
use futures_util::future::join_all;
Expand Down Expand Up @@ -165,12 +166,8 @@ impl LocalDockerOrchestrator {
))
}
#[expect(
clippy::cast_sign_loss,
clippy::string_slice,
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::indexing_slicing,
clippy::too_many_lines,
reason = r#"
- Timestamp and memory should always have a value > 0
- Container will always have a name with more than 1 character
Expand Down Expand Up @@ -210,79 +207,119 @@ impl LocalDockerOrchestrator {
.await
.into_iter()
.filter_map(|result: Result<_>| {
let (container_name, container_summary, container_spec) = result.ok()?;
let terminated_timestamp =
DateTime::parse_from_rfc3339(container_spec.state.as_ref()?.finished_at.as_ref()?)
.ok()?
.timestamp();
Some((
container_name,
PodRunInfo {
image: container_spec.config.as_ref()?.image.as_ref()?.clone(),
created: container_summary.created? as u64,
terminated: (terminated_timestamp > 0).then_some(terminated_timestamp as u64),
env_vars: container_spec
.config
.as_ref()?
.env
.as_ref()?
.iter()
.filter_map(|x| {
x.split_once('=')
.map(|(key, value)| (key.to_owned(), value.to_owned()))
})
.collect(),
command: [
container_spec.config.as_ref()?.entrypoint.as_ref()?.clone(),
container_spec.config.as_ref()?.cmd.as_ref()?.clone(),
]
.concat(),
status: match (
container_spec.state.as_ref()?.status.as_ref()?,
container_spec.state.as_ref()?.exit_code? as i16,
) {
(ContainerStateStatusEnum::RUNNING, _) => PodStatus::Running,
(
ContainerStateStatusEnum::EXITED
| ContainerStateStatusEnum::REMOVING
| ContainerStateStatusEnum::DEAD,
0,
) => PodStatus::Completed,
(
ContainerStateStatusEnum::EXITED
| ContainerStateStatusEnum::REMOVING
| ContainerStateStatusEnum::DEAD,
code,
) => PodStatus::Failed(code),
(_, code) => {
todo!(
"Unhandled container state: {}, exit code: {code}.",
container_spec.state.as_ref()?.status.as_ref()?
)
}
},
mounts: container_spec
.mounts
.as_ref()?
.iter()
.map(|mount_point| {
Some(format!(
"{}:{}{}",
mount_point.source.as_ref()?,
mount_point.destination.as_ref()?,
mount_point
.mode
.as_ref()
.map_or_else(String::new, |mode| format!(":{mode}"))
))
})
.collect::<Option<_>>()?,
labels: container_spec.config.as_ref()?.labels.as_ref()?.clone(),
cpu_limit: container_spec.host_config.as_ref()?.nano_cpus? as f32
/ 10_f32.powi(9), // ncpu, ucores=3, mcores=6, cores=9
memory_limit: container_spec.host_config.as_ref()?.memory? as u64,
},
))
let (container_name, container_summary, container_inspect_response) = result.ok()?;

Self::extract_run_info(&container_summary, &container_inspect_response)
.map(|run_info| (container_name.clone(), run_info))
}))
}

#[expect(
clippy::cast_sign_loss,
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
reason = r#"
- Timestamp and memory should always have a value > 0
- Container will always have a name with more than 1 character
- No issue in core casting if between 0 - 3.40e38(f32:MAX)
- No issue in exit code casting if between -3.27e4(i16:MIN) - 3.27e4(i16:MAX)
- Containers will always have at least 1 name with at least 2 characters
- This functions requires a lot of boilerplate code to extract the run info
"#
)]
fn extract_run_info(
container_summary: &ContainerSummary,
container_inspect_response: &ContainerInspectResponse,
) -> Option<PodRunInfo> {
let terminated_timestamp = DateTime::parse_from_rfc3339(
container_inspect_response
.state
.as_ref()?
.finished_at
.as_ref()?,
)
.ok()?
.timestamp() as u64;
Some(PodRunInfo {
image: container_inspect_response
.config
.as_ref()?
.image
.as_ref()?
.clone(),
created: container_summary.created? as u64,
terminated: (terminated_timestamp > 0).then_some(terminated_timestamp),
env_vars: container_inspect_response
.config
.as_ref()?
.env
.as_ref()?
.iter()
.filter_map(|x| {
x.split_once('=')
.map(|(key, value)| (key.to_owned(), value.to_owned()))
})
.collect(),
command: [
container_inspect_response
.config
.as_ref()?
.entrypoint
.as_ref()?
.clone(),
container_inspect_response
.config
.as_ref()?
.cmd
.as_ref()?
.clone(),
]
.concat(),
status: match (
container_inspect_response.state.as_ref()?.status?,
container_inspect_response.state.as_ref()?.exit_code? as i16,
) {
(ContainerStateStatusEnum::RUNNING | ContainerStateStatusEnum::RESTARTING, _) => {
PodStatus::Running
}
(ContainerStateStatusEnum::EXITED, 0) => PodStatus::Completed,
(ContainerStateStatusEnum::EXITED | ContainerStateStatusEnum::DEAD, code) => {
PodStatus::Failed(code)
}
(ContainerStateStatusEnum::CREATED, code) => {
if container_inspect_response.state.as_ref()?.error.is_some() {
PodStatus::Failed(code)
} else {
PodStatus::Running
}
}
_ => PodStatus::Undefined,
},
mounts: container_inspect_response
.mounts
.as_ref()?
.iter()
.map(|mount_point| {
Some(format!(
"{}:{}{}",
mount_point.source.as_ref()?,
mount_point.destination.as_ref()?,
mount_point
.mode
.as_ref()
.map_or_else(String::new, |mode| format!(":{mode}"))
))
})
.collect::<Option<Vec<_>>>()?,
labels: container_inspect_response
.config
.as_ref()?
.labels
.as_ref()?
.clone(),
cpu_limit: container_inspect_response.host_config.as_ref()?.nano_cpus? as f32
/ 10_f32.powi(9), // ncpu, ucores=3, mcores=6, cores=9
memory_limit: container_inspect_response.host_config.as_ref()?.memory? as u64,
})
}
}
20 changes: 20 additions & 0 deletions src/uniffi/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ pub(crate) enum Kind {
missing_keys: Vec<String>,
backtrace: Option<Backtrace>,
},
#[snafu(display(
"Fail to start pod with container_name: {container_name} with error: {reason}"
))]
FailedToStartPod {
container_name: String,
reason: String,
backtrace: Option<Backtrace>,
},
#[snafu(display("{source} ({path:?})."))]
InvalidFilepath {
path: PathBuf,
Expand Down Expand Up @@ -111,4 +119,16 @@ impl OrcaError {
pub fn is_purged_pod_run(&self) -> bool {
matches!(&self.kind, Kind::MissingInfo { details, .. } if details.contains("pod run"))
}
/// Returns `true` if the error was caused by an invalid file or directory path.
pub const fn is_failed_to_start_pod(&self) -> bool {
matches!(self.kind, Kind::FailedToStartPod { .. })
}
/// Returns container name if the
pub fn get_container_name(&self) -> Option<String> {
if let Kind::FailedToStartPod { container_name, .. } = &self.kind {
Some(container_name.clone())
} else {
None
}
}
}
5 changes: 3 additions & 2 deletions src/uniffi/orchestrator/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ impl Agent {
async |agent, inner_namespace_lookup, _, pod_job| {
let pod_run = agent
.orchestrator
.start(&inner_namespace_lookup, &pod_job)
.start(&pod_job, &inner_namespace_lookup)
.await?;
let pod_result = agent
.orchestrator
.get_result(&inner_namespace_lookup, &pod_run)
.get_result(&pod_run, &inner_namespace_lookup)
.await?;
agent.orchestrator.delete(&pod_run).await?;
Ok(pod_result)
Expand All @@ -190,6 +190,7 @@ impl Agent {
PodStatus::Completed => "success",
PodStatus::Running
| PodStatus::Failed(_)
| PodStatus::Undefined
| PodStatus::Unset => "failure",
}
.to_owned(),
Expand Down
Loading
Loading