From 5d5822a1589a3bb920a3590633f270a67745f237 Mon Sep 17 00:00:00 2001 From: Ralf Grubenmann Date: Mon, 26 Jan 2026 14:10:20 +0100 Subject: [PATCH 1/5] add rpc implementation --- Cargo.lock | 162 +++++++++++++++++++++++++++++++++++++ coman/Cargo.toml | 7 ++ coman/src/cli/app.rs | 8 ++ coman/src/cli/exec.rs | 7 ++ coman/src/cli/mod.rs | 1 + coman/src/cli/rpc.rs | 52 ++++++++++++ coman/src/cscs/cli.rs | 14 +++- coman/src/cscs/handlers.rs | 49 ++++++++--- coman/src/main.rs | 7 +- 9 files changed, 294 insertions(+), 13 deletions(-) create mode 100644 coman/src/cli/rpc.rs diff --git a/Cargo.lock b/Cargo.lock index 0bfa737..a3ae127 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -785,6 +785,15 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "2.9.4" @@ -1254,9 +1263,11 @@ dependencies = [ "strum 0.26.3", "strum_macros 0.27.2", "tabled", + "tarpc", "tempfile", "tera", "tokio", + "tokio-duplex", "tokio-util", "toml 0.9.7", "toml_edit 0.23.9", @@ -2162,6 +2173,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "educe" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4bd92664bf78c4d3dba9b7cdafce6fa15b13ed3ed16175218196942e99168a8" +dependencies = [ + "enum-ordinalize", + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "either" version = "1.15.0" @@ -2254,6 +2277,26 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "enum-ordinalize" +version = "4.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a1091a7bb1f8f2c4b28f1fe2cef4980ca2d410a3d727d67ecc3178c9b0800f0" +dependencies = [ + "enum-ordinalize-derive", +] + +[[package]] +name = "enum-ordinalize-derive" +version = "4.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ca9601fb2d62598ee17836250842873a413586e5d7ed88b356e38ddbb0ec631" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "enumflags2" version = "0.7.12" @@ -3885,6 +3928,12 @@ dependencies = [ "libm", ] +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + [[package]] name = "hybrid-array" version = "0.4.5" @@ -5860,6 +5909,41 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.17", + "tracing", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2" + +[[package]] +name = "opentelemetry_sdk" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand 0.9.2", + "thiserror 2.0.17", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -7966,6 +8050,42 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" +[[package]] +name = "tarpc" +version = "0.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5cb86a4b5b941909e9896289c01b46ff0bec756b01f366b0d5490a5e8ed7871" +dependencies = [ + "anyhow", + "fnv", + "futures", + "humantime", + "opentelemetry", + "opentelemetry-semantic-conventions", + "pin-project", + "rand 0.8.5", + "serde", + "static_assertions", + "tarpc-plugins", + "thiserror 2.0.17", + "tokio", + "tokio-serde", + "tokio-util", + "tracing", + "tracing-opentelemetry", +] + +[[package]] +name = "tarpc-plugins" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26ef4401b013b1f5218ba33ea8f1eddbfcc00ec8db073ef995c192e71f08f027" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "tempfile" version = "3.24.0" @@ -8175,6 +8295,16 @@ dependencies = [ "windows-sys 0.61.1", ] +[[package]] +name = "tokio-duplex" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ad960ebbe79234217390742426a94df5e5d04689a8e02ae702a0fde8c38deb3" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.6.0" @@ -8216,6 +8346,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-serde" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf600e7036b17782571dd44fa0a5cea3c82f60db5137f774a325a76a0d6852b" +dependencies = [ + "bincode", + "bytes", + "educe", + "futures-core", + "futures-sink", + "pin-project", + "serde", +] + [[package]] name = "tokio-stream" version = "0.1.17" @@ -8239,6 +8384,7 @@ dependencies = [ "futures-sink", "futures-util", "pin-project-lite", + "slab", "tokio", ] @@ -8468,6 +8614,22 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "tracing", + "tracing-core", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-subscriber" version = "0.3.20" diff --git a/coman/Cargo.toml b/coman/Cargo.toml index 45728c9..47b4108 100644 --- a/coman/Cargo.toml +++ b/coman/Cargo.toml @@ -84,6 +84,13 @@ iroh = "0.95.1" rand = "0.9.2" regex = "1.12.2" sha2 = "0.10.9" +tarpc = { version = "0.37.0", features = [ + "serde-transport", + "serde-transport-bincode", + "tcp", + "tokio1", +] } +tokio-duplex = "1.0.1" [build-dependencies] anyhow = "1.0.90" diff --git a/coman/src/cli/app.rs b/coman/src/cli/app.rs index 788e4ce..87bbc69 100644 --- a/coman/src/cli/app.rs +++ b/coman/src/cli/app.rs @@ -303,6 +303,14 @@ pub enum CscsJobCommands { #[clap(help="id or name of the job (name uses newest job of that name)", add = ArgValueCompleter::new(job_id_or_name_completer))] job: JobIdOrName, }, + #[clap( + alias = "ru", + about = "show current resource usage of the job, needs coman to be injected in the session [aliases: ru]" + )] + ResourceUsage { + #[clap(help="id or name of the job (name uses newest job of that name)", add = ArgValueCompleter::new(job_id_or_name_completer))] + job: JobIdOrName, + }, } fn job_id_or_name_completer(current: &std::ffi::OsStr) -> Vec { let mut completions = vec![]; diff --git a/coman/src/cli/exec.rs b/coman/src/cli/exec.rs index 0b1de11..1b6765a 100644 --- a/coman/src/cli/exec.rs +++ b/coman/src/cli/exec.rs @@ -7,10 +7,13 @@ use iroh::{ endpoint::ConnectionError, protocol::{ProtocolHandler, Router}, }; +use nom::AsBytes; use pid1::Pid1Settings; use rust_supervisor::{ChildType, Supervisor, SupervisorConfig}; use tokio::{io::AsyncWriteExt, net::TcpStream}; +use crate::cli::rpc::RpcHandler; + const SECRET_KEY_ENV: &str = "COMAN_IROH_SECRET"; const PORT_FORWARD_ENV: &str = "COMAN_FORWARDED_PORTS"; const SSH_PORT: u16 = 15263; @@ -121,6 +124,10 @@ async fn port_forward() -> Result<()> { builder = builder.accept(alpn.clone().into_bytes(), handler); println!("set up port forwarding for port {port} ({alpn})"); } + + // add rpc server + let rpc_handler = RpcHandler; + builder = builder.accept(b"/coman/rpc/".as_bytes(), rpc_handler); let _router = builder.spawn(); println!("port forwarding started"); diff --git a/coman/src/cli/mod.rs b/coman/src/cli/mod.rs index 320987e..9baf44b 100644 --- a/coman/src/cli/mod.rs +++ b/coman/src/cli/mod.rs @@ -1,3 +1,4 @@ pub mod app; pub mod exec; pub mod proxy; +pub mod rpc; diff --git a/coman/src/cli/rpc.rs b/coman/src/cli/rpc.rs new file mode 100644 index 0000000..143b013 --- /dev/null +++ b/coman/src/cli/rpc.rs @@ -0,0 +1,52 @@ +use futures::StreamExt; +use iroh::protocol::ProtocolHandler; +use tarpc::{ + serde_transport as transport, server, server::Channel, tokio_serde::formats::Bincode, + tokio_util::codec::LengthDelimitedCodec, +}; +use tokio_duplex::Duplex; + +use crate::cli::app::COMAN_VERSION; + +#[tarpc::service] +pub trait ComanRPC { + async fn version() -> String; +} +#[derive(Debug, Clone)] +struct RpcServer; + +impl ComanRPC for RpcServer { + async fn version(self, _: tarpc::context::Context) -> String { + COMAN_VERSION.to_string() + } +} + +#[derive(Debug, Default)] +pub struct RpcHandler; + +impl ProtocolHandler for RpcHandler { + async fn accept(&self, connection: iroh::endpoint::Connection) -> Result<(), iroh::protocol::AcceptError> { + let endpoint_id = connection.remote_id(); + match connection.accept_bi().await { + Ok((iroh_send, iroh_recv)) => { + println!("Accepted bidirectional stream from {endpoint_id}"); + let codec_builder = LengthDelimitedCodec::builder(); + let combined = Duplex::new(iroh_recv, iroh_send); + let framed = codec_builder.new_framed(combined); + + let transport = transport::new(framed, Bincode::default()); + let server = server::BaseChannel::with_defaults(transport); + tokio::spawn(server.execute(RpcServer.serve()).for_each(spawn)); + } + Err(e) => { + println!("Failed to accept bidirectional stream to rpc: {e}"); + } + } + + Ok(()) + } +} + +async fn spawn(fut: impl Future + Send + 'static) { + tokio::spawn(fut); +} diff --git a/coman/src/cscs/cli.rs b/coman/src/cscs/cli.rs index dc2b7e3..f5199ec 100644 --- a/coman/src/cscs/cli.rs +++ b/coman/src/cscs/cli.rs @@ -22,8 +22,8 @@ use crate::{ api_client::{client::JobStartOptions, types::JobStatus}, handlers::{ cscs_file_delete, cscs_file_download, cscs_file_list, cscs_file_upload, cscs_job_cancel, cscs_job_details, - cscs_job_list, cscs_job_log, cscs_job_start, cscs_login, cscs_port_forward, cscs_system_list, - cscs_system_set, + cscs_job_list, cscs_job_log, cscs_job_start, cscs_login, cscs_port_forward, cscs_resource_usage, + cscs_system_list, cscs_system_set, }, }, }; @@ -136,6 +136,16 @@ pub(crate) async fn cli_cscs_port_forward( cscs_port_forward(job_id, source_port, destination_port, system).await } +pub(crate) async fn cli_cscs_job_resource_usage( + job: JobIdOrName, + system: Option, + platform: Option, +) -> Result<()> { + let job_id = maybe_job_id_from_name(job, system.clone(), platform.clone()).await?; + println!("running port forward for job {job_id}"); + cscs_resource_usage(job_id, system).await +} + #[allow(clippy::too_many_arguments)] pub(crate) async fn cli_cscs_job_start( name: Option, diff --git a/coman/src/cscs/handlers.rs b/coman/src/cscs/handlers.rs index 9f243e6..6652629 100644 --- a/coman/src/cscs/handlers.rs +++ b/coman/src/cscs/handlers.rs @@ -19,15 +19,18 @@ use itertools::Itertools; use regex::Regex; use reqwest::Url; use sha2::{Digest, Sha256}; +use tarpc::{client, context, serde_transport, tokio_serde::formats::Bincode}; use tokio::{ fs::File, io::AsyncWriteExt, net::{TcpListener, TcpStream}, }; +use tokio_duplex::Duplex; +use tokio_util::codec::LengthDelimitedCodec; use super::api_client::client::{EdfSpec, ScriptSpec}; use crate::{ - cli::app::COMAN_VERSION, + cli::{app::COMAN_VERSION, rpc::ComanRPCClient}, config::{ComputePlatform, Config, get_data_dir}, cscs::{ api_client::{ @@ -165,12 +168,47 @@ pub async fn cscs_job_log( } } +pub async fn cscs_resource_usage(job_id: i64, system: Option) -> Result<()> { + let endpoint_id = get_endpoint_id(job_id, system).await?; + + let alpn: Vec = "/coman/rpc".to_string().into_bytes(); + let secret_key = SecretKey::generate(&mut rand::rng()); + let endpoint = Endpoint::builder().secret_key(secret_key).bind().await?; + + match endpoint.connect(endpoint_id, &alpn).await { + Ok(connection) => { + let (iroh_send, iroh_recv) = connection.open_bi().await?; + let combined = Duplex::new(iroh_recv, iroh_send); + let codec_builder = LengthDelimitedCodec::builder(); + let framed = codec_builder.new_framed(combined); + let transport = serde_transport::new(framed, Bincode::default()); + let client = ComanRPCClient::new(client::Config::default(), transport); + let result = client.spawn().version(context::current()).await?; + let _ = dbg!(result); + + Ok(()) + } + Err(e) => Err(e).wrap_err("couldn't establish tunnel to remote"), + } +} + pub async fn cscs_port_forward( job_id: i64, source_port: u16, destination_port: u16, system: Option, ) -> Result<()> { + let endpoint_id = get_endpoint_id(job_id, system).await?; + let listener = TcpListener::bind(format!("127.0.0.1:{source_port}")).await?; + println!("forwarding connection for port {source_port}"); + + loop { + let (socket, _) = listener.accept().await?; + process_port_forward(endpoint_id, destination_port, socket).await?; + } +} + +async fn get_endpoint_id(job_id: i64, system: Option) -> Result { let data_dir = get_data_dir(); let config = Config::new().unwrap(); let current_system = &system.unwrap_or(config.values.cscs.current_system); @@ -190,13 +228,7 @@ pub async fn cscs_port_forward( } else { return Err(eyre!("invalid endpoint id length")); })?; - let listener = TcpListener::bind(format!("127.0.0.1:{source_port}")).await?; - println!("forwarding connection for port {source_port}"); - - loop { - let (socket, _) = listener.accept().await?; - process_port_forward(endpoint_id, destination_port, socket).await?; - } + Ok(endpoint_id) } async fn process_port_forward(endpoint_id: EndpointId, destination_port: u16, mut socket: TcpStream) -> Result<()> { @@ -204,7 +236,6 @@ async fn process_port_forward(endpoint_id: EndpointId, destination_port: u16, mu let alpn: Vec = format!("/coman/{destination_port}").into_bytes(); let secret_key = SecretKey::generate(&mut rand::rng()); let endpoint = Endpoint::builder().secret_key(secret_key).bind().await?; - // let _router = Router::builder(endpoint.clone()).spawn(); // start local iroh listener match endpoint.connect(endpoint_id, &alpn).await { Ok(connection) => { diff --git a/coman/src/main.rs b/coman/src/main.rs index 39f8b0f..9e9ecf1 100644 --- a/coman/src/main.rs +++ b/coman/src/main.rs @@ -35,8 +35,8 @@ use crate::{ api_client::client::JobStartOptions, cli::{ cli_cscs_file_delete, cli_cscs_file_download, cli_cscs_file_list, cli_cscs_file_upload, - cli_cscs_job_cancel, cli_cscs_job_detail, cli_cscs_job_list, cli_cscs_job_log, cli_cscs_job_start, - cli_cscs_login, cli_cscs_port_forward, cli_cscs_set_system, cli_cscs_system_list, + cli_cscs_job_cancel, cli_cscs_job_detail, cli_cscs_job_list, cli_cscs_job_log, cli_cscs_job_resource_usage, + cli_cscs_job_start, cli_cscs_login, cli_cscs_port_forward, cli_cscs_set_system, cli_cscs_system_list, }, ports::{ AsyncBackgroundTaskPort, AsyncFetchWorkloadsPort, AsyncJobLogPort, AsyncSelectSystemPort, @@ -137,6 +137,9 @@ async fn main() -> Result<()> { .await? } CscsJobCommands::Cancel { job } => cli_cscs_job_cancel(job, system, platform).await?, + CscsJobCommands::ResourceUsage { job } => { + cli_cscs_job_resource_usage(job, system, platform).await? + } }, CscsCommands::File { command } => match command { CscsFileCommands::List { path } => cli_cscs_file_list(path, system, platform).await?, From bcd1ec67b68a1348170fc7b6cbcbfbd29f9b8a38 Mon Sep 17 00:00:00 2001 From: Ralf Grubenmann Date: Mon, 26 Jan 2026 16:29:42 +0100 Subject: [PATCH 2/5] add getting system metrics to rpc --- .github/workflows/pr-review.yaml | 1 + Cargo.lock | 133 +++++++++++++++++++++++-------- coman/Cargo.toml | 4 +- coman/src/cli/exec.rs | 3 +- coman/src/cli/rpc.rs | 47 +++++++++++ coman/src/cscs/handlers.rs | 72 ++++++++++------- coman/src/util/types.rs | 20 +++-- 7 files changed, 207 insertions(+), 73 deletions(-) diff --git a/.github/workflows/pr-review.yaml b/.github/workflows/pr-review.yaml index f67094a..306650f 100644 --- a/.github/workflows/pr-review.yaml +++ b/.github/workflows/pr-review.yaml @@ -66,3 +66,4 @@ jobs: VCS__PIPELINE__PULL_NUMBER: ${{ github.event.number}} VCS__HTTP_CLIENT__API_URL: "https://api.github.com" VCS__HTTP_CLIENT__API_TOKEN: ${{ secrets.GITHUB_TOKEN }} + REVIEW__IGNORE_CHANGES: '["Cargo.lock"]' diff --git a/Cargo.lock b/Cargo.lock index a3ae127..7dba173 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -908,6 +908,12 @@ dependencies = [ "either", ] +[[package]] +name = "bytesize" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd91ee7b2422bcb158d90ef4d14f75ef67f340943fc4149891dcce8f8b972a3" + [[package]] name = "camino" version = "1.2.1" @@ -1216,6 +1222,7 @@ dependencies = [ "aws-sdk-s3", "base64 0.22.1", "better-panic", + "bytesize", "chrono", "claim", "clap", @@ -1227,7 +1234,6 @@ dependencies = [ "current_dir", "derive_deref", "directories", - "dirs", "docker_credential", "eyre", "firecrest_client", @@ -1243,6 +1249,7 @@ dependencies = [ "lazy_static", "libc", "nom 8.0.0", + "nvml-wrapper", "oci-distribution", "open", "openidconnect", @@ -1262,6 +1269,7 @@ dependencies = [ "strip-ansi-escapes", "strum 0.26.3", "strum_macros 0.27.2", + "sysinfo", "tabled", "tarpc", "tempfile", @@ -1996,16 +2004,7 @@ version = "5.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a49173b84e034382284f27f1af4dcbbd231ffa358c0fe316541a7337f376a35" dependencies = [ - "dirs-sys 0.4.1", -] - -[[package]] -name = "dirs" -version = "6.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" -dependencies = [ - "dirs-sys 0.5.0", + "dirs-sys", ] [[package]] @@ -2016,22 +2015,10 @@ checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" dependencies = [ "libc", "option-ext", - "redox_users 0.4.6", + "redox_users", "windows-sys 0.48.0", ] -[[package]] -name = "dirs-sys" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab" -dependencies = [ - "libc", - "option-ext", - "redox_users 0.5.2", - "windows-sys 0.61.1", -] - [[package]] name = "displaydoc" version = "0.2.5" @@ -5026,6 +5013,16 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link 0.2.1", +] + [[package]] name = "libm" version = "0.2.15" @@ -5534,6 +5531,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "ntapi" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c70f219e21142367c70c0b30c6a9e3a14d55b4d12a204d897fbec83a0363f081" +dependencies = [ + "winapi", +] + [[package]] name = "ntimestamp" version = "1.0.0" @@ -5686,6 +5692,29 @@ dependencies = [ "libc", ] +[[package]] +name = "nvml-wrapper" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d5c6c0ef9702176a570f06ad94f3198bc29c524c8b498f1b9346e1b1bdcbb3a" +dependencies = [ + "bitflags", + "libloading", + "nvml-wrapper-sys", + "static_assertions", + "thiserror 1.0.69", + "wrapcenum-derive", +] + +[[package]] +name = "nvml-wrapper-sys" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd23dbe2eb8d8335d2bce0299e0a07d6a63c089243d626ca75b770a962ff49e6" +dependencies = [ + "libloading", +] + [[package]] name = "oas3" version = "0.20.1" @@ -5752,6 +5781,25 @@ dependencies = [ "url", ] +[[package]] +name = "objc2-core-foundation" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" +dependencies = [ + "bitflags", +] + +[[package]] +name = "objc2-io-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15" +dependencies = [ + "libc", + "objc2-core-foundation", +] + [[package]] name = "object" version = "0.37.3" @@ -6782,17 +6830,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "redox_users" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" -dependencies = [ - "getrandom 0.2.16", - "libredox", - "thiserror 2.0.17", -] - [[package]] name = "ref-cast" version = "1.0.25" @@ -7999,6 +8036,20 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "sysinfo" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe840c5b1afe259a5657392a4dbb74473a14c8db999c3ec2f4ae812e028a94da" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows 0.62.2", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -9758,6 +9809,18 @@ dependencies = [ "windows-core 0.62.2", ] +[[package]] +name = "wrapcenum-derive" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76ff259533532054cfbaefb115c613203c73707017459206380f03b3b3f266e" +dependencies = [ + "darling 0.20.11", + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "writeable" version = "0.6.1" diff --git a/coman/Cargo.toml b/coman/Cargo.toml index 47b4108..8500d30 100644 --- a/coman/Cargo.toml +++ b/coman/Cargo.toml @@ -79,7 +79,6 @@ rust_supervisor = "0.2.0" iroh-ssh = "0.2.7" whoami = "1.6.1" base64 = "0.22.1" -dirs = "6.0.0" iroh = "0.95.1" rand = "0.9.2" regex = "1.12.2" @@ -91,6 +90,9 @@ tarpc = { version = "0.37.0", features = [ "tokio1", ] } tokio-duplex = "1.0.1" +sysinfo = "0.38.0" +nvml-wrapper = "0.11.0" +bytesize = "2.3.1" [build-dependencies] anyhow = "1.0.90" diff --git a/coman/src/cli/exec.rs b/coman/src/cli/exec.rs index 1b6765a..1a870cf 100644 --- a/coman/src/cli/exec.rs +++ b/coman/src/cli/exec.rs @@ -7,7 +7,6 @@ use iroh::{ endpoint::ConnectionError, protocol::{ProtocolHandler, Router}, }; -use nom::AsBytes; use pid1::Pid1Settings; use rust_supervisor::{ChildType, Supervisor, SupervisorConfig}; use tokio::{io::AsyncWriteExt, net::TcpStream}; @@ -127,7 +126,7 @@ async fn port_forward() -> Result<()> { // add rpc server let rpc_handler = RpcHandler; - builder = builder.accept(b"/coman/rpc/".as_bytes(), rpc_handler); + builder = builder.accept(b"/coman/rpc", rpc_handler); let _router = builder.spawn(); println!("port forwarding started"); diff --git a/coman/src/cli/rpc.rs b/coman/src/cli/rpc.rs index 143b013..0d655a7 100644 --- a/coman/src/cli/rpc.rs +++ b/coman/src/cli/rpc.rs @@ -1,5 +1,8 @@ use futures::StreamExt; use iroh::protocol::ProtocolHandler; +use nvml_wrapper::Nvml; +use serde::{Deserialize, Serialize}; +use sysinfo::System; use tarpc::{ serde_transport as transport, server, server::Channel, tokio_serde::formats::Bincode, tokio_util::codec::LengthDelimitedCodec, @@ -8,9 +11,18 @@ use tokio_duplex::Duplex; use crate::cli::app::COMAN_VERSION; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResourceUsage { + pub cpu: f32, + pub mem_used: u64, + pub mem_total: u64, + pub gpu: Option, +} + #[tarpc::service] pub trait ComanRPC { async fn version() -> String; + async fn resource_usage() -> ResourceUsage; } #[derive(Debug, Clone)] struct RpcServer; @@ -19,6 +31,41 @@ impl ComanRPC for RpcServer { async fn version(self, _: tarpc::context::Context) -> String { COMAN_VERSION.to_string() } + + async fn resource_usage(self, _context: ::tarpc::context::Context) -> ResourceUsage { + let mut sys = System::new_all(); + sys.refresh_all(); + let mut cpu_usage = 0.0; + for cpu in sys.cpus() { + cpu_usage += cpu.cpu_usage(); + } + cpu_usage /= sys.cpus().len() as f32; + let gpu_usage = match Nvml::init() { + Ok(nvml) => match nvml.device_by_index(0) { + Ok(device) => match device.memory_info() { + Ok(memory_info) => Some(memory_info.used), + Err(e) => { + println!("Couldn't get GPU memory info: {e:?}"); + None + } + }, + Err(e) => { + println!("couldn't load nvidia device 0: {e:?}"); + None + } + }, + Err(e) => { + println!("Nvidia Device Info not available: {e:?}"); + None + } + }; + ResourceUsage { + cpu: cpu_usage, + mem_used: sys.used_memory(), + mem_total: sys.total_memory(), + gpu: gpu_usage, + } + } } #[derive(Debug, Default)] diff --git a/coman/src/cscs/handlers.rs b/coman/src/cscs/handlers.rs index 6652629..b9261ac 100644 --- a/coman/src/cscs/handlers.rs +++ b/coman/src/cscs/handlers.rs @@ -30,7 +30,10 @@ use tokio_util::codec::LengthDelimitedCodec; use super::api_client::client::{EdfSpec, ScriptSpec}; use crate::{ - cli::{app::COMAN_VERSION, rpc::ComanRPCClient}, + cli::{ + app::COMAN_VERSION, + rpc::{ComanRPCClient, ResourceUsage}, + }, config::{ComputePlatform, Config, get_data_dir}, cscs::{ api_client::{ @@ -168,10 +171,10 @@ pub async fn cscs_job_log( } } -pub async fn cscs_resource_usage(job_id: i64, system: Option) -> Result<()> { +pub async fn cscs_resource_usage(job_id: i64, system: Option) -> Result { let endpoint_id = get_endpoint_id(job_id, system).await?; - let alpn: Vec = "/coman/rpc".to_string().into_bytes(); + let alpn: Vec = b"/coman/rpc".to_vec(); let secret_key = SecretKey::generate(&mut rand::rng()); let endpoint = Endpoint::builder().secret_key(secret_key).bind().await?; @@ -183,10 +186,11 @@ pub async fn cscs_resource_usage(job_id: i64, system: Option) -> Result< let framed = codec_builder.new_framed(combined); let transport = serde_transport::new(framed, Bincode::default()); let client = ComanRPCClient::new(client::Config::default(), transport); - let result = client.spawn().version(context::current()).await?; - let _ = dbg!(result); - - Ok(()) + client + .spawn() + .resource_usage(context::current()) + .await + .wrap_err("couldn't get resource usage from remote") } Err(e) => Err(e).wrap_err("couldn't establish tunnel to remote"), } @@ -296,7 +300,10 @@ async fn setup_ssh( path.canonicalize().map(Some).wrap_err("couldn't get ssh key path")? } else { // try to figure our ssh key - let ssh_dir = dirs::home_dir().ok_or(eyre!("couldn't find home dir"))?.join(".ssh"); + let ssh_dir = directories::UserDirs::new() + .ok_or(eyre!("couldn't find home dir"))? + .home_dir() + .join(".ssh"); let mut ssh_path = None; for file in ["id_dsa.pub", "id_ecdsa.pub", "id_rsa.pub", "id_ed25519.pub"] { let path = ssh_dir.join(file); @@ -410,7 +417,10 @@ async fn store_ssh_information( current_system, job_id )?; - let ssh_dir = dirs::home_dir().ok_or(eyre!("couldn't find home dir"))?.join(".ssh"); + let ssh_dir = directories::UserDirs::new() + .ok_or(eyre!("couldn't find home dir"))? + .home_dir() + .join(".ssh"); let ssh_config_path = ssh_dir.join("config"); let mut ssh_config = std::fs::OpenOptions::new() .read(true) @@ -517,7 +527,7 @@ async fn inject_coman_squash( let config = Config::new().unwrap(); let local_squash_path = maybe_download_latest_squash(current_system, &config).await?; let target = base_path.join("coman.sqsh"); - let file_meta = std::fs::metadata(local_squash_path.clone())?; + let file_meta = std::fs::metadata(local_squash_path.clone()).wrap_err("couldn't load coman squash file")?; #[cfg(target_family = "unix")] let size = file_meta.size() as usize; @@ -641,26 +651,32 @@ async fn handle_edf( None }; if let Some(docker_image) = docker_image { - let meta = docker_image.inspect().await?; - if let Some(system_info) = config.values.cscs.systems.get(current_system) { - let mut compatible = false; - for sys_platform in system_info.architecture.iter() { - if meta.platforms.contains(&sys_platform.clone().into()) { - compatible = true; + match docker_image.inspect().await { + Ok(meta) => { + if let Some(system_info) = config.values.cscs.systems.get(current_system) { + let mut compatible = false; + for sys_platform in system_info.architecture.iter() { + if meta.platforms.contains(&sys_platform.clone().into()) { + compatible = true; + } + } + + if !compatible { + return Err(eyre!( + "System {} only supports images with architecture(s) '{}' but the supplied image is for architecture(s) '{}'", + current_system, + system_info.architecture.join(","), + meta.platforms + .iter() + .map(|p| p.to_string()) + .collect::>() + .join(",") + )); + } } } - - if !compatible { - return Err(eyre!( - "System {} only supports images with architecture(s) '{}' but the supplied image is for architecture(s) '{}'", - current_system, - system_info.architecture.join(","), - meta.platforms - .iter() - .map(|p| p.to_string()) - .collect::>() - .join(",") - )); + Err(e) => { + println!("couldn't get image information, skipping checks: {e:?}"); } } diff --git a/coman/src/util/types.rs b/coman/src/util/types.rs index b048f5e..2f58517 100644 --- a/coman/src/util/types.rs +++ b/coman/src/util/types.rs @@ -10,7 +10,7 @@ use nom::{ character::complete::{alphanumeric1, digit1}, combinator::{complete, opt, recognize}, multi::{many_m_n, many1, separated_list0, separated_list1}, - sequence::{preceded, terminated}, + sequence::{preceded, separated_pair, terminated}, }; use oci_distribution::{ Client, Reference, @@ -72,7 +72,10 @@ impl DockerImageUrl { }); let reference = self.to_string().parse()?; let auth = docker_auth(&reference)?; - let (manifest, _) = client.pull_manifest(&reference, &auth).await?; + let (manifest, _) = client + .pull_manifest(&reference, &auth) + .await + .wrap_err(format!("Couldn't get image manifest for image {reference}"))?; match manifest { OciManifest::Image(oci_image_manifest) => { // it's not clear what is returned in this case, I never hit this in my testing. @@ -129,15 +132,16 @@ impl FromStr for DockerImageUrl { fn from_str(s: &str) -> Result { // see https://ktomk.github.io/pipelines/doc/DOCKER-NAME-TAG.html#syntax let host = opt(terminated( - alt(( - recognize((separated_list1(tag("."), alphanumeric1), tag(":"), digit1)), - recognize(separated_list1(tag("."), alphanumeric1)), + recognize(separated_pair( + separated_pair(alphanumeric1, tag("."), separated_list1(tag("."), alphanumeric1)), + opt(tag(":")), + opt(digit1), )), tag("/"), )); - let image = recognize(separated_list0( + let image = recognize(separated_list1( tag("/"), - separated_list0( + separated_list1( alt(( tag("."), recognize(many_m_n(1, 2, tag("_"))), @@ -200,6 +204,8 @@ mod tests { #[rstest] #[case("ubuntu",(None,"ubuntu",None,None))] + #[case("nvidia/cuda:13.1.1-cudnn-devel-ubuntu24.04",(None,"nvidia/cuda",Some("13.1.1-cudnn-devel-ubuntu24.04"),None))] + #[case("nvidia/cuda@sha256:deadbeef",(None,"nvidia/cuda",None,Some("deadbeef")))] #[case("docker.io/library/hello-world:latest@sha256:deadbeef",(Some("docker.io"),"library/hello-world",Some("latest"),Some("deadbeef")))] #[case("ghcr.io/swissdatasciencecenter/renku-frontend-buildpacks/run-image:0.2.1",(Some("ghcr.io"),"swissdatasciencecenter/renku-frontend-buildpacks/run-image",Some("0.2.1"),None))] #[case("test.ghcr.io/a/b/c/d/e:a-1.f-2", (Some("test.ghcr.io"), "a/b/c/d/e", Some("a-1.f-2"), None))] From 43de0064b21d0024ca22ea2c41662a33f56af788 Mon Sep 17 00:00:00 2001 From: Ralf Grubenmann Date: Mon, 26 Jan 2026 16:29:42 +0100 Subject: [PATCH 3/5] add dcgm support and mount libs --- coman/.config/config.toml | 1 + coman/src/cli/app.rs | 2 +- coman/src/cli/rpc.rs | 27 ++++++------ coman/src/cscs/cli.rs | 13 +++++- coman/src/cscs/dcgm_enroot_hook.sh | 60 ++++++++++++++++++++++++++ coman/src/cscs/handlers.rs | 43 ++++++++++++++++-- coman/src/cscs/ports.rs | 2 +- firecrest_client/src/filesystem_api.rs | 2 +- 8 files changed, 130 insertions(+), 20 deletions(-) create mode 100644 coman/src/cscs/dcgm_enroot_hook.sh diff --git a/coman/.config/config.toml b/coman/.config/config.toml index b261991..6f0fb76 100644 --- a/coman/.config/config.toml +++ b/coman/.config/config.toml @@ -60,6 +60,7 @@ com.hooks.ssh.enabled = "true" com.hooks.ssh.authorize_ssh_key = "{{ ssh_public_key }}" com.hooks.ssh.port = "15263" {% endif %} +com.hooks.dcgm.enabled = "true" """ # set environment variables that should be passed to a job diff --git a/coman/src/cli/app.rs b/coman/src/cli/app.rs index 87bbc69..e845c20 100644 --- a/coman/src/cli/app.rs +++ b/coman/src/cli/app.rs @@ -408,7 +408,7 @@ fn remote_path_completer(current: &std::ffi::OsStr) -> Vec let (send, mut recv) = mpsc::unbounded_channel(); if current.is_empty() || current == "/" { tokio::spawn(async move { - let roots = file_system_roots().await; + let roots = file_system_roots(None).await; if let Ok(roots) = roots { for root in roots { send.send(CompletionCandidate::new(root.name.clone())).unwrap(); diff --git a/coman/src/cli/rpc.rs b/coman/src/cli/rpc.rs index 0d655a7..1ea6921 100644 --- a/coman/src/cli/rpc.rs +++ b/coman/src/cli/rpc.rs @@ -2,7 +2,7 @@ use futures::StreamExt; use iroh::protocol::ProtocolHandler; use nvml_wrapper::Nvml; use serde::{Deserialize, Serialize}; -use sysinfo::System; +use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, System, get_current_pid}; use tarpc::{ serde_transport as transport, server, server::Channel, tokio_serde::formats::Bincode, tokio_util::codec::LengthDelimitedCodec, @@ -11,11 +11,11 @@ use tokio_duplex::Duplex; use crate::cli::app::COMAN_VERSION; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct ResourceUsage { pub cpu: f32, - pub mem_used: u64, - pub mem_total: u64, + pub rss: u64, + pub vss: u64, pub gpu: Option, } @@ -35,11 +35,14 @@ impl ComanRPC for RpcServer { async fn resource_usage(self, _context: ::tarpc::context::Context) -> ResourceUsage { let mut sys = System::new_all(); sys.refresh_all(); - let mut cpu_usage = 0.0; - for cpu in sys.cpus() { - cpu_usage += cpu.cpu_usage(); - } - cpu_usage /= sys.cpus().len() as f32; + tokio::time::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL).await; + sys.refresh_processes_specifics(ProcessesToUpdate::All, true, ProcessRefreshKind::nothing().with_cpu()); + let Ok(pid) = get_current_pid() else { + return ResourceUsage::default(); + }; + let Some(process) = sys.process(pid) else { + return ResourceUsage::default(); + }; let gpu_usage = match Nvml::init() { Ok(nvml) => match nvml.device_by_index(0) { Ok(device) => match device.memory_info() { @@ -60,9 +63,9 @@ impl ComanRPC for RpcServer { } }; ResourceUsage { - cpu: cpu_usage, - mem_used: sys.used_memory(), - mem_total: sys.total_memory(), + cpu: process.cpu_usage() / sys.cpus().len() as f32, + rss: process.memory(), + vss: process.virtual_memory(), gpu: gpu_usage, } } diff --git a/coman/src/cscs/cli.rs b/coman/src/cscs/cli.rs index f5199ec..5e3bb2b 100644 --- a/coman/src/cscs/cli.rs +++ b/coman/src/cscs/cli.rs @@ -4,6 +4,7 @@ use std::{ time::{Duration, Instant}, }; +use bytesize::ByteSize; use color_eyre::{Result, eyre::Context}; use eyre::eyre; use futures::StreamExt; @@ -142,8 +143,16 @@ pub(crate) async fn cli_cscs_job_resource_usage( platform: Option, ) -> Result<()> { let job_id = maybe_job_id_from_name(job, system.clone(), platform.clone()).await?; - println!("running port forward for job {job_id}"); - cscs_resource_usage(job_id, system).await + let result = cscs_resource_usage(job_id, system).await?; + println!( + "CPU: {}%, Memory: RSS {} VSS {}, GPU: {}", + result.cpu, + ByteSize::b(result.rss).display().iec(), + ByteSize::b(result.vss).display().iec(), + result.gpu.map(|g| g.to_string()).unwrap_or("N/A".to_string()) + ); + + Ok(()) } #[allow(clippy::too_many_arguments)] diff --git a/coman/src/cscs/dcgm_enroot_hook.sh b/coman/src/cscs/dcgm_enroot_hook.sh new file mode 100644 index 0000000..17d2a5e --- /dev/null +++ b/coman/src/cscs/dcgm_enroot_hook.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + +set -euo pipefail +shopt -s lastpipe nullglob + +export PATH=\"${PATH}:/usr/sbin:/sbin\" + +source \"${ENROOT_LIBRARY_PATH}/common.sh\" + +common::checkcmd grep sed ldd ldconfig + +if [ \"${OCI_ANNOTATION_com__hooks__dcgm__enabled:-}\" != \"true\" ]; then + exit 0 +fi + +# Mounting the specified DCGM libraries and directories explicitly +cat << EOF | enroot-mount --root \"${ENROOT_ROOTFS}\" - +/usr/local/dcgm /usr/local/dcgm none x-create=dir,bind,ro,nosuid,nodev,private +/usr/lib64/libnvperf_dcgm_host.so /usr/lib64/libnvperf_dcgm_host.so none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulesysmon.so.3.3.6 /usr/lib64/libdcgmmodulesysmon.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulesysmon.so.3 /usr/lib64/libdcgmmodulesysmon.so.3 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulesysmon.so /usr/lib64/libdcgmmodulesysmon.so none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmoduleprofiling.so.3.3.6 /usr/lib64/libdcgmmoduleprofiling.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmoduleprofiling.so.3 /usr/lib64/libdcgmmoduleprofiling.so.3 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmoduleprofiling.so /usr/lib64/libdcgmmoduleprofiling.so none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulepolicy.so.3.3.6 /usr/lib64/libdcgmmodulepolicy.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulepolicy.so.3 /usr/lib64/libdcgmmodulepolicy.so.3 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulepolicy.so /usr/lib64/libdcgmmodulepolicy.so none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulenvswitch.so.3.3.6 /usr/lib64/libdcgmmodulenvswitch.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulenvswitch.so.3 /usr/lib64/libdcgmmodulenvswitch.so.3 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulenvswitch.so /usr/lib64/libdcgmmodulenvswitch.so none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmoduleintrospect.so.3.3.6 /usr/lib64/libdcgmmoduleintrospect.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmoduleintrospect.so.3 /usr/lib64/libdcgmmoduleintrospect.so.3 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmoduleintrospect.so /usr/lib64/libdcgmmoduleintrospect.so none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulehealth.so.3.3.6 /usr/lib64/libdcgmmodulehealth.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulehealth.so.3 /usr/lib64/libdcgmmodulehealth.so.3 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulehealth.so /usr/lib64/libdcgmmodulehealth.so none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulediag.so.3.3.6 /usr/lib64/libdcgmmodulediag.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulediag.so.3 /usr/lib64/libdcgmmodulediag.so.3 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmodulediag.so /usr/lib64/libdcgmmodulediag.so none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmoduleconfig.so.3.3.6 /usr/lib64/libdcgmmoduleconfig.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmoduleconfig.so.3 /usr/lib64/libdcgmmoduleconfig.so.3 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgmmoduleconfig.so /usr/lib64/libdcgmmoduleconfig.so none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgm_stub.a /usr/lib64/libdcgm_stub.a none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgm_cublas_proxy12.so /usr/lib64/libdcgm_cublas_proxy12.so none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgm_cublas_proxy11.so /usr/lib64/libdcgm_cublas_proxy11.so none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgm.so.3.3.6 /usr/lib64/libdcgm.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgm.so.3 /usr/lib64/libdcgm.so.3 none x-create=file,bind,ro,nosuid,nodev,private +/usr/lib64/libdcgm.so /usr/lib64/libdcgm.so none x-create=file,bind,ro,nosuid,nodev,private +EOF + +# Refresh the dynamic linker cache to include newly mounted libs +cat << EOF > \"${ENROOT_ROOTFS}/etc/ld.so.conf.d/enroot-dcgm-hook.conf\" +/lib64 +/usr/lib64 +EOF + +if ! ${ldconfig:-ldconfig} -r \"${ENROOT_ROOTFS}\" >> \"${ENROOT_ROOTFS}/dcgm-hook.log\" 2>&1; then + common::err \"Failed to refresh the dynamic linker cache\" +fi diff --git a/coman/src/cscs/handlers.rs b/coman/src/cscs/handlers.rs index b9261ac..a1e9906 100644 --- a/coman/src/cscs/handlers.rs +++ b/coman/src/cscs/handlers.rs @@ -52,6 +52,7 @@ use crate::{ }; const CSCS_MAX_DIRECT_SIZE: usize = 5242880; +const DCGM_ENROOT_HOOK: &str = include_str!("./dcgm_enroot_hook.sh"); async fn get_access_token() -> Result { let client_id = match get_secret(CLIENT_ID_SECRET_NAME).await { @@ -750,6 +751,30 @@ async fn handle_script( Ok(script_path) } +async fn setup_dcgm_hook(api_client: &CscsApi, current_system: &str) -> Result<()> { + let user_dirs = file_system_roots(Some(FileSystemType::Users)).await?; + let user_dir = user_dirs + .first() + .ok_or(eyre!("couldn't find user root directory on remote"))?; + let path = PathBuf::from(user_dir.name.clone()) + .join(".config") + .join("enroot") + .join("hooks.d") + .join("cscs_jobreport_dcgm_hook.sh"); + + let response = api_client.checksum(current_system, path.clone()).await; + if let Ok(Some(_)) = response { + // file exists + return Ok(()); + } + api_client + .mkdir(current_system, path.parent().unwrap().to_path_buf()) + .await?; + api_client + .upload(current_system, path, DCGM_ENROOT_HOOK.as_bytes().to_vec()) + .await + .wrap_err("couldn't upload dcgm enroot hook ".to_string()) +} pub async fn cscs_job_start( name: Option, @@ -807,7 +832,9 @@ pub async fn cscs_job_start( if coman_squash.is_none() { println!("Warning: coman squash wasn't templated and is needed for ssh through coman to work"); } - + if let Err(e) = setup_dcgm_hook(&api_client, current_system).await { + println!("Warning: couldn't set up dcgm hook: {e:?}"); + } let environment_path = handle_edf( &api_client, &base_path, @@ -869,7 +896,7 @@ pub async fn cscs_file_list( Err(e) => Err(e), } } -pub async fn file_system_roots() -> Result> { +pub async fn file_system_roots(type_filter: Option) -> Result> { let config = Config::new().expect("couldn't load config"); let user_info = cscs_user_info(None, None).await?; let systems = cscs_system_list(None).await?; @@ -878,7 +905,17 @@ pub async fn file_system_roots() -> Result> { .find(|s| s.name == config.values.cscs.current_system) .unwrap_or_else(|| panic!("couldn't get info for system {}", config.values.cscs.current_system)); let mut subpaths = vec![]; - for fs in system.file_systems.clone() { + let filesystems = if let Some(filter) = type_filter { + system + .file_systems + .clone() + .into_iter() + .filter(|fs| fs.data_type == filter) + .collect() + } else { + system.file_systems.clone() + }; + for fs in filesystems { let entry = match cscs_stat_path(PathBuf::from(fs.path.clone()).join(user_info.name.clone()), None, None).await { Ok(Some(_)) => PathEntry { diff --git a/coman/src/cscs/ports.rs b/coman/src/cscs/ports.rs index 44f3bd5..6b3a59e 100644 --- a/coman/src/cscs/ports.rs +++ b/coman/src/cscs/ports.rs @@ -248,7 +248,7 @@ async fn list_files(id: PathBuf) -> Result>> { .map_err(|_| eyre!("couldn't convert id to string".to_owned()))?; if id_str == "/" { // load file system roots - let subpaths = file_system_roots().await?; + let subpaths = file_system_roots(None).await?; Ok(Some(Event::User(UserEvent::File(FileEvent::List(id_str, subpaths))))) } else { let subpaths = cscs_file_list(id, None, None).await?; diff --git a/firecrest_client/src/filesystem_api.rs b/firecrest_client/src/filesystem_api.rs index f87eaf4..0a32d46 100644 --- a/firecrest_client/src/filesystem_api.rs +++ b/firecrest_client/src/filesystem_api.rs @@ -21,7 +21,7 @@ pub async fn get_filesystem_ops_ls( let response = client .get( format!("filesystem/{system_name}/ops/ls").as_str(), - Some(vec![("path", path)]), + Some(vec![("path", path), ("showHidden", "true")]), ) .await?; let model: GetDirectoryLsResponse = serde_json::from_str(response.as_str())?; From 058098088a34af92dc3bf1ba74f907ecedf5e50d Mon Sep 17 00:00:00 2001 From: Ralf Grubenmann Date: Wed, 28 Jan 2026 08:56:05 +0100 Subject: [PATCH 4/5] switch to nvidia-smi --- Cargo.lock | 46 -------------------- coman/Cargo.toml | 1 - coman/src/cli/rpc.rs | 45 ++++++++++--------- coman/src/cscs/api_client/client.rs | 4 +- coman/src/cscs/cli.rs | 20 +++++++-- coman/src/cscs/dcgm_enroot_hook.sh | 60 -------------------------- coman/src/cscs/handlers.rs | 36 ++-------------- firecrest_client/src/filesystem_api.rs | 6 ++- 8 files changed, 53 insertions(+), 165 deletions(-) delete mode 100644 coman/src/cscs/dcgm_enroot_hook.sh diff --git a/Cargo.lock b/Cargo.lock index 7dba173..2b3e004 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1249,7 +1249,6 @@ dependencies = [ "lazy_static", "libc", "nom 8.0.0", - "nvml-wrapper", "oci-distribution", "open", "openidconnect", @@ -5013,16 +5012,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "libloading" -version = "0.8.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" -dependencies = [ - "cfg-if", - "windows-link 0.2.1", -] - [[package]] name = "libm" version = "0.2.15" @@ -5692,29 +5681,6 @@ dependencies = [ "libc", ] -[[package]] -name = "nvml-wrapper" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d5c6c0ef9702176a570f06ad94f3198bc29c524c8b498f1b9346e1b1bdcbb3a" -dependencies = [ - "bitflags", - "libloading", - "nvml-wrapper-sys", - "static_assertions", - "thiserror 1.0.69", - "wrapcenum-derive", -] - -[[package]] -name = "nvml-wrapper-sys" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd23dbe2eb8d8335d2bce0299e0a07d6a63c089243d626ca75b770a962ff49e6" -dependencies = [ - "libloading", -] - [[package]] name = "oas3" version = "0.20.1" @@ -9809,18 +9775,6 @@ dependencies = [ "windows-core 0.62.2", ] -[[package]] -name = "wrapcenum-derive" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a76ff259533532054cfbaefb115c613203c73707017459206380f03b3b3f266e" -dependencies = [ - "darling 0.20.11", - "proc-macro2", - "quote", - "syn 2.0.108", -] - [[package]] name = "writeable" version = "0.6.1" diff --git a/coman/Cargo.toml b/coman/Cargo.toml index 8500d30..cfbdf4b 100644 --- a/coman/Cargo.toml +++ b/coman/Cargo.toml @@ -91,7 +91,6 @@ tarpc = { version = "0.37.0", features = [ ] } tokio-duplex = "1.0.1" sysinfo = "0.38.0" -nvml-wrapper = "0.11.0" bytesize = "2.3.1" [build-dependencies] diff --git a/coman/src/cli/rpc.rs b/coman/src/cli/rpc.rs index 1ea6921..3426a3d 100644 --- a/coman/src/cli/rpc.rs +++ b/coman/src/cli/rpc.rs @@ -1,6 +1,6 @@ +use bytesize::ByteSize; use futures::StreamExt; use iroh::protocol::ProtocolHandler; -use nvml_wrapper::Nvml; use serde::{Deserialize, Serialize}; use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, System, get_current_pid}; use tarpc::{ @@ -16,7 +16,7 @@ pub struct ResourceUsage { pub cpu: f32, pub rss: u64, pub vss: u64, - pub gpu: Option, + pub gpu: Option>, } #[tarpc::service] @@ -43,25 +43,30 @@ impl ComanRPC for RpcServer { let Some(process) = sys.process(pid) else { return ResourceUsage::default(); }; - let gpu_usage = match Nvml::init() { - Ok(nvml) => match nvml.device_by_index(0) { - Ok(device) => match device.memory_info() { - Ok(memory_info) => Some(memory_info.used), - Err(e) => { - println!("Couldn't get GPU memory info: {e:?}"); - None - } - }, - Err(e) => { - println!("couldn't load nvidia device 0: {e:?}"); - None - } - }, - Err(e) => { - println!("Nvidia Device Info not available: {e:?}"); - None - } + let gpu_usage = if let Ok(output) = std::process::Command::new("nvidia-smi") + .args(vec![ + "--query-gpu=memory.total,memory.used", + "--format=csv,noheader,nounits", + ]) + .output() + { + let output = String::from_utf8_lossy(&output.stdout); + let usage = output + .lines() + .map(|l| l.split_once(",").unwrap()) + .map(|(total, used)| { + ( + ByteSize::mib(total.trim().parse::().unwrap()).as_u64(), + ByteSize::mib(used.trim().parse::().unwrap()).as_u64(), + ) + }) + .collect(); + Some(usage) + } else { + println!("Failed to execute nvidia-smi, maybe it's not installed"); + None }; + ResourceUsage { cpu: process.cpu_usage() / sys.cpus().len() as f32, rss: process.memory(), diff --git a/coman/src/cscs/api_client/client.rs b/coman/src/cscs/api_client/client.rs index be70dcf..76c0e14 100644 --- a/coman/src/cscs/api_client/client.rs +++ b/coman/src/cscs/api_client/client.rs @@ -222,8 +222,8 @@ impl CscsApi { None => Ok("".to_string()), } } - pub async fn list_path(&self, system_name: &str, path: PathBuf) -> Result> { - let result = get_filesystem_ops_ls(&self.client, system_name, path) + pub async fn list_path(&self, system_name: &str, path: PathBuf, show_hidden: bool) -> Result> { + let result = get_filesystem_ops_ls(&self.client, system_name, path, show_hidden) .await .wrap_err("couldn't list path")?; match result.output { diff --git a/coman/src/cscs/cli.rs b/coman/src/cscs/cli.rs index 5e3bb2b..d5fa986 100644 --- a/coman/src/cscs/cli.rs +++ b/coman/src/cscs/cli.rs @@ -144,12 +144,26 @@ pub(crate) async fn cli_cscs_job_resource_usage( ) -> Result<()> { let job_id = maybe_job_id_from_name(job, system.clone(), platform.clone()).await?; let result = cscs_resource_usage(job_id, system).await?; + println!("CPU: {:.1}%", result.cpu); println!( - "CPU: {}%, Memory: RSS {} VSS {}, GPU: {}", - result.cpu, + "Memory: RSS {:.1}, VSS: {:.1}", ByteSize::b(result.rss).display().iec(), ByteSize::b(result.vss).display().iec(), - result.gpu.map(|g| g.to_string()).unwrap_or("N/A".to_string()) + ); + println!( + "GPU: {}", + result + .gpu + .map(|g| g + .into_iter() + .map(|(total, used)| format!( + "{}/{}({:.1}%)", + ByteSize::b(used).display().iec(), + ByteSize::b(total).display().iec(), + used as f64 / total as f64 * 100.0 + )) + .join(", ")) + .unwrap_or("N/A".to_string()) ); Ok(()) diff --git a/coman/src/cscs/dcgm_enroot_hook.sh b/coman/src/cscs/dcgm_enroot_hook.sh deleted file mode 100644 index 17d2a5e..0000000 --- a/coman/src/cscs/dcgm_enroot_hook.sh +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail -shopt -s lastpipe nullglob - -export PATH=\"${PATH}:/usr/sbin:/sbin\" - -source \"${ENROOT_LIBRARY_PATH}/common.sh\" - -common::checkcmd grep sed ldd ldconfig - -if [ \"${OCI_ANNOTATION_com__hooks__dcgm__enabled:-}\" != \"true\" ]; then - exit 0 -fi - -# Mounting the specified DCGM libraries and directories explicitly -cat << EOF | enroot-mount --root \"${ENROOT_ROOTFS}\" - -/usr/local/dcgm /usr/local/dcgm none x-create=dir,bind,ro,nosuid,nodev,private -/usr/lib64/libnvperf_dcgm_host.so /usr/lib64/libnvperf_dcgm_host.so none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulesysmon.so.3.3.6 /usr/lib64/libdcgmmodulesysmon.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulesysmon.so.3 /usr/lib64/libdcgmmodulesysmon.so.3 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulesysmon.so /usr/lib64/libdcgmmodulesysmon.so none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmoduleprofiling.so.3.3.6 /usr/lib64/libdcgmmoduleprofiling.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmoduleprofiling.so.3 /usr/lib64/libdcgmmoduleprofiling.so.3 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmoduleprofiling.so /usr/lib64/libdcgmmoduleprofiling.so none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulepolicy.so.3.3.6 /usr/lib64/libdcgmmodulepolicy.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulepolicy.so.3 /usr/lib64/libdcgmmodulepolicy.so.3 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulepolicy.so /usr/lib64/libdcgmmodulepolicy.so none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulenvswitch.so.3.3.6 /usr/lib64/libdcgmmodulenvswitch.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulenvswitch.so.3 /usr/lib64/libdcgmmodulenvswitch.so.3 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulenvswitch.so /usr/lib64/libdcgmmodulenvswitch.so none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmoduleintrospect.so.3.3.6 /usr/lib64/libdcgmmoduleintrospect.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmoduleintrospect.so.3 /usr/lib64/libdcgmmoduleintrospect.so.3 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmoduleintrospect.so /usr/lib64/libdcgmmoduleintrospect.so none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulehealth.so.3.3.6 /usr/lib64/libdcgmmodulehealth.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulehealth.so.3 /usr/lib64/libdcgmmodulehealth.so.3 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulehealth.so /usr/lib64/libdcgmmodulehealth.so none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulediag.so.3.3.6 /usr/lib64/libdcgmmodulediag.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulediag.so.3 /usr/lib64/libdcgmmodulediag.so.3 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmodulediag.so /usr/lib64/libdcgmmodulediag.so none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmoduleconfig.so.3.3.6 /usr/lib64/libdcgmmoduleconfig.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmoduleconfig.so.3 /usr/lib64/libdcgmmoduleconfig.so.3 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgmmoduleconfig.so /usr/lib64/libdcgmmoduleconfig.so none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgm_stub.a /usr/lib64/libdcgm_stub.a none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgm_cublas_proxy12.so /usr/lib64/libdcgm_cublas_proxy12.so none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgm_cublas_proxy11.so /usr/lib64/libdcgm_cublas_proxy11.so none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgm.so.3.3.6 /usr/lib64/libdcgm.so.3.3.6 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgm.so.3 /usr/lib64/libdcgm.so.3 none x-create=file,bind,ro,nosuid,nodev,private -/usr/lib64/libdcgm.so /usr/lib64/libdcgm.so none x-create=file,bind,ro,nosuid,nodev,private -EOF - -# Refresh the dynamic linker cache to include newly mounted libs -cat << EOF > \"${ENROOT_ROOTFS}/etc/ld.so.conf.d/enroot-dcgm-hook.conf\" -/lib64 -/usr/lib64 -EOF - -if ! ${ldconfig:-ldconfig} -r \"${ENROOT_ROOTFS}\" >> \"${ENROOT_ROOTFS}/dcgm-hook.log\" 2>&1; then - common::err \"Failed to refresh the dynamic linker cache\" -fi diff --git a/coman/src/cscs/handlers.rs b/coman/src/cscs/handlers.rs index a1e9906..68d74b8 100644 --- a/coman/src/cscs/handlers.rs +++ b/coman/src/cscs/handlers.rs @@ -52,7 +52,6 @@ use crate::{ }; const CSCS_MAX_DIRECT_SIZE: usize = 5242880; -const DCGM_ENROOT_HOOK: &str = include_str!("./dcgm_enroot_hook.sh"); async fn get_access_token() -> Result { let client_id = match get_secret(CLIENT_ID_SECRET_NAME).await { @@ -751,30 +750,6 @@ async fn handle_script( Ok(script_path) } -async fn setup_dcgm_hook(api_client: &CscsApi, current_system: &str) -> Result<()> { - let user_dirs = file_system_roots(Some(FileSystemType::Users)).await?; - let user_dir = user_dirs - .first() - .ok_or(eyre!("couldn't find user root directory on remote"))?; - let path = PathBuf::from(user_dir.name.clone()) - .join(".config") - .join("enroot") - .join("hooks.d") - .join("cscs_jobreport_dcgm_hook.sh"); - - let response = api_client.checksum(current_system, path.clone()).await; - if let Ok(Some(_)) = response { - // file exists - return Ok(()); - } - api_client - .mkdir(current_system, path.parent().unwrap().to_path_buf()) - .await?; - api_client - .upload(current_system, path, DCGM_ENROOT_HOOK.as_bytes().to_vec()) - .await - .wrap_err("couldn't upload dcgm enroot hook ".to_string()) -} pub async fn cscs_job_start( name: Option, @@ -832,9 +807,6 @@ pub async fn cscs_job_start( if coman_squash.is_none() { println!("Warning: coman squash wasn't templated and is needed for ssh through coman to work"); } - if let Err(e) = setup_dcgm_hook(&api_client, current_system).await { - println!("Warning: couldn't set up dcgm hook: {e:?}"); - } let environment_path = handle_edf( &api_client, &base_path, @@ -890,7 +862,7 @@ pub async fn cscs_file_list( let api_client = CscsApi::new(access_token.0, platform).unwrap(); let config = Config::new().unwrap(); api_client - .list_path(&system.unwrap_or(config.values.cscs.current_system), path) + .list_path(&system.unwrap_or(config.values.cscs.current_system), path, false) .await } Err(e) => Err(e), @@ -946,7 +918,7 @@ pub async fn cscs_file_delete( let api_client = CscsApi::new(access_token.0, platform).unwrap(); let config = Config::new().unwrap(); let current_system = &system.unwrap_or(config.values.cscs.current_system); - let paths = api_client.list_path(current_system, remote.clone()).await?; + let paths = api_client.list_path(current_system, remote.clone(), false).await?; let path = paths.first().ok_or(eyre!("remote path doesn't exist"))?; if let PathType::Directory = path.path_type { return Err(eyre!("remote path must be a file, not directory")); @@ -975,7 +947,7 @@ pub async fn cscs_file_download( let api_client = CscsApi::new(access_token.0, platform).unwrap(); let config = Config::new().unwrap(); let current_system = &system.unwrap_or(config.values.cscs.current_system); - let paths = api_client.list_path(current_system, remote.clone()).await?; + let paths = api_client.list_path(current_system, remote.clone(), false).await?; let path = paths.first().ok_or(eyre!("remote path doesn't exist"))?; if let PathType::Directory = path.path_type { return Err(eyre!("remote path must be a file, not directory")); @@ -1008,7 +980,7 @@ pub async fn cscs_file_upload( let api_client = CscsApi::new(access_token.0, platform).unwrap(); let config = Config::new().unwrap(); let current_system = &system.unwrap_or(config.values.cscs.current_system); - let existing = api_client.list_path(current_system, remote.clone()).await?; + let existing = api_client.list_path(current_system, remote.clone(), false).await?; let remote = if !existing.is_empty() { if existing.len() == 1 && existing[0].path_type == PathType::File { return Err(eyre!("remote file already exists")); diff --git a/firecrest_client/src/filesystem_api.rs b/firecrest_client/src/filesystem_api.rs index 0a32d46..15b50a4 100644 --- a/firecrest_client/src/filesystem_api.rs +++ b/firecrest_client/src/filesystem_api.rs @@ -16,12 +16,16 @@ pub async fn get_filesystem_ops_ls( client: &FirecrestClient, system_name: &str, path: PathBuf, + show_hidden: bool, ) -> Result { let path = path.as_os_str().to_str().ok_or(eyre!("couldn't cast path to string"))?; let response = client .get( format!("filesystem/{system_name}/ops/ls").as_str(), - Some(vec![("path", path), ("showHidden", "true")]), + Some(vec![ + ("path", path), + ("showHidden", if show_hidden { "true" } else { "false" }), + ]), ) .await?; let model: GetDirectoryLsResponse = serde_json::from_str(response.as_str())?; From 68942d2d1fee65020a81823555c2576aa762ae1c Mon Sep 17 00:00:00 2001 From: Ralf Grubenmann Date: Thu, 29 Jan 2026 15:10:37 +0100 Subject: [PATCH 5/5] add tui resource usage view --- Cargo.lock | 2 +- coman/Cargo.toml | 2 +- coman/src/app/ids.rs | 1 + coman/src/app/messages.rs | 1 + coman/src/app/model.rs | 62 ++- coman/src/app/user_events.rs | 2 + coman/src/cli/exec.rs | 4 +- coman/src/cli/rpc.rs | 24 +- coman/src/components/mod.rs | 1 + coman/src/components/resource_usage.rs | 542 +++++++++++++++++++++++++ coman/src/components/workload_list.rs | 19 +- coman/src/cscs/api_client/types.rs | 2 + coman/src/cscs/cli.rs | 10 +- coman/src/cscs/handlers.rs | 6 +- coman/src/cscs/ports.rs | 53 ++- coman/src/main.rs | 11 +- 16 files changed, 720 insertions(+), 22 deletions(-) create mode 100644 coman/src/components/resource_usage.rs diff --git a/Cargo.lock b/Cargo.lock index 2b3e004..d838a89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1216,7 +1216,7 @@ checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" [[package]] name = "coman" -version = "0.7.0" +version = "0.8.0" dependencies = [ "anyhow", "aws-sdk-s3", diff --git a/coman/Cargo.toml b/coman/Cargo.toml index cfbdf4b..efaeaad 100644 --- a/coman/Cargo.toml +++ b/coman/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "coman" -version = "0.7.0" +version = "0.8.0" edition = "2024" description = "Compute Manager for managing HPC compute" authors = ["Ralf Grubenmann "] diff --git a/coman/src/app/ids.rs b/coman/src/app/ids.rs index c488620..6f69f30 100644 --- a/coman/src/app/ids.rs +++ b/coman/src/app/ids.rs @@ -5,6 +5,7 @@ pub enum Id { WorkloadList, WorkloadLogs, WorkloadDetails, + WorkloadResourceUsage, GlobalListener, Menu, InfoPopup, diff --git a/coman/src/app/messages.rs b/coman/src/app/messages.rs index e03bc7e..6540c07 100644 --- a/coman/src/app/messages.rs +++ b/coman/src/app/messages.rs @@ -53,6 +53,7 @@ pub enum CscsMsg { #[derive(Debug, PartialEq)] pub enum JobMsg { Log(usize), + ResourceUsage(usize), Details(JobDetail), GetDetails(usize), Cancel(usize), diff --git a/coman/src/app/model.rs b/coman/src/app/model.rs index 6fd3b02..1509178 100644 --- a/coman/src/app/model.rs +++ b/coman/src/app/model.rs @@ -21,12 +21,12 @@ use crate::{ }, components::{ context_menu::ContextMenu, download_popup::DownloadTargetInput, error_popup::ErrorPopup, info_popup::InfoPopup, - login_popup::LoginPopup, system_select_popup::SystemSelectPopup, workload_details::WorkloadDetails, - workload_list::WorkloadList, workload_log::WorkloadLog, + login_popup::LoginPopup, resource_usage::ResourceUsage, system_select_popup::SystemSelectPopup, + workload_details::WorkloadDetails, workload_list::WorkloadList, workload_log::WorkloadLog, }, cscs::{ handlers::{cscs_login, cscs_system_set}, - ports::{BackgroundTask, JobLogAction}, + ports::{BackgroundTask, JobLogAction, JobResourceUsageAction}, }, trace_dbg, util::ui::{draw_area_in_absolute, draw_area_in_absolute_fixed_height}, @@ -58,6 +58,10 @@ where /// sending None stops watching pub job_log_tx: mpsc::Sender, + /// Triggers watching job logs + /// sending None stops watching + pub job_resource_usage_tx: mpsc::Sender, + /// Allows creating user events based on messages pub user_event_tx: mpsc::Sender, @@ -69,12 +73,14 @@ impl Model where T: TerminalAdapter, { + #[allow(clippy::too_many_arguments)] pub fn new( app: Application, bridge: TerminalBridge, error_tx: mpsc::Sender, select_system_tx: mpsc::Sender<()>, job_log_tx: mpsc::Sender, + job_resource_usage_tx: mpsc::Sender, user_event_tx: mpsc::Sender, background_task_tx: mpsc::Sender, ) -> Self { @@ -87,6 +93,7 @@ where error_tx, select_system_tx, job_log_tx, + job_resource_usage_tx, user_event_tx, background_task_tx, } @@ -151,6 +158,7 @@ where app.view(&Id::WorkloadList, frame, area); app.view(&Id::WorkloadLogs, frame, area); app.view(&Id::WorkloadDetails, frame, area); + app.view(&Id::WorkloadResourceUsage, frame, area); } fn view_files(app: &mut Application, frame: &mut Frame, area: Rect) { if app.mounted(&Id::FileView) { @@ -314,6 +322,9 @@ where if self.app.mounted(&Id::WorkloadDetails) { assert!(self.app.umount(&Id::WorkloadDetails).is_ok()); } + if self.app.mounted(&Id::WorkloadResourceUsage) { + assert!(self.app.umount(&Id::WorkloadResourceUsage).is_ok()); + } if !self.app.mounted(&Id::WorkloadLogs) { assert!( self.app @@ -362,6 +373,12 @@ where .is_ok() ); } + if self.app.mounted(&Id::WorkloadResourceUsage) { + assert!(self.app.umount(&Id::WorkloadResourceUsage).is_ok()); + } + if self.app.mounted(&Id::WorkloadLogs) { + assert!(self.app.umount(&Id::WorkloadLogs).is_ok()); + } if !self.app.mounted(&Id::WorkloadDetails) { assert!( self.app @@ -372,6 +389,37 @@ where assert!(self.app.active(&Id::WorkloadDetails).is_ok()); None } + JobMsg::ResourceUsage(jobid) => { + if self.app.mounted(&Id::WorkloadList) { + assert!( + self.app + .attr(&Id::WorkloadList, Attribute::Display, AttrValue::Flag(false)) + .is_ok() + ); + } + if self.app.mounted(&Id::WorkloadDetails) { + assert!(self.app.umount(&Id::WorkloadDetails).is_ok()); + } + if self.app.mounted(&Id::WorkloadLogs) { + assert!(self.app.umount(&Id::WorkloadLogs).is_ok()); + } + if !self.app.mounted(&Id::WorkloadResourceUsage) { + assert!( + self.app + .mount(Id::WorkloadResourceUsage, Box::new(ResourceUsage::default()), vec![]) + .is_ok() + ); + } + assert!(self.app.active(&Id::WorkloadResourceUsage).is_ok()); + let job_resource_usage_tx = self.job_resource_usage_tx.clone(); + tokio::spawn(async move { + job_resource_usage_tx + .send(JobResourceUsageAction::Job(jobid)) + .await + .unwrap(); + }); + None + } JobMsg::Switch => { let job_log_tx = self.job_log_tx.clone(); tokio::spawn(async move { @@ -386,6 +434,9 @@ where if self.app.mounted(&Id::WorkloadDetails) { assert!(self.app.umount(&Id::WorkloadDetails).is_ok()); } + if self.app.mounted(&Id::WorkloadResourceUsage) { + assert!(self.app.umount(&Id::WorkloadResourceUsage).is_ok()); + } if !self.app.mounted(&Id::WorkloadList) { assert!( self.app @@ -404,6 +455,11 @@ where // stopp polling for logs job_log_tx.send(JobLogAction::Stop).await.unwrap(); }); + let job_resource_usage_tx = self.job_resource_usage_tx.clone(); + tokio::spawn(async move { + // stopp polling for resource_usages + job_resource_usage_tx.send(JobResourceUsageAction::Stop).await.unwrap(); + }); None } } diff --git a/coman/src/app/user_events.rs b/coman/src/app/user_events.rs index 0b1d4be..864dff0 100644 --- a/coman/src/app/user_events.rs +++ b/coman/src/app/user_events.rs @@ -1,5 +1,6 @@ use crate::{ app::messages::View, + cli::rpc::ResourceUsage, cscs::api_client::types::{Job, JobDetail, PathEntry, System}, }; @@ -9,6 +10,7 @@ pub enum CscsEvent { GotWorkloadData(Vec), GotJobLog(String), GotJobDetails(JobDetail), + GotJobResourceUsage(ResourceUsage), SelectSystemList(Vec), SystemSelected(String), } diff --git a/coman/src/cli/exec.rs b/coman/src/cli/exec.rs index 1a870cf..d7dda74 100644 --- a/coman/src/cli/exec.rs +++ b/coman/src/cli/exec.rs @@ -11,7 +11,7 @@ use pid1::Pid1Settings; use rust_supervisor::{ChildType, Supervisor, SupervisorConfig}; use tokio::{io::AsyncWriteExt, net::TcpStream}; -use crate::cli::rpc::RpcHandler; +use crate::cli::rpc::{COMAN_RPC_ALPN, RpcHandler}; const SECRET_KEY_ENV: &str = "COMAN_IROH_SECRET"; const PORT_FORWARD_ENV: &str = "COMAN_FORWARDED_PORTS"; @@ -126,7 +126,7 @@ async fn port_forward() -> Result<()> { // add rpc server let rpc_handler = RpcHandler; - builder = builder.accept(b"/coman/rpc", rpc_handler); + builder = builder.accept(COMAN_RPC_ALPN, rpc_handler); let _router = builder.spawn(); println!("port forwarding started"); diff --git a/coman/src/cli/rpc.rs b/coman/src/cli/rpc.rs index 3426a3d..43b380d 100644 --- a/coman/src/cli/rpc.rs +++ b/coman/src/cli/rpc.rs @@ -11,14 +11,32 @@ use tokio_duplex::Duplex; use crate::cli::app::COMAN_VERSION; -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub const COMAN_RPC_ALPN: &[u8; 10] = b"/coman/rpc"; + +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] pub struct ResourceUsage { pub cpu: f32, pub rss: u64, - pub vss: u64, + pub vsz: u64, pub gpu: Option>, } +impl Eq for ResourceUsage {} +impl Ord for ResourceUsage { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.cpu.total_cmp(&other.cpu).then( + self.rss + .cmp(&other.rss) + .then(self.vsz.cmp(&other.vsz).then(self.gpu.cmp(&other.gpu))), + ) + } +} +impl PartialOrd for ResourceUsage { + fn partial_cmp(&self, other: &ResourceUsage) -> Option { + Some(self.cmp(other)) + } +} + #[tarpc::service] pub trait ComanRPC { async fn version() -> String; @@ -70,7 +88,7 @@ impl ComanRPC for RpcServer { ResourceUsage { cpu: process.cpu_usage() / sys.cpus().len() as f32, rss: process.memory(), - vss: process.virtual_memory(), + vsz: process.virtual_memory(), gpu: gpu_usage, } } diff --git a/coman/src/components/mod.rs b/coman/src/components/mod.rs index ad9ceab..e0cec75 100644 --- a/coman/src/components/mod.rs +++ b/coman/src/components/mod.rs @@ -5,6 +5,7 @@ pub(crate) mod file_tree; pub(crate) mod global_listener; pub(crate) mod info_popup; pub(crate) mod login_popup; +pub(crate) mod resource_usage; pub(crate) mod status_bar; pub(crate) mod system_select_popup; pub(crate) mod toolbar; diff --git a/coman/src/components/resource_usage.rs b/coman/src/components/resource_usage.rs new file mode 100644 index 0000000..b6a8e3f --- /dev/null +++ b/coman/src/components/resource_usage.rs @@ -0,0 +1,542 @@ +use bytesize::ByteSize; +use chrono::{Local, TimeDelta}; +use ratatui::{ + Frame, + layout::{Alignment, Constraint, Direction as LayoutDirection, Rect}, + symbols::Marker, + widgets::GraphType, +}; +use tui_realm_stdlib::{ + Chart, Container, + props::{CHART_X_BOUNDS, CHART_X_LABELS, CHART_Y_BOUNDS, CHART_Y_LABELS}, +}; +use tuirealm::{ + AttrValue, Attribute, Component, Event, MockComponent, State, + command::{Cmd, CmdResult, Direction, Position}, + event::{Key, KeyEvent}, + props::{Borders, Color, Dataset, Layout, PropPayload, PropValue, Style}, +}; + +use crate::app::{ + messages::{JobMsg, Msg}, + user_events::{CscsEvent, UserEvent}, +}; +pub const UPDATE_CPU_DATA: &str = "update-cpu-data"; +pub const UPDATE_MEMORY_DATA: &str = "update-memory-data"; +pub const UPDATE_GPU_DATA: &str = "update-gpu-data"; + +#[derive(MockComponent)] +pub struct ResourceUsage { + component: Container, +} + +impl Default for ResourceUsage { + fn default() -> Self { + Self { + component: Container::default() + .title("Resource Usage", Alignment::Left) + .layout( + Layout::default() + .constraints(&[ + Constraint::Percentage(33), + Constraint::Percentage(33), + Constraint::Percentage(34), + ]) + .direction(LayoutDirection::Vertical) + .margin(2), + ) + .children(vec![ + Box::new(CpuUsage::default()), + Box::new(MemoryUsage::default()), + Box::new(GpuUsage::default()), + ]), + } + } +} + +impl Component for ResourceUsage { + fn on(&mut self, ev: Event) -> Option { + let _ = match ev { + Event::Keyboard(KeyEvent { code: Key::Left, .. }) => self.perform(Cmd::Move(Direction::Left)), + Event::Keyboard(KeyEvent { code: Key::Right, .. }) => self.perform(Cmd::Move(Direction::Right)), + Event::Keyboard(KeyEvent { code: Key::Home, .. }) => self.perform(Cmd::GoTo(Position::Begin)), + Event::Keyboard(KeyEvent { code: Key::End, .. }) => self.perform(Cmd::GoTo(Position::End)), + Event::Keyboard(KeyEvent { code: Key::Esc, .. }) => { + return Some(Msg::Job(JobMsg::Close)); + } + Event::User(UserEvent::Cscs(CscsEvent::GotJobResourceUsage(ru))) => { + self.attr( + Attribute::Custom(UPDATE_CPU_DATA), + AttrValue::Payload(PropPayload::One(PropValue::F64(ru.cpu as f64))), + ); + self.attr( + Attribute::Custom(UPDATE_MEMORY_DATA), + AttrValue::Payload(PropPayload::Tup2((PropValue::U64(ru.rss), PropValue::U64(ru.vsz)))), + ); + if let Some(gpu) = ru.gpu { + self.attr( + Attribute::Custom(UPDATE_GPU_DATA), + AttrValue::Payload(PropPayload::Vec(gpu.iter().map(|g| PropValue::U64(g.1)).collect())), + ); + } + CmdResult::None + } + _ => CmdResult::None, + }; + Some(Msg::None) + } +} + +// START CPU +struct CpuUsage { + component: Chart, + dataset: Dataset, + max_y: f64, +} + +impl Default for CpuUsage { + fn default() -> Self { + let current_time = Local::now(); + let cur_time_str = current_time.format("%H:%M:%S").to_string(); + let start = current_time.checked_sub_signed(TimeDelta::minutes(5)).unwrap(); + let start_str = start.format("%H:%M:%S").to_string(); + Self { + component: Chart::default() + .disabled(false) + .title("CPU", Alignment::Left) + .borders(Borders::default()) + .x_style(Style::default().fg(Color::LightBlue)) + .x_title("") + .x_labels(&[&start_str, &cur_time_str]) + .x_bounds((start.timestamp() as f64, current_time.timestamp() as f64)) + .y_style(Style::default().fg(Color::Yellow)) + .y_title("") + .y_bounds((0.0, 1.0)) + .y_labels(&["0%", "100%"]), + dataset: Dataset::default() + .name("CPU") + .graph_type(GraphType::Line) + .marker(Marker::Braille) + .style(Style::default().fg(Color::Cyan)) + .data(Vec::new()), + max_y: 1.0, + } + } +} +impl MockComponent for CpuUsage { + fn view(&mut self, frame: &mut Frame, area: Rect) { + self.component.view(frame, area); + } + + fn query(&self, attr: Attribute) -> Option { + self.component.query(attr) + } + + fn attr(&mut self, query: Attribute, attr: AttrValue) { + match query { + Attribute::Custom(UPDATE_CPU_DATA) => { + // Update data + let mut current_data = self.dataset.get_data().to_vec(); + // update data + let current_time = Local::now(); + let since_epoch = current_time.timestamp() as f64; + let cpu_usage = attr.unwrap_payload().unwrap_one().unwrap_f64(); + if cpu_usage > self.max_y { + self.max_y = cpu_usage; + self.attr( + Attribute::Custom(CHART_Y_BOUNDS), + AttrValue::Payload(PropPayload::Tup2((PropValue::F64(0.0), PropValue::F64(self.max_y)))), + ); + } + current_data.push((since_epoch, cpu_usage)); + self.dataset = self.dataset.clone().data(current_data); + // update bounds + let start_time = self + .query(Attribute::Custom(CHART_X_BOUNDS)) + .unwrap() + .unwrap_payload() + .unwrap_tup2() + .0 + .unwrap_f64(); + self.attr( + Attribute::Custom(CHART_X_BOUNDS), + AttrValue::Payload(PropPayload::Tup2(( + PropValue::F64(start_time), + PropValue::F64(since_epoch), + ))), + ); + //update labels + let labels = self + .query(Attribute::Custom(CHART_X_LABELS)) + .unwrap() + .unwrap_payload() + .unwrap_vec(); + let start_label = labels[0].clone(); + self.attr( + Attribute::Custom(CHART_X_LABELS), + AttrValue::Payload(PropPayload::Vec(vec![ + start_label, + PropValue::Str(current_time.format("%H:%M:%S").to_string()), + ])), + ); + + let labels = self + .query(Attribute::Custom(CHART_Y_LABELS)) + .unwrap() + .unwrap_payload() + .unwrap_vec(); + let start_label = labels[0].clone(); + self.attr( + Attribute::Custom(CHART_Y_LABELS), + AttrValue::Payload(PropPayload::Vec(vec![ + start_label, + PropValue::Str(format!("{:.0}%", self.max_y.ceil())), + ])), + ); + + self.attr( + Attribute::Dataset, + AttrValue::Payload(PropPayload::Vec(vec![PropValue::Dataset(self.dataset.clone())])), + ); + } + _ => self.component.attr(query, attr), + } + } + + fn state(&self) -> State { + self.component.state() + } + + fn perform(&mut self, cmd: Cmd) -> CmdResult { + self.component.perform(cmd) + } +} + +impl Component for CpuUsage { + fn on(&mut self, _ev: Event) -> Option { + Some(Msg::None) + } +} + +// START MEMORY + +struct MemoryUsage { + component: Chart, + rss_dataset: Dataset, + vsz_dataset: Dataset, + max_y: u64, +} + +impl Default for MemoryUsage { + fn default() -> Self { + let current_time = Local::now(); + let cur_time_str = current_time.format("%H:%M:%S").to_string(); + let start = current_time.checked_sub_signed(TimeDelta::minutes(5)).unwrap(); + let start_str = start.format("%H:%M:%S").to_string(); + Self { + component: Chart::default() + .disabled(false) + .title("Memory", Alignment::Left) + .borders(Borders::default()) + .x_style(Style::default().fg(Color::LightBlue)) + .x_title("") + .x_labels(&[&start_str, &cur_time_str]) + .x_bounds((start.timestamp() as f64, current_time.timestamp() as f64)) + .y_style(Style::default().fg(Color::Yellow)) + .y_title("") + .y_bounds((0.0, 1.0)) + .y_labels(&["0", "1"]), + rss_dataset: Dataset::default() + .name("RSS") + .graph_type(GraphType::Line) + .marker(Marker::Braille) + .style(Style::default().fg(Color::Cyan)) + .data(Vec::new()), + vsz_dataset: Dataset::default() + .name("VSZ") + .graph_type(GraphType::Line) + .marker(Marker::Braille) + .style(Style::default().fg(Color::Green)) + .data(Vec::new()), + max_y: 1, + } + } +} +impl MockComponent for MemoryUsage { + fn view(&mut self, frame: &mut Frame, area: Rect) { + self.component.view(frame, area); + } + + fn query(&self, attr: Attribute) -> Option { + self.component.query(attr) + } + + fn attr(&mut self, query: Attribute, attr: AttrValue) { + match query { + Attribute::Custom(UPDATE_MEMORY_DATA) => { + // Update data + let mut current_rss_data = self.rss_dataset.get_data().to_vec(); + // update data + let current_time = Local::now(); + let since_epoch = current_time.timestamp() as f64; + let usage = attr.unwrap_payload().unwrap_tup2(); + + let rss_usage = usage.0.unwrap_u64(); + if rss_usage > self.max_y { + self.max_y = rss_usage; + self.attr( + Attribute::Custom(CHART_Y_BOUNDS), + AttrValue::Payload(PropPayload::Tup2(( + PropValue::F64(0.0), + PropValue::F64(self.max_y as f64), + ))), + ); + } + current_rss_data.push((since_epoch, rss_usage as f64)); + self.rss_dataset = self.rss_dataset.clone().data(current_rss_data); + + let mut current_vsz_data = self.vsz_dataset.get_data().to_vec(); + let vsz_usage = usage.1.unwrap_u64(); + if vsz_usage > self.max_y { + self.max_y = vsz_usage; + self.attr( + Attribute::Custom(CHART_Y_BOUNDS), + AttrValue::Payload(PropPayload::Tup2(( + PropValue::F64(0.0), + PropValue::F64(self.max_y as f64), + ))), + ); + } + current_vsz_data.push((since_epoch, vsz_usage as f64)); + self.vsz_dataset = self.vsz_dataset.clone().data(current_vsz_data); + // update bounds + let start_time = self + .query(Attribute::Custom(CHART_X_BOUNDS)) + .unwrap() + .unwrap_payload() + .unwrap_tup2() + .0 + .unwrap_f64(); + self.attr( + Attribute::Custom(CHART_X_BOUNDS), + AttrValue::Payload(PropPayload::Tup2(( + PropValue::F64(start_time), + PropValue::F64(since_epoch), + ))), + ); + //update labels + let labels = self + .query(Attribute::Custom(CHART_X_LABELS)) + .unwrap() + .unwrap_payload() + .unwrap_vec(); + let start_label = labels[0].clone(); + self.attr( + Attribute::Custom(CHART_X_LABELS), + AttrValue::Payload(PropPayload::Vec(vec![ + start_label, + PropValue::Str(current_time.format("%H:%M:%S").to_string()), + ])), + ); + + let labels = self + .query(Attribute::Custom(CHART_Y_LABELS)) + .unwrap() + .unwrap_payload() + .unwrap_vec(); + let start_label = labels[0].clone(); + self.attr( + Attribute::Custom(CHART_Y_LABELS), + AttrValue::Payload(PropPayload::Vec(vec![ + start_label, + PropValue::Str(ByteSize::b(self.max_y).display().iec().to_string()), + ])), + ); + + self.attr( + Attribute::Dataset, + AttrValue::Payload(PropPayload::Vec(vec![ + PropValue::Dataset(self.rss_dataset.clone()), + PropValue::Dataset(self.vsz_dataset.clone()), + ])), + ); + } + _ => self.component.attr(query, attr), + } + } + + fn state(&self) -> State { + self.component.state() + } + + fn perform(&mut self, cmd: Cmd) -> CmdResult { + self.component.perform(cmd) + } +} + +impl Component for MemoryUsage { + fn on(&mut self, _ev: Event) -> Option { + Some(Msg::None) + } +} + +// START GPU + +const PALETTE: [Color; 8] = [ + Color::Cyan, + Color::Green, + Color::Red, + Color::Blue, + Color::Magenta, + Color::Yellow, + Color::Gray, + Color::LightYellow, +]; +struct GpuUsage { + component: Chart, + datasets: Vec, + max_y: u64, +} + +impl Default for GpuUsage { + fn default() -> Self { + let current_time = Local::now(); + let cur_time_str = current_time.format("%H:%M:%S").to_string(); + let start = current_time.checked_sub_signed(TimeDelta::minutes(5)).unwrap(); + let start_str = start.format("%H:%M:%S").to_string(); + Self { + component: Chart::default() + .disabled(false) + .title("GPU", Alignment::Left) + .borders(Borders::default()) + .x_style(Style::default().fg(Color::LightBlue)) + .x_title("") + .x_labels(&[&start_str, &cur_time_str]) + .x_bounds((start.timestamp() as f64, current_time.timestamp() as f64)) + .y_style(Style::default().fg(Color::Yellow)) + .y_title("") + .y_bounds((0.0, 1.0)) + .y_labels(&["0", "1"]), + datasets: Vec::new(), + max_y: 1, + } + } +} +impl MockComponent for GpuUsage { + fn view(&mut self, frame: &mut Frame, area: Rect) { + self.component.view(frame, area); + } + + fn query(&self, attr: Attribute) -> Option { + self.component.query(attr) + } + + fn attr(&mut self, query: Attribute, attr: AttrValue) { + match query { + Attribute::Custom(UPDATE_GPU_DATA) => { + let payload = attr.unwrap_payload().unwrap_vec(); + let current_time = Local::now(); + let since_epoch = current_time.timestamp() as f64; + for (i, gpu_payload) in payload.iter().enumerate() { + // Update data + let mut current_data = self + .datasets + .get(i) + .map(|d| d.get_data().to_vec()) + .unwrap_or(Vec::new()); + // update data + let gpu_usage = gpu_payload.clone().unwrap_u64(); + if gpu_usage > self.max_y { + self.max_y = gpu_usage; + self.attr( + Attribute::Custom(CHART_Y_BOUNDS), + AttrValue::Payload(PropPayload::Tup2(( + PropValue::F64(0.0), + PropValue::F64(self.max_y as f64), + ))), + ); + } + current_data.push((since_epoch, gpu_usage as f64)); + match self.datasets.get_mut(i) { + Some(ds) => { + self.datasets[i] = ds.clone().data(current_data); + } + None => self.datasets.push( + Dataset::default() + .name(format!("GPU {i}")) + .graph_type(GraphType::Line) + .marker(Marker::Braille) + .style(Style::default().fg(PALETTE[i % 8])) + .data(current_data), + ), + } + } + // update bounds + let start_time = self + .query(Attribute::Custom(CHART_X_BOUNDS)) + .unwrap() + .unwrap_payload() + .unwrap_tup2() + .0 + .unwrap_f64(); + self.attr( + Attribute::Custom(CHART_X_BOUNDS), + AttrValue::Payload(PropPayload::Tup2(( + PropValue::F64(start_time), + PropValue::F64(since_epoch), + ))), + ); + //update labels + let labels = self + .query(Attribute::Custom(CHART_X_LABELS)) + .unwrap() + .unwrap_payload() + .unwrap_vec(); + let start_label = labels[0].clone(); + self.attr( + Attribute::Custom(CHART_X_LABELS), + AttrValue::Payload(PropPayload::Vec(vec![ + start_label, + PropValue::Str(current_time.format("%H:%M:%S").to_string()), + ])), + ); + + let labels = self + .query(Attribute::Custom(CHART_Y_LABELS)) + .unwrap() + .unwrap_payload() + .unwrap_vec(); + let start_label = labels[0].clone(); + self.attr( + Attribute::Custom(CHART_Y_LABELS), + AttrValue::Payload(PropPayload::Vec(vec![ + start_label, + PropValue::Str(ByteSize::b(self.max_y).display().iec().to_string()), + ])), + ); + + self.attr( + Attribute::Dataset, + AttrValue::Payload(PropPayload::Vec( + self.datasets.iter().map(|d| PropValue::Dataset(d.clone())).collect(), + )), + ); + } + _ => self.component.attr(query, attr), + } + } + + fn state(&self) -> State { + self.component.state() + } + + fn perform(&mut self, cmd: Cmd) -> CmdResult { + self.component.perform(cmd) + } +} + +impl Component for GpuUsage { + fn on(&mut self, _ev: Event) -> Option { + Some(Msg::None) + } +} diff --git a/coman/src/components/workload_list.rs b/coman/src/components/workload_list.rs index 1e609ac..3e83fa7 100644 --- a/coman/src/components/workload_list.rs +++ b/coman/src/components/workload_list.rs @@ -13,7 +13,7 @@ use crate::{ messages::{JobMsg, Msg}, user_events::{CscsEvent, JobEvent, UserEvent}, }, - cscs::api_client::types::Job, + cscs::api_client::types::{Job, JobStatus}, }; #[derive(MockComponent)] @@ -111,6 +111,23 @@ impl Component for WorkloadList { } CmdResult::None } + Event::Keyboard(KeyEvent { + code: Key::Char('r'), + modifiers: KeyModifiers::NONE, + }) => { + if let State::One(StateValue::Usize(index)) = self.state() + && !self.jobs.is_empty() + { + let job = self.jobs[index].clone(); + if job.status != JobStatus::Running { + return Some(Msg::Error( + "Can only get resource usage for jobs in 'Running' state".to_string(), + )); + } + return Some(Msg::Job(JobMsg::ResourceUsage(job.id))); + } + CmdResult::None + } _ => CmdResult::None, }; Some(Msg::None) diff --git a/coman/src/cscs/api_client/types.rs b/coman/src/cscs/api_client/types.rs index 234347d..843c286 100644 --- a/coman/src/cscs/api_client/types.rs +++ b/coman/src/cscs/api_client/types.rs @@ -141,6 +141,7 @@ pub enum JobStatus { Cancelled, Failed, Timeout, + Requeued, } impl From for JobStatus { fn from(value: String) -> Self { @@ -151,6 +152,7 @@ impl From for JobStatus { "CANCELLED" => JobStatus::Cancelled, "PENDING" => JobStatus::Pending, "TIMEOUT" => JobStatus::Timeout, + "REQUEUED" => JobStatus::Requeued, other => panic!("got job status: {}", other), } } diff --git a/coman/src/cscs/cli.rs b/coman/src/cscs/cli.rs index d5fa986..17f65b7 100644 --- a/coman/src/cscs/cli.rs +++ b/coman/src/cscs/cli.rs @@ -143,12 +143,14 @@ pub(crate) async fn cli_cscs_job_resource_usage( platform: Option, ) -> Result<()> { let job_id = maybe_job_id_from_name(job, system.clone(), platform.clone()).await?; - let result = cscs_resource_usage(job_id, system).await?; + let result = cscs_resource_usage(job_id, system) + .await + .wrap_err("failed to fetch resource usage")?; println!("CPU: {:.1}%", result.cpu); println!( - "Memory: RSS {:.1}, VSS: {:.1}", + "Memory: RSS {:.1}, VSZ: {:.1}", ByteSize::b(result.rss).display().iec(), - ByteSize::b(result.vss).display().iec(), + ByteSize::b(result.vsz).display().iec(), ); println!( "GPU: {}", @@ -264,7 +266,7 @@ pub(crate) async fn cli_cscs_file_download( while !transfer_done { if let Some(job) = cscs_job_details(job_data.0, system.clone(), platform.clone()).await? { match job.status { - JobStatus::Pending | JobStatus::Running => {} + JobStatus::Pending | JobStatus::Requeued | JobStatus::Running => {} JobStatus::Finished => transfer_done = true, JobStatus::Cancelled | JobStatus::Failed => return Err(eyre!("transfer job failed")), JobStatus::Timeout => return Err(eyre!("transfer job timed out")), diff --git a/coman/src/cscs/handlers.rs b/coman/src/cscs/handlers.rs index 68d74b8..bd90313 100644 --- a/coman/src/cscs/handlers.rs +++ b/coman/src/cscs/handlers.rs @@ -32,7 +32,7 @@ use super::api_client::client::{EdfSpec, ScriptSpec}; use crate::{ cli::{ app::COMAN_VERSION, - rpc::{ComanRPCClient, ResourceUsage}, + rpc::{COMAN_RPC_ALPN, ComanRPCClient, ResourceUsage}, }, config::{ComputePlatform, Config, get_data_dir}, cscs::{ @@ -174,7 +174,7 @@ pub async fn cscs_job_log( pub async fn cscs_resource_usage(job_id: i64, system: Option) -> Result { let endpoint_id = get_endpoint_id(job_id, system).await?; - let alpn: Vec = b"/coman/rpc".to_vec(); + let alpn: Vec = COMAN_RPC_ALPN.to_vec(); let secret_key = SecretKey::generate(&mut rand::rng()); let endpoint = Endpoint::builder().secret_key(secret_key).bind().await?; @@ -339,7 +339,7 @@ async fn garbage_collect_ssh(api_client: &CscsApi, current_system: &str) -> Resu let jobs = api_client.list_jobs(current_system, None).await?; let job_entries: HashSet<_> = jobs .iter() - .filter(|j| j.status == JobStatus::Pending || j.status == JobStatus::Running) + .filter(|j| j.status == JobStatus::Pending || j.status == JobStatus::Running || j.status == JobStatus::Requeued) .map(|j| format!("{}_{}", current_system, j.id)) .collect(); let outdated_endpoints: Vec<_> = std::fs::read_dir(&data_dir)? diff --git a/coman/src/cscs/ports.rs b/coman/src/cscs/ports.rs index 6b3a59e..ecb1486 100644 --- a/coman/src/cscs/ports.rs +++ b/coman/src/cscs/ports.rs @@ -18,7 +18,7 @@ use crate::{ api_client::types::JobStatus, handlers::{ cscs_file_delete, cscs_file_download, cscs_file_list, cscs_job_cancel, cscs_job_details, cscs_job_list, - cscs_job_log, cscs_system_list, file_system_roots, + cscs_job_log, cscs_resource_usage, cscs_system_list, file_system_roots, }, oauth2::{ACCESS_TOKEN_SECRET_NAME, REFRESH_TOKEN_SECRET_NAME, finish_cscs_device_login}, }, @@ -220,6 +220,55 @@ impl PollAsync for AsyncJobLogPort { } } +pub enum JobResourceUsageAction { + Job(usize), + Stop, +} + +/// This port handles polling the logs of a CSCS job +pub(crate) struct AsyncJobResourceUsagePort { + receiver: mpsc::Receiver, + current_job: Option, +} + +impl AsyncJobResourceUsagePort { + pub fn new(receiver: mpsc::Receiver) -> Self { + Self { + receiver, + current_job: None, + } + } +} +#[tuirealm::async_trait] +impl PollAsync for AsyncJobResourceUsagePort { + async fn poll(&mut self) -> ListenerResult>> { + if self.receiver.is_closed() { + return Ok(Some(Event::None)); + } + if !self.receiver.is_empty() + && let Some(val) = self.receiver.recv().await + { + match val { + JobResourceUsageAction::Job(jobid) => { + self.current_job = Some(jobid); + } + JobResourceUsageAction::Stop => { + self.current_job = None; + } + } + } + if let Some(job_id) = self.current_job { + match cscs_resource_usage(job_id as i64, None).await { + Ok(ru) => Ok(Some(Event::User(UserEvent::Cscs(CscsEvent::GotJobResourceUsage(ru))))), + Err(e) => Ok(Some(Event::User(UserEvent::Status(StatusEvent::Warning(format!( + "couldn't get resource usage: {e:?}" + )))))), + } + } else { + Ok(Some(Event::None)) + } + } +} #[derive(Debug)] pub enum BackgroundTask { ListPaths(PathBuf), @@ -271,7 +320,7 @@ async fn download_file( while !transfer_done { if let Some(job) = cscs_job_details(job_data.0, None, None).await? { match job.status { - JobStatus::Pending | JobStatus::Running => { + JobStatus::Pending | JobStatus::Running | JobStatus::Requeued => { event_tx .send(UserEvent::Status(StatusEvent::Info( "waiting for transfer job".to_owned(), diff --git a/coman/src/main.rs b/coman/src/main.rs index 9e9ecf1..08ba62e 100644 --- a/coman/src/main.rs +++ b/coman/src/main.rs @@ -39,8 +39,8 @@ use crate::{ cli_cscs_job_start, cli_cscs_login, cli_cscs_port_forward, cli_cscs_set_system, cli_cscs_system_list, }, ports::{ - AsyncBackgroundTaskPort, AsyncFetchWorkloadsPort, AsyncJobLogPort, AsyncSelectSystemPort, - AsyncUserEventPort, + AsyncBackgroundTaskPort, AsyncFetchWorkloadsPort, AsyncJobLogPort, AsyncJobResourceUsagePort, + AsyncSelectSystemPort, AsyncUserEventPort, }, }, errors::AsyncErrorPort, @@ -180,6 +180,7 @@ fn run_tui(tick_rate: f64) -> Result<()> { let (select_system_tx, select_system_rx) = mpsc::channel(100); let (job_log_tx, job_log_rx) = mpsc::channel(100); + let (job_resource_usage_tx, job_resource_usage_rx) = mpsc::channel(100); let (background_task_tx, background_task_rx) = mpsc::channel(100); let (user_event_tx, user_event_rx) = mpsc::channel(100); let (error_tx, error_rx) = mpsc::channel(100); @@ -200,6 +201,11 @@ fn run_tui(tick_rate: f64) -> Result<()> { 1, ) .add_async_port(Box::new(AsyncJobLogPort::new(job_log_rx)), Duration::from_secs(3), 1) + .add_async_port( + Box::new(AsyncJobResourceUsagePort::new(job_resource_usage_rx)), + Duration::from_secs(1), + 1, + ) .add_async_port( Box::new(AsyncBackgroundTaskPort::new(background_task_rx, user_event_tx.clone())), Duration::default(), @@ -321,6 +327,7 @@ fn run_tui(tick_rate: f64) -> Result<()> { error_tx, select_system_tx, job_log_tx, + job_resource_usage_tx, user_event_tx, background_task_tx, );