Skip to content

Commit af27402

Browse files
authored
Run reconstruction inside a scoped rayon pool (#8075)
Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com> Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com> Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>
1 parent d80c0ff commit af27402

File tree

11 files changed

+123
-60
lines changed

11 files changed

+123
-60
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ use store::{
124124
BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error as DBError, HotColdDB, HotStateSummary,
125125
KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
126126
};
127-
use task_executor::{ShutdownReason, TaskExecutor};
127+
use task_executor::{RayonPoolType, ShutdownReason, TaskExecutor};
128128
use tokio_stream::Stream;
129129
use tracing::{Span, debug, debug_span, error, info, info_span, instrument, trace, warn};
130130
use tree_hash::TreeHash;
@@ -3274,16 +3274,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
32743274
let current_span = Span::current();
32753275
let result = self
32763276
.task_executor
3277-
.spawn_blocking_handle(
3278-
move || {
3279-
let _guard = current_span.enter();
3280-
data_availability_checker.reconstruct_data_columns(&block_root)
3281-
},
3282-
"reconstruct_data_columns",
3283-
)
3284-
.ok_or(BeaconChainError::RuntimeShutdown)?
3277+
.spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || {
3278+
let _guard = current_span.enter();
3279+
data_availability_checker.reconstruct_data_columns(&block_root)
3280+
})
32853281
.await
3286-
.map_err(BeaconChainError::TokioJoin)??;
3282+
.map_err(|_| BeaconChainError::RuntimeShutdown)??;
32873283

