Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion coman/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "coman"
version = "0.2.5"
version = "0.3.1"
edition = "2024"
description = "Compute Manager for managing HPC compute"
authors = ["Ralf Grubenmann <ralf.grubenmann@sdsc.ethz.ch>"]
Expand Down
15 changes: 10 additions & 5 deletions coman/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use clap::{Parser, Subcommand, builder::TypedValueParser};
use strum::VariantNames;

use crate::{
config::{ComputePlatform, get_config_dir, get_data_dir},
config::{ComputePlatform, get_config_dir, get_data_dir, get_project_local_config_file},
util::types::DockerImageUrl,
};

Expand All @@ -31,6 +31,7 @@ pub enum CliCommands {
},
}

#[allow(clippy::large_enum_variant)]
#[derive(Subcommand, Debug)]
pub enum CscsCommands {
#[clap(about = "Log in to CSCS")]
Expand Down Expand Up @@ -87,6 +88,10 @@ pub enum CscsJobCommands {
mount: Vec<(String, String)>,
#[clap(short, long, help = "The docker image to use")]
image: Option<DockerImageUrl>,
#[clap(short, long, help = "Path where stdout of the job gets written to")]
stdout: Option<PathBuf>,
#[clap(short, long, help = "Path where stderr of the job gets written to")]
stderr: Option<PathBuf>,
#[clap(trailing_var_arg = true, help = "The command to run in the container")]
command: Option<Vec<String>>,
},
Expand Down Expand Up @@ -155,18 +160,18 @@ const VERSION_MESSAGE: &str = concat!(
);

pub fn version() -> String {
let author = clap::crate_authors!();

// let current_exe_path = PathBuf::from(clap::crate_name!()).display().to_string();
let config_dir_path = get_config_dir().display().to_string();
let data_dir_path = get_data_dir().display().to_string();
let project_config_dir = get_project_local_config_file()
.map(|p| p.display().to_string())
.unwrap_or("".to_owned());

format!(
"\
{VERSION_MESSAGE}

Authors: {author}

Project config directory: {project_config_dir}
Config directory: {config_dir_path}
Data directory: {data_dir_path}"
)
Expand Down
51 changes: 40 additions & 11 deletions coman/src/cscs/api_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use eyre::eyre;
use firecrest_client::{
client::FirecrestClient,
compute_api::{
cancel_compute_system_job, get_compute_system_job, get_compute_system_job_metadata, get_compute_system_jobs,
post_compute_system_job,
JobOptions, cancel_compute_system_job, get_compute_system_job, get_compute_system_job_metadata,
get_compute_system_jobs, post_compute_system_job,
},
filesystem_api::{
get_filesystem_ops_download, get_filesystem_ops_ls, get_filesystem_ops_stat, get_filesystem_ops_tail,
Expand All @@ -22,8 +22,21 @@ use crate::{
config::{ComputePlatform, Config},
cscs::api_client::types::{FileStat, Job, JobDetail, PathEntry, S3Upload, System, UserInfo},
trace_dbg,
util::types::DockerImageUrl,
};

#[derive(Debug, Clone, Default)]
pub struct JobStartOptions {
pub script_file: Option<PathBuf>,
pub image: Option<DockerImageUrl>,
pub command: Option<Vec<String>>,
pub stdout: Option<PathBuf>,
pub stderr: Option<PathBuf>,
pub container_workdir: Option<String>,
pub env: Vec<(String, String)>,
pub mount: Vec<(String, String)>,
}

pub struct CscsApi {
client: FirecrestClient,
}
Expand All @@ -46,6 +59,7 @@ impl CscsApi {
name: &str,
script_path: PathBuf,
envvars: HashMap<String, String>,
options: JobStartOptions,
) -> Result<()> {
let workingdir = script_path.clone();
let workingdir = workingdir.parent();
Expand All @@ -54,10 +68,14 @@ impl CscsApi {
system_name,
account,
name,
None,
Some(script_path),
workingdir.map(|p| p.to_path_buf()),
envvars,
JobOptions {
script: None,
script_path: Some(script_path),
working_dir: workingdir.map(|p| p.to_path_buf()),
envvars,
stdout: options.stdout,
stderr: options.stderr,
},
)
.await?;

Expand Down Expand Up @@ -231,10 +249,14 @@ mod tests {
"",
None,
"",
None,
None,
None,
HashMap::new()
JobOptions {
script: None,
script_path: None,
working_dir: None,
envvars: HashMap::new(),
stdout: None,
stderr: None
}
),
Result<PostJobSubmissionResponse>
))
Expand All @@ -243,7 +265,14 @@ mod tests {
Result<PostJobSubmissionResponse>
));
let result = client
.start_job("test", None, "test", PathBuf::from("/test"), HashMap::new())
.start_job(
"test",
None,
"test",
PathBuf::from("/test"),
HashMap::new(),
JobStartOptions::default(),
)
.await;
assert_ok!(result);
}
Expand Down
25 changes: 3 additions & 22 deletions coman/src/cscs/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ use tokio::{
use crate::{
config::ComputePlatform,
cscs::{
api_client::types::JobStatus,
api_client::{client::JobStartOptions, types::JobStatus},
handlers::{
cscs_file_download, cscs_file_list, cscs_file_upload, cscs_job_cancel, cscs_job_details, cscs_job_list,
cscs_job_log, cscs_login, cscs_start_job, cscs_system_list, cscs_system_set,
},
},
util::types::DockerImageUrl,
};

pub(crate) async fn cli_cscs_login() -> Result<()> {
Expand Down Expand Up @@ -103,30 +102,12 @@ pub(crate) async fn cli_cscs_job_log(
#[allow(clippy::too_many_arguments)]
pub(crate) async fn cli_cscs_job_start(
name: Option<String>,
script_file: Option<PathBuf>,
image: Option<DockerImageUrl>,
command: Option<Vec<String>>,
workdir: Option<String>,
env: Vec<(String, String)>,
mount: Vec<(String, String)>,
options: JobStartOptions,
system: Option<String>,
platform: Option<ComputePlatform>,
account: Option<String>,
) -> Result<()> {
match cscs_start_job(
name,
script_file,
image,
command,
workdir,
env,
mount,
system,
platform,
account,
)
.await
{
match cscs_start_job(name, options, system, platform, account).await {
Ok(_) => {
println!("Job started");
Ok(())
Expand Down
54 changes: 34 additions & 20 deletions coman/src/cscs/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,15 @@ use crate::{
config::{ComputePlatform, Config},
cscs::{
api_client::{
client::CscsApi,
client::{CscsApi, JobStartOptions},
types::{FileStat, FileSystemType, Job, JobDetail, PathEntry, PathType, S3Upload, System, UserInfo},
},
oauth2::{
CLIENT_ID_SECRET_NAME, CLIENT_SECRET_SECRET_NAME, client_credentials_login, finish_cscs_device_login,
start_cscs_device_login,
},
},
util::{
keyring::{Secret, get_secret, store_secret},
types::DockerImageUrl,
},
util::keyring::{Secret, get_secret, store_secret},
};

const CSCS_MAX_DIRECT_SIZE: usize = 5242880;
Expand Down Expand Up @@ -134,10 +131,19 @@ pub async fn cscs_job_log(
return Err(eyre!("couldn't find job {}", job_id));
}
let path = if stderr {
PathBuf::from(job.unwrap().stderr)
job.unwrap().stderr
} else {
PathBuf::from(job.unwrap().stdout)
job.unwrap().stdout
};
if path.is_empty() {
return Err(eyre!(
"No {} log exists for job {}",
if stderr { "stderr" } else { "stdout" },
job_id
));
}

let path = PathBuf::from(path);
api_client.tail(current_system, path, 100).await
}
Err(e) => Err(e),
Expand All @@ -160,12 +166,7 @@ pub async fn cscs_job_cancel(job_id: i64, system: Option<String>, platform: Opti
#[allow(clippy::too_many_arguments)]
pub async fn cscs_start_job(
name: Option<String>,
script_file: Option<PathBuf>,
image: Option<DockerImageUrl>,
command: Option<Vec<String>>,
container_workdir: Option<String>,
env: Vec<(String, String)>,
mount: Vec<(String, String)>,
options: JobStartOptions,
system: Option<String>,
platform: Option<ComputePlatform>,
account: Option<String>,
Expand Down Expand Up @@ -193,12 +194,15 @@ pub async fn cscs_start_job(
return Err(eyre!("couldn't get system description for {}", current_system));
}
};
let container_workdir = container_workdir.unwrap_or(config.cscs.workdir.unwrap_or("/scratch".to_owned()));
let container_workdir = options
.container_workdir
.clone()
.unwrap_or(config.cscs.workdir.unwrap_or("/scratch".to_owned()));
let base_path = scratch.join(user_info.name.clone()).join(&job_name);

let mut envvars = config.cscs.env.clone();
envvars.extend(env);
let mut mount: HashMap<String, String> = mount.into_iter().collect();
envvars.extend(options.env.clone());
let mut mount: HashMap<String, String> = options.mount.clone().into_iter().collect();
mount.entry("${SCRATCH}".to_owned()).or_insert("/scratch".to_owned());

let mut tera = tera::Tera::default();
Expand All @@ -207,7 +211,7 @@ pub async fn cscs_start_job(
let environment_template = config.cscs.edf_file_template;
tera.add_raw_template("environment.toml", &environment_template)?;

let docker_image = image.unwrap_or(config.cscs.image.try_into()?);
let docker_image = options.image.clone().unwrap_or(config.cscs.image.try_into()?);
let meta = docker_image.inspect().await?;
if let Some(system_info) = config.cscs.systems.get(current_system) {
let mut compatible = false;
Expand Down Expand Up @@ -246,13 +250,18 @@ pub async fn cscs_start_job(

// upload script
let script_path = base_path.join("script.sh");
let script_template = script_file
let script_template = options
.script_file
.clone()
.map(std::fs::read_to_string)
.unwrap_or(Ok(config.cscs.sbatch_script_template))?;
tera.add_raw_template("script.sh", &script_template)?;
let mut context = tera::Context::new();
context.insert("name", &job_name);
context.insert("command", &command.unwrap_or(config.cscs.command).join(" "));
context.insert(
"command",
&options.command.clone().unwrap_or(config.cscs.command).join(" "),
);
context.insert("environment_file", &environment_path);
context.insert("container_workdir", &container_workdir);
let script = tera.render("script.sh", &context)?;
Expand All @@ -262,7 +271,7 @@ pub async fn cscs_start_job(

// start job
api_client
.start_job(current_system, account, &job_name, script_path, envvars)
.start_job(current_system, account, &job_name, script_path, envvars, options)
.await?;
Ok(())
}
Expand Down Expand Up @@ -294,6 +303,11 @@ pub async fn cscs_file_download(
system: Option<String>,
platform: Option<ComputePlatform>,
) -> Result<Option<(i64, Url, usize)>> {
let local = if local.is_dir() {
local.join(remote.file_name().ok_or(eyre!("couldn't get name of remote file"))?)
} else {
local
};
match get_access_token().await {
Ok(access_token) => {
let api_client = CscsApi::new(access_token.0, platform).unwrap();
Expand Down
18 changes: 14 additions & 4 deletions coman/src/cscs/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl PollAsync<UserEvent> for AsyncJobLogPort {
match val {
JobLogAction::Job(jobid) => {
self.current_job = Some(jobid);
self.stderr = false;
}
JobLogAction::SwitchLog => {
self.stderr = !self.stderr;
Expand All @@ -200,10 +201,19 @@ impl PollAsync<UserEvent> for AsyncJobLogPort {
if let Some(job_id) = self.current_job {
match cscs_job_log(job_id as i64, self.stderr, None, None).await {
Ok(log) => Ok(Some(Event::User(UserEvent::Cscs(CscsEvent::GotJobLog(log))))),
Err(e) => Ok(Some(Event::User(UserEvent::Error(format!(
"{:?}",
Err::<(), Report>(e).wrap_err("couldn't get log")
))))),
Err(e) => {
// if there was an error getting the log, if it's stderr, switch to stdout which should
// always exist. If we're on stdout and it doesn't exist, unset log watching to not spam errors
if self.stderr {
self.stderr = false;
} else {
self.current_job = None;
}
Ok(Some(Event::User(UserEvent::Error(format!(
"{:?}",
Err::<(), Report>(e).wrap_err("couldn't get log")
)))))
}
}
} else {
Ok(Some(Event::None))
Expand Down
Loading