Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e08389c
Save progress
Synicix Jul 9, 2025
6b7913d
Save progress
Synicix Jul 9, 2025
5cd16d2
Save progress
Synicix Jul 9, 2025
81b8a49
Save redesign
Synicix Jul 9, 2025
54d38be
Fix bugs and implemented missing parts
Synicix Jul 10, 2025
3058eb7
Add basic design struct
Synicix Jul 10, 2025
c45df99
Replace broadcast arch with MPSC
Synicix Jul 10, 2025
415eb05
Added output handling
Synicix Jul 10, 2025
fb8ace1
Fix bugs and clippy suggestions
Synicix Jul 10, 2025
08453b3
Added a lot of docs and fix input.txt issue for test
Synicix Jul 10, 2025
713e471
fix mistake in docs
Synicix Jul 10, 2025
d3701c9
Added joiner processor unit test and fix bug for case where we don't …
Synicix Jul 15, 2025
75b9739
Merge remote-tracking branch 'upstream/dev' into pipeline_runner
Synicix Jul 17, 2025
4e3afc4
Merge remote-tracking branch 'upstream/dev' into pipeline_runner
Synicix Jul 17, 2025
b911f79
Save progress
Synicix Jul 18, 2025
b77d3f2
dsave progres
Synicix Jul 18, 2025
75d1e79
Save progress
Synicix Jul 18, 2025
99b2d1e
Save progress
Synicix Jul 19, 2025
630e27e
Save progress
Synicix Jul 20, 2025
31f925e
Save progress
Synicix Jul 20, 2025
b369549
Save progress
Synicix Jul 21, 2025
dc3be33
Remove bincode and switch to json. Fix a few joining error
Synicix Jul 21, 2025
83c9eca
Add group and host name
Synicix Jul 23, 2025
1df8295
fix unit test
Synicix Jul 23, 2025
4ed76a8
Readd gpu
Synicix Jul 23, 2025
f6bd697
Update comments
Synicix Jul 23, 2025
d421221
Merge remote-tracking branch 'origin/dev' into pipeline_runner
Synicix Jul 25, 2025
16e0cbb
Save progress
Synicix Jul 26, 2025
65cbbdd
Save progress
Synicix Jul 28, 2025
72e44db
Merge remote-tracking branch 'upstream/dev' into pipeline_runner (NOT…
Synicix Jul 30, 2025
58864a0
Fix majority of merge errors
Synicix Jul 31, 2025
d9ade76
Merge remote-tracking branch 'upstream/dev' into pipeline_runner
Synicix Jul 31, 2025
0fca094
Add pipeline util func to handle new pipeline input_spec behavior (Ru…
Synicix Aug 1, 2025
0c59a9d
Save progress
Synicix Aug 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .clippy.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
excessive-nesting-threshold = 4
excessive-nesting-threshold = 6
too-many-arguments-threshold = 10
allowed-idents-below-min-chars = ["..", "k", "v", "f", "re", "id", "Ok", "'_"]
1 change: 1 addition & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
},
"runArgs": [
"--name=${localWorkspaceFolderBasename}_devcontainer",
"--gpus=all",
"--privileged",
"--cgroupns=host"
],
Expand Down
1 change: 0 additions & 1 deletion .devcontainer/gpu/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
},
"runArgs": [
"--name=${localWorkspaceFolderBasename}_devcontainer",
"--gpus=all",
"--privileged",
"--cgroupns=host"
],
Expand Down
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
"python.terminal.activateEnvironment": false,
"notebook.formatOnSave.enabled": true,
"notebook.output.scrolling": true
}
}
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ getset = { version = "0.1.5", git = "https://github.com/guzman-raphael/getset.gi
glob = "0.3.1"
# strings to snake_case
heck = "0.5.0"
hostname = "0.4.1"
# convert bytes to hex strings
hex = "0.4.3"
# hashmaps that preserve insertion order
indexmap = { version = "2.9.0", features = ["serde"] }
itertools = "0.14.0"
layout-rs = "0.1.3"
# random name generator
names = "0.14.0"
# graph algorithms
Expand All @@ -73,6 +76,7 @@ sha2 = "0.10.8"
snafu = { version = "0.8.5", features = ["futures"] }
# a runtime for async applications
tokio = { version = "1.41.0", features = ["full"] }
tokio-stream = "0.1.17"
# utilities for async calls
tokio-util = "0.7.13"
# automated CFFI + bindings in other languages
Expand Down
9 changes: 8 additions & 1 deletion cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,14 @@
"strsim",
"getrandom",
"wasi",
"patchelf"
"petgraph",
"rfind",
"itertools",
"oneshot",
"patchelf",
"colinianking",
"bitcode",
"pathset"
],
"useGitignore": false,
"ignorePaths": [
Expand Down
3 changes: 3 additions & 0 deletions output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1
2
2
19 changes: 19 additions & 0 deletions src/core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
io,
path::{self},
};
use tokio::sync::oneshot;
use tokio::task;

