Skip to content

Commit c74444a

Browse files
authored
Move block_cost_limit tracking to BankingStage in preparation for SIMD-0207 (#753)
1 parent a1a3779 commit c74444a

File tree

18 files changed

+238
-425
lines changed

18 files changed

+238
-425
lines changed

banking-bench/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ fn main() {
493493
false,
494494
HashSet::default(),
495495
BundleAccountLocker::default(),
496+
|_| 0,
496497
);
497498

498499
// This is so that the signal_receiver does not go out of scope after the closure.

core/benches/banking_stage.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
135135
&mut transaction_buffer,
136136
&BankingStageStats::default(),
137137
&mut LeaderSlotMetricsTracker::new(0),
138+
&|_| 0,
138139
);
139140
});
140141

@@ -327,6 +328,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
327328
false,
328329
HashSet::default(),
329330
BundleAccountLocker::default(),
331+
|_| 0,
330332
);
331333

332334
let chunk_len = verified.len() / CHUNKS;

core/benches/consumer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz
176176
&bank,
177177
transaction_iter.next().unwrap(),
178178
0,
179+
&|_| 0,
179180
);
180181
assert!(summary
181182
.execute_and_commit_transactions_output

core/src/banking_simulation.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,7 @@ impl BankingSimulator {
821821
false,
822822
collections::HashSet::default(),
823823
BundleAccountLocker::default(),
824+
|_| 0,
824825
);
825826

826827
let (&_slot, &raw_base_event_time) = freeze_time_by_slot

core/src/banking_stage.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use {
3636
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
3737
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
3838
solana_runtime::{
39-
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
39+
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
4040
vote_sender_types::ReplayVoteSender,
4141
},
4242
solana_sdk::{pubkey::Pubkey, timing::AtomicInterval},
@@ -366,6 +366,8 @@ impl BankingStage {
366366
enable_forwarding: bool,
367367
blacklisted_accounts: HashSet<Pubkey>,
368368
bundle_account_locker: BundleAccountLocker,
369+
// callback function for compute space reservation for BundleStage
370+
block_cost_limit_block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
369371
) -> Self {
370372
Self::new_num_threads(
371373
block_production_method,
@@ -384,6 +386,7 @@ impl BankingStage {
384386
enable_forwarding,
385387
blacklisted_accounts,
386388
bundle_account_locker,
389+
block_cost_limit_block_cost_limit_reservation_cb,
387390
)
388391
}
389392

@@ -405,6 +408,7 @@ impl BankingStage {
405408
enable_forwarding: bool,
406409
blacklisted_accounts: HashSet<Pubkey>,
407410
bundle_account_locker: BundleAccountLocker,
411+
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
408412
) -> Self {
409413
match block_production_method {
410414
BlockProductionMethod::CentralScheduler => Self::new_central_scheduler(
@@ -423,6 +427,7 @@ impl BankingStage {
423427
enable_forwarding,
424428
blacklisted_accounts,
425429
bundle_account_locker,
430+
block_cost_limit_reservation_cb,
426431
),
427432
}
428433
}
@@ -444,6 +449,7 @@ impl BankingStage {
444449
enable_forwarding: bool,
445450
blacklisted_accounts: HashSet<Pubkey>,
446451
bundle_account_locker: BundleAccountLocker,
452+
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
447453
) -> Self {
448454
assert!(num_threads >= MIN_TOTAL_THREADS);
449455
// Single thread to generate entries from many banks.
@@ -492,6 +498,7 @@ impl BankingStage {
492498
),
493499
blacklisted_accounts.clone(),
494500
bundle_account_locker.clone(),
501+
block_cost_limit_reservation_cb.clone(),
495502
));
496503
}
497504

@@ -521,11 +528,12 @@ impl BankingStage {
521528
);
522529

523530
worker_metrics.push(consume_worker.metrics_handle());
531+
let cb = block_cost_limit_reservation_cb.clone();
524532
bank_thread_hdls.push(
525533
Builder::new()
526534
.name(format!("solCoWorker{id:02}"))
527535
.spawn(move || {
528-
let _ = consume_worker.run();
536+
let _ = consume_worker.run(cb);
529537
})
530538
.unwrap(),
531539
)
@@ -589,6 +597,7 @@ impl BankingStage {
589597
unprocessed_transaction_storage: UnprocessedTransactionStorage,
590598
blacklisted_accounts: HashSet<Pubkey>,
591599
bundle_account_locker: BundleAccountLocker,
600+
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
592601
) -> JoinHandle<()> {
593602
let mut packet_receiver = PacketReceiver::new(id, packet_receiver);
594603
let consumer = Consumer::new(
@@ -610,6 +619,7 @@ impl BankingStage {
610619
&consumer,
611620
id,
612621
unprocessed_transaction_storage,
622+
block_cost_limit_reservation_cb,
613623
)
614624
})
615625
.unwrap()
@@ -623,6 +633,7 @@ impl BankingStage {
623633
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
624634
banking_stage_stats: &BankingStageStats,
625635
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
636+
block_cost_limit_reservation_cb: &impl Fn(&Bank) -> u64,
626637
) {
627638
if unprocessed_transaction_storage.should_not_process() {
628639
return;
@@ -648,6 +659,7 @@ impl BankingStage {
648659
unprocessed_transaction_storage,
649660
banking_stage_stats,
650661
slot_metrics_tracker,
662+
block_cost_limit_reservation_cb
651663
));
652664
slot_metrics_tracker
653665
.increment_consume_buffered_packets_us(consume_buffered_packets_us);
@@ -686,6 +698,7 @@ impl BankingStage {
686698
consumer: &Consumer,
687699
id: u32,
688700
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
701+
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64,
689702
) {
690703
let mut banking_stage_stats = BankingStageStats::new(id);
691704

@@ -703,6 +716,7 @@ impl BankingStage {
703716
&mut unprocessed_transaction_storage,
704717
&banking_stage_stats,
705718
&mut slot_metrics_tracker,
719+
&block_cost_limit_reservation_cb
706720
));
707721
slot_metrics_tracker
708722
.increment_process_buffered_packets_us(process_buffered_packets_us);
@@ -840,6 +854,7 @@ mod tests {
840854
false,
841855
HashSet::default(),
842856
BundleAccountLocker::default(),
857+
|_| 0,
843858
);
844859
drop(non_vote_sender);
845860
drop(tpu_vote_sender);
@@ -902,6 +917,7 @@ mod tests {
902917
false,
903918
HashSet::default(),
904919
BundleAccountLocker::default(),
920+
|_| 0,
905921
);
906922
trace!("sending bank");
907923
drop(non_vote_sender);
@@ -993,6 +1009,7 @@ mod tests {
9931009
false,
9941010
HashSet::default(),
9951011
BundleAccountLocker::default(),
1012+
|_| 0,
9961013
);
9971014

9981015
// fund another account so we can send 2 good transactions in a single batch.
@@ -1170,6 +1187,7 @@ mod tests {
11701187
false,
11711188
HashSet::default(),
11721189
BundleAccountLocker::default(),
1190+
|_| 0,
11731191
);
11741192

11751193
// wait for banking_stage to eat the packets
@@ -1377,6 +1395,7 @@ mod tests {
13771395
false,
13781396
HashSet::default(),
13791397
BundleAccountLocker::default(),
1398+
|_| 0,
13801399
);
13811400

13821401
let keypairs = (0..100).map(|_| Keypair::new()).collect_vec();

core/src/banking_stage/consume_worker.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,18 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
5959
self.metrics.clone()
6060
}
6161

62-
pub fn run(self) -> Result<(), ConsumeWorkerError<Tx>> {
62+
pub fn run(self, reservation_cb: impl Fn(&Bank) -> u64) -> Result<(), ConsumeWorkerError<Tx>> {
6363
loop {
6464
let work = self.consume_receiver.recv()?;
65-
self.consume_loop(work)?;
65+
self.consume_loop(work, &reservation_cb)?;
6666
}
6767
}
6868

69-
fn consume_loop(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
69+
fn consume_loop(
70+
&self,
71+
work: ConsumeWork<Tx>,
72+
reservation_cb: &impl Fn(&Bank) -> u64,
73+
) -> Result<(), ConsumeWorkerError<Tx>> {
7074
let (maybe_consume_bank, get_bank_us) = measure_us!(self.get_consume_bank());
7175
let Some(mut bank) = maybe_consume_bank else {
7276
self.metrics
@@ -97,7 +101,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
97101
return self.retry_drain(work);
98102
}
99103
}
100-
self.consume(&bank, work)?;
104+
self.consume(&bank, work, reservation_cb)?;
101105
}
102106

103107
Ok(())
@@ -108,11 +112,13 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
108112
&self,
109113
bank: &Arc<Bank>,
110114
work: ConsumeWork<Tx>,
115+
reservation_cb: &impl Fn(&Bank) -> u64,
111116
) -> Result<(), ConsumeWorkerError<Tx>> {
112117
let output = self.consumer.process_and_record_aged_transactions(
113118
bank,
114119
&work.transactions,
115120
&work.max_ages,
121+
reservation_cb,
116122
);
117123

118124
self.metrics.update_for_consume(&output);
@@ -904,7 +910,7 @@ mod tests {
904910
consumed_receiver,
905911
..
906912
} = &test_frame;
907-
let worker_thread = std::thread::spawn(move || worker.run());
913+
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
908914

909915
let pubkey1 = Pubkey::new_unique();
910916

@@ -949,7 +955,7 @@ mod tests {
949955
consumed_receiver,
950956
..
951957
} = &test_frame;
952-
let worker_thread = std::thread::spawn(move || worker.run());
958+
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
953959
poh_recorder
954960
.write()
955961
.unwrap()
@@ -998,7 +1004,7 @@ mod tests {
9981004
consumed_receiver,
9991005
..
10001006
} = &test_frame;
1001-
let worker_thread = std::thread::spawn(move || worker.run());
1007+
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
10021008
poh_recorder
10031009
.write()
10041010
.unwrap()
@@ -1050,7 +1056,7 @@ mod tests {
10501056
consumed_receiver,
10511057
..
10521058
} = &test_frame;
1053-
let worker_thread = std::thread::spawn(move || worker.run());
1059+
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
10541060
poh_recorder
10551061
.write()
10561062
.unwrap()
@@ -1125,7 +1131,7 @@ mod tests {
11251131
consumed_receiver,
11261132
..
11271133
} = &test_frame;
1128-
let worker_thread = std::thread::spawn(move || worker.run());
1134+
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
11291135
poh_recorder
11301136
.write()
11311137
.unwrap()

0 commit comments

Comments
 (0)