Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lustre-collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ pub mod types;

pub use crate::error::LustreCollectorError;
use combine::parser::EasyParser;
pub use lnetctl_parser::{parse as parse_lnetctl_output, parse_lnetctl_stats};
pub use lnetctl_parser::{
parse as parse_lnetctl_output, parse_lnetctl_global, parse_lnetctl_stats,
};
pub use node_stats_parsers::{parse_cpustats_output, parse_meminfo_output};
use std::{io, str};
pub use types::*;
Expand Down
30 changes: 29 additions & 1 deletion lustre-collector/src/lnetctl_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::{
LNetStatGlobal, LustreCollectorError,
lnet_exports::LNetStatsStatistics,
types::{LNetStat, LNetStats, Param, Record, lnet_exports::Net},
types::{LNetGlobal, LNetGlobalConfig, LNetStat, LNetStats, Param, Record, lnet_exports::Net},
};

#[derive(serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -33,6 +33,11 @@ pub(crate) fn build_lnet_stats(x: &Net) -> Vec<Record> {
param: Param("drop_count".to_string()),
value: y.statistics.drop_count,
}),
LNetStats::HealthValue(LNetStat {
nid: y.nid.to_string(),
param: Param("health_value".to_string()),
value: y.health_stats.health_value,
}),
]
})
.map(Record::LNetStat)
Expand Down Expand Up @@ -96,6 +101,29 @@ pub fn parse_lnetctl_stats(xs: &[u8]) -> Result<Vec<Record>, LustreCollectorErro
.unwrap_or_default())
}

pub(crate) fn build_lnetctl_global(x: &LNetGlobalConfig) -> Vec<Record> {
vec![Record::LNetStat(LNetStats::HealthSensitivity(
LNetStatGlobal {
param: Param("health_sensitivity".to_string()),
value: x.health_sensitivity,
},
))]
}

