Skip to content

Commit c2850f6

Browse files
zhk101chee-chyuan
authored andcommitted
perf(metrics): prevent multi-second engine stalls from scrape hooks (#189)
* metrics: gate heavy scrape hooks behind RETH_DISABLE_HEAVY_METRICS The two custom hooks registered by `metrics_hooks` walk MDBX page metadata + the freelist DB and iterate every static-file jar header on every Prometheus scrape. On large databases this is expensive enough to stall the metrics endpoint and starve the runtime. Skip registering both hooks when `RETH_DISABLE_HEAVY_METRICS` is set in the environment; the rest of the registry (process, jemalloc, io, chain spec, version) is unaffected and the endpoint still responds normally. The env var is documented on the function so the escape hatch is discoverable without grepping the source. * metrics: run scrape handler and push-gateway render on spawn_blocking The Prometheus metrics handler is fundamentally synchronous: it invokes every registered hook and then runs the prometheus exporter's `render()`, all on the tokio worker that accepted the HTTP request (or on the runtime worker driving the push-gateway loop). The default hooks are cheap (procfs, jemalloc atomic reads), but the two `report_metrics` hooks (DB stat walk, static-file jar enumeration) can take seconds on large archives. Even with those gated out (see preceding patch), `render()` itself is O(total time-series) and will grow over time. A multi-millisecond synchronous block on a runtime worker is not ideal and can become a real engine latency source if hook cost ever regresses. Move the synchronous work off the runtime worker: - Endpoint handler now offloads `handle_request` (which calls the hook + render or the pprof dump) to `spawn_blocking`. On join error, return a 500 instead of letting the connection task panic. - Push-gateway loop offloads the hook + render to `spawn_blocking`; on join error, log a warning and skip this tick rather than killing the loop. The HTTP put itself was already async so it stays inline. No behavioral change to what the endpoint or push-gateway returns; only the thread on which the rendering happens.
1 parent 3baee78 commit c2850f6

2 files changed

Lines changed: 116 additions & 100 deletions

File tree

crates/node/builder/src/launch/common.rs

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1584,27 +1584,36 @@ where
15841584
}
15851585

15861586
/// Returns the metrics hooks for the node.
1587+
///
1588+
/// The DB and static-file metric-reporting hooks walk all tables/segments and
1589+
/// can be expensive on large databases. Set `RETH_DISABLE_HEAVY_METRICS` to
1590+
/// any value to skip registering them; the metrics server still serves the
1591+
/// rest of the registry.
15871592
pub fn metrics_hooks<N: NodeTypesWithDB>(provider_factory: &ProviderFactory<N>) -> Hooks {
1588-
Hooks::builder()
1589-
.with_hook({
1590-
let db = provider_factory.db_ref().clone();
1591-
move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
1592-
})
1593-
.with_hook({
1594-
let sfp = provider_factory.static_file_provider();
1595-
move || {
1596-
throttle!(Duration::from_secs(5 * 60), || {
1597-
if let Err(error) = sfp.report_metrics() {
1598-
error!(%error, "Failed to report metrics from static file provider");
1599-
}
1600-
})
1601-
}
1602-
})
1603-
.with_hook({
1604-
let rocksdb = provider_factory.rocksdb_provider();
1605-
move || throttle!(Duration::from_secs(5 * 60), || rocksdb.report_metrics())
1606-
})
1607-
.build()
1593+
let mut builder = Hooks::builder();
1594+
// Heavy hooks: opt out via env var when their cost is unacceptable.
1595+
if std::env::var_os("RETH_DISABLE_HEAVY_METRICS").is_none() {
1596+
builder = builder
1597+
.with_hook({
1598+
let db = provider_factory.db_ref().clone();
1599+
move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
1600+
})
1601+
.with_hook({
1602+
let sfp = provider_factory.static_file_provider();
1603+
move || {
1604+
throttle!(Duration::from_secs(5 * 60), || {
1605+
if let Err(error) = sfp.report_metrics() {
1606+
error!(%error, "Failed to report metrics from static file provider");
1607+
}
1608+
})
1609+
}
1610+
})
1611+
.with_hook({
1612+
let rocksdb = provider_factory.rocksdb_provider();
1613+
move || throttle!(Duration::from_secs(5 * 60), || rocksdb.report_metrics())
1614+
});
1615+
}
1616+
builder.build()
16081617
}
16091618

