Skip to content

Commit dca7872

Browse files
committed
add getting system metrics to rpc
1 parent 5d5822a commit dca7872

7 files changed

Lines changed: 159 additions & 12 deletions

File tree

.github/workflows/pr-review.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,4 @@ jobs:
6666
VCS__PIPELINE__PULL_NUMBER: ${{ github.event.number}}
6767
VCS__HTTP_CLIENT__API_URL: "https://api.github.com"
6868
VCS__HTTP_CLIENT__API_TOKEN: ${{ secrets.GITHUB_TOKEN }}
69+
REVIEW__IGNORE_CHANGES: '["Cargo.lock"]'

Cargo.lock

Lines changed: 89 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coman/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ tarpc = { version = "0.37.0", features = [
9191
"tokio1",
9292
] }
9393
tokio-duplex = "1.0.1"
94+
sysinfo = "0.38.0"
95+
nvml-wrapper = "0.11.0"
9496

9597
[build-dependencies]
9698
anyhow = "1.0.90"

coman/src/cli/exec.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use iroh::{
77
endpoint::ConnectionError,
88
protocol::{ProtocolHandler, Router},
99
};
10-
use nom::AsBytes;
1110
use pid1::Pid1Settings;
1211
use rust_supervisor::{ChildType, Supervisor, SupervisorConfig};
1312
use tokio::{io::AsyncWriteExt, net::TcpStream};
@@ -127,7 +126,7 @@ async fn port_forward() -> Result<()> {
127126

128127
// add rpc server
129128
let rpc_handler = RpcHandler;
130-
builder = builder.accept(b"/coman/rpc/".as_bytes(), rpc_handler);
129+
builder = builder.accept(b"/coman/rpc", rpc_handler);
131130
let _router = builder.spawn();
132131
println!("port forwarding started");
133132

coman/src/cli/rpc.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use futures::StreamExt;
22
use iroh::protocol::ProtocolHandler;
3+
use nvml_wrapper::Nvml;
4+
use serde::{Deserialize, Serialize};
5+
use sysinfo::System;
36
use tarpc::{
47
serde_transport as transport, server, server::Channel, tokio_serde::formats::Bincode,
58
tokio_util::codec::LengthDelimitedCodec,
@@ -8,9 +11,17 @@ use tokio_duplex::Duplex;
811

912
use crate::cli::app::COMAN_VERSION;
1013

14+
#[derive(Debug, Clone, Serialize, Deserialize)]
15+
pub struct ResourceUsage {
16+
pub cpu: f32,
17+
pub rss: u64,
18+
pub gpu: Option<u64>,
19+
}
20+
1121
#[tarpc::service]
1222
pub trait ComanRPC {
1323
async fn version() -> String;
24+
async fn resource_usage() -> ResourceUsage;
1425
}
1526
#[derive(Debug, Clone)]
1627
struct RpcServer;
@@ -19,6 +30,40 @@ impl ComanRPC for RpcServer {
1930
async fn version(self, _: tarpc::context::Context) -> String {
2031
COMAN_VERSION.to_string()
2132
}
33+
34+
async fn resource_usage(self, _context: ::tarpc::context::Context) -> ResourceUsage {
35+
let mut sys = System::new_all();
36+
sys.refresh_all();
37+
let mut cpu_usage = 0.0;
38+
for cpu in sys.cpus() {
39+
cpu_usage += cpu.cpu_usage();
40+
}
41+
cpu_usage /= sys.cpus().len() as f32;
42+
let gpu_usage = match Nvml::init() {
43+
Ok(nvml) => match nvml.device_by_index(0) {
44+
Ok(device) => match device.memory_info() {
45+
Ok(memory_info) => Some(memory_info.used),
46+
Err(e) => {
47+
println!("Couldn't get GPU memory info: {e:?}");
48+
None
49+
}
50+
},
51+
Err(e) => {
52+
println!("couldn't load nvidia device 0: {e:?}");
53+
None
54+
}
55+
},
56+
Err(e) => {
57+
println!("Nvidia Device Info not available: {e:?}");
58+
None
59+
}
60+
};
61+
ResourceUsage {
62+
cpu: cpu_usage,
63+
rss: sys.used_memory(),
64+
gpu: gpu_usage,
65+
}
66+
}
2267
}
2368

2469
#[derive(Debug, Default)]

coman/src/cscs/cli.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,15 @@ pub(crate) async fn cli_cscs_job_resource_usage(
142142
platform: Option<ComputePlatform>,
143143
) -> Result<()> {
144144
let job_id = maybe_job_id_from_name(job, system.clone(), platform.clone()).await?;
145-
println!("running port forward for job {job_id}");
146-
cscs_resource_usage(job_id, system).await
145+
let result = cscs_resource_usage(job_id, system).await?;
146+
println!(
147+
"CPU: {}, RSS: {}, GPU: {}",
148+
result.cpu,
149+
result.rss,
150+
result.gpu.map(|g| g.to_string()).unwrap_or("N/A".to_string())
151+
);
152+
153+
Ok(())
147154
}
148155

149156
#[allow(clippy::too_many_arguments)]

coman/src/cscs/handlers.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ use tokio_util::codec::LengthDelimitedCodec;
3030

3131
use super::api_client::client::{EdfSpec, ScriptSpec};
3232
use crate::{
33-
cli::{app::COMAN_VERSION, rpc::ComanRPCClient},
33+
cli::{
34+
app::COMAN_VERSION,
35+
rpc::{ComanRPCClient, ResourceUsage},
36+
},
3437
config::{ComputePlatform, Config, get_data_dir},
3538
cscs::{
3639
api_client::{
@@ -168,10 +171,10 @@ pub async fn cscs_job_log(
168171
}
169172
}
170173

171-
pub async fn cscs_resource_usage(job_id: i64, system: Option<String>) -> Result<()> {
174+
pub async fn cscs_resource_usage(job_id: i64, system: Option<String>) -> Result<ResourceUsage> {
172175
let endpoint_id = get_endpoint_id(job_id, system).await?;
173176

174-
let alpn: Vec<u8> = "/coman/rpc".to_string().into_bytes();
177+
let alpn: Vec<u8> = b"/coman/rpc".to_vec();
175178
let secret_key = SecretKey::generate(&mut rand::rng());
176179
let endpoint = Endpoint::builder().secret_key(secret_key).bind().await?;
177180

@@ -183,10 +186,11 @@ pub async fn cscs_resource_usage(job_id: i64, system: Option<String>) -> Result<
183186
let framed = codec_builder.new_framed(combined);
184187
let transport = serde_transport::new(framed, Bincode::default());
185188
let client = ComanRPCClient::new(client::Config::default(), transport);
186-
let result = client.spawn().version(context::current()).await?;
187-
let _ = dbg!(result);
188-
189-
Ok(())
189+
client
190+
.spawn()
191+
.resource_usage(context::current())
192+
.await
193+
.wrap_err("couldn't get resource usage from remote")
190194
}
191195
Err(e) => Err(e).wrap_err("couldn't establish tunnel to remote"),
192196
}
@@ -517,7 +521,7 @@ async fn inject_coman_squash(
517521
let config = Config::new().unwrap();
518522
let local_squash_path = maybe_download_latest_squash(current_system, &config).await?;
519523
let target = base_path.join("coman.sqsh");
520-
let file_meta = std::fs::metadata(local_squash_path.clone())?;
524+
let file_meta = std::fs::metadata(local_squash_path.clone()).wrap_err("couldn't load coman squash file")?;
521525

522526
#[cfg(target_family = "unix")]
523527
let size = file_meta.size() as usize;

0 commit comments

Comments
 (0)