Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1628,23 +1628,32 @@ where
}

/// Returns the metrics hooks for the node.
///
/// The DB and static-file metric-reporting hooks walk all tables/segments and
/// can be expensive on large databases. Set `RETH_DISABLE_HEAVY_METRICS` to
/// any value to skip registering them; the metrics server still serves the
/// rest of the registry.
pub fn metrics_hooks<N: NodeTypesWithDB>(provider_factory: &ProviderFactory<N>) -> Hooks {
Hooks::builder()
.with_hook({
let db = provider_factory.db_ref().clone();
move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
})
.with_hook({
let sfp = provider_factory.static_file_provider();
move || {
throttle!(Duration::from_secs(5 * 60), || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics from static file provider");
}
})
}
})
.build()
let mut builder = Hooks::builder();
// Heavy hooks: opt out via env var when their cost is unacceptable.
if std::env::var_os("RETH_DISABLE_HEAVY_METRICS").is_none() {
builder = builder
.with_hook({
let db = provider_factory.db_ref().clone();
move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
})
.with_hook({
let sfp = provider_factory.static_file_provider();
move || {
throttle!(Duration::from_secs(5 * 60), || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics from static file provider");
}
})
}
});
}
builder.build()
}

#[cfg(test)]
Expand Down
36 changes: 31 additions & 5 deletions crates/node/metrics/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,24 @@ impl MetricServer {
let hook = hook.clone();
let pprof_dump_dir = pprof_dump_dir.clone();
let service = tower::service_fn(move |req: Request<_>| {
let response =
handle_request(req.uri().path(), &*hook, handle, &pprof_dump_dir);
async move { Ok::<_, Infallible>(response) }
let path = req.uri().path().to_owned();
let hook = hook.clone();
let pprof_dump_dir = pprof_dump_dir.clone();
async move {
let response = tokio::task::spawn_blocking(move || {
handle_request(&path, &*hook, handle, &pprof_dump_dir)
})
.await
.unwrap_or_else(|err| {
tracing::error!(%err, "metrics handler task failed");
let mut response = Response::new(Full::new(Bytes::from_static(
b"metrics handler error",
)));
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
response
});
Ok::<_, Infallible>(response)
}
});

let mut shutdown = signal.clone().ignore_guard();
Expand Down Expand Up @@ -193,8 +208,19 @@ impl MetricServer {
break;
}
_ = tokio::time::sleep(interval) => {
hooks.iter().for_each(|hook| hook());
let metrics = handle.handle().render();
let hooks_clone = hooks.clone();
let metrics = match tokio::task::spawn_blocking(move || {
hooks_clone.iter().for_each(|hook| hook());
handle.handle().render()
})
.await
{
Ok(m) => m,
Err(err) => {
tracing::warn!(%err, "metrics gather failed; skipping push");
continue;
}
};
match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
Ok(response) => {
if !response.status().is_success() {
Expand Down
Loading