Skip to content

PBS alloc checker #381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions crates/hyperqueue/src/client/commands/autoalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ async fn dry_run_command(mut connection: ClientConnection, opts: DryRunOpts) ->
DryRunCommand::Slurm(params) => (ManagerType::Slurm, args_to_params(params)),
};
let message = FromClientMessage::AutoAlloc(AutoAllocRequest::DryRun {
manager,
parameters,
manager: manager.clone(),
parameters: parameters.clone(),
});

rpc_call!(connection, message,
Expand Down Expand Up @@ -248,8 +248,8 @@ async fn add_queue(mut connection: ClientConnection, opts: AddQueueOpts) -> anyh
};

let message = FromClientMessage::AutoAlloc(AutoAllocRequest::AddQueue {
manager,
parameters,
manager: manager.clone(),
parameters: parameters.clone(),
dry_run,
});

Expand All @@ -260,8 +260,8 @@ async fn add_queue(mut connection: ClientConnection, opts: AddQueueOpts) -> anyh

if dry_run {
log::info!(
"A trial allocation was submitted successfully. It was immediately canceled to avoid \
wasting resources."
"A trial allocation was submitted successfully. \
It was immediately canceled to avoid wasting resources."
);
}

Expand Down
243 changes: 239 additions & 4 deletions crates/hyperqueue/src/server/client/autoalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use crate::transfer::messages::{
AllocationQueueParams, AutoAllocListResponse, AutoAllocRequest, AutoAllocResponse,
QueueDescriptorData, ToClientMessage,
};
use anyhow::Context;
use chrono::{NaiveTime, Timelike};
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use tempdir::TempDir;

Expand Down Expand Up @@ -238,6 +242,179 @@ fn create_queue(
}
}

struct QueueLimits {
walltime: Option<Duration>,
backlog: Option<u32>,
workers: Option<u32>,
}

// Returns value of given arg, in formats: keyVALUE, key=VALUE, [key, VALUE]
fn get_arg_by_key(key: String, args: &[String]) -> Option<String> {
let index_opt = args.iter().position(|r| r.contains(&key));

if let Some(index) = index_opt {
let mut value = args[index].clone();

value = value.replace(&key, "");
value = value.replace('=', "");

return if !value.is_empty() {
Some(value)
} else {
args.get(index + 1).cloned()
};
}
None
}

// Returns value of given path in json, can include nests
fn get_value_by_key_json(
json: &serde_json::Value,
path: String,
) -> anyhow::Result<&serde_json::Value> {
let json_keys = path.split('/');
let mut value = json;
for json_key in json_keys {
value = value
.get(json_key)
.context(format!("key {json_key} not found"))?;
}
Ok(value)
}

fn get_pbs_queue_limit(queue_type: String) -> anyhow::Result<Option<QueueLimits>> {
let queue_info = Command::new("qstat")
.arg("-Q")
.arg(&queue_type)
.arg("-f")
.arg("-F")
.arg("json")
.output()?;

let output = String::from_utf8_lossy(&queue_info.stdout);
let json: serde_json::Value =
serde_json::from_str(&output).context("JSON was not well-formatted")?;
let queue = json
.get("Queue")
.and_then(|x| x.get(&queue_type))
.context("can't find queue info")?;

let (walltime, backlog, workers_max, workers_available) = (
"resources_max/walltime",
"max_queued",
"resources_max/nodect",
"resources_available/nodes",
);

let walltime = match get_value_by_key_json(queue, String::from(walltime)) {
Ok(value) => {
let walltime = NaiveTime::from_str(value.as_str().context("can't convert str")?)?;
let secs: u64 =
(walltime.hour() * 3600 + walltime.minute() * 60 + walltime.second()) as u64;
Some(Duration::from_secs(secs))
}
Err(_) => None,
};

let backlog = match get_value_by_key_json(queue, String::from(backlog)) {
Ok(value) => Some(
value
.to_string()
.chars()
.into_iter()
.filter(|x| x.is_numeric())
.collect::<String>()
.parse()?,
),
Err(_) => None,
};

let workers: Option<u32> = {
let possible_workers = match get_value_by_key_json(queue, String::from(workers_max)) {
Ok(max) => Some(max),
Err(_) => match get_value_by_key_json(queue, String::from(workers_available)) {
Ok(available) => Some(available),
Err(_) => None,
},
};

match possible_workers {
None => None,
Some(value) => Some(
value
.to_string()
.chars()
.into_iter()
.filter(|x| x.is_numeric())
.collect::<String>()
.parse()?,
),
}
};
Ok(Option::from(QueueLimits {
walltime,
backlog,
workers,
}))
}

