Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
365 changes: 365 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

1,010 changes: 1,002 additions & 8 deletions Cargo.nix

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions ofborg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ separator = "0.4.1"

async-std = "=1.5.0"
lapin = "1.0.0-rc6"
lazy_static = "1.4.0"
prometheus = "0.8"
tide = "0.8.0"
5 changes: 3 additions & 2 deletions ofborg/src/bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use async_std::task;
use tracing::{info, warn};

use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::easylapin;
use ofborg::{checkout, config, tasks};
use ofborg::{checkout, config, easylapin, metrics, tasks};

// FIXME: remove with rust/cargo update
#[allow(clippy::cognitive_complexity)]
Expand Down Expand Up @@ -93,6 +92,8 @@ fn main() -> Result<(), Box<dyn Error>> {
},
)?;

metrics::spawn_server();

info!("Fetching jobs from {}", &queue_name);
task::block_on(handle);

Expand Down
21 changes: 12 additions & 9 deletions ofborg/src/easylapin.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,5 @@
use std::pin::Pin;

use crate::config::RabbitMQConfig;
use crate::easyamqp::{
BindQueueConfig, ChannelExt, ConsumeConfig, ConsumerExt, ExchangeConfig, ExchangeType,
QueueConfig,
};
use crate::notifyworker::{NotificationReceiver, SimpleNotifyWorker};
use crate::ofborg;
use crate::worker::{Action, SimpleWorker};

use async_std::future::Future;
use async_std::stream::StreamExt;
use async_std::task;
Expand All @@ -21,6 +12,16 @@ use lapin::types::{AMQPValue, FieldTable};
use lapin::{BasicProperties, Channel, Connection, ConnectionProperties, ExchangeKind};
use tracing::{debug, trace};

use crate::config::RabbitMQConfig;
use crate::easyamqp::{
BindQueueConfig, ChannelExt, ConsumeConfig, ConsumerExt, ExchangeConfig, ExchangeType,
QueueConfig,
};
use crate::metrics;
use crate::notifyworker::{NotificationReceiver, SimpleNotifyWorker};
use crate::ofborg;
use crate::worker::{Action, SimpleWorker};

