diff --git a/crates/hyperqueue/src/client/commands/autoalloc.rs b/crates/hyperqueue/src/client/commands/autoalloc.rs index 31be532f1..91207343a 100644 --- a/crates/hyperqueue/src/client/commands/autoalloc.rs +++ b/crates/hyperqueue/src/client/commands/autoalloc.rs @@ -219,8 +219,8 @@ async fn dry_run_command(mut connection: ClientConnection, opts: DryRunOpts) -> DryRunCommand::Slurm(params) => (ManagerType::Slurm, args_to_params(params)), }; let message = FromClientMessage::AutoAlloc(AutoAllocRequest::DryRun { - manager, - parameters, + manager: manager.clone(), + parameters: parameters.clone(), }); rpc_call!(connection, message, @@ -248,8 +248,8 @@ async fn add_queue(mut connection: ClientConnection, opts: AddQueueOpts) -> anyh }; let message = FromClientMessage::AutoAlloc(AutoAllocRequest::AddQueue { - manager, - parameters, + manager: manager.clone(), + parameters: parameters.clone(), dry_run, }); @@ -260,8 +260,8 @@ async fn add_queue(mut connection: ClientConnection, opts: AddQueueOpts) -> anyh if dry_run { log::info!( - "A trial allocation was submitted successfully. It was immediately canceled to avoid \ -wasting resources." + "A trial allocation was submitted successfully. \ + It was immediately canceled to avoid wasting resources." ); } diff --git a/crates/hyperqueue/src/server/client/autoalloc.rs b/crates/hyperqueue/src/server/client/autoalloc.rs index 8a0b541d3..b83051976 100644 --- a/crates/hyperqueue/src/server/client/autoalloc.rs +++ b/crates/hyperqueue/src/server/client/autoalloc.rs @@ -9,7 +9,12 @@ use crate::transfer::messages::{ AllocationQueueParams, AutoAllocListResponse, AutoAllocRequest, AutoAllocResponse, QueueDescriptorData, ToClientMessage, }; +use anyhow::Context; +use chrono::{NaiveTime, Timelike}; +use serde_json::Value; use std::path::PathBuf; +use std::process::Command; +use std::str::FromStr; use std::time::{Duration, SystemTime}; use tempdir::TempDir; @@ -238,6 +243,183 @@ fn create_queue( } } +struct QueueLimits { + walltime: Option, + backlog: Option, + workers: Option, +} + +// Returns value of given arg, in formats: keyVALUE, key=VALUE, [key, VALUE] +fn get_arg_by_key(key: String, args: &[String]) -> Option { + let index_opt = args.iter().position(|r| r.contains(&key)); + + if let Some(index) = index_opt { + let mut value = args[index].clone(); + + value = value.replace(&key, ""); + value = value.replace('=', ""); + + return if !value.is_empty() { + Some(value) + } else { + args.get(index + 1).cloned() + }; + } + None +} + +// Returns value of given path in json, can include nests +fn get_value_by_key_json( + json: &serde_json::Value, + path: String, +) -> anyhow::Result<&serde_json::Value> { + let json_keys = path.split('/'); + let mut value = json; + for json_key in json_keys { + value = value + .get(json_key) + .context(format!("key {json_key} not found"))?; + } + Ok(value) +} + +fn get_pbs_queue_limit(queue_type: String) -> anyhow::Result> { + let queue_info = Command::new("qstat") + .arg("-Q") + .arg(&queue_type) + .arg("-f") + .arg("-F") + .arg("json") + .output()?; + + let output = String::from_utf8_lossy(&queue_info.stdout); + let json: serde_json::Value = + serde_json::from_str(&output).context("JSON was not well-formatted")?; + let queue = json + .get("Queue") + .and_then(|x| x.get(&queue_type)) + .context("can't find queue info")?; + + parse_pbs_queue_limit(queue) +} + +fn parse_pbs_queue_limit(queue: &Value) -> anyhow::Result> { + let (walltime, backlog, workers_max, workers_available) = ( + "resources_max/walltime", + "max_queued", + "resources_max/nodect", + "resources_available/nodes", + ); + + let walltime = match get_value_by_key_json(queue, String::from(walltime)) { + Ok(value) => { + let walltime = NaiveTime::from_str(value.as_str().context("can't convert str")?)?; + let secs: u64 = + (walltime.hour() * 3600 + walltime.minute() * 60 + walltime.second()) as u64; + Some(Duration::from_secs(secs)) + } + Err(_) => None, + }; + + let backlog = match get_value_by_key_json(queue, String::from(backlog)) { + Ok(value) => Some( + value + .to_string() + .chars() + .into_iter() + .filter(|x| x.is_numeric()) + .collect::() + .parse()?, + ), + Err(_) => None, + }; + + let workers: Option = { + let possible_workers = match get_value_by_key_json(queue, String::from(workers_max)) { + Ok(max) => Some(max), + Err(_) => match get_value_by_key_json(queue, String::from(workers_available)) { + Ok(available) => Some(available), + Err(_) => None, + }, + }; + + match possible_workers { + None => None, + Some(value) => Some( + value + .to_string() + .chars() + .into_iter() + .filter(|x| x.is_numeric()) + .collect::() + .parse()?, + ), + } + }; + Ok(Option::from(QueueLimits { + walltime, + backlog, + workers, + })) +} + +fn get_slurm_queue_limit(_args: &[String]) -> anyhow::Result> { + Ok(None) +} + +pub fn validate_queue_parameters( + info: &AllocationQueueParams, + manager: &ManagerType, +) -> anyhow::Result<()> { + let queue_limit = match manager { + ManagerType::Pbs => { + let opt_queue_type = get_arg_by_key(String::from("-q"), &info.additional_args); + match opt_queue_type { + Some(queue_type) => get_pbs_queue_limit(queue_type.replace("-q", "")), + None => Ok(None), + } + } + ManagerType::Slurm => get_slurm_queue_limit(&info.additional_args), + }; + + match queue_limit { + Err(err) => Err(anyhow::anyhow!("Error while getting queue limit: {err}")), + Ok(None) => Err(anyhow::anyhow!( + "Queue limit wasn't retrieved, can't provide additional information." + )), + Ok(Some(limit)) => { + let mut msg = String::new(); + + if let Some(walltime) = limit.walltime { + if walltime < info.timelimit { + msg.push_str( + "Walltime is larger than the walltime limit of the given queue.\n", + ); + } + } + if let Some(backlog) = limit.backlog { + if backlog < info.backlog { + msg.push_str( + "Backlog size is larger than the max. queue limit of the given queue.\n", + ); + } + } + if let Some(workers) = limit.workers { + if workers < info.workers_per_alloc { + msg.push_str("Workers per allocation is larger than the max. number of workers of the given queue.\n"); + } + } + + if !msg.is_empty() { + return Err(anyhow::anyhow!( + "Allocation parameters exceeded queue limits: ".to_owned() + &msg + )); + } + Ok(()) + } + } +} + async fn try_submit_allocation( manager: ManagerType, params: AllocationQueueParams, @@ -246,7 +428,7 @@ async fn try_submit_allocation( let mut handler = create_allocation_handler(&manager, params.name.clone(), tmpdir.as_ref().to_path_buf())?; let worker_count = params.workers_per_alloc; - let queue_info = create_queue_info(params); + let queue_info = create_queue_info(params.clone()); let allocation = handler .submit_allocation(0, &queue_info, worker_count as u64, SubmitMode::DryRun) @@ -254,9 +436,25 @@ async fn try_submit_allocation( .map_err(|e| anyhow::anyhow!("Could not submit allocation: {:?}", e))?; let working_dir = allocation.working_dir().to_path_buf(); - let id = allocation - .into_id() - .map_err(|e| anyhow::anyhow!("Could not submit allocation: {:?}", e))?; + let id = match allocation.into_id() { + Ok(result) => result, + Err(basic_error) => { + return { + match validate_queue_parameters(¶ms, &manager) { + Ok(_) => Err(anyhow::anyhow!( + "Could not submit allocation: {:?}", + basic_error + )), + Err(additional_error) => Err(anyhow::anyhow!( + "Could not submit allocation: {:?} \nAdditional errors: {:?}", + basic_error, + additional_error + )), + } + } + } + }; + let allocation = Allocation { id: id.to_string(), worker_count: 1, @@ -271,3 +469,114 @@ async fn try_submit_allocation( Ok(()) } + +#[cfg(test)] +mod tests { + use crate::server::client::autoalloc::{get_arg_by_key, parse_pbs_queue_limit}; + use std::time::Duration; + + #[test] + fn test_get_arg_by_key() { + assert_eq!( + get_arg_by_key( + String::from("-q"), + &[String::from("x"), String::from("-qqexp"), String::from("y")] + ) + .is_some(), + true + ); + assert_eq!( + get_arg_by_key( + String::from("-q"), + &[ + String::from("x"), + String::from("-q=qexp"), + String::from("y") + ] + ) + .is_some(), + true + ); + assert_eq!( + get_arg_by_key( + String::from("-q"), + &[ + String::from("x"), + String::from("-q"), + String::from("qexp"), + String::from("y") + ] + ) + .is_some(), + true + ); + } + + #[test] + fn test_get_pbs_queue_limit() { + let queue_json = r#"{ + "timestamp":1646895019, + "pbs_version":"20.0.1", + "pbs_server":"isrv1.barbora.it4i.cz", + "Queue":{ + "qexp":{ + "queue_type":"Execution", + "Priority":150, + "total_jobs":2, + "state_count":"Transit:0 Queued:0 Held:2 Waiting:0 Running:0 Exiting:0 Begun:0 ", + "max_queued":"[u:PBS_GENERIC=5]", + "resources_max":{ + "ncpus":144, + "nodect":4, + "walltime":"01:00:00" + }, + "resources_default":{ + "ncpus":36, + "walltime":"01:00:00" + }, + "default_chunk":{ + "ncpus":36, + "Qlist":"qexp" + }, + "resources_available":{ + "ncpus":576, + "nodect":16 + }, + "resources_assigned":{ + "mem":"0kb", + "mpiprocs":0, + "ncpus":0, + "nodect":0 + }, + "max_run":"[u:PBS_GENERIC=1]", + "max_run_res":{ + "ncpus":"[u:PBS_GENERIC=144]", + "nodect":"[u:PBS_GENERIC=4]" + }, + "enabled":"True", + "started":"True" + } + } +}"#; + // Get queue info + let json: serde_json::Value = serde_json::from_str(&queue_json).unwrap(); + let queue = json + .get("Queue") + .and_then(|x| x.get(&String::from("qexp"))) + .unwrap(); + + // Check values + let result = parse_pbs_queue_limit(queue); + match result { + Ok(Some(limit)) => { + // Max-queued + assert_eq!(limit.backlog == Some(5), true); + // resources_max/nodect for this queue + assert_eq!(limit.workers == Some(4), true); + // resources_max/walltime + assert_eq!(limit.walltime == Some(Duration::from_secs(3600)), true); + } + _ => assert!(false), + } + } +}