Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8e1b2f3
Cut out a features to deffer till later. MVP implementation needed fo…
Synicix Jul 7, 2025
e2705c7
Rename error to be more clear
Synicix Jul 7, 2025
770ac57
Added uniffi bindings
Synicix Jul 7, 2025
e3929b3
Fix OOM issue with stress test
Synicix Jul 24, 2025
0c557c2
Adjust tolerance for pi
Synicix Jul 24, 2025
075543c
Merge branch 'dev' of github.com:walkerlab/orcapod into Synicix/pipeline
guzman-raphael Jul 27, 2025
76b4a95
Apply review feedback.
guzman-raphael Jul 27, 2025
eacd2cf
Remove unneeded PartialEq trait implementation.
guzman-raphael Jul 27, 2025
dfdee37
Add pipeline job test that validates input checksums, extract pod_cus…
guzman-raphael Jul 28, 2025
8eb6c34
Cover container dead case in docker orchestrator.
guzman-raphael Jul 28, 2025
70631ef
Debug GHA.
guzman-raphael Jul 28, 2025
abe0a7e
Add dot error test.
guzman-raphael Jul 28, 2025
18873c6
Add packet validation.
guzman-raphael Jul 28, 2025
ea40e21
Fix bug in packet validation.
guzman-raphael Jul 28, 2025
a0102b4
Set project coverage check to 95%.
guzman-raphael Jul 28, 2025
d57484c
Add test for incomplete packet.
guzman-raphael Jul 28, 2025
c5e7285
Allow dashes in agent group, logically isolate agent communication in…
guzman-raphael Jul 28, 2025
c2c77c9
Merge remote-tracking branch 'upstream/dev' into pipeline
Synicix Jul 28, 2025
832ff47
Merge branch 'dev' into pipeline
Synicix Jul 28, 2025
8d9a633
Merge branch 'pipeline' of github.com:Synicix/orcapod into Synicix/pi…
guzman-raphael Jul 28, 2025
674dea1
Split model module in uniffi to make it easier to maintain.
guzman-raphael Jul 29, 2025
5ad6e67
Merge pull request #7 from guzman-raphael/Synicix/pipeline
Synicix Jul 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .codecov.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# https://docs.codecov.com/docs/codecovyml-reference
# https://docs.codecov.com/docs/commit-status
# curl -X POST --data-binary @.codecov.yaml https://codecov.io/validate
coverage:
status:
Expand All @@ -7,3 +8,7 @@ coverage:
target: 100%
threshold: 0%
informational: true
project:
default:
target: 95%
threshold: 0%
11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ chrono = "0.4.39"
colored = "2.1.0"
# derive utilities for new types
derive_more = { version = "2.0.1", features = ["display"] }
# traverse DOT graphs
dot-parser = "0.5.1"
# allow sync functions to be called from async using `block_on`
futures-executor = "0.3.31"
# chaining async calls and processing stream data in local docker orchestrator
Expand All @@ -47,10 +49,16 @@ getset = { version = "0.1.5", git = "https://github.com/guzman-raphael/getset.gi
glob = "0.3.1"
# strings to snake_case
heck = "0.5.0"
# convert bytes to hex strings
hex = "0.4.3"
# hashmaps that preserve insertion order
indexmap = { version = "2.9.0", features = ["serde"] }
# random name generator
names = "0.14.0"
# graph algorithms
petgraph = { version = "0.8.2", features = ["serde-1", "dot_parser"] }
# generate random data
rand = "0.9.1"
# complex pattern matching in strings
regex = "1.11.0"
# serialization/deserialization to/from filestore
Expand All @@ -70,7 +78,7 @@ tokio-util = "0.7.13"
# automated CFFI + bindings in other languages
uniffi = { version = "0.29.1", features = ["cli", "tokio"] }
# shared, distributed memory via communication
zenoh = { version = "1.3.4" }
zenoh = "1.3.4"

[[bin]]
name = "uniffi-bindgen"
Expand Down Expand Up @@ -113,6 +121,7 @@ impl_trait_in_params = { level = "allow", priority = 127 } # impl in
implicit_return = { level = "allow", priority = 127 } # missing return ok
inline_asm_x86_intel_syntax = { level = "allow", priority = 127 } # intel syntax ok
integer_division = { level = "allow", priority = 127 } # allow discarding remainder
iter_over_hash_type = { level = "allow", priority = 127 } # allow iterating over unordered iterables like `HashMap`
little_endian_bytes = { level = "allow", priority = 127 } # allow to_le_bytes / from_le_bytes
missing_asserts_for_indexing = { level = "allow", priority = 127 } # missing assert before indexing ok
missing_docs_in_private_items = { level = "allow", priority = 127 } # missing docs on private ok
Expand Down
3 changes: 2 additions & 1 deletion cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"cffi",
"zenoh",
"PodJob",
"stresser"
"stresser",
"petgraph"
],
"ignoreWords": [
"relpath",
Expand Down
14 changes: 11 additions & 3 deletions src/core/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use crate::{
core::util::get,
uniffi::{
error::{Result, selector},
model::{Blob, BlobKind},
model::packet::{Blob, BlobKind},
},
};
use hex;
use rand::{self, RngCore as _};
use serde_yaml;
use sha2::{Digest as _, Sha256};
use snafu::ResultExt as _;
Expand Down Expand Up @@ -88,14 +90,20 @@ pub fn hash_dir(dirpath: impl AsRef<Path>) -> Result<String> {
/// Will return error if hashing fails on file or directory.
pub(crate) fn hash_blob(
namespace_lookup: &HashMap<String, PathBuf, RandomState>,
blob: Blob,
blob: &Blob,
) -> Result<Blob> {
let blob_path = get(namespace_lookup, &blob.location.namespace)?.join(&blob.location.path);
Ok(Blob {
checksum: match blob.kind {
BlobKind::File => hash_file(blob_path)?,
BlobKind::Directory => hash_dir(blob_path)?,
},
..blob
..blob.clone()
})
}

pub(crate) fn make_random_hash() -> String {
let mut bytes = [0; 32];
rand::rng().fill_bytes(&mut bytes);
hex::encode(bytes)
}
13 changes: 13 additions & 0 deletions src/core/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::uniffi::error::{Kind, OrcaError};
use bollard::errors::Error as BollardError;
use dot_parser::ast::PestError;
use glob;
use serde_json;
use serde_yaml;
Expand All @@ -21,6 +22,16 @@ impl From<BollardError> for OrcaError {
}
}
}
impl From<PestError> for OrcaError {
fn from(error: PestError) -> Self {
Self {
kind: Kind::DOTError {
source: error.into(),
backtrace: Some(Backtrace::capture()),
},
}
}
}
impl From<glob::PatternError> for OrcaError {
fn from(error: glob::PatternError) -> Self {
Self {
Expand Down Expand Up @@ -103,6 +114,7 @@ impl fmt::Debug for OrcaError {
Kind::AgentCommunicationFailure { backtrace, .. }
| Kind::EmptyResponseWhenLoadingContainerAltImage { backtrace, .. }
| Kind::GeneratedNamesOverflow { backtrace, .. }
| Kind::IncompletePacket { backtrace, .. }
| Kind::InvalidFilepath { backtrace, .. }
| Kind::InvalidPodResultTerminatedDatetime { backtrace, .. }
| Kind::KeyMissing { backtrace, .. }
Expand All @@ -113,6 +125,7 @@ impl fmt::Debug for OrcaError {
| Kind::NoRemainingServices { backtrace, .. }
| Kind::NoTagFoundInContainerAltImage { backtrace, .. }
| Kind::BollardError { backtrace, .. }
| Kind::DOTError { backtrace, .. }
| Kind::GlobPatternError { backtrace, .. }
| Kind::IoError { backtrace, .. }
| Kind::PathPrefixError { backtrace, .. }
Expand Down
37 changes: 37 additions & 0 deletions src/core/graph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use crate::{
core::{pipeline::PipelineNode, util::get},
uniffi::{error::Result, model::pipeline::Kernel},
};
use dot_parser::ast::Graph as DOTGraph;
use petgraph::{
dot::dot_parser::{DotAttrList, DotNodeWeight, ParseFromDot as _},
graph::DiGraph,
};
use std::collections::HashMap;

#[expect(
clippy::needless_pass_by_value,
clippy::panic_in_result_fn,
clippy::panic,
reason = "
- Drop metadata and encourage usage from the graph.
- `node_map` does not allow returning results.
"
)]
pub fn make_graph(
input_dot: &str,
metadata: HashMap<String, Kernel>,
) -> Result<DiGraph<PipelineNode, ()>> {
let graph =
DiGraph::<DotNodeWeight, DotAttrList>::from_dot_graph(DOTGraph::try_from(input_dot)?).map(
|_, node| PipelineNode {
name: node.id.clone(),
kernel: get(&metadata, &node.id)
.unwrap_or_else(|error| panic!("{error}"))
.clone(),
},
|_, _| (),
);

Ok(graph)
}
3 changes: 3 additions & 0 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
/// State change verification via cryptographic utilities.
pub mod crypto;
pub(crate) mod error;
pub(crate) mod graph;
/// Components of the data model.
pub mod model;
pub(crate) mod orchestrator;
pub(crate) mod pipeline;
pub(crate) mod store;
pub(crate) mod util;
pub(crate) mod validation;
2 changes: 1 addition & 1 deletion src/core/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
core::util::get_type_name,
uniffi::{
error::Result,
model::{Pod, PodJob},
model::pod::{Pod, PodJob},
},
};
use heck::ToSnakeCase as _;
Expand Down
4 changes: 2 additions & 2 deletions src/core/orchestrator/agent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::uniffi::{
error::{OrcaError, Result, selector},
model::{PodJob, PodResult},
model::pod::{PodJob, PodResult},
orchestrator::agent::{Agent, AgentClient},
store::ModelID,
};
Expand All @@ -25,7 +25,7 @@ static RE_PODJOB_ACTION: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(
r"(?x)
^
group\/(?<group>[a-z_]+)\/
group\/(?<group>[a-z_\-]+)\/
(?<action>request|reservation|success|failure)\/
pod_job\/(?<pod_job_hash>[0-9a-f]+)\/
host\/(?<host>[a-z_]+)\/
Expand Down
12 changes: 8 additions & 4 deletions src/core/orchestrator/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
core::util::get,
uniffi::{
error::{Result, selector},
model::{PathSet, PodJob},
model::{packet::PathSet, pod::PodJob},
orchestrator::{RunInfo, Status, docker::LocalDockerOrchestrator},
},
};
Expand Down Expand Up @@ -245,11 +245,15 @@ impl LocalDockerOrchestrator {
) {
(ContainerStateStatusEnum::RUNNING, _) => Status::Running,
(
ContainerStateStatusEnum::EXITED | ContainerStateStatusEnum::REMOVING,
ContainerStateStatusEnum::EXITED
| ContainerStateStatusEnum::REMOVING
| ContainerStateStatusEnum::DEAD,
0,
) => Status::Completed,
(
ContainerStateStatusEnum::EXITED | ContainerStateStatusEnum::REMOVING,
ContainerStateStatusEnum::EXITED
| ContainerStateStatusEnum::REMOVING
| ContainerStateStatusEnum::DEAD,
code,
) => Status::Failed(code),
(_, code) => {
Expand All @@ -274,7 +278,7 @@ impl LocalDockerOrchestrator {
.map_or_else(String::new, |mode| format!(":{mode}"))
))
})
.collect::<Option<Vec<_>>>()?,
.collect::<Option<_>>()?,
labels: container_spec.config.as_ref()?.labels.as_ref()?.clone(),
cpu_limit: container_spec.host_config.as_ref()?.nano_cpus? as f32
/ 10_f32.powi(9), // ncpu, ucores=3, mcores=6, cores=9
Expand Down
2 changes: 1 addition & 1 deletion src/core/orchestrator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
core::util::get_type_name,
uniffi::{
model::PodJob,
model::pod::PodJob,
orchestrator::{Orchestrator, PodRun},
},
};
Expand Down
8 changes: 8 additions & 0 deletions src/core/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use crate::uniffi::model::pipeline::Kernel;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PipelineNode {
pub name: String,
pub kernel: Kernel,
}
10 changes: 7 additions & 3 deletions src/core/util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::uniffi::error::{Result, selector};
use snafu::OptionExt as _;
use std::{any::type_name, collections::HashMap, fmt};
use std::{any::type_name, borrow::Borrow, collections::HashMap, fmt, hash};