pub fn parse_lnetctl_global(xs: &[u8]) -> Result<Vec<Record>, LustreCollectorError> {
let xs = xs.trim_ascii();

if xs.is_empty() {
return Ok(vec![]);
}

let y: LNetGlobal = serde_yaml::from_slice(xs)?;

Ok(y.global
.map(|x| build_lnetctl_global(&x))
.unwrap_or_default())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
26 changes: 23 additions & 3 deletions lustre-collector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

use clap::{Arg, ValueEnum, value_parser};
use lustre_collector::{
error::LustreCollectorError, mgs::mgs_fs_parser, parse_lctl_output, parse_lnetctl_output,
parse_lnetctl_stats, parse_mgs_fs_output, parse_recovery_status_output, parser,
recovery_status_parser, types::Record,
error::LustreCollectorError, mgs::mgs_fs_parser, parse_lctl_output, parse_lnetctl_global,
parse_lnetctl_output, parse_lnetctl_stats, parse_mgs_fs_output, parse_recovery_status_output,
parser, recovery_status_parser, types::Record,
};
use std::{
fmt, panic,
Expand Down Expand Up @@ -81,6 +81,12 @@ fn get_lnetctl_stats_output() -> Result<Vec<u8>, LustreCollectorError> {
Ok(r.stdout)
}

fn get_lnetctl_global_output() -> Result<Vec<u8>, LustreCollectorError> {
let r = Command::new("lnetctl").arg("global").arg("show").output()?;

Ok(r.stdout)
}

fn main() -> ExitCode {
match run() {
Ok(()) => ExitCode::SUCCESS,
Expand Down Expand Up @@ -136,6 +142,14 @@ fn run() -> Result<(), LustreCollectorError> {
Ok(lnetctl_stats_record)
});

let lnetctl_global_handle =
thread::spawn(move || -> Result<Vec<Record>, LustreCollectorError> {
let lnetctl_global_output = get_lnetctl_global_output()?;
let lnetctl_global_record = parse_lnetctl_global(&lnetctl_global_output)?;

Ok(lnetctl_global_record)
});

let recovery_status_handle =
thread::spawn(move || -> Result<Vec<Record>, LustreCollectorError> {
let recovery_status_output = get_recovery_status_output()?;
Expand Down Expand Up @@ -172,10 +186,16 @@ fn run() -> Result<(), LustreCollectorError> {
Err(e) => panic::resume_unwind(e),
};

let mut lnetctl_global_record = match lnetctl_global_handle.join() {
Ok(r) => r.unwrap_or_default(),
Err(e) => panic::resume_unwind(e),
};

lctl_record.append(&mut lnet_record);
lctl_record.append(&mut mgs_fs_record);
lctl_record.append(&mut recovery_status_records);
lctl_record.append(&mut lnetctl_stats_record);
lctl_record.append(&mut lnetctl_global_record);

let x = match format {
Format::Json => serde_json::to_string(&lctl_record)?,
Expand Down
14 changes: 13 additions & 1 deletion lustre-collector/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ pub mod lnet_exports {
#[derive(serde::Serialize, serde::Deserialize)]
pub struct HealthStats {
#[serde(rename = "health value")]
health_value: i64,
pub health_value: i64,
interrupts: i64,
dropped: i64,
aborted: i64,
Expand Down Expand Up @@ -380,6 +380,16 @@ pub struct LNetStatGlobal<T> {
pub value: T,
}

#[derive(serde::Serialize, serde::Deserialize)]
pub struct LNetGlobal {
pub global: Option<LNetGlobalConfig>,
}

#[derive(serde::Serialize, serde::Deserialize)]
pub struct LNetGlobalConfig {
pub health_sensitivity: i64,
}

#[derive(PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
/// Changelog stats from parsing `mdd.*.changelog_users`.
pub struct ChangelogStat {
Expand Down Expand Up @@ -540,6 +550,8 @@ pub enum LNetStats {
SendLength(LNetStatGlobal<i64>),
RecvLength(LNetStatGlobal<i64>),
DropLength(LNetStatGlobal<i64>),
HealthValue(LNetStat<i64>),
HealthSensitivity(LNetStatGlobal<i64>),
}

#[derive(PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
Expand Down
30 changes: 29 additions & 1 deletion lustrefs-exporter/src/lnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use std::sync::atomic::AtomicU64;

use crate::Family;
use lustre_collector::{LNetStat, LNetStatGlobal, LNetStats};
use prometheus_client::{metrics::counter::Counter, registry::Registry};
use prometheus_client::{metrics::counter::Counter, metrics::gauge::Gauge, registry::Registry};

#[derive(Debug, Default)]
pub struct LNetMetrics {
Expand All @@ -14,6 +16,8 @@ pub struct LNetMetrics {
send_bytes_total: Family<Counter<u64>>,
receive_bytes_total: Family<Counter<u64>>,
drop_bytes_total: Family<Counter<u64>>,
health_value: Family<Gauge<u64, AtomicU64>>,
health_sensitivity: Family<Gauge<u64, AtomicU64>>,
}

impl LNetMetrics {
Expand Down Expand Up @@ -53,6 +57,18 @@ impl LNetMetrics {
"Total number of bytes that have been dropped",
self.drop_bytes_total.clone(),
);

registry.register_without_auto_suffix(
"lustre_health_value",
"Health value of the LNet network",
self.health_value.clone(),
);

registry.register_without_auto_suffix(
"lustre_health_sensitivity",
"Health sensitivity of the LNet network",
self.health_sensitivity.clone(),
);
}
}

Expand Down Expand Up @@ -92,5 +108,17 @@ pub fn build_lnet_stats(x: &LNetStats, lnet: &mut LNetMetrics) {
LNetStats::DropLength(stat) => {
record_lnet_stat_global(stat, &mut lnet.drop_bytes_total);
}
LNetStats::HealthValue(stat) => {
let labels = vec![("nid", stat.nid.to_string())];
lnet.health_value
.get_or_create(&labels)
.set(stat.value.try_into().unwrap_or(0));
}
LNetStats::HealthSensitivity(stat) => {
let labels = vec![];
lnet.health_sensitivity
.get_or_create(&labels)
.set(stat.value.try_into().unwrap_or(0));
}
}
}
22 changes: 15 additions & 7 deletions lustrefs-exporter/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use axum::{
response::{IntoResponse, Response},
routing::get,
};
use lustre_collector::{parse_lctl_output, parse_lnetctl_output, parse_lnetctl_stats, parser};
use lustre_collector::{
parse_lctl_output, parse_lnetctl_global, parse_lnetctl_output, parse_lnetctl_stats, parser,
};
use prometheus_client::{encoding::text::encode, registry::Registry};
use serde::Deserialize;
use std::{
Expand Down Expand Up @@ -101,6 +103,14 @@ pub fn net_show_output() -> Command {
cmd
}

pub fn global_show_output() -> Command {
let mut cmd = Command::new("lnetctl");

cmd.args(["global", "show"]).kill_on_drop(true);

cmd
}

pub fn lnet_stats_output() -> Command {
let mut cmd = Command::new("lnetctl");

Expand Down Expand Up @@ -202,23 +212,21 @@ pub async fn scrape(Query(params): Query<Params>) -> Result<Response<Body>, Erro
let mut output = vec![];

let lctl = lustre_metrics_output().output().await?;

let mut lctl_output = parse_lctl_output(&lctl.stdout)?;

output.append(&mut lctl_output);

let lnetctl = net_show_output().output().await?;

let mut lnetctl_output = parse_lnetctl_output(&lnetctl.stdout)?;

output.append(&mut lnetctl_output);

let lnetctl_stats_output = lnet_stats_output().output().await?;

let mut lnetctl_stats_record = parse_lnetctl_stats(&lnetctl_stats_output.stdout)?;

output.append(&mut lnetctl_stats_record);

let lnetctl_global_output = global_show_output().output().await?;
let mut lnetctl_global_record = parse_lnetctl_global(&lnetctl_global_output.stdout)?;
output.append(&mut lnetctl_global_record);

// Build and register Lustre metrics
metrics::build_lustre_stats(&output, &mut opentelemetry_metrics);
opentelemetry_metrics.register_metric(&mut registry);
Expand Down
Loading