impl From<BollardError> for OrcaError {
Expand All @@ -22,6 +23,16 @@ impl From<BollardError> for OrcaError {
}
}
}
impl From<oneshot::error::RecvError> for OrcaError {
fn from(error: oneshot::error::RecvError) -> Self {
Self {
kind: Kind::ChannelReceiveError {
source: error,
backtrace: Some(Backtrace::capture()),
},
}
}
}
impl From<PestError> for OrcaError {
fn from(error: PestError) -> Self {
Self {
Expand Down Expand Up @@ -113,9 +124,11 @@ impl fmt::Debug for OrcaError {
match &self.kind {
Kind::AgentCommunicationFailure { backtrace, .. }
| Kind::EmptyResponseWhenLoadingContainerAltImage { backtrace, .. }
| Kind::FailedToParseDot { backtrace, .. }
| Kind::GeneratedNamesOverflow { backtrace, .. }
| Kind::IncompletePacket { backtrace, .. }
| Kind::InvalidFilepath { backtrace, .. }
| Kind::InvalidIndex { backtrace, .. }
| Kind::InvalidPodResultTerminatedDatetime { backtrace, .. }
| Kind::KeyMissing { backtrace, .. }
| Kind::NoAnnotationFound { backtrace, .. }
Expand All @@ -124,11 +137,17 @@ impl fmt::Debug for OrcaError {
| Kind::NoMatchingPodRun { backtrace, .. }
| Kind::NoRemainingServices { backtrace, .. }
| Kind::NoTagFoundInContainerAltImage { backtrace, .. }
| Kind::PodJobSubmissionFailed { backtrace, .. }
| Kind::PodJobProcessingError { backtrace, .. }
| Kind::StatusConversionFailure { backtrace, .. }
| Kind::UnsupportedPathType { backtrace, .. }
| Kind::BollardError { backtrace, .. }
| Kind::ChannelReceiveError { backtrace, .. }
| Kind::DOTError { backtrace, .. }
| Kind::GlobPatternError { backtrace, .. }
| Kind::IoError { backtrace, .. }
| Kind::PathPrefixError { backtrace, .. }
| Kind::SendError { backtrace, .. }
| Kind::SerdeJsonError { backtrace, .. }
| Kind::SerdeYamlError { backtrace, .. }
| Kind::TokioTaskJoinError { backtrace, .. } => {
Expand Down
2 changes: 1 addition & 1 deletion src/core/graph.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
core::{pipeline::PipelineNode, util::get},
core::{model::pipeline::PipelineNode, util::get},
uniffi::{error::Result, model::pipeline::Kernel},
};
use dot_parser::ast::Graph as DOTGraph;
Expand Down
3 changes: 1 addition & 2 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
pub mod crypto;
pub(crate) mod error;
pub(crate) mod graph;
/// Components of the data model.
/// Model definitions and utilities.
pub mod model;
pub(crate) mod orchestrator;
pub(crate) mod pipeline;
pub(crate) mod store;
pub(crate) mod util;
pub(crate) mod validation;
65 changes: 5 additions & 60 deletions src/core/model.rs → src/core/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
use crate::{
core::util::get_type_name,
uniffi::{
error::Result,
model::pod::{Pod, PodJob},
},
};
use crate::{core::util::get_type_name, uniffi::error::Result};
use heck::ToSnakeCase as _;
use indexmap::IndexMap;
use serde::{Deserialize as _, Deserializer, Serialize, Serializer};
use serde::{Serialize, Serializer};
use serde_yaml::{self, Value};
use std::{
collections::{BTreeMap, HashMap},
result,
sync::Arc,
};

/// Converts a model instance into a consistent yaml.
///
/// # Errors
Expand Down Expand Up @@ -63,54 +57,5 @@ where
sorted.serialize(serializer)
}

#[expect(
clippy::expect_used,
reason = "Function signature required by serde API."
)]
pub(crate) fn deserialize_pod<'de, D>(deserializer: D) -> result::Result<Arc<Pod>, D::Error>
where
D: Deserializer<'de>,
{
let value = Value::deserialize(deserializer)?;
(value).as_str().map_or_else(
|| {
Ok(serde_yaml::from_value(value.clone())
.expect("Failed to convert from serde value to specific type."))
},
|hash| {
Ok({
Pod {
hash: hash.to_owned(),
..Pod::default()
}
.into()
})
},
)
}

#[expect(
clippy::expect_used,
reason = "Function signature required by serde API."
)]
pub(crate) fn deserialize_pod_job<'de, D>(deserializer: D) -> result::Result<Arc<PodJob>, D::Error>
where
D: Deserializer<'de>,
{
let value = Value::deserialize(deserializer)?;
(value).as_str().map_or_else(
|| {
Ok(serde_yaml::from_value(value.clone())
.expect("Failed to convert from serde value to specific type."))
},
|hash| {
Ok({
PodJob {
hash: hash.to_owned(),
..PodJob::default()
}
.into()
})
},
)
}
pub(crate) mod pipeline;
pub(crate) mod pod;
43 changes: 43 additions & 0 deletions src/core/model/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::collections::HashSet;

use crate::uniffi::model::pipeline::{Kernel, Pipeline};
use petgraph::Direction::Incoming;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct PipelineNode {
pub name: String,
pub kernel: Kernel,
}

impl Pipeline {
/// Function to get the parents of a node
pub(crate) fn get_node_parents(
&self,
node: &PipelineNode,
) -> impl Iterator<Item = &PipelineNode> {
// Find the NodeIndex for the given node_key
let node_index = self
.graph
.node_indices()
.find(|&idx| self.graph[idx] == *node);
node_index.into_iter().flat_map(move |idx| {
self.graph
.neighbors_directed(idx, Incoming)
.map(move |parent_idx| &self.graph[parent_idx])
})
}

/// Return a vec of `node_names` that takes in inputs based on the `input_spec`ec
pub(crate) fn get_input_nodes(&self) -> HashSet<&String> {
let mut input_nodes = HashSet::new();

self.input_spec.iter().for_each(|(_, node_uris)| {
for node_uri in node_uris {
input_nodes.insert(&node_uri.node_name);
}
});

input_nodes
}
}
56 changes: 56 additions & 0 deletions src/core/model/pod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use crate::uniffi::model::pod::{Pod, PodJob};
use serde::{Deserialize as _, Deserializer};
use serde_yaml::{self, Value};
use std::{result, sync::Arc};

#[expect(
clippy::expect_used,
reason = "Function signature required by serde API."
)]
pub fn deserialize_pod<'de, D>(deserializer: D) -> result::Result<Arc<Pod>, D::Error>
where
D: Deserializer<'de>,
{
let value = Value::deserialize(deserializer)?;
(value).as_str().map_or_else(
|| {
Ok(serde_yaml::from_value(value.clone())
.expect("Failed to convert from serde value to specific type."))
},
|hash| {
Ok({
Pod {
hash: hash.to_owned(),
..Pod::default()
}
.into()
})
},
)
}

