Skip to content

Add API for monitoring exited tasks #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ required-features = ["executable"]

[dependencies]
libc = "0.2.139"
netlink-sys = "0.8.3"
netlink-sys = "0.8.6"
thiserror = "1.0.38"
log = "0.4.17"
env_logger = { version = "0.10.0", optional = true }
Expand Down
121 changes: 106 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@

pub use c_headers::taskstats as __bindgen_taskstats;
use c_headers::{
__u16, __u32, __u64, __u8, TASKSTATS_CMD_ATTR_PID, TASKSTATS_CMD_ATTR_TGID, TASKSTATS_CMD_GET,
__u16, __u32, __u64, __u8, TASKSTATS_CMD_ATTR_DEREGISTER_CPUMASK, TASKSTATS_CMD_ATTR_PID,
TASKSTATS_CMD_ATTR_REGISTER_CPUMASK, TASKSTATS_CMD_ATTR_TGID, TASKSTATS_CMD_GET,
TASKSTATS_GENL_NAME, TASKSTATS_TYPE_AGGR_PID, TASKSTATS_TYPE_AGGR_TGID, TASKSTATS_TYPE_NULL,
TASKSTATS_TYPE_PID, TASKSTATS_TYPE_STATS, TASKSTATS_TYPE_TGID,
};
use log::{debug, warn};
use netlink::Netlink;
use netlink::NlPayload;
use std::mem;
use std::slice;
use std::{mem, slice};
use thiserror::Error;

/// Errors possibly returned by `Client`
Expand Down Expand Up @@ -93,12 +93,7 @@
/// * when kernel responded error
/// * when the returned data couldn't be interpreted
pub fn pid_stats(&self, tid: u32) -> Result<TaskStats> {
self.netlink.send_cmd(
self.ts_family_id,
TASKSTATS_CMD_GET as u8,
TASKSTATS_CMD_ATTR_PID as u16,
tid.as_buf(),
)?;
self.send(TASKSTATS_CMD_ATTR_PID as u16, tid.as_buf())?;

let resp = self.netlink.recv_response()?;
for na in resp.payload_as_nlattrs() {
Expand Down Expand Up @@ -138,12 +133,7 @@
/// * when kernel responded error
/// * when the returned data couldn't be interpreted
pub fn tgid_stats(&self, tgid: u32) -> Result<TaskStats> {
self.netlink.send_cmd(
self.ts_family_id,
TASKSTATS_CMD_GET as u8,
TASKSTATS_CMD_ATTR_TGID as u16,
tgid.as_buf(),
)?;
self.send(TASKSTATS_CMD_ATTR_TGID as u16, tgid.as_buf())?;

let resp = self.netlink.recv_response()?;
for na in resp.payload_as_nlattrs() {
Expand All @@ -168,6 +158,107 @@
"no TASKSTATS_TYPE_STATS found in response".to_string(),
))
}

/// Register listener with the specific cpumask
///
/// # Arguments
/// * `cpu_mask` - cpumask is specified as an ascii string of comma-separated cpu ranges e.g.
/// to listen to exit data from cpus 1,2,3,5,7,8 the cpumask would be "1-3,5,7-8".
pub fn register_cpumask(&self, cpu_mask: &str) -> Result<()> {
self.send(
TASKSTATS_CMD_ATTR_REGISTER_CPUMASK as u16,
cpu_mask.as_bytes(),
)?;
Ok(())
}

/// Deregister listener with the specific cpumask
/// If userspace forgets to deregister interest in cpus before closing the listening socket,
/// the kernel cleans up its interest set over time. However, for the sake of efficiency,
/// an explicit deregistration is advisable.
///
/// # Arguments
/// * `cpu_mask` - cpumask is specified as an ascii string of comma-separated cpu ranges e.g.
/// to listen to exit data from cpus 1,2,3,5,7,8 the cpumask would be "1-3,5,7-8".
pub fn deregister_cpumask(&self, cpu_mask: &str) -> Result<()> {
self.send(
TASKSTATS_CMD_ATTR_DEREGISTER_CPUMASK as u16,
cpu_mask.as_bytes(),
)?;
Ok(())
}

/// Listen registered cpumask's.
/// If no messages are available at the socket, the receive call
/// wait for a message to arrive, unless the socket is nonblocking.
///
/// # Return
/// * `Ok(Vec<TaskStats>)`: vector with stats messages. If the current task is NOT the last
/// one in its thread group, only one message is returned in the vector.
/// However, if it is the last task, an additional element containing the per-thread
/// group ID (tgid) statistics is also included. This additional element sums up
/// the statistics for all threads within the thread group, both past and present
pub fn listen_registered(&self) -> Result<Vec<TaskStats>> {
let resp = self.netlink.recv_response()?;
let mut stats_vec = Vec::new();

for na in resp.payload_as_nlattrs() {
match na.header.nla_type as u32 {
TASKSTATS_TYPE_NULL => break,
TASKSTATS_TYPE_AGGR_PID | TASKSTATS_TYPE_AGGR_TGID => {
for inner in na.payload_as_nlattrs() {
match inner.header.nla_type as u32 {
TASKSTATS_TYPE_PID => debug!("Received TASKSTATS_TYPE_PID"),
TASKSTATS_TYPE_TGID => debug!("Received TASKSTATS_TYPE_TGID"),
TASKSTATS_TYPE_STATS => {
stats_vec.push(TaskStats::from(inner.payload()));
}
unknown => println!("Skipping unknown nla_type: {}", unknown),
}
}
}
unknown => println!("Skipping unknown nla_type: {}", unknown),
}
}
if !stats_vec.is_empty() {
return Ok(stats_vec);
}
Err(Error::Unknown(
"no TASKSTATS_TYPE_STATS found in response".to_string(),
))
}

/// Set receiver buffer size in bytes (SO_RCVBUF socket option, see socket(7))
///
/// # Arguments
/// * `payload` - buffer size in bytes. The kernel doubles this value
/// (to allow space for bookkeeping overhead). The default value is set by the
/// /proc/sys/net/core/rmem_default file, and the maximum allowed value is set by the
/// /proc/sys/net/core/rmem_max file. The minimum (doubled) value for this option is 256.
pub fn set_rx_buf_sz<T>(&self, payload: T) -> Result<()> {
self.netlink
.set_rx_buf_sz(payload)
.map_err(|err| err.into())
}

/// Get receiver buffer size in bytes (SO_RCVBUF socket option, see socket(7))
///
/// # Return
/// * `usize` buffer size in bytes.
/// Kernel returns doubled value, that have been set using [set_rx_buf_sz]
pub fn get_rx_buf_sz(&self) -> Result<usize> {
self.netlink.get_rx_buf_sz().map_err(|err| err.into())
}

pub fn send(&self, taskstats_cmd: u16, data: &[u8]) -> Result<()> {
self.netlink.send_cmd(
self.ts_family_id,
TASKSTATS_CMD_GET as u8,
taskstats_cmd,
data,
)?;
Ok(())
}
}