#[expect(
clippy::unwrap_used,
Expand All @@ -26,8 +26,12 @@ pub fn parse_debug_name<T: fmt::Debug>(instance: &T) -> String {
.unwrap()
}

pub fn get<'map, T>(map: &'map HashMap<String, T>, key: &str) -> Result<&'map T> {
pub fn get<'map, K, V, Q>(map: &'map HashMap<K, V>, key: &Q) -> Result<&'map V>
where
Q: ?Sized + hash::Hash + Eq + fmt::Debug,
K: Borrow<Q> + hash::Hash + Eq,
{
Ok(map.get(key).context(selector::KeyMissing {
key: key.to_owned(),
key: format!("{key:?}"),
})?)
}
24 changes: 24 additions & 0 deletions src/core/validation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use crate::uniffi::error::{Result, selector};
use snafu::OptionExt as _;
use std::collections::{HashMap, HashSet};

pub fn validate_packet<SV, PV>(
kind: String,
spec: &HashMap<String, SV>,
packet: &HashMap<String, PV>,
) -> Result<()> {
let spec_keys = spec.keys().collect::<HashSet<_>>();
let packet_keys = packet.keys().collect::<HashSet<_>>();
let missing_keys = spec_keys
.difference(&packet_keys)
.copied()
.cloned()
.collect::<Vec<_>>();

missing_keys
.is_empty()
.then_some(())
.context(selector::IncompletePacket { kind, missing_keys })?;

Ok(())
}
12 changes: 12 additions & 0 deletions src/uniffi/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
)]

