Skip to content

Commit b9c0669

Browse files
committed
add getting system metrics to rpc
1 parent 5d5822a commit b9c0669

5 files changed

Lines changed: 139 additions & 13 deletions

File tree

Cargo.lock

Lines changed: 89 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coman/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ tarpc = { version = "0.37.0", features = [
9191
"tokio1",
9292
] }
9393
tokio-duplex = "1.0.1"
94+
sysinfo = "0.38.0"
95+
nvml-wrapper = "0.11.0"
9496

9597
[build-dependencies]
9698
anyhow = "1.0.90"

coman/src/cli/rpc.rs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,27 @@
11
use futures::StreamExt;
22
use iroh::protocol::ProtocolHandler;
3-
use tarpc::{
4-
serde_transport as transport, server, server::Channel, tokio_serde::formats::Bincode,
5-
tokio_util::codec::LengthDelimitedCodec,
6-
};
3+
use nvml_wrapper::Nvml;
4+
use serde::{Deserialize, Serialize};
5+
use sysinfo::System;
6+
use tarpc::server::Channel;
7+
use tarpc::tokio_serde::formats::Bincode;
8+
use tarpc::tokio_util::codec::LengthDelimitedCodec;
9+
use tarpc::{serde_transport as transport, server};
710
use tokio_duplex::Duplex;
811

912
use crate::cli::app::COMAN_VERSION;
1013

14+
#[derive(Debug, Clone, Serialize, Deserialize)]
15+
pub struct ResourceUsage {
16+
pub cpu: f32,
17+
pub rss: u64,
18+
pub gpu: u64,
19+
}
20+
1121
#[tarpc::service]
1222
pub trait ComanRPC {
1323
async fn version() -> String;
24+
async fn resource_usage() -> ResourceUsage;
1425
}
1526
#[derive(Debug, Clone)]
1627
struct RpcServer;
@@ -19,6 +30,24 @@ impl ComanRPC for RpcServer {
1930
async fn version(self, _: tarpc::context::Context) -> String {
2031
COMAN_VERSION.to_string()
2132
}
33+
34+
async fn resource_usage(self, _context: ::tarpc::context::Context) -> ResourceUsage {
35+
let mut sys = System::new_all();
36+
sys.refresh_all();
37+
let mut cpu_usage = 0.0;
38+
for cpu in sys.cpus() {
39+
cpu_usage += cpu.cpu_usage();
40+
}
41+
cpu_usage /= sys.cpus().len() as f32;
42+
let nvml = Nvml::init().unwrap();
43+
let device = nvml.device_by_index(0).unwrap();
44+
let memory_info = device.memory_info().unwrap();
45+
ResourceUsage {
46+
cpu: cpu_usage,
47+
rss: sys.used_memory(),
48+
gpu: memory_info.used,
49+
}
50+
}
2251
}
2352

2453
#[derive(Debug, Default)]

coman/src/cscs/cli.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,10 @@ pub(crate) async fn cli_cscs_job_resource_usage(
142142
platform: Option<ComputePlatform>,
143143
) -> Result<()> {
144144
let job_id = maybe_job_id_from_name(job, system.clone(), platform.clone()).await?;
145-
println!("running port forward for job {job_id}");
146-
cscs_resource_usage(job_id, system).await
145+
let result = cscs_resource_usage(job_id, system).await?;
146+
println!("CPU: {}, RSS: {}, GPU: {}", result.cpu, result.rss, result.gpu);
147+
148+
Ok(())
147149
}
148150

149151
#[allow(clippy::too_many_arguments)]

coman/src/cscs/handlers.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ use tokio_util::codec::LengthDelimitedCodec;
3030

3131
use super::api_client::client::{EdfSpec, ScriptSpec};
3232
use crate::{
33-
cli::{app::COMAN_VERSION, rpc::ComanRPCClient},
33+
cli::{
34+
app::COMAN_VERSION,
35+
rpc::{ComanRPCClient, ResourceUsage},
36+
},
3437
config::{ComputePlatform, Config, get_data_dir},
3538
cscs::{
3639
api_client::{
@@ -168,10 +171,10 @@ pub async fn cscs_job_log(
168171
}
169172
}
170173

171-
pub async fn cscs_resource_usage(job_id: i64, system: Option<String>) -> Result<()> {
174+
pub async fn cscs_resource_usage(job_id: i64, system: Option<String>) -> Result<ResourceUsage> {
172175
let endpoint_id = get_endpoint_id(job_id, system).await?;
173176

174-
let alpn: Vec<u8> = "/coman/rpc".to_string().into_bytes();
177+
let alpn: Vec<u8> = b"/coman/rpc".to_vec();
175178
let secret_key = SecretKey::generate(&mut rand::rng());
176179
let endpoint = Endpoint::builder().secret_key(secret_key).bind().await?;
177180

@@ -183,10 +186,11 @@ pub async fn cscs_resource_usage(job_id: i64, system: Option<String>) -> Result<
183186
let framed = codec_builder.new_framed(combined);
184187
let transport = serde_transport::new(framed, Bincode::default());
185188
let client = ComanRPCClient::new(client::Config::default(), transport);
186-
let result = client.spawn().version(context::current()).await?;
187-
let _ = dbg!(result);
188-
189-
Ok(())
189+
client
190+
.spawn()
191+
.resource_usage(context::current())
192+
.await
193+
.wrap_err("couldn't get resource usage from remote")
190194
}
191195
Err(e) => Err(e).wrap_err("couldn't establish tunnel to remote"),
192196
}

0 commit comments

Comments
 (0)