trait AsBuf<T> {
Expand Down Expand Up @@ -282,34 +373,34 @@
// Automatically generated by tools/gen_layout_test.sh
assert_eq!(328, std::mem::size_of::<taskstats>());
assert_eq!(0, unsafe {
&(*(std::ptr::null::<taskstats>())).version as *const _ as usize

Check warning on line 376 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

dereferencing a null pointer
});
assert_eq!(4, unsafe {
&(*(std::ptr::null::<taskstats>())).ac_exitcode as *const _ as usize

Check warning on line 379 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

dereferencing a null pointer
});
assert_eq!(8, unsafe {
&(*(std::ptr::null::<taskstats>())).ac_flag as *const _ as usize

Check warning on line 382 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

dereferencing a null pointer
});
assert_eq!(9, unsafe {
&(*(std::ptr::null::<taskstats>())).ac_nice as *const _ as usize

Check warning on line 385 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

dereferencing a null pointer
});
assert_eq!(16, unsafe {
&(*(std::ptr::null::<taskstats>())).cpu_count as *const _ as usize

Check warning on line 388 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

dereferencing a null pointer
});
assert_eq!(24, unsafe {
&(*(std::ptr::null::<taskstats>())).cpu_delay_total as *const _ as usize

Check warning on line 391 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

dereferencing a null pointer
});
assert_eq!(32, unsafe {
&(*(std::ptr::null::<taskstats>())).blkio_count as *const _ as usize

Check warning on line 394 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

dereferencing a null pointer
});
assert_eq!(40, unsafe {
&(*(std::ptr::null::<taskstats>())).blkio_delay_total as *const _ as usize

Check warning on line 397 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

dereferencing a null pointer
});
assert_eq!(48, unsafe {
&(*(std::ptr::null::<taskstats>())).swapin_count as *const _ as usize

Check warning on line 400 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

dereferencing a null pointer
});
assert_eq!(56, unsafe {
&(*(std::ptr::null::<taskstats>())).swapin_delay_total as *const _ as usize

Check warning on line 403 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build

dereferencing a null pointer
});
assert_eq!(64, unsafe {
&(*(std::ptr::null::<taskstats>())).cpu_run_real_total as *const _ as usize
Expand Down
8 changes: 8 additions & 0 deletions src/netlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ impl Netlink<nl::Socket> {
mypid: process::id(),
})
}

pub fn set_rx_buf_sz<T>(&self, payload: T) -> Result<()> {
self.sock.set_rx_buf_sz(payload).map_err(|err| err.into())
}

pub fn get_rx_buf_sz(&self) -> Result<usize> {
self.sock.get_rx_buf_sz().map_err(|err| err.into())
}
}

impl<S: NlSocket> Netlink<S> {
Expand Down
Loading