@@ -24,6 +24,7 @@ pub fn check_dataflow(
2424) -> eyre:: Result < ( ) > {
2525 let nodes = dataflow. resolve_aliases_and_set_defaults ( ) ?;
2626 let mut has_python_operator = false ;
27+ let mut errors: Vec < String > = Vec :: new ( ) ;
2728
2829 // check that nodes and operators exist
2930 for node in nodes. values ( ) {
@@ -47,29 +48,41 @@ pub fn check_dataflow(
4748 }
4849 } else if custom. build . is_some ( ) {
4950 info ! ( "skipping path check for node with build command" ) ;
50- } else {
51- resolve_path ( source, working_dir) . wrap_err_with ( || {
52- format ! ( "Could not find source path `{source}`" )
53- } ) ?;
51+ } else if let Err ( err) = resolve_path ( source, working_dir) {
52+ errors. push ( format ! ( "node `{}`: {err}" , node. id) ) ;
5453 } ;
5554 }
5655 } ,
5756 dora_message:: descriptor:: NodeSource :: GitBranch { .. } => {
5857 info ! ( "skipping check for node with git source" ) ;
5958 }
6059 } ,
61- descriptor:: CoreNodeKind :: Runtime ( node ) => {
62- for operator_definition in & node . operators {
60+ descriptor:: CoreNodeKind :: Runtime ( runtime_node ) => {
61+ for operator_definition in & runtime_node . operators {
6362 match & operator_definition. config . source {
6463 OperatorSource :: SharedLibrary ( path) => {
6564 if source_is_url ( path) {
6665 info ! ( "{path} is a URL." ) ; // TODO: Implement url check.
6766 } else if operator_definition. config . build . is_some ( ) {
6867 info ! ( "skipping path check for operator with build command" ) ;
6968 } else {
70- let path = adjust_shared_library_path ( Path :: new ( & path) ) ?;
71- if !working_dir. join ( & path) . exists ( ) {
72- bail ! ( "no shared library at `{}`" , path. display( ) ) ;
69+ match adjust_shared_library_path ( Path :: new ( & path) ) {
70+ Ok ( path) => {
71+ if !working_dir. join ( & path) . exists ( ) {
72+ errors. push ( format ! (
73+ "node `{}`, operator `{}`: no shared library at `{}`" ,
74+ node. id,
75+ operator_definition. id,
76+ path. display( )
77+ ) ) ;
78+ }
79+ }
80+ Err ( err) => {
81+ errors. push ( format ! (
82+ "node `{}`, operator `{}`: {err}" ,
83+ node. id, operator_definition. id,
84+ ) ) ;
85+ }
7386 }
7487 }
7588 }
@@ -79,14 +92,20 @@ pub fn check_dataflow(
7992 if source_is_url ( path) {
8093 info ! ( "{path} is a URL." ) ; // TODO: Implement url check.
8194 } else if !working_dir. join ( path) . exists ( ) {
82- bail ! ( "no Python library at `{path}`" ) ;
95+ errors. push ( format ! (
96+ "node `{}`, operator `{}`: no Python library at `{path}`" ,
97+ node. id, operator_definition. id,
98+ ) ) ;
8399 }
84100 }
85101 OperatorSource :: Wasm ( path) => {
86102 if source_is_url ( path) {
87103 info ! ( "{path} is a URL." ) ; // TODO: Implement url check.
88104 } else if !working_dir. join ( path) . exists ( ) {
89- bail ! ( "no WASM library at `{path}`" ) ;
105+ errors. push ( format ! (
106+ "node `{}`, operator `{}`: no WASM library at `{path}`" ,
107+ node. id, operator_definition. id,
108+ ) ) ;
90109 }
91110 }
92111 }
@@ -100,17 +119,22 @@ pub fn check_dataflow(
100119 match & node. kind {
101120 descriptor:: CoreNodeKind :: Custom ( custom_node) => {
102121 for ( input_id, input) in & custom_node. run_config . inputs {
103- check_input ( input, & nodes, & format ! ( "{}/{input_id}" , node. id) ) ?;
122+ if let Err ( err) = check_input ( input, & nodes, & format ! ( "{}/{input_id}" , node. id) )
123+ {
124+ errors. push ( format ! ( "{err}" ) ) ;
125+ }
104126 }
105127 }
106128 descriptor:: CoreNodeKind :: Runtime ( runtime_node) => {
107129 for operator_definition in & runtime_node. operators {
108130 for ( input_id, input) in & operator_definition. config . inputs {
109- check_input (
131+ if let Err ( err ) = check_input (
110132 input,
111133 & nodes,
112134 & format ! ( "{}/{}/{input_id}" , operator_definition. id, node. id) ,
113- ) ?;
135+ ) {
136+ errors. push ( format ! ( "{err}" ) ) ;
137+ }
114138 }
115139 }
116140 }
@@ -119,15 +143,34 @@ pub fn check_dataflow(
119143
120144 // Check that nodes can resolve `send_stdout_as`
121145 for node in nodes. values ( ) {
122- node. send_stdout_as ( )
123- . context ( "Could not resolve `send_stdout_as` configuration" ) ?;
146+ if let Err ( err) = node. send_stdout_as ( ) {
147+ errors. push ( format ! (
148+ "node `{}`: could not resolve `send_stdout_as` configuration: {err}" ,
149+ node. id
150+ ) ) ;
151+ }
124152 }
125153
126154 if has_python_operator {
127- check_python_runtime ( ) ?;
155+ if let Err ( err) = check_python_runtime ( ) {
156+ errors. push ( format ! ( "{err}" ) ) ;
157+ }
128158 }
129159
130- Ok ( ( ) )
160+ if errors. is_empty ( ) {
161+ Ok ( ( ) )
162+ } else {
163+ let error_list = errors
164+ . iter ( )
165+ . map ( |e| format ! ( " - {e}" ) )
166+ . collect :: < Vec < _ > > ( )
167+ . join ( "\n " ) ;
168+ bail ! (
169+ "found {} validation error(s):\n {}" ,
170+ errors. len( ) ,
171+ error_list
172+ ) ;
173+ }
131174}
132175
133176pub trait ResolvedNodeExt {
0 commit comments