Skip to content

Commit e9326ee

Browse files
committed
Merge branch 'test_split' into pod_model_hash
2 parents f3115da + e185b65 commit e9326ee

File tree

17 files changed

+493
-197
lines changed

17 files changed

+493
-197
lines changed

.clippy.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
excessive-nesting-threshold = 5
1+
excessive-nesting-threshold = 6
22
too-many-arguments-threshold = 10
33
allowed-idents-below-min-chars = ["..", "k", "v", "f", "re", "id", "Ok", "'_"]

src/core/crypto.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,9 @@ pub fn hash_buffer(buffer: impl AsRef<[u8]>) -> String {
5050
///
5151
/// Will return error if unable to access file.
5252
pub fn hash_file(filepath: impl AsRef<Path>) -> Result<String> {
53-
hash_stream(
54-
&mut File::open(&filepath).context(selector::InvalidFilepath {
55-
path: filepath.as_ref(),
56-
})?,
57-
)
53+
hash_stream(&mut File::open(&filepath).context(selector::InvalidPath {
54+
path: filepath.as_ref(),
55+
})?)
5856
}
5957
/// Evaluate checksum hash of a directory.
6058
///
@@ -64,7 +62,10 @@ pub fn hash_file(filepath: impl AsRef<Path>) -> Result<String> {
6462
pub fn hash_dir(dirpath: impl AsRef<Path>) -> Result<String> {
6563
let summary: BTreeMap<String, String> = dirpath
6664
.as_ref()
67-
.read_dir()?
65+
.read_dir()
66+
.context(selector::InvalidPath {
67+
path: dirpath.as_ref(),
68+
})?
6869
.map(|path| {
6970
let access_path = path?.path();
7071
Ok((

src/core/error.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,10 @@ impl fmt::Debug for OrcaError {
122122
match &self.kind {
123123
Kind::AgentCommunicationFailure { backtrace, .. }
124124
| Kind::EmptyDir { backtrace, .. }
125+
| Kind::FailedToStartPod { backtrace, .. }
126+
| Kind::FailedToExtractRunInfo { backtrace, .. }
125127
| Kind::IncompletePacket { backtrace, .. }
126-
| Kind::InvalidFilepath { backtrace, .. }
128+
| Kind::InvalidPath { backtrace, .. }
127129
| Kind::InvalidIndex { backtrace, .. }
128130
| Kind::KeyMissing { backtrace, .. }
129131
| Kind::MissingInfo { backtrace, .. }

src/core/orchestrator/docker.rs

Lines changed: 114 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
use bollard::{
1010
container::{Config, CreateContainerOptions, ListContainersOptions},
1111
models::{ContainerStateStatusEnum, HostConfig},
12+
secret::{ContainerInspectResponse, ContainerSummary},
1213
};
1314
use chrono::DateTime;
1415
use futures_util::future::join_all;
@@ -165,12 +166,8 @@ impl LocalDockerOrchestrator {
165166
))
166167
}
167168
#[expect(
168-
clippy::cast_sign_loss,
169169
clippy::string_slice,
170-
clippy::cast_precision_loss,
171-
clippy::cast_possible_truncation,
172170
clippy::indexing_slicing,
173-
clippy::too_many_lines,
174171
reason = r#"
175172
- Timestamp and memory should always have a value > 0
176173
- Container will always have a name with more than 1 character
@@ -210,79 +207,119 @@ impl LocalDockerOrchestrator {
210207
.await
211208
.into_iter()
212209
.filter_map(|result: Result<_>| {
213-
let (container_name, container_summary, container_spec) = result.ok()?;
214-
let terminated_timestamp =
215-
DateTime::parse_from_rfc3339(container_spec.state.as_ref()?.finished_at.as_ref()?)
216-
.ok()?
217-
.timestamp();
218-
Some((
219-
container_name,
220-
PodRunInfo {
221-
image: container_spec.config.as_ref()?.image.as_ref()?.clone(),
222-
created: container_summary.created? as u64,
223-
terminated: (terminated_timestamp > 0).then_some(terminated_timestamp as u64),
224-
env_vars: container_spec
225-
.config
226-
.as_ref()?
227-
.env
228-
.as_ref()?
229-
.iter()
230-
.filter_map(|x| {
231-
x.split_once('=')
232-
.map(|(key, value)| (key.to_owned(), value.to_owned()))
233-
})
234-
.collect(),
235-
command: [
236-
container_spec.config.as_ref()?.entrypoint.as_ref()?.clone(),
237-
container_spec.config.as_ref()?.cmd.as_ref()?.clone(),
238-
]
239-
.concat(),
240-
status: match (
241-
container_spec.state.as_ref()?.status.as_ref()?,
242-
container_spec.state.as_ref()?.exit_code? as i16,
243-
) {
244-
(ContainerStateStatusEnum::RUNNING, _) => PodStatus::Running,
245-
(
246-
ContainerStateStatusEnum::EXITED
247-
| ContainerStateStatusEnum::REMOVING
248-
| ContainerStateStatusEnum::DEAD,
249-
0,
250-
) => PodStatus::Completed,
251-
(
252-
ContainerStateStatusEnum::EXITED
253-
| ContainerStateStatusEnum::REMOVING
254-
| ContainerStateStatusEnum::DEAD,
255-
code,
256-
) => PodStatus::Failed(code),
257-
(_, code) => {
258-
todo!(
259-
"Unhandled container state: {}, exit code: {code}.",
260-
container_spec.state.as_ref()?.status.as_ref()?
261-
)
262-
}
263-
},
264-
mounts: container_spec
265-
.mounts
266-
.as_ref()?
267-
.iter()
268-
.map(|mount_point| {
269-
Some(format!(
270-
"{}:{}{}",
271-
mount_point.source.as_ref()?,
272-
mount_point.destination.as_ref()?,
273-
mount_point
274-
.mode
275-
.as_ref()
276-
.map_or_else(String::new, |mode| format!(":{mode}"))
277-
))
278-
})
279-
.collect::<Option<_>>()?,
280-
labels: container_spec.config.as_ref()?.labels.as_ref()?.clone(),
281-
cpu_limit: container_spec.host_config.as_ref()?.nano_cpus? as f32
282-
/ 10_f32.powi(9), // ncpu, ucores=3, mcores=6, cores=9
283-
memory_limit: container_spec.host_config.as_ref()?.memory? as u64,
284-
},
285-
))
210+
let (container_name, container_summary, container_inspect_response) = result.ok()?;
211+
212+
Self::extract_run_info(&container_summary, &container_inspect_response)
213+
.map(|run_info| (container_name.clone(), run_info))
286214
}))
287215
}
216+
217+
#[expect(
218+
clippy::cast_sign_loss,
219+
clippy::cast_precision_loss,
220+
clippy::cast_possible_truncation,
221+
reason = r#"
222+
- Timestamp and memory should always have a value > 0
223+
- Container will always have a name with more than 1 character
224+
- No issue in core casting if between 0 - 3.40e38(f32:MAX)
225+
- No issue in exit code casting if between -3.27e4(i16:MIN) - 3.27e4(i16:MAX)
226+
- Containers will always have at least 1 name with at least 2 characters
227+
- This functions requires a lot of boilerplate code to extract the run info
228+
"#
229+
)]
230+
fn extract_run_info(
231+
container_summary: &ContainerSummary,
232+
container_inspect_response: &ContainerInspectResponse,
233+
) -> Option<PodRunInfo> {
234+
let terminated_timestamp = DateTime::parse_from_rfc3339(
235+
container_inspect_response
236+
.state
237+
.as_ref()?
238+
.finished_at
239+
.as_ref()?,
240+
)
241+
.ok()?
242+
.timestamp() as u64;
243+
Some(PodRunInfo {
244+
image: container_inspect_response
245+
.config
246+
.as_ref()?
247+
.image
248+
.as_ref()?
249+
.clone(),
250+
created: container_summary.created? as u64,
251+
terminated: (terminated_timestamp > 0).then_some(terminated_timestamp),
252+
env_vars: container_inspect_response
253+
.config
254+
.as_ref()?
255+
.env
256+
.as_ref()?
257+
.iter()
258+
.filter_map(|x| {
259+
x.split_once('=')
260+
.map(|(key, value)| (key.to_owned(), value.to_owned()))
261+
})
262+
.collect(),
263+
command: [
264+
container_inspect_response
265+
.config
266+
.as_ref()?
267+
.entrypoint
268+
.as_ref()?
269+
.clone(),
270+
container_inspect_response
271+
.config
272+
.as_ref()?
273+
.cmd
274+
.as_ref()?
275+
.clone(),
276+
]
277+
.concat(),
278+
status: match (
279+
container_inspect_response.state.as_ref()?.status?,
280+
container_inspect_response.state.as_ref()?.exit_code? as i16,
281+
) {
282+
(ContainerStateStatusEnum::RUNNING | ContainerStateStatusEnum::RESTARTING, _) => {
283+
PodStatus::Running
284+
}
285+
(ContainerStateStatusEnum::EXITED, 0) => PodStatus::Completed,
286+
(ContainerStateStatusEnum::EXITED | ContainerStateStatusEnum::DEAD, code) => {
287+
PodStatus::Failed(code)
288+
}
289+
(ContainerStateStatusEnum::CREATED, code) => {
290+
if container_inspect_response.state.as_ref()?.error.is_some() {
291+
PodStatus::Failed(code)
292+
} else {
293+
PodStatus::Running
294+
}
295+
}
296+
_ => PodStatus::Undefined,
297+
},
298+
mounts: container_inspect_response
299+
.mounts
300+
.as_ref()?
301+
.iter()
302+
.map(|mount_point| {
303+
Some(format!(
304+
"{}:{}{}",
305+
mount_point.source.as_ref()?,
306+
mount_point.destination.as_ref()?,
307+
mount_point
308+
.mode
309+
.as_ref()
310+
.map_or_else(String::new, |mode| format!(":{mode}"))
311+
))
312+
})
313+
.collect::<Option<Vec<_>>>()?,
314+
labels: container_inspect_response
315+
.config
316+
.as_ref()?
317+
.labels
318+
.as_ref()?
319+
.clone(),
320+
cpu_limit: container_inspect_response.host_config.as_ref()?.nano_cpus? as f32
321+
/ 10_f32.powi(9), // ncpu, ucores=3, mcores=6, cores=9
322+
memory_limit: container_inspect_response.host_config.as_ref()?.memory? as u64,
323+
})
324+
}
288325
}

src/core/pipeline_runner.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::{
2323
};
2424
use async_trait::async_trait;
2525
use names::{Generator, Name};
26-
use serde::{Deserialize, Serialize};
2726
use serde_yaml::Serializer;
2827
use snafu::{OptionExt as _, ResultExt as _};
2928
use std::{
@@ -39,18 +38,6 @@ use tokio::{
3938
static NODE_OUTPUT_KEY_EXPR: &str = "output";
4039
static FAILURE_KEY_EXP: &str = "failure";
4140

42-
#[derive(Serialize, Deserialize, Clone, Debug)]
43-
enum NodeOutput {
44-
Packet(String, HashMap<String, PathSet>),
45-
ProcessingCompleted(String),
46-
}
47-
48-
#[derive(Serialize, Deserialize, Clone, Debug)]
49-
struct ProcessingFailure {
50-
node_id: String,
51-
error: String,
52-
}
53-
5441
/// Internal representation of a pipeline run, which should not be made public due to the fact that it contains
5542
#[derive(Debug)]
5643
struct PipelineRunInternal {
@@ -736,7 +723,7 @@ impl PodProcessor {
736723
},
737724
});
738725
}
739-
PodStatus::Running | PodStatus::Unset => {
726+
PodStatus::Running | PodStatus::Unset | PodStatus::Undefined => {
740727
// This should not happen, but if it does, we will return an error
741728
return Err(OrcaError {
742729
kind: Kind::PodJobProcessingError {

src/core/store/filestore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl LocalFileStore {
194194
Ok((
195195
serde_yaml::from_str(
196196
&fs::read_to_string(path.clone())
197-
.context(selector::InvalidFilepath { path })?,
197+
.context(selector::InvalidPath { path })?,
198198
)?,
199199
None,
200200
hash.to_owned(),

src/uniffi/error.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ pub(crate) enum Kind {
3636
dir: PathBuf,
3737
backtrace: Option<Backtrace>,
3838
},
39+
#[snafu(display(
40+
"Failed to extract run info from the container image file: {container_name}."
41+
))]
42+
FailedToExtractRunInfo {
43+
container_name: String,
44+
backtrace: Option<Backtrace>,
45+
},
3946
#[snafu(display(
4047
"Missing expected output file or dir with key {packet_key} at path {path:?} for pod job (hash: {pod_job_hash})."
4148
))]
@@ -52,8 +59,16 @@ pub(crate) enum Kind {
5259
missing_keys: Vec<String>,
5360
backtrace: Option<Backtrace>,
5461
},
62+
#[snafu(display(
63+
"Fail to start pod with container_name: {container_name} with error: {reason}"
64+
))]
65+
FailedToStartPod {
66+
container_name: String,
67+
reason: String,
68+
backtrace: Option<Backtrace>,
69+
},
5570
#[snafu(display("{source} ({path:?})."))]
56-
InvalidFilepath {
71+
InvalidPath {
5772
path: PathBuf,
5873
source: io::Error,
5974
backtrace: Option<Backtrace>,
@@ -153,4 +168,16 @@ impl OrcaError {
153168
pub fn is_purged_pod_run(&self) -> bool {
154169
matches!(&self.kind, Kind::MissingInfo { details, .. } if details.contains("pod run"))
155170
}
171+
/// Returns `true` if the error was caused by an invalid file or directory path.
172+
pub const fn is_failed_to_start_pod(&self) -> bool {
173+
matches!(self.kind, Kind::FailedToStartPod { .. })
174+
}
175+
/// Returns container name if the
176+
pub fn get_container_name(&self) -> Option<String> {
177+
if let Kind::FailedToStartPod { container_name, .. } = &self.kind {
178+
Some(container_name.clone())
179+
} else {
180+
None
181+
}
182+
}
156183
}

src/uniffi/model/pod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,8 @@ pub struct PodResult {
267267
pub created: u64,
268268
/// Time in epoch when terminated in seconds.
269269
pub terminated: u64,
270+
/// Logs about stdout and stderr, where stderr is append at the end
271+
pub logs: String,
270272
}
271273

272274
impl PodResult {
@@ -283,6 +285,7 @@ impl PodResult {
283285
created: u64,
284286
terminated: u64,
285287
namespace_lookup: &HashMap<String, PathBuf>,
288+
logs: String,
286289
) -> Result<Self> {
287290
let output_packet = pod_job
288291
.pod
@@ -339,6 +342,7 @@ impl PodResult {
339342
status,
340343
created,
341344
terminated,
345+
logs,
342346
};
343347
Ok(Self {
344348
hash: hash_buffer(pod_result_no_hash.to_yaml()?),

src/uniffi/orchestrator/agent.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,11 @@ impl Agent {
171171
async |agent, inner_namespace_lookup, _, pod_job| {
172172
let pod_run = agent
173173
.orchestrator
174-
.start(&inner_namespace_lookup, &pod_job)
174+
.start(&pod_job, &inner_namespace_lookup)
175175
.await?;
176176
let pod_result = agent
177177
.orchestrator
178-
.get_result(&inner_namespace_lookup, &pod_run)
178+
.get_result(&pod_run, &inner_namespace_lookup)
179179
.await?;
180180
agent.orchestrator.delete(&pod_run).await?;
181181
Ok(pod_result)
@@ -191,6 +191,7 @@ impl Agent {
191191
PodStatus::Completed => "success",
192192
PodStatus::Running
193193
| PodStatus::Failed(_)
194+
| PodStatus::Undefined
194195
| PodStatus::Unset => "failure",
195196
}
196197
.to_owned(),

0 commit comments

Comments
 (0)