11use clap:: { Args , Subcommand } ;
22use serde_json:: { json, Value } ;
33
4- use crate :: api:: admin:: { self , jobs} ;
4+ use crate :: api:: admin:: { self , jobs, runs } ;
55use crate :: core:: config:: Config ;
6- use crate :: core:: error:: Result ;
6+ use crate :: core:: error:: { DbtpError , Result } ;
77use crate :: core:: rest_client:: RestClient ;
88
99#[ derive( Debug , Args ) ]
@@ -51,10 +51,30 @@ pub enum JobsCommand {
5151 git_branch : Option < String > ,
5252 #[ arg( long) ]
5353 git_sha : Option < String > ,
54+ /// Wait for the triggered run to reach a final state
55+ #[ arg( long) ]
56+ wait : bool ,
57+ /// Polling interval in seconds (used with --wait)
58+ #[ arg( long, default_value = "10" ) ]
59+ interval : u64 ,
60+ /// Timeout in seconds (used with --wait)
61+ #[ arg( long, default_value = "3600" ) ]
62+ timeout : u64 ,
5463 } ,
5564 /// Rerun a job from its point of failure
5665 #[ command( name = "trigger-from-failure" ) ]
57- TriggerFromFailure { id : u64 } ,
66+ TriggerFromFailure {
67+ id : u64 ,
68+ /// Wait for the triggered run to reach a final state
69+ #[ arg( long) ]
70+ wait : bool ,
71+ /// Polling interval in seconds (used with --wait)
72+ #[ arg( long, default_value = "10" ) ]
73+ interval : u64 ,
74+ /// Timeout in seconds (used with --wait)
75+ #[ arg( long, default_value = "3600" ) ]
76+ timeout : u64 ,
77+ } ,
5878}
5979
6080pub async fn exec ( args : & JobsArgs , client : & RestClient , config : & Config ) -> Result < Value > {
@@ -107,6 +127,9 @@ pub async fn exec(args: &JobsArgs, client: &RestClient, config: &Config) -> Resu
107127 cause,
108128 git_branch,
109129 git_sha,
130+ wait,
131+ interval,
132+ timeout,
110133 } => {
111134 let mut body = json ! ( {
112135 "cause" : cause. as_deref( ) . unwrap_or( "Triggered via dbtp CLI" ) ,
@@ -117,10 +140,76 @@ pub async fn exec(args: &JobsArgs, client: &RestClient, config: &Config) -> Resu
117140 if let Some ( sha) = git_sha {
118141 body[ "git_sha" ] = json ! ( sha) ;
119142 }
120- jobs:: trigger ( client, * id, & body) . await
143+ let run = jobs:: trigger ( client, * id, & body) . await ?;
144+ if * wait {
145+ let run_id = run[ "id" ]
146+ . as_u64 ( )
147+ . ok_or_else ( || DbtpError :: config ( "Trigger response missing run id" ) ) ?;
148+ wait_for_run ( client, run_id, * interval, * timeout) . await
149+ } else {
150+ Ok ( run)
151+ }
152+ }
153+ JobsCommand :: TriggerFromFailure {
154+ id,
155+ wait,
156+ interval,
157+ timeout,
158+ } => {
159+ let run = jobs:: trigger_from_failure ( client, * id) . await ?;
160+ if * wait {
161+ let run_id = run[ "id" ]
162+ . as_u64 ( )
163+ . ok_or_else ( || DbtpError :: config ( "Trigger response missing run id" ) ) ?;
164+ wait_for_run ( client, run_id, * interval, * timeout) . await
165+ } else {
166+ Ok ( run)
167+ }
168+ }
169+ }
170+ }
171+
172+ const STATUS_SUCCESS : u64 = 10 ;
173+ const STATUS_ERROR : u64 = 20 ;
174+ const STATUS_CANCELLED : u64 = 30 ;
175+
176+ fn is_terminal_status ( status : u64 ) -> bool {
177+ matches ! ( status, STATUS_SUCCESS | STATUS_ERROR | STATUS_CANCELLED )
178+ }
179+
180+ async fn wait_for_run (
181+ client : & RestClient ,
182+ run_id : u64 ,
183+ interval : u64 ,
184+ timeout : u64 ,
185+ ) -> Result < Value > {
186+ eprintln ! ( "Waiting for run {run_id} (polling every {interval}s, timeout {timeout}s)..." ) ;
187+ let start = std:: time:: Instant :: now ( ) ;
188+ let mut last_status = String :: new ( ) ;
189+
190+ loop {
191+ let run = runs:: get ( client, run_id) . await ?;
192+ let status_code = run[ "status" ] . as_u64 ( ) . unwrap_or ( 0 ) ;
193+ let status_human = run[ "status_humanized" ]
194+ . as_str ( )
195+ . unwrap_or ( "Unknown" )
196+ . to_string ( ) ;
197+
198+ if status_human != last_status {
199+ eprintln ! ( "Run {run_id}: {status_human}" ) ;
200+ last_status = status_human;
201+ }
202+
203+ if is_terminal_status ( status_code) {
204+ return Ok ( run) ;
121205 }
122- JobsCommand :: TriggerFromFailure { id } => {
123- jobs:: trigger_from_failure ( client, * id) . await
206+
207+ if start. elapsed ( ) . as_secs ( ) >= timeout {
208+ return Err ( DbtpError :: config ( format ! (
209+ "Timeout waiting for run {run_id} after {timeout}s"
210+ ) ) ) ;
124211 }
212+
213+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( interval) ) . await ;
125214 }
126215}
0 commit comments