Skip to content

Commit d56b4e3

Browse files
authored
Merge pull request #118 from Synicix/pipeline_hash
Add Pipeline hashing, saving, loading, list, and delete
2 parents 853cc4f + 537a92a commit d56b4e3

File tree

16 files changed

+711
-151
lines changed

16 files changed

+711
-151
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"gitlens.showWhatsNewAfterUpgrades": false,
1313
"lldb.consoleMode": "evaluate",
1414
"rust-analyzer.check.command": "clippy",
15+
"rust-analyzer.checkOnSave": true,
1516
"rust-analyzer.runnables.extraTestBinaryArgs": [
1617
"--nocapture"
1718
],

src/core/error.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ impl fmt::Debug for OrcaError {
132132
| Kind::InvalidOutputSpecNodeNotInGraph { backtrace, .. }
133133
| Kind::KeyMissing { backtrace, .. }
134134
| Kind::MissingInfo { backtrace, .. }
135+
| Kind::FailedToGetLabelHashFromFileName { backtrace, .. }
135136
| Kind::FailedToGetPodJobOutput { backtrace, .. }
136137
| Kind::PipelineValidationErrorMissingKeys { backtrace, .. }
137138
| Kind::PodJobProcessingError { backtrace, .. }

src/core/graph.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@ pub fn make_graph(
2323
) -> Result<DiGraph<PipelineNode, ()>> {
2424
let graph =
2525
DiGraph::<DotNodeWeight, DotAttrList>::from_dot_graph(DOTGraph::try_from(input_dot)?).map(
26-
|node_idx, node| PipelineNode {
27-
hash: String::new(),
28-
kernel: get(metadata, &node.id)
29-
.unwrap_or_else(|error| panic!("{error}"))
30-
.clone(),
31-
label: node.id.clone(),
32-
node_idx,
26+
|node_idx, node| {
27+
let node_id_without_quotes = node.id.replace('"', "");
28+
PipelineNode {
29+
hash: String::new(),
30+
kernel: get(metadata, &node_id_without_quotes)
31+
.unwrap_or_else(|error| panic!("{error}"))
32+
.clone(),
33+
label: node_id_without_quotes.clone(),
34+
node_idx,
35+
}
3336
},
3437
|_, _| (),
3538
);

src/core/model/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ use serde::{Serialize, Serializer};
55
use serde_yaml::{self, Value};
66
use std::{
77
collections::{BTreeMap, HashMap},
8+
fmt::Debug,
89
hash::BuildHasher,
910
result,
1011
};
1112

1213
/// Trait to handle serialization to yaml for `OrcaPod` models
13-
pub trait ToYaml: Serialize + Sized {
14+
pub trait ToYaml: Serialize + Sized + Debug {
1415
/// Serializes the instance to a YAML string.
1516
/// # Errors
1617
/// Will return `Err` if it fail to serialize instance to string

src/core/model/pipeline.rs

Lines changed: 192 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use std::{
22
backtrace::Backtrace,
3-
collections::{HashMap, HashSet},
3+
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
4+
result,
45
};
56

67
use crate::{
7-
core::crypto::hash_buffer,
8+
core::{crypto::hash_buffer, model::ToYaml},
89
uniffi::{
910
error::{Kind, OrcaError, Result, selector},
1011
model::{
@@ -18,10 +19,10 @@ use petgraph::{
1819
Direction::Incoming,
1920
graph::{self, NodeIndex},
2021
};
21-
use serde::{Deserialize, Serialize};
22+
use serde::{Deserialize, Serialize, ser::SerializeStruct as _};
2223
use snafu::OptionExt as _;
2324

24-
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
25+
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
2526
pub struct PipelineNode {
2627
// Hash that represent the node
2728
pub hash: String,
@@ -259,6 +260,108 @@ impl Pipeline {
259260

260261
&graph[node_idx].hash
261262
}
263+
264+
fn to_dot_lex(&self) -> String {
265+
// Get all the nodes and their children in lexicographical order
266+
let nodes_and_edges = self.graph.node_indices().fold(
267+
BTreeMap::<&String, BTreeSet<&String>>::new(),
268+
|mut acc, node_idx| {
269+
let children = self
270+
.graph
271+
.neighbors_directed(node_idx, petgraph::Direction::Outgoing)
272+
.map(|child_idx| &self.graph[child_idx].hash)
273+
.collect::<BTreeSet<&String>>();
274+
acc.insert(&self.graph[node_idx].hash, children);
275+
acc
276+
},
277+
);
278+
279+
// Build the dot representation string by
280+
let mut lines = Vec::new();
281+
for (node, node_children) in nodes_and_edges {
282+
if node_children.is_empty() {
283+
lines.push(format!(" \"{node}\""));
284+
} else {
285+
for child in node_children {
286+
lines.push(format!(" \"{node}\" -> \"{child}\""));
287+
}
288+
}
289+
}
290+
291+
// Convert lines into a single string with a proper new line between each entry
292+
format!("digraph {{\n{}\n}}", lines.join("\n"))
293+
}
294+
295+
/// Get a `BTreeMap` of <`kernel_hash`, `BTreeSet`<`node_hashes`>> for all nodes in the graph. Mainly use for serialization
296+
pub(crate) fn get_kernel_to_node_lut(&self) -> BTreeMap<String, BTreeSet<String>> {
297+
self.graph.node_indices().fold(
298+
BTreeMap::<String, BTreeSet<String>>::new(),
299+
|mut acc, node_idx| {
300+
acc.entry(self.graph[node_idx].kernel.get_hash().to_owned())
301+
.or_default()
302+
.insert(self.graph[node_idx].hash.clone());
303+
acc
304+
},
305+
)
306+
}
307+
308+
pub(crate) fn get_kernel_lut(&self) -> HashSet<&Kernel> {
309+
self.graph
310+
.node_indices()
311+
.fold(HashSet::<&Kernel>::new(), |mut acc, node_idx| {
312+
acc.insert(&self.graph[node_idx].kernel);
313+
acc
314+
})
315+
}
316+
317+
/// Get a `HashMap` of <`node_hash`, `node_label`> for all nodes in the graph if label is not empty. Mainly use for serialization
318+
pub(crate) fn get_label_lut(&self) -> impl Iterator<Item = (&String, &String)> {
319+
self.graph.node_indices().filter_map(|node_idx| {
320+
let label = &self.graph[node_idx].label;
321+
if label.is_empty() {
322+
None
323+
} else {
324+
Some((&self.graph[node_idx].hash, label))
325+
}
326+
})
327+
}
328+
}
329+
330+
impl Serialize for Pipeline {
331+
fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
332+
where
333+
S: serde::Serializer,
334+
{
335+
let mut state = serializer.serialize_struct("Pipeline", 4)?;
336+
state.serialize_field("kernel_lut", &self.get_kernel_to_node_lut())?;
337+
state.serialize_field("dot", &self.to_dot_lex())?;
338+
339+
// Input spec needs to be sorted for consistent serialization
340+
let input_spec_sorted: BTreeMap<_, Vec<NodeURI>> = self
341+
.input_spec
342+
.iter()
343+
.map(|(k, v)| {
344+
let mut sorted_v = v.clone();
345+
sorted_v.sort();
346+
(k, sorted_v)
347+
})
348+
.collect();
349+
state.serialize_field("input_spec", &input_spec_sorted)?;
350+
state.serialize_field("output_spec", &self.output_spec)?;
351+
state.end()
352+
}
353+
}
354+
355+
impl ToYaml for Pipeline {
356+
fn process_field(
357+
field_name: &str,
358+
field_value: &serde_yaml::Value,
359+
) -> Option<(String, serde_yaml::Value)> {
360+
match field_name {
361+
"hash" | "annotation" => None, // Skip annotation field
362+
_ => Some((field_name.to_owned(), field_value.clone())),
363+
}
364+
}
262365
}
263366

264367
impl PipelineJob {
@@ -316,3 +419,88 @@ impl PipelineJob {
316419
Ok(node_input_packets)
317420
}
318421
}
422+
423+
#[cfg(test)]
424+
mod tests {
425+
use crate::{
426+
core::model::ToYaml as _,
427+
uniffi::{
428+
error::Result,
429+
model::{
430+
Annotation,
431+
pipeline::{NodeURI, Pipeline},
432+
},
433+
operator::MapOperator,
434+
},
435+
};
436+
use indoc::indoc;
437+
use pretty_assertions::assert_eq;
438+
use std::collections::HashMap;
439+
440+
#[test]
441+
fn to_yaml() -> Result<()> {
442+
let pipeline = Pipeline::new(
443+
indoc! {"
444+
digraph {
445+
A -> B -> C
446+
}
447+
"},
448+
&HashMap::from([
449+
(
450+
"A".into(),
451+
MapOperator::new(HashMap::from([("node_key_1".into(), "node_key_2".into())]))?
452+
.into(),
453+
),
454+
(
455+
"B".into(),
456+
MapOperator::new(HashMap::from([("node_key_2".into(), "node_key_1".into())]))?
457+
.into(),
458+
),
459+
(
460+
"C".into(),
461+
MapOperator::new(HashMap::from([("node_key_1".into(), "node_key_2".into())]))?
462+
.into(),
463+
),
464+
]),
465+
HashMap::from([(
466+
"pipeline_key_1".into(),
467+
vec![NodeURI {
468+
node_id: "A".into(),
469+
key: "node_key_1".into(),
470+
}],
471+
)]),
472+
HashMap::new(),
473+
Some(Annotation {
474+
name: "test".into(),
475+
version: "0.1".into(),
476+
description: "Test pipeline".into(),
477+
}),
478+
)?;
479+
480+
assert_eq!(
481+
pipeline.to_yaml()?,
482+
indoc! {r#"
483+
class: pipeline
484+
kernel_lut:
485+
2980eb39e3702442cc31656d6ec3995f91680ab042a27160a00ffe33b91419af:
486+
- 4b498582ed57ca6a10809d7480bd3f159542ad139e402698e3f525fc6b0d4dea
487+
c8f036079b69beee914434c1e01be638972ce05cd2e640fc1e9be7bf3d9e76be:
488+
- 368c7a517f3fbdd7cab10c90ebc44e44765fc33f66b4f4f5151a6bee322d8217
489+
- 6c84111298d0cfe811dff3f10ab444795c0a9d60609ba1b9391c45e642a69afa
490+
dot: |-
491+
digraph {
492+
"368c7a517f3fbdd7cab10c90ebc44e44765fc33f66b4f4f5151a6bee322d8217"
493+
"4b498582ed57ca6a10809d7480bd3f159542ad139e402698e3f525fc6b0d4dea" -> "368c7a517f3fbdd7cab10c90ebc44e44765fc33f66b4f4f5151a6bee322d8217"
494+
"6c84111298d0cfe811dff3f10ab444795c0a9d60609ba1b9391c45e642a69afa" -> "4b498582ed57ca6a10809d7480bd3f159542ad139e402698e3f525fc6b0d4dea"
495+
}
496+
input_spec:
497+
pipeline_key_1:
498+
- node_id: 6c84111298d0cfe811dff3f10ab444795c0a9d60609ba1b9391c45e642a69afa
499+
key: node_key_1
500+
output_spec: {}
501+
"#},
502+
);
503+
504+
Ok(())
505+
}
506+
}

0 commit comments

Comments
 (0)