@@ -3,39 +3,176 @@ use std::{
33 collections:: { HashMap , HashSet } ,
44} ;
55
6- use crate :: uniffi:: {
7- error:: { Kind , OrcaError , Result } ,
8- model:: {
9- packet:: PathSet ,
10- pipeline:: { Kernel , Pipeline , PipelineJob } ,
6+ use crate :: {
7+ core:: crypto:: hash_buffer,
8+ uniffi:: {
9+ error:: { Kind , OrcaError , Result , selector} ,
10+ model:: {
11+ packet:: PathSet ,
12+ pipeline:: { Kernel , NodeURI , Pipeline , PipelineJob } ,
13+ } ,
1114 } ,
1215} ;
1316use itertools:: Itertools as _;
14- use petgraph:: Direction :: Incoming ;
17+ use petgraph:: {
18+ Direction :: Incoming ,
19+ graph:: { self , NodeIndex } ,
20+ } ;
1521use serde:: { Deserialize , Serialize } ;
22+ use snafu:: OptionExt as _;
1623
1724#[ derive( Debug , Clone , Deserialize , Serialize , PartialEq ) ]
1825pub struct PipelineNode {
19- pub id : String ,
26+ // Hash that represent the node
27+ pub hash : String ,
28+ /// Kernel associated with the node
2029 pub kernel : Kernel ,
30+ /// User provided label for the node
31+ pub label : String ,
32+ /// This is meant for internal use only to track the node index in the graph
33+ pub node_idx : NodeIndex ,
2134}
2235
2336impl Pipeline {
37+ /// Validate the pipeline to ensure that, based on user labels:
38+ /// 1. Each node's `input_spec` is covered by either its parent nodes or the pipeline's `input_spec`
39+ pub ( crate ) fn validate ( & self ) -> Result < ( ) > {
40+ // For verification we check that each node has it's input_spec covered by either it's parent or input_spec of the pipeline
41+ // Build a map from input_spec where HashMap<Node_id (Should be label when coming in from new), HashSet<Key covered by input_spec>,
42+ let mut keys_covered_by_input_spec_lut: HashMap < & String , HashSet < & String > > =
43+ HashMap :: < & String , HashSet < & String > > :: new ( ) ;
44+ for node_uris in self . input_spec . values ( ) {
45+ for node_uri in node_uris {
46+ keys_covered_by_input_spec_lut
47+ . entry ( & node_uri. node_id )
48+ . or_default ( )
49+ . insert ( & node_uri. key ) ;
50+ }
51+ }
52+
53+ // Iterate over each node in the graph and verify that its input spec is met
54+ for node_idx in self . graph . node_indices ( ) {
55+ self . validate_valid_input_spec (
56+ node_idx,
57+ keys_covered_by_input_spec_lut. get ( & self . graph [ node_idx] . hash ) ,
58+ ) ?;
59+ }
60+
61+ // Build a LUT for all node_hash to idx
62+ let node_hash_to_idx_lut: HashMap < & String , NodeIndex > = self
63+ . graph
64+ . node_indices ( )
65+ . map ( |idx| ( & self . graph [ idx] . hash , idx) )
66+ . collect ( ) ;
67+
68+ // Validate that all output_keys are valid
69+ self . output_spec . iter ( ) . try_for_each ( |( _, node_uri) | {
70+ if !self
71+ . get_output_spec_for_node ( * node_hash_to_idx_lut. get ( & node_uri. node_id ) . context (
72+ selector:: InvalidOutputSpecNodeNotInGraph {
73+ node_name : node_uri. node_id . clone ( ) ,
74+ } ,
75+ ) ?)
76+ . contains ( & node_uri. key )
77+ {
78+ return Err ( OrcaError {
79+ kind : Kind :: InvalidOutputSpecKeyNotInNode {
80+ node_name : node_uri. node_id . clone ( ) ,
81+ key : node_uri. key . clone ( ) ,
82+ backtrace : Some ( Backtrace :: capture ( ) ) ,
83+ } ,
84+ } ) ;
85+ }
86+ Ok ( ( ) )
87+ } ) ?;
88+
89+ Ok ( ( ) )
90+ }
91+
92+ /// Validates that the input spec for a given node is valid based on its parents and the input spec of the pipeline
93+ fn validate_valid_input_spec (
94+ & self ,
95+ node_idx : NodeIndex ,
96+ keys_covered_by_input_spec : Option < & HashSet < & String > > ,
97+ ) -> Result < ( ) > {
98+ // We need to get the input spec of the current node and build the packet based on the
99+ // parent nodes output spec + input
100+
101+ // Get the parent nodes input specs and combine them into
102+ let incoming_packet_keys = self
103+ . get_parent_node_indices ( node_idx)
104+ . flat_map ( |parent_idx| self . get_output_spec_for_node ( parent_idx) )
105+ . collect :: < HashSet < & String > > ( ) ;
106+
107+ // Get this node input_spec
108+ let missing_keys: HashSet < & String > = self
109+ . get_input_spec_for_node ( node_idx)
110+ . into_iter ( )
111+ . filter ( |expected_key| {
112+ !( incoming_packet_keys. contains ( expected_key)
113+ || keys_covered_by_input_spec. is_some_and ( |keys| keys. contains ( expected_key) ) )
114+ } )
115+ . collect ( ) ;
116+
117+ // Verify that there are no missing keys, otherwise return error
118+ if !missing_keys. is_empty ( ) {
119+ return Err ( OrcaError {
120+ kind : Kind :: PipelineValidationErrorMissingKeys {
121+ node_name : self . graph [ node_idx] . label . clone ( ) ,
122+ missing_keys : missing_keys. into_iter ( ) . cloned ( ) . collect ( ) ,
123+ backtrace : Some ( Backtrace :: capture ( ) ) ,
124+ } ,
125+ } ) ;
126+ }
127+ Ok ( ( ) )
128+ }
129+
130+ fn get_input_spec_for_node ( & self , node_idx : NodeIndex ) -> HashSet < & String > {
131+ match & self . graph [ node_idx] . kernel {
132+ Kernel :: Pod { pod } => pod. input_spec . keys ( ) . collect ( ) ,
133+ Kernel :: JoinOperator => {
134+ // JoinOperator input_spec is derived from its parents
135+ self . get_parent_node_indices ( node_idx)
136+ . flat_map ( |parent_idx| self . get_output_spec_for_node ( parent_idx) )
137+ . collect ( )
138+ }
139+ Kernel :: MapOperator { mapper } => mapper. map . keys ( ) . collect ( ) ,
140+ }
141+ }
142+
143+ fn get_output_spec_for_node ( & self , node_idx : NodeIndex ) -> HashSet < & String > {
144+ match & self . graph [ node_idx] . kernel {
145+ Kernel :: Pod { pod } => pod. output_spec . keys ( ) . collect ( ) ,
146+ Kernel :: JoinOperator => {
147+ // JoinOperator output_spec is derived from its parents
148+ self . get_parent_node_indices ( node_idx)
149+ . flat_map ( |parent_idx| self . get_output_spec_for_node ( parent_idx) )
150+ . collect ( )
151+ }
152+ Kernel :: MapOperator { mapper } => mapper. map . values ( ) . collect ( ) ,
153+ }
154+ }
155+
24156 /// Function to get the parents of a node
25157 pub ( crate ) fn get_node_parents (
26158 & self ,
27159 node : & PipelineNode ,
28- ) -> impl Iterator < Item = & PipelineNode > {
160+ ) -> Result < impl Iterator < Item = & PipelineNode > > {
29161 // Find the NodeIndex for the given node_key
30- let node_index = self
162+ let node_idx = self
31163 . graph
32164 . node_indices ( )
33- . find ( |& idx| self . graph [ idx] == * node) ;
34- node_index. into_iter ( ) . flat_map ( move |idx| {
35- self . graph
36- . neighbors_directed ( idx, Incoming )
37- . map ( move |parent_idx| & self . graph [ parent_idx] )
38- } )
165+ . find ( |& idx| self . graph [ idx] == * node)
166+ . ok_or ( OrcaError {
167+ kind : Kind :: KeyMissing {
168+ key : node. label . clone ( ) ,
169+ backtrace : Some ( Backtrace :: capture ( ) ) ,
170+ } ,
171+ } ) ?;
172+
173+ Ok ( self
174+ . get_parent_node_indices ( node_idx)
175+ . map ( |parent_idx| & self . graph [ parent_idx] ) )
39176 }
40177
41178 /// Return a vec of `node_names` that takes in inputs based on the `input_spec`
@@ -50,6 +187,78 @@ impl Pipeline {
50187
51188 input_nodes
52189 }
190+
191+ fn get_parent_node_indices ( & self , node_idx : NodeIndex ) -> impl Iterator < Item = NodeIndex > {
192+ self . graph . neighbors_directed ( node_idx, Incoming )
193+ }
194+
195+ /// Find the leaf nodes in the graph (nodes with no outgoing edges)
196+ /// # Returns
197+ /// A vector of `NodeIndex` representing the leaf nodes in the graph
198+ pub fn find_leaf_nodes ( & self ) -> Vec < NodeIndex > {
199+ self . graph
200+ . node_indices ( )
201+ . filter ( |& idx| {
202+ self . graph
203+ . neighbors_directed ( idx, petgraph:: Direction :: Outgoing )
204+ . next ( )
205+ . is_none ( )
206+ } )
207+ . collect ( )
208+ }
209+
210+ /// Compute the hash for each node in the graph which is defined as the hash of its kernel + the hashes of its parents
211+ pub ( crate ) fn compute_hash_for_node_and_parents < ' g > (
212+ node_idx : NodeIndex ,
213+ input_spec : & HashMap < String , Vec < NodeURI > > ,
214+ graph : & ' g mut graph:: Graph < PipelineNode , ( ) > ,
215+ ) -> & ' g str {
216+ if graph[ node_idx] . hash . is_empty ( ) {
217+ // Collect parent indices first to avoid borrowing issues
218+ let parent_indices: Vec < NodeIndex > =
219+ graph. neighbors_directed ( node_idx, Incoming ) . collect ( ) ;
220+
221+ // Sort the parent hashes to ensure consistent ordering
222+ let mut parent_hashes: Vec < String > = if parent_indices. is_empty ( ) {
223+ // This is parent node, thus we will need to use the input_spec to generate a unique hash for the node
224+ // Find all the input keys that map to this node
225+ input_spec
226+ . iter ( )
227+ . filter_map ( |( input_key, node_uris) | {
228+ node_uris. iter ( ) . find_map ( |node_uri| {
229+ ( node_uri. node_id == graph[ node_idx] . label ) . then ( || input_key. clone ( ) )
230+ } )
231+ } )
232+ . collect ( )
233+ } else {
234+ parent_indices
235+ . into_iter ( )
236+ . map ( |parent_idx| {
237+ // Check if hash has been computed for this node, if not trigger computation
238+ Self :: compute_hash_for_node_and_parents ( parent_idx, input_spec, graph)
239+ . to_owned ( )
240+ } )
241+ . collect ( )
242+ } ;
243+
244+ parent_hashes. sort ( ) ;
245+
246+ // Combine the node's kernel hash + the parent_hashes by concatenation only if there are parents hashes, else it is just the kernel hash
247+ if parent_hashes. is_empty ( ) {
248+ let kernel_hash = graph[ node_idx] . kernel . get_hash ( ) . to_owned ( ) ;
249+ graph[ node_idx] . hash . clone_from ( & kernel_hash) ;
250+ } else {
251+ let hash_for_node = format ! (
252+ "{}{}" ,
253+ & graph[ node_idx] . kernel. get_hash( ) ,
254+ parent_hashes. into_iter( ) . join( "" )
255+ ) ;
256+ graph[ node_idx] . hash = hash_buffer ( hash_for_node. as_bytes ( ) ) ;
257+ }
258+ }
259+
260+ & graph[ node_idx] . hash
261+ }
53262}
54263
55264impl PipelineJob {
0 commit comments