16101619
#[cfg(test)]

crates/node/metrics/src/server.rs

Lines changed: 87 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -132,39 +132,58 @@ impl MetricServer {
132132

133133
tracing::info!(target: "reth::cli", "Starting metrics endpoint at {}", listener.local_addr().unwrap());
134134

135-
task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| loop {
136-
let io = tokio::select! {
137-
_ = &mut signal => break,
138-
io = listener.accept() => {
139-
match io {
140-
Ok((stream, _remote_addr)) => stream,
141-
Err(err) => {
142-
tracing::error!(%err, "failed to accept connection");
143-
continue;
135+
task_executor.spawn_with_graceful_shutdown_signal(|mut signal| {
136+
Box::pin(async move {
137+
loop {
138+
let io = tokio::select! {
139+
_ = &mut signal => break,
140+
io = listener.accept() => {
141+
match io {
142+
Ok((stream, _remote_addr)) => stream,
143+
Err(err) => {
144+
tracing::error!(%err, "failed to accept connection");
145+
continue;
146+
}
147+
}
144148
}
145-
}
146-
}
147-
};
148-
149-
let handle = install_prometheus_recorder();
150-
let hook = hook.clone();
151-
let pprof_dump_dir = pprof_dump_dir.clone();
152-
let service = tower::service_fn(move |req: Request<_>| {
153-
let hook = hook.clone();
154-
let pprof_dump_dir = pprof_dump_dir.clone();
155-
async move {
156-
let response =
157-
handle_request(req.uri().path(), &*hook, handle, &pprof_dump_dir).await;
158-
Ok::<_, Infallible>(response)
149+
};
150+
151+
let handle = install_prometheus_recorder();
152+
let hook = hook.clone();
153+
let pprof_dump_dir = pprof_dump_dir.clone();
154+
let service = tower::service_fn(move |req: Request<_>| {
155+
let path = req.uri().path().to_owned();
156+
let hook = hook.clone();
157+
let pprof_dump_dir = pprof_dump_dir.clone();
158+
async move {
159+
let response = tokio::task::spawn_blocking(move || {
160+
handle_request(&path, &*hook, handle, &pprof_dump_dir)
161+
})
162+
.await
163+
.unwrap_or_else(|err| {
164+
tracing::error!(%err, "metrics handler task failed");
165+
let mut response = Response::new(Full::new(Bytes::from_static(
166+
b"metrics handler error",
167+
)));
168+
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
169+
response
170+
});
171+
Ok::<_, Infallible>(response)
172+
}
173+
});
174+
175+
let mut shutdown = signal.clone().ignore_guard();
176+
tokio::task::spawn(async move {
177+
let _ = jsonrpsee_server::serve_with_graceful_shutdown(
178+
io,
179+
service,
180+
&mut shutdown,
181+
)
182+
.await
183+
.inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
184+
});
159185
}
160-
});
161-
162-
let mut shutdown = signal.clone().ignore_guard();
163-
tokio::task::spawn(async move {
164-
let _ = jsonrpsee_server::serve_with_graceful_shutdown(io, service, &mut shutdown)
165-
.await
166-
.inspect_err(|error| tracing::debug!(%error, "failed to serve request"));
167-
});
186+
})
168187
});
169188

170189
Ok(())
@@ -181,34 +200,47 @@ impl MetricServer {
181200
let client = Client::builder()
182201
.build()
183202
.wrap_err("Could not create HTTP client to push metrics to gateway")?;
184-
task_executor.spawn_with_graceful_shutdown_signal(async move |mut signal| {
185-
tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
186-
let handle = install_prometheus_recorder();
187-
loop {
188-
tokio::select! {
189-
_ = &mut signal => {
190-
tracing::info!("Shutting down task to push metrics to gateway");
191-
break;
192-
}
193-
_ = tokio::time::sleep(interval) => {
194-
hooks.iter().for_each(|hook| hook());
195-
let metrics = handle.handle().render();
196-
match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
197-
Ok(response) => {
198-
if !response.status().is_success() {
199-
tracing::warn!(
200-
status = %response.status(),
201-
"Failed to push metrics to gateway"
202-
);
203+
task_executor.spawn_with_graceful_shutdown_signal(move |mut signal| {
204+
Box::pin(async move {
205+
tracing::info!(url = %url, interval = ?interval, "Starting task to push metrics to gateway");
206+
let handle = install_prometheus_recorder();
207+
loop {
208+
tokio::select! {
209+
_ = &mut signal => {
210+
tracing::info!("Shutting down task to push metrics to gateway");
211+
break;
212+
}
213+
_ = tokio::time::sleep(interval) => {
214+
let hooks_clone = hooks.clone();
215+
let metrics = match tokio::task::spawn_blocking(move || {
216+
hooks_clone.iter().for_each(|hook| hook());
217+
handle.handle().render()
218+
})
219+
.await
220+
{
221+
Ok(m) => m,
222+
Err(err) => {
223+
tracing::warn!(%err, "metrics gather failed; skipping push");
224+
continue;
225+
}
226+
};
227+
match client.put(&url).header("Content-Type", "text/plain").body(metrics).send().await {
228+
Ok(response) => {
229+
if !response.status().is_success() {
230+
tracing::warn!(
231+
status = %response.status(),
232+
"Failed to push metrics to gateway"
233+
);
234+
}
235+
}
236+
Err(err) => {
237+
tracing::warn!(%err, "Failed to push metrics to gateway");
203238
}
204-
}
205-
Err(err) => {
206-
tracing::warn!(%err, "Failed to push metrics to gateway");
207239
}
208240
}
209241
}
210242
}
211-
}
243+
})
212244
});
213245
Ok(())
214246
}
@@ -314,15 +346,14 @@ fn describe_io_stats() {
314346
#[cfg(not(target_os = "linux"))]
315347
const fn describe_io_stats() {}
316348

317-
async fn handle_request(
349+
fn handle_request(
318350
path: &str,
319351
hook: impl Fn(),
320352
handle: &crate::recorder::PrometheusRecorder,
321353
pprof_dump_dir: &PathBuf,
322354
) -> Response<Full<Bytes>> {
323355
match path {
324356
"/debug/pprof/heap" => handle_pprof_heap(pprof_dump_dir),
325-
"/debug/tokio/dump" => handle_tokio_dump().await,
326357
_ => {
327358
hook();
328359
let metrics = handle.handle().render();
@@ -412,30 +443,6 @@ fn handle_pprof_heap(_pprof_dump_dir: &PathBuf) -> Response<Full<Bytes>> {
412443
response
413444
}
414445

415-
#[cfg(tokio_unstable)]
416-
async fn handle_tokio_dump() -> Response<Full<Bytes>> {
417-
let handle = tokio::runtime::Handle::current();
418-
let dump = handle.dump().await;
419-
420-
let mut output = String::new();
421-
for (i, task) in dump.tasks().iter().enumerate() {
422-
let trace = task.trace();
423-
output.push_str(&format!("task {i}:\n{trace}\n\n"));
424-
}
425-
426-
let mut response = Response::new(Full::new(Bytes::from(output)));
427-
response.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
428-
response
429-
}
430-
431-
#[cfg(not(tokio_unstable))]
432-
async fn handle_tokio_dump() -> Response<Full<Bytes>> {
433-
let mut response = Response::new(Full::new(Bytes::from_static(
434-
b"tokio task dump not available. Rebuild with RUSTFLAGS=\"--cfg tokio_unstable\" and tokio's `taskdump` feature.",
435-
)));
436-
*response.status_mut() = StatusCode::NOT_IMPLEMENTED;
437-
response
438-
}
439446

440447
#[cfg(test)]
441448
mod tests {

0 commit comments

Comments
 (0)