1- use std:: { error:: Error , path:: PathBuf , thread, time:: Duration } ;
1+ use std:: { error:: Error , path:: PathBuf , str :: FromStr , thread, time:: Duration } ;
22
33use base64:: prelude:: * ;
44use clap:: { Args , Command , Parser , Subcommand , ValueHint , builder:: TypedValueParser } ;
5- use clap_complete:: { Generator , Shell , generate} ;
6- use color_eyre:: { Result , eyre:: eyre} ;
5+ use clap_complete:: { ArgValueCompleter , CompletionCandidate , Generator , Shell , generate} ;
6+ use color_eyre:: { Report , Result , eyre:: eyre} ;
77use iroh_ssh:: IrohSsh ;
8+ use itertools:: Itertools ;
89use pid1:: Pid1Settings ;
910use rust_supervisor:: { ChildType , Supervisor , SupervisorConfig } ;
1011use strum:: VariantNames ;
12+ use tokio:: sync:: mpsc;
1113
1214use crate :: {
1315 config:: { ComputePlatform , Config , get_config_dir, get_data_dir, get_project_local_config_file} ,
1416 cscs:: {
1517 api_client:: {
1618 client:: { EdfSpec as EdfSpecEnum , ScriptSpec as ScriptSpecEnum } ,
17- types:: JobStatus ,
19+ types:: { JobStatus , PathType } ,
1820 } ,
19- handlers:: cscs_job_details,
21+ handlers:: { cscs_file_list , cscs_job_details, cscs_job_list , file_system_roots } ,
2022 } ,
2123 util:: types:: DockerImageUrl ,
2224} ;
@@ -143,7 +145,7 @@ pub struct ScriptSpec {
143145 generate_script : bool ,
144146 #[ arg( long, value_name = "PATH" , help = "upload local script file" , value_hint=ValueHint :: FilePath ) ]
145147 local_script : Option < PathBuf > ,
146- #[ arg( long, value_name = "PATH" , help = "use script file already present on remote" , value_hint= ValueHint :: Other ) ]
148+ #[ arg( long, value_name = "PATH" , help = "use script file already present on remote" , add = ArgValueCompleter :: new ( remote_path_completer ) ) ]
147149 remote_script : Option < PathBuf > ,
148150}
149151impl Default for ScriptSpec {
@@ -178,7 +180,7 @@ pub struct EdfSpec {
178180 generate_edf : bool ,
179181 #[ arg( long, value_name = "PATH" , help = "upload local edf file" , value_hint=ValueHint :: FilePath ) ]
180182 local_edf : Option < PathBuf > ,
181- #[ arg( long, value_name = "PATH" , help = "use edf file already present on remote" , value_hint= ValueHint :: Other ) ]
183+ #[ arg( long, value_name = "PATH" , help = "use edf file already present on remote" , add = ArgValueCompleter :: new ( remote_path_completer ) ) ]
182184 remote_edf : Option < PathBuf > ,
183185}
184186
@@ -204,22 +206,38 @@ impl From<EdfSpec> for EdfSpecEnum {
204206 }
205207}
206208
209+ #[ derive( Debug , Clone ) ]
210+ pub enum JobIdOrName {
211+ Id ( i64 ) ,
212+ Name ( String ) ,
213+ }
214+
215+ impl FromStr for JobIdOrName {
216+ type Err = Report ;
217+
218+ fn from_str ( s : & str ) -> std:: result:: Result < Self , Self :: Err > {
219+ Ok ( s. parse :: < i64 > ( )
220+ . map ( JobIdOrName :: Id )
221+ . unwrap_or_else ( |_| JobIdOrName :: Name ( s. to_string ( ) ) ) )
222+ }
223+ }
224+
207225#[ allow( clippy:: large_enum_variant) ]
208226#[ derive( Subcommand , Debug ) ]
209227pub enum CscsJobCommands {
210228 #[ clap( alias( "ls" ) , about = "List all jobs [aliases: ls]" ) ]
211229 List ,
212230 #[ clap( alias( "g" ) , about = "Get metadata for a specific job [aliases: g]" ) ]
213231 Get {
214- #[ arg( help="id of the job" , value_hint= ValueHint :: Other ) ]
215- job_id : i64 ,
232+ #[ arg( help="id or name of the job (name uses newest job of that name) " , add = ArgValueCompleter :: new ( job_id_or_name_completer ) ) ]
233+ job : JobIdOrName ,
216234 } ,
217235 #[ clap( about = "Get the stdout of a job" ) ]
218236 Log {
219237 #[ clap( short, long, action, help = "whether to get stderr instead of stdout" ) ]
220238 stderr : bool ,
221- #[ arg( help="id of the job" , value_hint= ValueHint :: Other ) ]
222- job_id : i64 ,
239+ #[ arg( help="id or name of the job (name uses newest job of that name) " , add = ArgValueCompleter :: new ( job_id_or_name_completer ) ) ]
240+ job : JobIdOrName ,
223241 } ,
224242
225243 #[ clap( alias( "s" ) , about = "Submit a new compute job [aliases: s]" ) ]
@@ -269,26 +287,80 @@ pub enum CscsJobCommands {
269287 about = "Cancel a running job, fails if the job isn't running [aliases: c]"
270288 ) ]
271289 Cancel {
272- #[ clap( help="id of the job" , value_hint= ValueHint :: Other ) ]
273- job_id : i64 ,
290+ #[ clap( help="id or name of the job (name uses newest job of that name) " , add = ArgValueCompleter :: new ( job_id_or_name_completer ) ) ]
291+ job : JobIdOrName ,
274292 } ,
275293}
294+ fn job_id_or_name_completer ( current : & std:: ffi:: OsStr ) -> Vec < CompletionCandidate > {
295+ let mut completions = vec ! [ ] ;
296+ let Some ( current) = current. to_str ( ) else {
297+ return completions;
298+ } ;
299+ let jn = JobIdOrName :: from_str ( current) . unwrap ( ) ;
300+ // the tokio shenanigans here are to be able to call async code from this sync method,
301+ // with an already running async runtime from tokio::main, and getting back the result,
302+ // all without blocking the async runtime in sync code (hence the extra thread).
303+ let ( send, mut recv) = mpsc:: unbounded_channel ( ) ;
304+ match jn {
305+ JobIdOrName :: Id ( id) => {
306+ tokio:: spawn ( async move {
307+ let jobs = cscs_job_list ( None , None ) . await . unwrap ( ) ;
308+ let partial_id = id. to_string ( ) ;
309+ let ids: Vec < _ > = jobs
310+ . iter ( )
311+ . map ( |j| ( j. id . to_string ( ) , j. name . clone ( ) ) )
312+ . filter ( |i| i. 0 . starts_with ( & partial_id) )
313+ . sorted_by_key ( |i| i. 0 . clone ( ) )
314+ . collect ( ) ;
315+ for ( id, name) in ids {
316+ send. send ( CompletionCandidate :: new ( id) . help ( Some ( name. into ( ) ) ) ) . unwrap ( ) ;
317+ }
318+ } ) ;
319+ }
320+ JobIdOrName :: Name ( name) => {
321+ tokio:: spawn ( async move {
322+ let jobs = cscs_job_list ( None , None ) . await . unwrap ( ) ;
323+ let names: Vec < _ > = jobs
324+ . into_iter ( )
325+ . map ( |j| j. name )
326+ . filter ( |n| n. starts_with ( & name) )
327+ . sorted ( )
328+ . dedup ( )
329+ . collect ( ) ;
330+ for name in names {
331+ send. send ( CompletionCandidate :: new ( name) ) . unwrap ( ) ;
332+ }
333+ } ) ;
334+ }
335+ }
336+ let sync_recv = thread:: spawn ( move || {
337+ let mut completions = vec ! [ ] ;
338+ while let Some ( candidate) = recv. blocking_recv ( ) {
339+ completions. push ( candidate) ;
340+ }
341+ completions
342+ } ) ;
343+ let comp = sync_recv. join ( ) . unwrap ( ) ;
344+ completions. extend ( comp) ;
345+
346+ completions
347+ }
276348
277349#[ derive( Subcommand , Debug ) ]
278350pub enum CscsFileCommands {
279351 #[ clap( alias( "ls" ) , about = "List folders and files in a remote path [aliases: ls]" ) ]
280352 List {
281- #[ arg( help ="remote path to list" , value_hint= ValueHint :: Other ) ]
353+ #[ arg( help ="remote path to list" , add = ArgValueCompleter :: new ( remote_path_completer ) ) ]
282354 path : PathBuf ,
283355 } ,
284356 #[ clap( alias( "rm" ) , about = "Remove remote files or folders [aliases: rm]" ) ]
285357 Remove {
286- #[ arg( help ="remote path to remove" , value_hint= ValueHint :: Other ) ]
358+ #[ arg( help ="remote path to remove" , add = ArgValueCompleter :: new ( remote_path_completer ) ) ]
287359 path : PathBuf ,
288360 } ,
289361 #[ clap( alias( "dl" ) , about = "Download a remote file [aliases: dl]" ) ]
290362 Download {
291- #[ clap( help = "The path in the cluster to download" , value_hint= ValueHint :: Other ) ]
363+ #[ clap( help = "The path in the cluster to download" , add = ArgValueCompleter :: new ( remote_path_completer ) ) ]
292364 remote : PathBuf ,
293365 #[ clap( help = "The local path to download the file to" , value_hint=ValueHint :: AnyPath ) ]
294366 local : PathBuf ,
@@ -298,11 +370,78 @@ pub enum CscsFileCommands {
298370 #[ clap( help = "The local path to upload to the cluster" , value_hint=ValueHint :: AnyPath ) ]
299371 local : PathBuf ,
300372
301- #[ clap( help = "the path in the cluster to upload to" , value_hint= ValueHint :: Other ) ]
373+ #[ clap( help = "the path in the cluster to upload to" , add = ArgValueCompleter :: new ( remote_path_completer ) ) ]
302374 remote : PathBuf ,
303375 } ,
304376}
305377
378+ fn remote_path_completer ( current : & std:: ffi:: OsStr ) -> Vec < CompletionCandidate > {
379+ let mut completions = vec ! [ ] ;
380+ let Some ( current) = current. to_str ( ) else {
381+ return completions;
382+ } ;
383+
384+ // the tokio shenanigans here are to be able to call async code from this sync method,
385+ // with an already running async runtime from tokio::main, and getting back the result,
386+ // all without blocking the async runtime in sync code (hence the extra thread).
387+ let ( send, mut recv) = mpsc:: unbounded_channel ( ) ;
388+ if current. is_empty ( ) || current == "/" {
389+ tokio:: spawn ( async move {
390+ let roots = file_system_roots ( ) . await ;
391+ if let Ok ( roots) = roots {
392+ for root in roots {
393+ send. send ( CompletionCandidate :: new ( root. name . clone ( ) ) ) . unwrap ( ) ;
394+ }
395+ }
396+ } ) ;
397+ } else {
398+ let current = PathBuf :: from ( current) ;
399+ tokio:: spawn ( async move {
400+ let parent = current. parent ( ) . unwrap ( ) ;
401+ let roots = cscs_file_list ( current. clone ( ) , None , None ) . await ;
402+ if let Ok ( roots) = roots {
403+ for root in roots {
404+ if root. path_type == PathType :: File {
405+ send. send ( CompletionCandidate :: new ( current. join ( root. name . clone ( ) ) ) )
406+ . unwrap ( ) ;
407+ } else {
408+ // joining with "" ensures trailing slash
409+ send. send ( CompletionCandidate :: new ( current. join ( root. name . clone ( ) ) . join ( "" ) ) )
410+ . unwrap ( ) ;
411+ }
412+ }
413+ } else {
414+ // file listing only work for full paths, so if we want to complet a partial result, we need
415+ // to list the parent folder and take it from there
416+ if let Ok ( roots) = cscs_file_list ( parent. to_path_buf ( ) , None , None ) . await {
417+ let partial = current. file_name ( ) . unwrap ( ) . to_string_lossy ( ) . into_owned ( ) ;
418+ for root in roots {
419+ if root. name . starts_with ( & partial) {
420+ if root. path_type == PathType :: File {
421+ send. send ( CompletionCandidate :: new ( parent. join ( root. name ) ) ) . unwrap ( ) ;
422+ } else {
423+ // joining with "" ensures trailing slash
424+ send. send ( CompletionCandidate :: new ( parent. join ( root. name . clone ( ) ) . join ( "" ) ) )
425+ . unwrap ( ) ;
426+ }
427+ }
428+ }
429+ }
430+ }
431+ } ) ;
432+ }
433+ let sync_recv = thread:: spawn ( move || {
434+ let mut completions = vec ! [ ] ;
435+ while let Some ( candidate) = recv. blocking_recv ( ) {
436+ completions. push ( candidate) ;
437+ }
438+ completions
439+ } ) ;
440+ let comp = sync_recv. join ( ) . unwrap ( ) ;
441+ completions. extend ( comp) ;
442+
443+ completions
444+ }
306445#[ derive( Subcommand , Debug ) ]
307446pub enum CscsSystemCommands {
308447 #[ clap( alias( "ls" ) , about = "List available compute systems [aliases: ls]" ) ]
0 commit comments