pub fn from_config(cfg: &RabbitMQConfig) -> Result<Connection, lapin::Error> {
let mut props = FieldTable::default();
props.insert(
Expand Down Expand Up @@ -166,6 +167,8 @@ impl<'a, W: SimpleNotifyWorker + 'a> ConsumerExt<'a, W> for NotifyChannel {
Ok(Box::pin(async move {
while let Some(Ok(deliver)) = consumer.next().await {
debug!(?deliver.delivery_tag, "consumed delivery");
metrics::JOBS_RECEIVED.inc();

let mut receiver = ChannelNotificationReceiver {
channel: &mut chan,
deliver: &deliver,
Expand Down
1 change: 1 addition & 0 deletions ofborg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod ghevent;
pub mod locks;
pub mod maintainers;
pub mod message;
pub mod metrics;
pub mod nix;
pub mod nixenv;
pub mod nixstats;
Expand Down
12 changes: 12 additions & 0 deletions ofborg/src/message/buildresult.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ pub enum BuildStatus {
UnexpectedError { err: String },
}

impl BuildStatus {
pub fn as_label(&self) -> &'static str {
match self {
BuildStatus::Skipped => "skipped",
BuildStatus::Success => "success",
BuildStatus::Failure => "failure",
BuildStatus::TimedOut => "timeout",
BuildStatus::UnexpectedError { err: _ } => "unexpected",
}
}
}

impl From<BuildStatus> for String {
fn from(status: BuildStatus) -> String {
match status {
Expand Down
77 changes: 77 additions & 0 deletions ofborg/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::env;

use async_std::task;
use lazy_static::lazy_static;
use prometheus::{
register_counter_vec, register_gauge_vec, register_int_counter, CounterVec, Encoder, GaugeVec,
IntCounter, TextEncoder,
};
use tide::http::StatusCode;
use tide::{Body, Request, Response};
use tracing::info;

use crate::ofborg;

lazy_static! {
pub static ref JOBS_RECEIVED: IntCounter = register_int_counter!(
"ofborg_jobs_received_total",
"Total number of jobs received."
)
.unwrap();
pub static ref BUILDS_RECEIVED: CounterVec = register_counter_vec!(
"ofborg_builds_received_total",
"Total number of builds received.",
&["system"]
)
.unwrap();
pub static ref BUILDS_FINISHED: CounterVec = register_counter_vec!(
"ofborg_builds_finished_total",
"Total number of builds finished.",
&["system", "status"]
)
.unwrap();
pub static ref BUILDS_ATTRIBUTES_ATTEMPTED: CounterVec = register_counter_vec!(
"ofborg_builds_attributes_attempted_total",
"Total number of attributes attempted to build.",
&["system"]
)
.unwrap();
pub static ref BUILDS_ATTRIBUTES_NOT_ATTEMPTED: CounterVec = register_counter_vec!(
"ofborg_builds_attributes_not_attempted_total",
"Total number of attributes not attempted to build.",
&["system"]
)
.unwrap();
}

lazy_static! {
static ref VERSION: GaugeVec = register_gauge_vec!(
"ofborg_version",
"Labeled OfBorg version information.",
&["version"]
)
.unwrap();
}

pub fn spawn_server() {
let port = env::var("METRICS_PORT").unwrap_or_else(|_err| String::from("9128"));

let mut server = tide::new();
server.at("/metrics").get(metrics);

// Initialize version metric.
VERSION.with_label_values(&[ofborg::VERSION]).set(1_f64);

info!("Listening on http://127.0.0.1:{}/metrics", port);
task::spawn(server.listen(format!("0.0.0.0:{}", port)));
}

async fn metrics(mut _req: Request<()>) -> Result<Response, tide::Error> {
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder.encode(&metric_families, &mut buffer)?;

let resp = Response::new(StatusCode::Ok).body(Body::from(buffer));
Ok(resp)
}
18 changes: 18 additions & 0 deletions ofborg/src/tasks/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::checkout;
use crate::commentparser;
use crate::message::buildresult::{BuildResult, BuildStatus, V1Tag};
use crate::message::{buildjob, buildlogmsg};
use crate::metrics;
use crate::nix;
use crate::notifyworker;
use crate::worker;
Expand Down Expand Up @@ -280,6 +281,9 @@ impl notifyworker::SimpleNotifyWorker for BuildWorker {
) {
let span = debug_span!("job", pr = ?job.pr.number);
let _enter = span.enter();
metrics::BUILDS_RECEIVED
.with_label_values(&[&self.system])
.inc();

let mut actions = self.actions(&job, notifier);

Expand Down Expand Up @@ -347,6 +351,14 @@ impl notifyworker::SimpleNotifyWorker for BuildWorker {
cannot_build_attrs.join(", ")
);

metrics::BUILDS_ATTRIBUTES_ATTEMPTED
.with_label_values(&[&self.system])
.inc_by(can_build.len() as f64);

metrics::BUILDS_ATTRIBUTES_NOT_ATTEMPTED
.with_label_values(&[&self.system])
.inc_by(cannot_build_attrs.len() as f64);

actions.log_started(can_build.clone(), cannot_build_attrs.clone());
actions.log_instantiation_errors(cannot_build);

Expand Down Expand Up @@ -375,7 +387,13 @@ impl notifyworker::SimpleNotifyWorker for BuildWorker {
.last();
info!("----->8-----");

let status_label = status.as_label();
actions.build_finished(status, can_build, cannot_build_attrs);

metrics::BUILDS_FINISHED
.with_label_values(&[&self.system, status_label])
.inc();

info!("Build done!");
}
}
Expand Down