Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
844fdff
Move to dev
Aug 4, 2025
764387d
Remove uneeded results and change to return iterator
Synicix Aug 5, 2025
a03db37
Remove accident comment
Synicix Aug 5, 2025
abbb5ce
Add ground work for pod operator
Synicix Aug 5, 2025
450a90f
Add async operation for JoinOperator
Synicix Aug 5, 2025
e4b3a54
Add pod operator + better error handling for pod_result:::new()
Synicix Aug 5, 2025
fee8d9c
Improve error handling for file io in localfilestore
Synicix Aug 5, 2025
c04335f
Update operator to be in operation again
Synicix Aug 5, 2025
ebe04d1
Fix pod result hashing bug
Synicix Aug 5, 2025
20f2345
Apply feedback from review
Synicix Aug 6, 2025
51dfea6
Merge branch 'worktree-docker-patch' into 106
Synicix Aug 8, 2025
cb72829
Remove stale error
Synicix Aug 8, 2025
d36ec71
Merge branch 'worktree-docker-patch' into 106
Synicix Aug 10, 2025
2986e0d
Save changes
Synicix Aug 10, 2025
0e3dc71
Merge branch '106' into pipeline_runner
Synicix Aug 10, 2025
595a12e
Split model in core to match uniffi
Synicix Aug 10, 2025
b671dba
Merge branch 'model_split' into 106
Synicix Aug 10, 2025
ac363f2
Merge branch '106' into runner
Synicix Aug 10, 2025
86d5f70
Save progress
Synicix Aug 11, 2025
aead411
Merge branch 'dev' into model_split
Synicix Aug 11, 2025
58ba198
Fix missing stuff
Synicix Aug 11, 2025
9c72bd8
Merge branch 'model_split' into runner
Synicix Aug 11, 2025
7ee5251
save change
Synicix Aug 11, 2025
fda43aa
Update pipeline_runner to use new operator + improvments
Synicix Aug 11, 2025
1e9a19f
Readded tests
Synicix Aug 11, 2025
32299f8
save progress
Synicix Aug 12, 2025
e93c32b
Merge remote-tracking branch 'upstream' into runner
Synicix Aug 13, 2025
ab900b6
Fix logic bug
Synicix Aug 13, 2025
4f511ce
Update tests and test fixture to merge sentence correctly
Synicix Aug 13, 2025
2a4f1f7
Fix agent event to make it more efficient
Synicix Aug 13, 2025
c89c700
Remove empty impl
Synicix Aug 13, 2025
a834ed9
Fix remaining test to deal with issue
Synicix Aug 14, 2025
7863cad
Fix memory bug
Synicix Aug 15, 2025
b8c2de2
Update rust version
Synicix Aug 15, 2025
acbd967
Remove into iter()
Synicix Aug 15, 2025
ae137ed
Fix old lint that doesn't apply anymore, was hidden by rust analyzer
Synicix Aug 15, 2025
f5fefb0
Add PipelineStatus and failure logs
Synicix Aug 15, 2025
75336de
Remove notebook
Synicix Aug 15, 2025
f9adf0a
Remove stale functions
Synicix Aug 15, 2025
9795458
Remove unused imports
Synicix Aug 15, 2025
4fae330
Merge remote-tracking branch 'upstream/dev' into logging
Synicix Aug 28, 2025
53e972f
Merge branch 'orch_tests' into logging_patch
Synicix Aug 28, 2025
9de08e1
reintergrate logging into podresult
Synicix Aug 28, 2025
3ace99d
Fix code according to copilot suggestions
Synicix Aug 29, 2025
d16da30
Fix copilot suggestions
Synicix Aug 29, 2025
9a38076
Fix copilot suggestions
Synicix Aug 29, 2025
a2f44cf
Merge branch 'dev' into runner
eywalker Oct 14, 2025
af14f92
Fix merging issues
Synicix Oct 17, 2025
78ceed6
Fix clippy issues and orchestrator bugs
Synicix Oct 17, 2025
dd5c44f
Remove clippy except since github action complained
Synicix Oct 17, 2025
f9b53ba
Apply fixes requests from clippy
Synicix Oct 17, 2025
44b4245
Merge branch 'runner' into logging_patch
Synicix Oct 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .clippy.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
excessive-nesting-threshold = 4
excessive-nesting-threshold = 6
too-many-arguments-threshold = 10
allowed-idents-below-min-chars = ["..", "k", "v", "f", "re", "id", "Ok", "'_"]
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Install Rust + components
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: 1.87
toolchain: 1.89
components: rustfmt,clippy
- name: Install code coverage
uses: taiki-e/install-action@cargo-llvm-cov
Expand Down
1 change: 0 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
],
"files.autoSave": "off",
"files.insertFinalNewline": true,
"gitlens.showWelcomeOnInstall": false,
"gitlens.showWhatsNewAfterUpgrades": false,
"lldb.consoleMode": "evaluate",
"rust-analyzer.cargo.features": [
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ glob = "0.3.1"
heck = "0.5.0"
# convert bytes to hex strings
hex = "0.4.3"
hostname = "0.4.1"
# hashmaps that preserve insertion order
indexmap = { version = "2.9.0", features = ["serde"] }
# utilities for iterables e.g. cartesian products
Expand Down
4 changes: 3 additions & 1 deletion cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@
"wasi",
"patchelf",
"itertools",
"colinianking"
"colinianking",
"itertools",
"pathset",
],
"useGitignore": false,
"ignorePaths": [
Expand Down
7 changes: 7 additions & 0 deletions src/core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,16 @@ impl fmt::Debug for OrcaError {
match &self.kind {
Kind::AgentCommunicationFailure { backtrace, .. }
| Kind::FailedToStartPod { backtrace, .. }
| Kind::FailedToExtractRunInfo { backtrace, .. }
| Kind::IncompletePacket { backtrace, .. }
| Kind::InvalidPath { backtrace, .. }
| Kind::InvalidIndex { backtrace, .. }
| Kind::KeyMissing { backtrace, .. }
| Kind::MissingInfo { backtrace, .. }
| Kind::FailedToGetPodJobOutput { backtrace, .. }
| Kind::PodJobProcessingError { backtrace, .. }
| Kind::PodJobSubmissionFailed { backtrace, .. }
| Kind::UnexpectedPathType { backtrace, .. }
| Kind::BollardError { backtrace, .. }
| Kind::ChronoParseError { backtrace, .. }
| Kind::DOTError { backtrace, .. }
Expand Down
4 changes: 2 additions & 2 deletions src/core/graph.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
core::{pipeline::PipelineNode, util::get},
core::{model::pipeline::PipelineNode, util::get},
uniffi::{error::Result, model::pipeline::Kernel},
};
use dot_parser::ast::Graph as DOTGraph;
Expand All @@ -25,7 +25,7 @@ pub fn make_graph(
let graph =
DiGraph::<DotNodeWeight, DotAttrList>::from_dot_graph(DOTGraph::try_from(input_dot)?).map(
|_, node| PipelineNode {
name: node.id.clone(),
id: node.id.clone(),
kernel: get(&metadata, &node.id)
.unwrap_or_else(|error| panic!("{error}"))
.clone(),
Expand Down
5 changes: 3 additions & 2 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ macro_rules! inner_attr_to_each {

pub(crate) mod error;
pub(crate) mod graph;
pub(crate) mod pipeline;
pub(crate) mod store;
pub(crate) mod util;
pub(crate) mod validation;
Expand All @@ -20,6 +19,7 @@ inner_attr_to_each! {
pub(crate) mod model;
pub(crate) mod operator;
pub(crate) mod orchestrator;
pub(crate) mod pipeline_runner;
}

#[cfg(feature = "test")]
Expand All @@ -35,6 +35,7 @@ inner_attr_to_each! {
)]
pub mod crypto;
pub mod model;
pub mod operator;
pub mod orchestrator;
pub mod pipeline_runner;
pub mod operator;
}
1 change: 1 addition & 0 deletions src/core/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,5 @@ where
sorted.serialize(serializer)
}

pub mod pipeline;
pub mod pod;
109 changes: 109 additions & 0 deletions src/core/model/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use std::{
backtrace::Backtrace,
collections::{HashMap, HashSet},
};

use crate::uniffi::{
error::{Kind, OrcaError, Result},
model::{
packet::PathSet,
pipeline::{Kernel, Pipeline, PipelineJob},
},
};
use itertools::Itertools as _;
use petgraph::Direction::Incoming;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct PipelineNode {
pub id: String,
pub kernel: Kernel,
}

impl Pipeline {
/// Function to get the parents of a node
pub(crate) fn get_node_parents(
&self,
node: &PipelineNode,
) -> impl Iterator<Item = &PipelineNode> {
// Find the NodeIndex for the given node_key
let node_index = self
.graph
.node_indices()
.find(|&idx| self.graph[idx] == *node);
node_index.into_iter().flat_map(move |idx| {
self.graph
.neighbors_directed(idx, Incoming)
.map(move |parent_idx| &self.graph[parent_idx])
})
}

/// Return a vec of `node_names` that takes in inputs based on the `input_spec`
pub(crate) fn get_input_nodes(&self) -> HashSet<&String> {
let mut input_nodes = HashSet::new();

self.input_spec.iter().for_each(|(_, node_uris)| {
for node_uri in node_uris {
input_nodes.insert(&node_uri.node_id);
}
});

input_nodes
}
}

impl PipelineJob {
/// Helpful function to get the input packet for input nodes of the pipeline based on the `pipeline_job` an`pipeline_spec`ec
/// # Errors
/// Will return `Err` if there is an issue getting the input packet per node.
/// # Returns
/// A `HashMap` where the key is the node name and the value is a vector of `HashMap<String, PathSet>` representing the input packets for that node.
pub fn get_input_packet_per_node(
&self,
) -> Result<HashMap<String, Vec<HashMap<String, PathSet>>>> {
// For each node in the input specification, we will iterate over its mapping
// nodes_input_spec contains <node_id, HashMap<key, PathSet>>
let mut nodes_input_spec = HashMap::new();
for (input_key, node_uris) in &self.pipeline.input_spec {
for node_uri in node_uris {
let input_path_sets = self.input_packet.get(input_key).ok_or(OrcaError {
kind: Kind::KeyMissing {
key: input_key.clone(),
backtrace: Some(Backtrace::capture()),
},
})?;
// There shouldn't be a duplicate key in the input packet as this will be handle by pipeline verify
let input_spec = nodes_input_spec
.entry(&node_uri.node_id)
.or_insert_with(HashMap::new);
input_spec.insert(&node_uri.key, input_path_sets);
}
}

// For each node, compute the cartesian product of the path_sets for each unique combination of keys
let node_input_packets = nodes_input_spec
.into_iter()
.map(|(node_id, input_node_keys)| {
// We need to pull them out at the same time to ensure the key order is preserve to match the cartesian product
let (keys, values): (Vec<_>, Vec<_>) = input_node_keys.into_iter().unzip();

// Covert each combo into a packet
let packets = values
.into_iter()
.multi_cartesian_product()
.map(|combo| {
keys.iter()
.copied()
.zip(combo)
.map(|(key, pathset)| (key.to_owned(), pathset.to_owned()))
.collect::<HashMap<_, _>>()
})
.collect::<Vec<HashMap<String, PathSet>>>();

(node_id.to_owned(), packets)
})
.collect::<HashMap<_, _>>();

Ok(node_input_packets)
}
}
10 changes: 6 additions & 4 deletions src/core/operator.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::uniffi::{error::Result, model::packet::Packet};
use async_trait;
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use std::{clone::Clone, collections::HashMap, iter::IntoIterator, sync::Arc};
use tokio::sync::Mutex;

#[async_trait::async_trait]
pub trait Operator {
async fn next(&self, stream_name: String, packet: Packet) -> Result<Vec<Packet>>;
async fn process_packet(&self, stream_name: String, packet: Packet) -> Result<Vec<Packet>>;
}

pub struct JoinOperator {
Expand All @@ -25,7 +26,7 @@ impl JoinOperator {

#[async_trait::async_trait]
impl Operator for JoinOperator {
async fn next(&self, stream_name: String, packet: Packet) -> Result<Vec<Packet>> {
async fn process_packet(&self, stream_name: String, packet: Packet) -> Result<Vec<Packet>> {
let mut received_packets = self.received_packets.lock().await;
received_packets
.entry(stream_name.clone())
Expand Down Expand Up @@ -61,8 +62,9 @@ impl Operator for JoinOperator {
}
}

#[derive(uniffi::Object, Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct MapOperator {
map: HashMap<String, String>,
pub map: HashMap<String, String>,
}

impl MapOperator {
Expand All @@ -73,7 +75,7 @@ impl MapOperator {

#[async_trait::async_trait]
impl Operator for MapOperator {
async fn next(&self, _: String, packet: Packet) -> Result<Vec<Packet>> {
async fn process_packet(&self, _: String, packet: Packet) -> Result<Vec<Packet>> {
Ok(vec![
packet
.iter()
Expand Down
8 changes: 0 additions & 8 deletions src/core/pipeline.rs

This file was deleted.

Loading