Skip to content

Commit f2f9064

Browse files
qpmrIlya Byckevich
and
Ilya Byckevich
authored
Add API for monitoring exited tasks (#11)
To be compliant with the taskstats docs, also added methods to adjust the RX socket buffer Co-authored-by: Ilya Byckevich <[email protected]>
1 parent b413461 commit f2f9064

File tree

3 files changed

+115
-16
lines changed

3 files changed

+115
-16
lines changed

Diff for: Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ required-features = ["executable"]
2222

2323
[dependencies]
2424
libc = "0.2.139"
25-
netlink-sys = "0.8.3"
25+
netlink-sys = "0.8.6"
2626
thiserror = "1.0.38"
2727
log = "0.4.17"
2828
env_logger = { version = "0.10.0", optional = true }

Diff for: src/lib.rs

+106-15
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ pub use model::*;
1212

1313
pub use c_headers::taskstats as __bindgen_taskstats;
1414
use c_headers::{
15-
__u16, __u32, __u64, __u8, TASKSTATS_CMD_ATTR_PID, TASKSTATS_CMD_ATTR_TGID, TASKSTATS_CMD_GET,
15+
__u16, __u32, __u64, __u8, TASKSTATS_CMD_ATTR_DEREGISTER_CPUMASK, TASKSTATS_CMD_ATTR_PID,
16+
TASKSTATS_CMD_ATTR_REGISTER_CPUMASK, TASKSTATS_CMD_ATTR_TGID, TASKSTATS_CMD_GET,
1617
TASKSTATS_GENL_NAME, TASKSTATS_TYPE_AGGR_PID, TASKSTATS_TYPE_AGGR_TGID, TASKSTATS_TYPE_NULL,
1718
TASKSTATS_TYPE_PID, TASKSTATS_TYPE_STATS, TASKSTATS_TYPE_TGID,
1819
};
1920
use log::{debug, warn};
2021
use netlink::Netlink;
2122
use netlink::NlPayload;
22-
use std::mem;
23-
use std::slice;
23+
use std::{mem, slice};
2424
use thiserror::Error;
2525

2626
/// Errors possibly returned by `Client`
@@ -93,12 +93,7 @@ impl Client {
9393
/// * when kernel responded error
9494
/// * when the returned data couldn't be interpreted
9595
pub fn pid_stats(&self, tid: u32) -> Result<TaskStats> {
96-
self.netlink.send_cmd(
97-
self.ts_family_id,
98-
TASKSTATS_CMD_GET as u8,
99-
TASKSTATS_CMD_ATTR_PID as u16,
100-
tid.as_buf(),
101-
)?;
96+
self.send(TASKSTATS_CMD_ATTR_PID as u16, tid.as_buf())?;
10297

10398
let resp = self.netlink.recv_response()?;
10499
for na in resp.payload_as_nlattrs() {
@@ -138,12 +133,7 @@ impl Client {
138133
/// * when kernel responded error
139134
/// * when the returned data couldn't be interpreted
140135
pub fn tgid_stats(&self, tgid: u32) -> Result<TaskStats> {
141-
self.netlink.send_cmd(
142-
self.ts_family_id,
143-
TASKSTATS_CMD_GET as u8,
144-
TASKSTATS_CMD_ATTR_TGID as u16,
145-
tgid.as_buf(),
146-
)?;
136+
self.send(TASKSTATS_CMD_ATTR_TGID as u16, tgid.as_buf())?;
147137

148138
let resp = self.netlink.recv_response()?;
149139
for na in resp.payload_as_nlattrs() {
@@ -168,6 +158,107 @@ impl Client {
168158
"no TASKSTATS_TYPE_STATS found in response".to_string(),
169159
))
170160
}
161+
162+
/// Register listener with the specific cpumask
163+
///
164+
/// # Arguments
165+
/// * `cpu_mask` - cpumask is specified as an ascii string of comma-separated cpu ranges e.g.
166+
/// to listen to exit data from cpus 1,2,3,5,7,8 the cpumask would be "1-3,5,7-8".
167+
pub fn register_cpumask(&self, cpu_mask: &str) -> Result<()> {
168+
self.send(
169+
TASKSTATS_CMD_ATTR_REGISTER_CPUMASK as u16,
170+
cpu_mask.as_bytes(),
171+
)?;
172+
Ok(())
173+
}
174+
175+
/// Deregister listener with the specific cpumask
176+
/// If userspace forgets to deregister interest in cpus before closing the listening socket,
177+
/// the kernel cleans up its interest set over time. However, for the sake of efficiency,
178+
/// an explicit deregistration is advisable.
179+
///
180+
/// # Arguments
181+
/// * `cpu_mask` - cpumask is specified as an ascii string of comma-separated cpu ranges e.g.
182+
/// to listen to exit data from cpus 1,2,3,5,7,8 the cpumask would be "1-3,5,7-8".
183+
pub fn deregister_cpumask(&self, cpu_mask: &str) -> Result<()> {
184+
self.send(
185+
TASKSTATS_CMD_ATTR_DEREGISTER_CPUMASK as u16,
186+
cpu_mask.as_bytes(),
187+
)?;
188+
Ok(())
189+
}
190+
191+
/// Listen registered cpumask's.
192+
/// If no messages are available at the socket, the receive call
193+
/// wait for a message to arrive, unless the socket is nonblocking.
194+
///
195+
/// # Return
196+
/// * `Ok(Vec<TaskStats>)`: vector with stats messages. If the current task is NOT the last
197+
/// one in its thread group, only one message is returned in the vector.
198+
/// However, if it is the last task, an additional element containing the per-thread
199+
/// group ID (tgid) statistics is also included. This additional element sums up
200+
/// the statistics for all threads within the thread group, both past and present
201+
pub fn listen_registered(&self) -> Result<Vec<TaskStats>> {
202+
let resp = self.netlink.recv_response()?;
203+
let mut stats_vec = Vec::new();
204+
205+
for na in resp.payload_as_nlattrs() {
206+
match na.header.nla_type as u32 {
207+
TASKSTATS_TYPE_NULL => break,
208+
TASKSTATS_TYPE_AGGR_PID | TASKSTATS_TYPE_AGGR_TGID => {
209+
for inner in na.payload_as_nlattrs() {
210+
match inner.header.nla_type as u32 {
211+
TASKSTATS_TYPE_PID => debug!("Received TASKSTATS_TYPE_PID"),
212+
TASKSTATS_TYPE_TGID => debug!("Received TASKSTATS_TYPE_TGID"),
213+
TASKSTATS_TYPE_STATS => {
214+
stats_vec.push(TaskStats::from(inner.payload()));
215+
}
216+
unknown => println!("Skipping unknown nla_type: {}", unknown),
217+
}
218+
}
219+
}
220+
unknown => println!("Skipping unknown nla_type: {}", unknown),
221+
}
222+
}
223+
if !stats_vec.is_empty() {
224+
return Ok(stats_vec);
225+
}
226+
Err(Error::Unknown(
227+
"no TASKSTATS_TYPE_STATS found in response".to_string(),
228+
))
229+
}
230+
231+
/// Set receiver buffer size in bytes (SO_RCVBUF socket option, see socket(7))
232+
///
233+
/// # Arguments
234+
/// * `payload` - buffer size in bytes. The kernel doubles this value
235+
/// (to allow space for bookkeeping overhead). The default value is set by the
236+
/// /proc/sys/net/core/rmem_default file, and the maximum allowed value is set by the
237+
/// /proc/sys/net/core/rmem_max file. The minimum (doubled) value for this option is 256.
238+
pub fn set_rx_buf_sz<T>(&self, payload: T) -> Result<()> {
239+
self.netlink
240+
.set_rx_buf_sz(payload)
241+
.map_err(|err| err.into())
242+
}
243+
244+
/// Get receiver buffer size in bytes (SO_RCVBUF socket option, see socket(7))
245+
///
246+
/// # Return
247+
/// * `usize` buffer size in bytes.
248+
/// Kernel returns doubled value, that have been set using [set_rx_buf_sz]
249+
pub fn get_rx_buf_sz(&self) -> Result<usize> {
250+
self.netlink.get_rx_buf_sz().map_err(|err| err.into())
251+
}
252+
253+
pub fn send(&self, taskstats_cmd: u16, data: &[u8]) -> Result<()> {
254+
self.netlink.send_cmd(
255+
self.ts_family_id,
256+
TASKSTATS_CMD_GET as u8,
257+
taskstats_cmd,
258+
data,
259+
)?;
260+
Ok(())
261+
}
171262
}
172263

173264
trait AsBuf<T> {

Diff for: src/netlink.rs

+8
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ impl Netlink<nl::Socket> {
105105
mypid: process::id(),
106106
})
107107
}
108+
109+
pub fn set_rx_buf_sz<T>(&self, payload: T) -> Result<()> {
110+
self.sock.set_rx_buf_sz(payload).map_err(|err| err.into())
111+
}
112+
113+
pub fn get_rx_buf_sz(&self) -> Result<usize> {
114+
self.sock.get_rx_buf_sz().map_err(|err| err.into())
115+
}
108116
}
109117

110118
impl<S: NlSocket> Netlink<S> {

0 commit comments

Comments
 (0)