Skip to content

Commit e1864af

Browse files
committed
feature(collator): remove cumulative stats recalculation
1 parent b02b785 commit e1864af

File tree

3 files changed

+27
-62
lines changed

3 files changed

+27
-62
lines changed

collator/src/collator/messages_reader/internals_reader.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,6 @@ impl<V: InternalMessageValue> InternalsPartitionReader<V> {
356356

357357
ranges.push(QueueShardBoundedRange {
358358
shard_ident: *shard_id,
359-
// TODO: may be we should pass `from` here because we should load stats for the full range
360359
from: Bound::Excluded(shard_reader_state.current_position),
361360
to: Bound::Included(shard_range_to),
362361
});

collator/src/collator/messages_reader/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,6 @@ impl<V: InternalMessageValue> MessagesReader<V> {
217217
.copied()
218218
.collect();
219219

220-
cumulative_stats_just_loaded = true;
221220
match (previous_cumulative_statistics, cx.part_stat_ranges) {
222221
(Some(mut previous_cumulative_statistics), Some(part_stat_ranges)) => {
223222
// update all_shards_processed_to_by_partitions and truncate processed data
@@ -235,11 +234,13 @@ impl<V: InternalMessageValue> MessagesReader<V> {
235234
previous_cumulative_statistics
236235
}
237236
_ => {
238-
let mut inner_cumulative_statistics =
239-
CumulativeStatistics::new(params.all_shards_processed_to_by_partitions);
237+
cumulative_stats_just_loaded = true;
238+
let mut inner_cumulative_statistics = CumulativeStatistics::new(
239+
cx.for_shard_id,
240+
params.all_shards_processed_to_by_partitions,
241+
);
240242
inner_cumulative_statistics.load(
241243
mq_adapter.clone(),
242-
&cx.for_shard_id,
243244
&partitions,
244245
cx.prev_state_gen_lt,
245246
cx.mc_state_gen_lt,

collator/src/collator/types.rs

Lines changed: 22 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff};
2424
use tycho_core::global_config::MempoolGlobalConfig;
2525
use tycho_executor::{AccountMeta, PublicLibraryChange, TransactionMeta};
2626
use tycho_network::PeerId;
27-
use tycho_util::metrics::HistogramGuard;
2827
use tycho_util::{DashMapEntry, FastDashMap, FastHashMap, FastHashSet};
2928

3029
use super::do_collate::work_units::PrepareMsgGroupsWu;
@@ -1281,6 +1280,9 @@ pub struct QueueStatisticsWithRemaning {
12811280
}
12821281

12831282
pub struct CumulativeStatistics {
1283+
/// Cumulative statistics created for this shard. When this shard reads statistics, it decrements `remaining messages`
1284+
/// Another shard can decrements the statistics remaining only by using `update_processed_to_by_partitions`
1285+
for_shard: ShardIdent,
12841286
/// Actual processed to info for master and all shards
12851287
all_shards_processed_to_by_partitions: FastHashMap<ShardIdent, (bool, ProcessedToByPartitions)>,
12861288

@@ -1290,38 +1292,35 @@ pub struct CumulativeStatistics {
12901292

12911293
/// The final aggregated statistics (across all shards) by partitions.
12921294
result: FastHashMap<QueuePartitionIdx, QueueStatisticsWithRemaning>,
1293-
1294-
/// A flag indicating that data has changed, and we need to recalculate before returning `result`.
1295-
dirty: bool,
12961295
}
12971296

12981297
impl CumulativeStatistics {
12991298
pub fn new(
1299+
for_shard: ShardIdent,
13001300
all_shards_processed_to_by_partitions: FastHashMap<
13011301
ShardIdent,
13021302
(bool, ProcessedToByPartitions),
13031303
>,
13041304
) -> Self {
13051305
Self {
1306+
for_shard,
13061307
all_shards_processed_to_by_partitions,
13071308
shards_stats_by_partitions: Default::default(),
13081309
result: Default::default(),
1309-
dirty: false,
13101310
}
13111311
}
13121312

13131313
/// Create range and full load statistics and store it
13141314
pub fn load<V: InternalMessageValue>(
13151315
&mut self,
13161316
mq_adapter: Arc<dyn MessageQueueAdapter<V>>,
1317-
current_shard: &ShardIdent,
13181317
partitions: &FastHashSet<QueuePartitionIdx>,
13191318
prev_state_gen_lt: Lt,
13201319
mc_state_gen_lt: Lt,
13211320
mc_top_shards_end_lts: &FastHashMap<ShardIdent, Lt>,
13221321
) -> Result<()> {
13231322
let ranges = Self::compute_cumulative_stats_ranges(
1324-
current_shard,
1323+
&self.for_shard,
13251324
&self.all_shards_processed_to_by_partitions,
13261325
prev_state_gen_lt,
13271326
mc_state_gen_lt,
@@ -1345,7 +1344,6 @@ impl CumulativeStatistics {
13451344
partitions: &FastHashSet<QueuePartitionIdx>,
13461345
ranges: Vec<QueueShardBoundedRange>,
13471346
) -> Result<()> {
1348-
self.dirty = true;
13491347
tracing::trace!(
13501348
target: tracing_targets::COLLATOR,
13511349
"cumulative_stats_partial_ranges: {:?}",
@@ -1407,7 +1405,6 @@ impl CumulativeStatistics {
14071405
});
14081406

14091407
self.all_shards_processed_to_by_partitions = new_pt;
1410-
self.dirty = true;
14111408
}
14121409

14131410
fn compute_cumulative_stats_ranges(
@@ -1469,15 +1466,23 @@ impl CumulativeStatistics {
14691466
}
14701467
}
14711468

1469+
let entry = self
1470+
.result
1471+
.entry(partition)
1472+
.or_insert_with(|| QueueStatisticsWithRemaning {
1473+
initial_stats: QueueStatistics::default(),
1474+
remaning_stats: ConcurrentQueueStatistics::default(),
1475+
});
1476+
entry.initial_stats.append(&diff_partition_stats);
1477+
entry.remaning_stats.append(&diff_partition_stats);
1478+
14721479
// finally add weeded stats
14731480
self.add_diff_partition_stats(
14741481
partition,
14751482
diff_shard,
14761483
diff_max_message,
14771484
diff_partition_stats,
14781485
);
1479-
1480-
self.dirty = true;
14811486
}
14821487

14831488
fn add_diff_partition_stats(
@@ -1541,6 +1546,12 @@ impl CumulativeStatistics {
15411546
cumulative_stats
15421547
.initial_stats
15431548
.decrement_for_account(dst_acc.clone(), *count);
1549+
1550+
if self.for_shard != dst_shard {
1551+
cumulative_stats
1552+
.remaning_stats
1553+
.decrement_for_account(dst_acc.clone(), *count);
1554+
}
15441555
false
15451556
} else {
15461557
true
@@ -1571,15 +1582,12 @@ impl CumulativeStatistics {
15711582
/// Returns a reference to the aggregated stats by partitions.
15721583
/// If the data is marked as dirty, it triggers a lazy recalculation first.
15731584
pub fn result(&mut self) -> &FastHashMap<QueuePartitionIdx, QueueStatisticsWithRemaning> {
1574-
self.ensure_finalized();
15751585
&self.result
15761586
}
15771587

15781588
/// Calc aggregated stats among all partitions.
15791589
/// If the data is marked as dirty, it triggers a lazy recalculation first.
15801590
pub fn get_aggregated_result(&mut self) -> QueueStatistics {
1581-
self.ensure_finalized();
1582-
15831591
let mut res: Option<QueueStatistics> = None;
15841592
for stats in self.result.values() {
15851593
if let Some(aggregated) = res.as_mut() {
@@ -1590,42 +1598,6 @@ impl CumulativeStatistics {
15901598
}
15911599
res.unwrap_or_default()
15921600
}
1593-
1594-
/// A helper function to trigger a recalculation if `dirty` is set.
1595-
fn ensure_finalized(&mut self) {
1596-
if self.dirty {
1597-
self.recalculate();
1598-
}
1599-
}
1600-
1601-
/// Clears the existing result and aggregates all data from `shards_statistics`.
1602-
fn recalculate(&mut self) {
1603-
let _histogram = HistogramGuard::begin("tycho_do_collate_recalculate_statistics_time");
1604-
self.result.clear();
1605-
1606-
for shard_stats_by_partitions in self.shards_stats_by_partitions.values() {
1607-
for (&partition, diffs) in shard_stats_by_partitions {
1608-
let mut partition_stats = AccountStatistics::new();
1609-
for diff_stats in diffs.values() {
1610-
for (account, &count) in diff_stats {
1611-
*partition_stats.entry(account.clone()).or_default() += count;
1612-
}
1613-
}
1614-
self.result
1615-
.entry(partition)
1616-
.and_modify(|stats| {
1617-
stats.initial_stats.append(&partition_stats);
1618-
stats.remaning_stats.append(&partition_stats);
1619-
})
1620-
.or_insert(QueueStatisticsWithRemaning {
1621-
initial_stats: QueueStatistics::with_statistics(partition_stats.clone()),
1622-
remaning_stats: ConcurrentQueueStatistics::with_statistics(partition_stats),
1623-
});
1624-
}
1625-
}
1626-
1627-
self.dirty = false;
1628-
}
16291601
}
16301602

16311603
#[derive(Debug, Default, Clone)]
@@ -1634,12 +1606,6 @@ pub struct ConcurrentQueueStatistics {
16341606
}
16351607

16361608
impl ConcurrentQueueStatistics {
1637-
pub fn with_statistics(statistics: AccountStatistics) -> Self {
1638-
Self {
1639-
statistics: Arc::new(statistics.into_iter().collect()),
1640-
}
1641-
}
1642-
16431609
pub fn statistics(&self) -> &FastDashMap<IntAddr, u64> {
16441610
&self.statistics
16451611
}
@@ -1694,7 +1660,6 @@ impl fmt::Debug for CumulativeStatistics {
16941660
)
16951661
.field("shards_stats_by_partitions", &shards_summary)
16961662
.field("result", &format!("{} partitions", self.result.len()))
1697-
.field("dirty", &self.dirty)
16981663
.finish()
16991664
}
17001665
}

0 commit comments

Comments
 (0)