Skip to content

Single banking stage thread for votes #5813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 25 additions & 36 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@
use qualifier_attr::qualifiers;
use {
self::{
committer::Committer,
consumer::Consumer,
decision_maker::DecisionMaker,
latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource},
packet_receiver::PacketReceiver,
qos_service::QosService,
vote_storage::VoteStorage,
committer::Committer, consumer::Consumer, decision_maker::DecisionMaker,
latest_unprocessed_votes::LatestUnprocessedVotes, packet_receiver::PacketReceiver,
qos_service::QosService, vote_storage::VoteStorage,
},
crate::{
banking_stage::{
Expand Down Expand Up @@ -98,7 +94,6 @@ const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10);
#[derive(Debug, Default)]
pub struct BankingStageStats {
last_report: AtomicInterval,
id: String,
receive_and_buffer_packets_count: AtomicUsize,
dropped_packets_count: AtomicUsize,
pub(crate) dropped_duplicated_packets_count: AtomicUsize,
Expand All @@ -119,9 +114,8 @@ pub struct BankingStageStats {
}

impl BankingStageStats {
pub fn new(id: u32) -> Self {
pub fn new() -> Self {
BankingStageStats {
id: id.to_string(),
batch_packet_indexes_len: Histogram::configure()
.max_value(PACKETS_PER_BATCH as u64)
.build()
Expand Down Expand Up @@ -162,8 +156,7 @@ impl BankingStageStats {
}
if self.last_report.should_update(report_interval_ms) {
datapoint_info!(
"banking_stage-loop-stats",
"id" => self.id,
"banking_stage-vote_loop_stats",
(
"receive_and_buffer_packets_count",
self.receive_and_buffer_packets_count
Expand Down Expand Up @@ -416,7 +409,7 @@ impl BankingStage {
// Keeps track of extraneous vote transactions for the vote threads
let latest_unprocessed_votes = {
let bank = bank_forks.read().unwrap().working_bank();
Arc::new(LatestUnprocessedVotes::new(&bank))
LatestUnprocessedVotes::new(&bank)
};

let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
Expand All @@ -429,22 +422,17 @@ impl BankingStage {
// + 1 for the central scheduler thread
let mut bank_thread_hdls = Vec::with_capacity(num_threads as usize + 1);

// Spawn legacy voting threads first: 1 gossip, 1 tpu
for (id, packet_receiver, vote_source) in [
(0, gossip_vote_receiver, VoteSource::Gossip),
(1, tpu_vote_receiver, VoteSource::Tpu),
] {
bank_thread_hdls.push(Self::spawn_vote_worker(
id,
packet_receiver,
decision_maker.clone(),
bank_forks.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
VoteStorage::new(latest_unprocessed_votes.clone(), vote_source),
));
}
// Spawn legacy voting thread
bank_thread_hdls.push(Self::spawn_vote_worker(
tpu_vote_receiver,
gossip_vote_receiver,
decision_maker.clone(),
bank_forks.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
VoteStorage::new(latest_unprocessed_votes),
));

match transaction_struct {
TransactionStructure::Sdk => {
Expand Down Expand Up @@ -584,33 +572,34 @@ impl BankingStage {
}

fn spawn_vote_worker(
id: u32,
packet_receiver: BankingPacketReceiver,
tpu_receiver: BankingPacketReceiver,
gossip_receiver: BankingPacketReceiver,
decision_maker: DecisionMaker,
bank_forks: Arc<RwLock<BankForks>>,
committer: Committer,
transaction_recorder: TransactionRecorder,
log_messages_bytes_limit: Option<usize>,
vote_storage: VoteStorage,
) -> JoinHandle<()> {
let packet_receiver = PacketReceiver::new(id, packet_receiver);
let tpu_receiver = PacketReceiver::new(tpu_receiver);
let gossip_receiver = PacketReceiver::new(gossip_receiver);
let consumer = Consumer::new(
committer,
transaction_recorder,
QosService::new(id),
QosService::new(0),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to also remove id from QosService?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That one we cannot because the non-vote threads also have a QosService

log_messages_bytes_limit,
);

Builder::new()
.name(format!("solBanknStgTx{id:02}"))
.name("solBanknStgVote".to_string())
.spawn(move || {
VoteWorker::new(
decision_maker,
packet_receiver,
tpu_receiver,
gossip_receiver,
vote_storage,
bank_forks,
consumer,
id,
)
.run()
})
Expand Down
80 changes: 30 additions & 50 deletions core/src/banking_stage/leader_slot_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,11 @@ impl LeaderSlotPacketCountMetrics {
Self::default()
}

fn report(&self, id: &str, slot: Slot) {
fn report(&self, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_packet_counts",
"id" => id,
"banking_stage-vote_slot_packet_counts",
("slot", slot, i64),
(
"total_new_valid_packets",
self.total_new_valid_packets,
i64
),
("total_new_valid_packets", self.total_new_valid_packets, i64),
(
"newly_failed_sigverify_count",
self.newly_failed_sigverify_count,
Expand All @@ -213,11 +208,7 @@ impl LeaderSlotPacketCountMetrics {
self.excessive_precompile_count,
i64
),
(
"invalid_votes_count",
self.invalid_votes_count,
i64
),
("invalid_votes_count", self.invalid_votes_count, i64),
(
"exceeded_buffer_limit_dropped_packets_count",
self.exceeded_buffer_limit_dropped_packets_count,
Expand Down Expand Up @@ -253,11 +244,7 @@ impl LeaderSlotPacketCountMetrics {
self.retryable_errored_transaction_count,
i64
),
(
"retryable_packets_count",
self.retryable_packets_count,
i64
),
("retryable_packets_count", self.retryable_packets_count, i64),
(
"nonretryable_errored_transactions_count",
self.nonretryable_errored_transactions_count,
Expand Down Expand Up @@ -292,10 +279,9 @@ impl LeaderSlotPacketCountMetrics {
}
}

fn report_transaction_error_metrics(errors: &TransactionErrorMetrics, id: &str, slot: Slot) {
fn report_transaction_error_metrics(errors: &TransactionErrorMetrics, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_transaction_errors",
"id" => id,
"banking_stage-vote_slot_transaction_errors",
("slot", slot as i64, i64),
("total", errors.total.0 as i64, i64),
("account_in_use", errors.account_in_use.0 as i64, i64),
Expand All @@ -310,12 +296,24 @@ fn report_transaction_error_metrics(errors: &TransactionErrorMetrics, id: &str,
i64
),
("account_not_found", errors.account_not_found.0 as i64, i64),
("blockhash_not_found", errors.blockhash_not_found.0 as i64, i64),
(
"blockhash_not_found",
errors.blockhash_not_found.0 as i64,
i64
),
("blockhash_too_old", errors.blockhash_too_old.0 as i64, i64),
("call_chain_too_deep", errors.call_chain_too_deep.0 as i64, i64),
(
"call_chain_too_deep",
errors.call_chain_too_deep.0 as i64,
i64
),
("already_processed", errors.already_processed.0 as i64, i64),
("instruction_error", errors.instruction_error.0 as i64, i64),
("insufficient_funds", errors.insufficient_funds.0 as i64, i64),
(
"insufficient_funds",
errors.insufficient_funds.0 as i64,
i64
),
(
"invalid_account_for_fee",
errors.invalid_account_for_fee.0 as i64,
Expand Down Expand Up @@ -386,11 +384,6 @@ fn report_transaction_error_metrics(errors: &TransactionErrorMetrics, id: &str,

#[derive(Debug)]
pub(crate) struct LeaderSlotMetrics {
// banking_stage creates one QosService instance per working threads, that is uniquely
// identified by id. This field allows to categorize metrics for gossip votes, TPU votes
// and other transactions.
id: String,

// aggregate metrics per slot
slot: Slot,

Expand All @@ -407,9 +400,8 @@ pub(crate) struct LeaderSlotMetrics {
}

impl LeaderSlotMetrics {
pub(crate) fn new(id: u32, slot: Slot, bank_creation_time: &Instant) -> Self {
pub(crate) fn new(slot: Slot, bank_creation_time: &Instant) -> Self {
Self {
id: id.to_string(),
slot,
packet_count_metrics: LeaderSlotPacketCountMetrics::new(),
transaction_error_metrics: TransactionErrorMetrics::new(),
Expand All @@ -422,10 +414,10 @@ impl LeaderSlotMetrics {
pub(crate) fn report(&mut self) {
self.is_reported = true;

self.timing_metrics.report(&self.id, self.slot);
report_transaction_error_metrics(&self.transaction_error_metrics, &self.id, self.slot);
self.packet_count_metrics.report(&self.id, self.slot);
self.vote_packet_count_metrics.report(&self.id, self.slot);
self.timing_metrics.report(self.slot);
report_transaction_error_metrics(&self.transaction_error_metrics, self.slot);
self.packet_count_metrics.report(self.slot);
self.vote_packet_count_metrics.report(self.slot);
}

/// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None
Expand Down Expand Up @@ -458,10 +450,9 @@ impl VotePacketCountMetrics {
Self::default()
}

fn report(&self, id: &str, slot: Slot) {
fn report(&self, slot: Slot) {
datapoint_info!(
"banking_stage-vote_packet_counts",
"id" => id,
("slot", slot, i64),
("dropped_gossip_votes", self.dropped_gossip_votes, i64),
("dropped_tpu_votes", self.dropped_tpu_votes, i64)
Expand All @@ -477,22 +468,14 @@ pub(crate) enum MetricsTrackerAction {
ReportAndNewTracker(Option<LeaderSlotMetrics>),
}

#[derive(Debug)]
#[derive(Debug, Default)]
pub struct LeaderSlotMetricsTracker {
// Only `Some` if BankingStage detects it's time to construct our leader slot,
// otherwise `None`
leader_slot_metrics: Option<LeaderSlotMetrics>,
id: u32,
}

impl LeaderSlotMetricsTracker {
pub fn new(id: u32) -> Self {
Self {
leader_slot_metrics: None,
id,
}
}

// Check leader slot, return MetricsTrackerAction to be applied by apply_action()
pub(crate) fn check_leader_slot_boundary(
&mut self,
Expand All @@ -509,7 +492,6 @@ impl LeaderSlotMetricsTracker {
// Our leader slot has begain, time to create a new slot tracker
(None, Some(bank_start)) => {
MetricsTrackerAction::NewTracker(Some(LeaderSlotMetrics::new(
self.id,
bank_start.working_bank.slot(),
&bank_start.bank_creation_time,
)))
Expand All @@ -520,7 +502,6 @@ impl LeaderSlotMetricsTracker {
// Last slot has ended, new slot has began
leader_slot_metrics.mark_slot_end_detected();
MetricsTrackerAction::ReportAndNewTracker(Some(LeaderSlotMetrics::new(
self.id,
bank_start.working_bank.slot(),
&bank_start.bank_creation_time,
)))
Expand Down Expand Up @@ -910,8 +891,7 @@ mod tests {
bank_creation_time: Arc::new(Instant::now()),
};

let banking_stage_thread_id = 0;
let leader_slot_metrics_tracker = LeaderSlotMetricsTracker::new(banking_stage_thread_id);
let leader_slot_metrics_tracker = LeaderSlotMetricsTracker::default();

TestSlotBoundaryComponents {
first_bank,
Expand Down
Loading
Loading