fn get_slurm_queue_limit(_args: &[String]) -> anyhow::Result<Option<QueueLimits>> {
Ok(None)
}

pub fn validate_queue_parameters(
info: &AllocationQueueParams,
manager: &ManagerType,
) -> anyhow::Result<()> {
let queue_limit = match manager {
ManagerType::Pbs => {
let opt_queue_type = get_arg_by_key(String::from("-q"), &info.additional_args);
match opt_queue_type {
Some(queue_type) => get_pbs_queue_limit(queue_type.replace("-q", "")),
None => Ok(None),
}
}
ManagerType::Slurm => get_slurm_queue_limit(&info.additional_args),
};

match queue_limit {
Err(err) => Err(anyhow::anyhow!("Error while getting queue limit: {err}")),
Ok(None) => Err(anyhow::anyhow!(
"Queue limit wasn't retrieved, can't provide additional information."
)),
Ok(Some(limit)) => {
let mut msg = String::new();

if let Some(walltime) = limit.walltime {
if walltime < info.timelimit {
msg.push_str(
"Walltime is larger than the walltime limit of the given queue.\n",
);
}
}
if let Some(backlog) = limit.backlog {
if backlog < info.backlog {
msg.push_str(
"Backlog size is larger than the max. queue limit of the given queue.\n",
);
}
}
if let Some(workers) = limit.workers {
if workers < info.workers_per_alloc {
msg.push_str("Workers per allocation is larger than the max. number of workers of the given queue.\n");
}
}

if !msg.is_empty() {
return Err(anyhow::anyhow!(
"Allocation parameters exceeded queue limits: ".to_owned() + &msg
));
}
Ok(())
}
}
}

async fn try_submit_allocation(
manager: ManagerType,
params: AllocationQueueParams,
Expand All @@ -246,17 +423,33 @@ async fn try_submit_allocation(
let mut handler =
create_allocation_handler(&manager, params.name.clone(), tmpdir.as_ref().to_path_buf())?;
let worker_count = params.workers_per_alloc;
let queue_info = create_queue_info(params);
let queue_info = create_queue_info(params.clone());

let allocation = handler
.submit_allocation(0, &queue_info, worker_count as u64, SubmitMode::DryRun)
.await
.map_err(|e| anyhow::anyhow!("Could not submit allocation: {:?}", e))?;

let working_dir = allocation.working_dir().to_path_buf();
let id = allocation
.into_id()
.map_err(|e| anyhow::anyhow!("Could not submit allocation: {:?}", e))?;
let id = match allocation.into_id() {
Ok(result) => result,
Err(basic_error) => {
return {
match validate_queue_parameters(&params, &manager) {
Ok(_) => Err(anyhow::anyhow!(
"Could not submit allocation: {:?}",
basic_error
)),
Err(additional_error) => Err(anyhow::anyhow!(
"Could not submit allocation: {:?} \nAdditional errors: {:?}",
basic_error,
additional_error
)),
}
}
}
};

let allocation = Allocation {
id: id.to_string(),
worker_count: 1,
Expand All @@ -271,3 +464,45 @@ async fn try_submit_allocation(

Ok(())
}

#[cfg(test)]
mod tests {
use crate::server::client::autoalloc::get_arg_by_key;

#[test]
fn test_get_arg_by_key() {
assert_eq!(
get_arg_by_key(
String::from("-q"),
&[String::from("x"), String::from("-qqexp"), String::from("y")]
)
.is_some(),
true
);
assert_eq!(
get_arg_by_key(
String::from("-q"),
&[
String::from("x"),
String::from("-q=qexp"),
String::from("y")
]
)
.is_some(),
true
);
assert_eq!(
get_arg_by_key(
String::from("-q"),
&[
String::from("x"),
String::from("-q"),
String::from("qexp"),
String::from("y")
]
)
.is_some(),
true
);
}
}