#[expect(
clippy::expect_used,
reason = "Function signature required by serde API."
)]
pub fn deserialize_pod_job<'de, D>(deserializer: D) -> result::Result<Arc<PodJob>, D::Error>
where
D: Deserializer<'de>,
{
let value = Value::deserialize(deserializer)?;
(value).as_str().map_or_else(
|| {
Ok(serde_yaml::from_value(value.clone())
.expect("Failed to convert from serde value to specific type."))
},
|hash| {
Ok({
PodJob {
hash: hash.to_owned(),
..PodJob::default()
}
.into()
})
},
)
}
5 changes: 3 additions & 2 deletions src/core/orchestrator/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ static RE_PODJOB_ACTION: LazyLock<Regex> = LazyLock::new(|| {
group\/(?<group>[a-z_\-]+)\/
(?<action>request|reservation|success|failure)\/
pod_job\/(?<pod_job_hash>[0-9a-f]+)\/
host\/(?<host>[a-z_]+)\/
host\/(?<host>[0-9a-z_]+)\/
timestamp\/(?<timestamp>.*?)
$
",
Expand Down Expand Up @@ -101,8 +101,8 @@ impl AgentClient {
}

#[expect(
clippy::excessive_nesting,
clippy::let_underscore_must_use,
clippy::excessive_nesting,
reason = "`result::Result<(), SendError<_>>` is the only uncaptured result since it would mean we can't transmit results over mpsc."
)]
pub async fn start_service<
Expand Down Expand Up @@ -165,6 +165,7 @@ where
subgroup: metadata["pod_job_hash"].to_string(),
};
let _event_payload = event_classifier(&input);
println!("Sending it to request task.");
tasks.spawn({
let inner_request_task = request_task.clone();
let inner_inner_agent = Arc::clone(&inner_agent);
Expand Down
8 changes: 0 additions & 8 deletions src/core/pipeline.rs

This file was deleted.

Loading
Loading