@@ -22,6 +22,7 @@ use dora_message::{
2222 daemon_to_node:: { NodeConfig , RuntimeConfig } ,
2323 id:: NodeId ,
2424 DataflowId ,
25+ descriptor:: EnvValue
2526} ;
2627use dora_node_api:: {
2728 arrow:: array:: ArrayData ,
@@ -34,6 +35,7 @@ use std::{
3435 path:: { Path , PathBuf } ,
3536 process:: Stdio ,
3637 sync:: Arc ,
38+ collections:: BTreeMap
3739} ;
3840use tokio:: {
3941 fs:: File ,
@@ -126,7 +128,7 @@ impl Spawner {
126128 let ( command, error_msg) = match & node. kind {
127129 dora_core:: descriptor:: CoreNodeKind :: Custom ( n) => {
128130 let mut command =
129- path_spawn_command ( & node_working_dir, self . uv , logger, n, true ) . await ?;
131+ path_spawn_command ( & node_working_dir, self . uv , logger, n, & node . env , true ) . await ?;
130132
131133 if let Some ( command) = & mut command {
132134 command. current_dir ( & node_working_dir) ;
@@ -608,6 +610,7 @@ async fn path_spawn_command(
608610 uv : bool ,
609611 logger : & mut NodeLogger < ' _ > ,
610612 node : & dora_core:: descriptor:: CustomNode ,
613+ env : & Option < BTreeMap < String , EnvValue > > ,
611614 permit_url : bool ,
612615) -> eyre:: Result < Option < tokio:: process:: Command > > {
613616 let cmd = match node. path . as_str ( ) {
@@ -634,7 +637,32 @@ async fn path_spawn_command(
634637 . await
635638 . wrap_err ( "failed to download custom node" ) ?
636639 } else {
637- resolve_path ( source, working_dir)
640+ let replacements: Vec < eyre:: Result < ( String , String ) > > = source. find ( '$' ) . map ( |start| {
641+ let end = source[ start..] . find ( '/' ) . unwrap_or ( source. len ( ) ) ;
642+ let var = & source[ start + 1 ..start + end] ;
643+ if let Some ( envs) = env {
644+ if let Some ( val) = envs. get ( var) {
645+ Ok ( ( var. to_string ( ) , val. to_string ( ) ) )
646+ } else {
647+ eyre:: bail!( "environment variable `{}` for node `{}` not found" , var, source)
648+ }
649+ } else {
650+ eyre:: bail!( "environment variable `{}` for node `{}` not found" , var, source)
651+ }
652+ } ) . into_iter ( ) . collect ( ) ;
653+ let mut source = String :: from ( source) ;
654+ for kv in replacements. into_iter ( ) {
655+ match kv {
656+ Ok ( ( var, value) ) => {
657+ source = source. replace ( & format ! ( "${}" , var) , & value) ;
658+ }
659+ Err ( err) => {
660+ return Err ( err) ;
661+ }
662+ }
663+ }
664+
665+ resolve_path ( & source, working_dir)
638666 . wrap_err_with ( || format ! ( "failed to resolve node source `{source}`" ) ) ?
639667 } ;
640668
0 commit comments