use bollard::errors::Error as BollardError;
use dot_parser::ast::PestError;
use glob;
use serde_json;
use serde_yaml;
Expand Down Expand Up @@ -38,6 +39,12 @@ pub(crate) enum Kind {
},
#[snafu(display("Out of generated random names."))]
GeneratedNamesOverflow { backtrace: Option<Backtrace> },
#[snafu(display("Incomplete {kind} packet. Missing `{missing_keys:?}` keys."))]
IncompletePacket {
kind: String,
missing_keys: Vec<String>,
backtrace: Option<Backtrace>,
},
#[snafu(display("{source} ({path:?})."))]
InvalidFilepath {
path: PathBuf,
Expand Down Expand Up @@ -88,6 +95,11 @@ pub(crate) enum Kind {
backtrace: Option<Backtrace>,
},
#[snafu(transparent)]
DOTError {
source: Box<PestError>,
backtrace: Option<Backtrace>,
},
#[snafu(transparent)]
GlobPatternError {
source: glob::PatternError,
backtrace: Option<Backtrace>,
Expand Down
37 changes: 37 additions & 0 deletions src/uniffi/model/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use serde::{Deserialize, Serialize};
use std::path::PathBuf;

/// Available models.
#[derive(uniffi::Enum, Debug)]
pub enum ModelType {
/// See [`crate::uniffi::model::pod::Pod`].
Pod,
/// See [`crate::uniffi::model::pod::PodJob`].
PodJob,
/// See [`crate::uniffi::model::pod::PodResult`].
PodResult,
}

/// Standard metadata structure for all model instances.
#[derive(uniffi::Record, Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct Annotation {
/// A unique name.
pub name: String,
/// A unique semantic version.
pub version: String,
/// A long form description.
pub description: String,
}

uniffi::custom_type!(PathBuf, String, {
remote,
try_lift: |val| Ok(PathBuf::from(&val)),
lower: |obj| obj.display().to_string(),
});

/// Utility types for describing packets.
pub mod packet;
/// Models and utility types for pipelines.
pub mod pipeline;
/// Models and utility types for pods.
pub mod pod;
Loading