Skip to content

Commit 0bad874

Browse files
committed
Add more tasks to handle propagated statements
1 parent 694679e commit 0bad874

File tree

4 files changed

+29
-25
lines changed

4 files changed

+29
-25
lines changed

cumulus/polkadot-omni-node/lib/src/common/statement_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ pub(crate) fn build_statement_store<
7676
let statement_protocol_executor = {
7777
let spawn_handle = task_manager.spawn_handle();
7878
Box::new(move |fut| {
79-
spawn_handle.spawn("network-statement-validator", Some("networking"), fut);
79+
spawn_handle.spawn_blocking("network-statement-validator", Some("networking"), fut);
8080
})
8181
};
8282
let statement_handler = statement_handler_proto.build(

substrate/bin/node/cli/src/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,7 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
777777
let statement_protocol_executor = {
778778
let spawn_handle = task_manager.spawn_handle();
779779
Box::new(move |fut| {
780-
spawn_handle.spawn("network-statement-validator", Some("networking"), fut);
780+
spawn_handle.spawn_blocking("network-statement-validator", Some("networking"), fut);
781781
})
782782
};
783783
let statement_handler = statement_handler_proto.build(

substrate/client/network/statement/src/lib.rs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -205,30 +205,34 @@ impl StatementHandlerPrototype {
205205
executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
206206
) -> error::Result<StatementHandler<N, S>> {
207207
let sync_event_stream = sync.event_stream("statement-handler-sync");
208-
let (queue_sender, mut queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
209-
210-
let store = statement_store.clone();
211-
executor(
212-
async move {
213-
loop {
214-
let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
215-
queue_receiver.next().await;
216-
match task {
217-
None => return,
218-
Some((statement, completion)) => {
219-
let result = store.submit(statement, StatementSource::Network);
220-
if completion.send(result).is_err() {
221-
log::debug!(
222-
target: LOG_TARGET,
223-
"Error sending validation completion"
224-
);
225-
}
226-
},
208+
let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
209+
210+
const NUM_VALIDATION_WORKERS: usize = 4;
211+
for _ in 0..NUM_VALIDATION_WORKERS {
212+
let store = statement_store.clone();
213+
let mut queue_receiver = queue_receiver.clone();
214+
executor(
215+
async move {
216+
loop {
217+
let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
218+
queue_receiver.next().await;
219+
match task {
220+
None => return,
221+
Some((statement, completion)) => {
222+
let result = store.submit(statement, StatementSource::Network);
223+
if completion.send(result).is_err() {
224+
log::debug!(
225+
target: LOG_TARGET,
226+
"Error sending validation completion"
227+
);
228+
}
229+
},
230+
}
227231
}
228232
}
229-
}
230-
.boxed(),
231-
);
233+
.boxed(),
234+
);
235+
}
232236

233237
let handler = StatementHandler {
234238
protocol_name: self.protocol_name,

substrate/client/statement-store/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1056,7 +1056,7 @@ fn shard_index(account_id: &AccountId) -> usize {
10561056
(hash[0] as usize) % NUM_SHARDS
10571057
}
10581058

1059-
// Sharded statement store wrapper
1059+
/// Sharded statement store wrapper
10601060
pub struct Store {
10611061
shards: Vec<Arc<SingleStore>>,
10621062
}

0 commit comments

Comments
 (0)