32883284
match result {
32893285
DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => {

beacon_node/beacon_processor/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ logging = { workspace = true }
1212
metrics = { workspace = true }
1313
num_cpus = { workspace = true }
1414
parking_lot = { workspace = true }
15-
rayon = { workspace = true }
1615
serde = { workspace = true }
1716
slot_clock = { workspace = true }
1817
strum = { workspace = true }

beacon_node/beacon_processor/src/lib.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
3939
//! task.
4040
41-
use crate::rayon_manager::RayonManager;
4241
use crate::work_reprocessing_queue::{
4342
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
4443
};
@@ -48,7 +47,6 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
4847
use logging::TimeLatch;
4948
use logging::crit;
5049
use parking_lot::Mutex;
51-
use rayon::ThreadPool;
5250
pub use scheduler::work_reprocessing_queue;
5351
use serde::{Deserialize, Serialize};
5452
use slot_clock::SlotClock;
@@ -61,7 +59,7 @@ use std::sync::Arc;
6159
use std::task::Context;
6260
use std::time::{Duration, Instant};
6361
use strum::IntoStaticStr;
64-
use task_executor::TaskExecutor;
62+
use task_executor::{RayonPoolType, TaskExecutor};
6563
use tokio::sync::mpsc;
6664
use tokio::sync::mpsc::error::TrySendError;
6765
use tracing::{debug, error, trace, warn};
@@ -76,7 +74,6 @@ use work_reprocessing_queue::{
7674
};
7775

7876
mod metrics;
79-
pub mod rayon_manager;
8077
pub mod scheduler;
8178

8279
/// The maximum size of the channel for work events to the `BeaconProcessor`.
@@ -810,7 +807,6 @@ pub struct BeaconProcessor<E: EthSpec> {
810807
pub network_globals: Arc<NetworkGlobals<E>>,
811808
pub executor: TaskExecutor,
812809
pub current_workers: usize,
813-
pub rayon_manager: RayonManager,
814810
pub config: BeaconProcessorConfig,
815811
}
816812

@@ -1609,10 +1605,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
16091605
}
16101606
Work::ChainSegmentBackfill(process_fn) => {
16111607
if self.config.enable_backfill_rate_limiting {
1612-
task_spawner.spawn_blocking_with_rayon(
1613-
self.rayon_manager.low_priority_threadpool.clone(),
1614-
process_fn,
1615-
)
1608+
task_spawner.spawn_blocking_with_rayon(RayonPoolType::LowPriority, process_fn)
16161609
} else {
16171610
// use the global rayon thread pool if backfill rate limiting is disabled.
16181611
task_spawner.spawn_blocking(process_fn)
@@ -1681,17 +1674,16 @@ impl TaskSpawner {
16811674
}
16821675

16831676
/// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion.
1684-
fn spawn_blocking_with_rayon<F>(self, thread_pool: Arc<ThreadPool>, task: F)
1677+
fn spawn_blocking_with_rayon<F>(self, rayon_pool_type: RayonPoolType, task: F)
16851678
where
16861679
F: FnOnce() + Send + 'static,
16871680
{
1688-
self.executor.spawn_blocking(
1681+
self.executor.spawn_blocking_with_rayon(
16891682
move || {
1690-
thread_pool.install(|| {
1691-
task();
1692-
});
1683+
task();
16931684
drop(self.send_idle_on_drop)
16941685
},
1686+
rayon_pool_type,
16951687
WORKER_TASK_NAME,
16961688
)
16971689
}

beacon_node/beacon_processor/src/rayon_manager.rs

Lines changed: 0 additions & 27 deletions
This file was deleted.

beacon_node/client/src/builder.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use beacon_chain::{
1717
store::{HotColdDB, ItemStore, StoreConfig},
1818
};
1919
use beacon_chain::{Kzg, LightClientProducerEvent};
20-
use beacon_processor::rayon_manager::RayonManager;
2120
use beacon_processor::{BeaconProcessor, BeaconProcessorChannels};
2221
use beacon_processor::{BeaconProcessorConfig, BeaconProcessorQueueLengths};
2322
use environment::RuntimeContext;
@@ -681,7 +680,6 @@ where
681680
executor: beacon_processor_context.executor.clone(),
682681
current_workers: 0,
683682
config: beacon_processor_config,
684-
rayon_manager: RayonManager::default(),
685683
}
686684
.spawn_manager(
687685
beacon_processor_channels.beacon_processor_rx,

beacon_node/http_api/src/test_utils.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use beacon_chain::{
55
};
66
use beacon_processor::{
77
BeaconProcessor, BeaconProcessorChannels, BeaconProcessorConfig, BeaconProcessorQueueLengths,
8-
rayon_manager::RayonManager,
98
};
109
use directory::DEFAULT_ROOT_DIR;
1110
use eth2::{BeaconNodeHttpClient, Timeouts};
@@ -248,7 +247,6 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
248247
executor: test_runtime.task_executor.clone(),
249248
current_workers: 0,
250249
config: beacon_processor_config,
251-
rayon_manager: RayonManager::default(),
252250
}
253251
.spawn_manager(
254252
beacon_processor_rx,

beacon_node/network/src/network_beacon_processor/tests.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use beacon_chain::test_utils::{
1717
test_spec,
1818
};
1919
use beacon_chain::{BeaconChain, WhenSlotSkipped};
20-
use beacon_processor::rayon_manager::RayonManager;
2120
use beacon_processor::{work_reprocessing_queue::*, *};
2221
use gossipsub::MessageAcceptance;
2322
use itertools::Itertools;
@@ -267,7 +266,6 @@ impl TestRig {
267266
executor,
268267
current_workers: 0,
269268
config: beacon_processor_config,
270-
rayon_manager: RayonManager::default(),
271269
}
272270
.spawn_manager(
273271
beacon_processor_rx,

common/task_executor/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ edition = { workspace = true }
88
async-channel = { workspace = true }
99
futures = { workspace = true }
1010
metrics = { workspace = true }
11+
num_cpus = { workspace = true }
12+
rayon = { workspace = true }
1113
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
1214
tracing = { workspace = true }
1315

common/task_executor/src/lib.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
mod metrics;
2+
mod rayon_pool_provider;
23
pub mod test_utils;
34

45
use futures::channel::mpsc::Sender;
56
use futures::prelude::*;
6-
use std::sync::Weak;
7+
use std::sync::{Arc, Weak};
78
use tokio::runtime::{Handle, Runtime};
89
use tracing::debug;
910

11+
use crate::rayon_pool_provider::RayonPoolProvider;
12+
pub use crate::rayon_pool_provider::RayonPoolType;
1013
pub use tokio::task::JoinHandle;
1114

1215
/// Provides a reason when Lighthouse is shut down.
@@ -84,6 +87,8 @@ pub struct TaskExecutor {
8487
// FIXME(sproul): delete?
8588
#[allow(dead_code)]
8689
service_name: String,
90+
91+
rayon_pool_provider: Arc<RayonPoolProvider>,
8792
}
8893

8994
impl TaskExecutor {
@@ -105,6 +110,7 @@ impl TaskExecutor {
105110
exit,
106111
signal_tx,
107112
service_name,
113+
rayon_pool_provider: Arc::new(RayonPoolProvider::default()),
108114
}
109115
}
110116

@@ -115,6 +121,7 @@ impl TaskExecutor {
115121
exit: self.exit.clone(),
116122
signal_tx: self.signal_tx.clone(),
117123
service_name,
124+
rayon_pool_provider: self.rayon_pool_provider.clone(),
118125
}
119126
}
120127

@@ -226,6 +233,47 @@ impl TaskExecutor {
226233
}
227234
}
228235

236+
/// Spawns a blocking task on a dedicated tokio thread pool and installs a rayon context within it.
237+
pub fn spawn_blocking_with_rayon<F>(
238+
self,
239+
task: F,
240+
rayon_pool_type: RayonPoolType,
241+
name: &'static str,
242+
) where
243+
F: FnOnce() + Send + 'static,
244+
{
245+
let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type);
246+
self.spawn_blocking(
247+
move || {
248+
thread_pool.install(|| {
249+
task();
250+
});
251+
},
252+
name,
253+
)
254+
}
255+
256+
/// Spawns a blocking computation on a rayon thread pool and awaits the result.
257+
pub async fn spawn_blocking_with_rayon_async<F, R>(
258+
&self,
259+
rayon_pool_type: RayonPoolType,
260+
task: F,
261+
) -> Result<R, tokio::sync::oneshot::error::RecvError>
262+
where
263+
F: FnOnce() -> R + Send + 'static,
264+
R: Send + 'static,
265+
{
266+
let thread_pool = self.rayon_pool_provider.get_thread_pool(rayon_pool_type);
267+
let (tx, rx) = tokio::sync::oneshot::channel();
268+
269+
thread_pool.spawn(move || {
270+
let result = task();
271+
let _ = tx.send(result);
272+
});
273+
274+
rx.await
275+
}
276+
229277
/// Spawn a future on the tokio runtime wrapped in an `async-channel::Receiver` returning an optional
230278
/// join handle to the future.
231279
/// The task is cancelled when the corresponding async-channel is dropped.

0 commit comments

Comments
 (0)