@@ -24,7 +24,6 @@ use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff};
24
24
use tycho_core:: global_config:: MempoolGlobalConfig ;
25
25
use tycho_executor:: { AccountMeta , PublicLibraryChange , TransactionMeta } ;
26
26
use tycho_network:: PeerId ;
27
- use tycho_util:: metrics:: HistogramGuard ;
28
27
use tycho_util:: { DashMapEntry , FastDashMap , FastHashMap , FastHashSet } ;
29
28
30
29
use super :: do_collate:: work_units:: PrepareMsgGroupsWu ;
@@ -1290,9 +1289,6 @@ pub struct CumulativeStatistics {
1290
1289
1291
1290
/// The final aggregated statistics (across all shards) by partitions.
1292
1291
result : FastHashMap < QueuePartitionIdx , QueueStatisticsWithRemaning > ,
1293
-
1294
- /// A flag indicating that data has changed, and we need to recalculate before returning `result`.
1295
- dirty : bool ,
1296
1292
}
1297
1293
1298
1294
impl CumulativeStatistics {
@@ -1306,7 +1302,6 @@ impl CumulativeStatistics {
1306
1302
all_shards_processed_to_by_partitions,
1307
1303
shards_stats_by_partitions : Default :: default ( ) ,
1308
1304
result : Default :: default ( ) ,
1309
- dirty : false ,
1310
1305
}
1311
1306
}
1312
1307
@@ -1345,7 +1340,6 @@ impl CumulativeStatistics {
1345
1340
partitions : & FastHashSet < QueuePartitionIdx > ,
1346
1341
ranges : Vec < QueueShardBoundedRange > ,
1347
1342
) -> Result < ( ) > {
1348
- self . dirty = true ;
1349
1343
tracing:: trace!(
1350
1344
target: tracing_targets:: COLLATOR ,
1351
1345
"cumulative_stats_partial_ranges: {:?}" ,
@@ -1405,9 +1399,6 @@ impl CumulativeStatistics {
1405
1399
by_partitions. retain ( |_part, diffs| !diffs. is_empty ( ) ) ;
1406
1400
!by_partitions. is_empty ( )
1407
1401
} ) ;
1408
-
1409
- self . all_shards_processed_to_by_partitions = new_pt;
1410
- self . dirty = true ;
1411
1402
}
1412
1403
1413
1404
fn compute_cumulative_stats_ranges (
@@ -1469,15 +1460,19 @@ impl CumulativeStatistics {
1469
1460
}
1470
1461
}
1471
1462
1463
+ if !diff_partition_stats. is_empty ( ) {
1464
+ let part_stats = self . result . entry ( partition) . or_default ( ) ;
1465
+ part_stats. initial_stats . append ( & diff_partition_stats) ;
1466
+ part_stats. remaning_stats . append ( & diff_partition_stats) ;
1467
+ }
1468
+
1472
1469
// finally add weeded stats
1473
1470
self . add_diff_partition_stats (
1474
1471
partition,
1475
1472
diff_shard,
1476
1473
diff_max_message,
1477
1474
diff_partition_stats,
1478
1475
) ;
1479
-
1480
- self . dirty = true ;
1481
1476
}
1482
1477
1483
1478
fn add_diff_partition_stats (
@@ -1541,6 +1536,9 @@ impl CumulativeStatistics {
1541
1536
cumulative_stats
1542
1537
. initial_stats
1543
1538
. decrement_for_account ( dst_acc. clone ( ) , * count) ;
1539
+ cumulative_stats
1540
+ . remaning_stats
1541
+ . decrement_for_account ( dst_acc. clone ( ) , * count) ;
1544
1542
false
1545
1543
} else {
1546
1544
true
@@ -1571,15 +1569,12 @@ impl CumulativeStatistics {
1571
1569
/// Returns a reference to the aggregated stats by partitions.
1572
1570
/// If the data is marked as dirty, it triggers a lazy recalculation first.
1573
1571
pub fn result ( & mut self ) -> & FastHashMap < QueuePartitionIdx , QueueStatisticsWithRemaning > {
1574
- self . ensure_finalized ( ) ;
1575
1572
& self . result
1576
1573
}
1577
1574
1578
1575
/// Calc aggregated stats among all partitions.
1579
1576
/// If the data is marked as dirty, it triggers a lazy recalculation first.
1580
1577
pub fn get_aggregated_result ( & mut self ) -> QueueStatistics {
1581
- self . ensure_finalized ( ) ;
1582
-
1583
1578
let mut res: Option < QueueStatistics > = None ;
1584
1579
for stats in self . result . values ( ) {
1585
1580
if let Some ( aggregated) = res. as_mut ( ) {
@@ -1590,42 +1585,6 @@ impl CumulativeStatistics {
1590
1585
}
1591
1586
res. unwrap_or_default ( )
1592
1587
}
1593
-
1594
- /// A helper function to trigger a recalculation if `dirty` is set.
1595
- fn ensure_finalized ( & mut self ) {
1596
- if self . dirty {
1597
- self . recalculate ( ) ;
1598
- }
1599
- }
1600
-
1601
- /// Clears the existing result and aggregates all data from `shards_statistics`.
1602
- fn recalculate ( & mut self ) {
1603
- let _histogram = HistogramGuard :: begin ( "tycho_do_collate_recalculate_statistics_time" ) ;
1604
- self . result . clear ( ) ;
1605
-
1606
- for shard_stats_by_partitions in self . shards_stats_by_partitions . values ( ) {
1607
- for ( & partition, diffs) in shard_stats_by_partitions {
1608
- let mut partition_stats = AccountStatistics :: new ( ) ;
1609
- for diff_stats in diffs. values ( ) {
1610
- for ( account, & count) in diff_stats {
1611
- * partition_stats. entry ( account. clone ( ) ) . or_default ( ) += count;
1612
- }
1613
- }
1614
- self . result
1615
- . entry ( partition)
1616
- . and_modify ( |stats| {
1617
- stats. initial_stats . append ( & partition_stats) ;
1618
- stats. remaning_stats . append ( & partition_stats) ;
1619
- } )
1620
- . or_insert ( QueueStatisticsWithRemaning {
1621
- initial_stats : QueueStatistics :: with_statistics ( partition_stats. clone ( ) ) ,
1622
- remaning_stats : ConcurrentQueueStatistics :: with_statistics ( partition_stats) ,
1623
- } ) ;
1624
- }
1625
- }
1626
-
1627
- self . dirty = false ;
1628
- }
1629
1588
}
1630
1589
1631
1590
#[ derive( Debug , Default , Clone ) ]
@@ -1634,12 +1593,6 @@ pub struct ConcurrentQueueStatistics {
1634
1593
}
1635
1594
1636
1595
impl ConcurrentQueueStatistics {
1637
- pub fn with_statistics ( statistics : AccountStatistics ) -> Self {
1638
- Self {
1639
- statistics : Arc :: new ( statistics. into_iter ( ) . collect ( ) ) ,
1640
- }
1641
- }
1642
-
1643
1596
pub fn statistics ( & self ) -> & FastDashMap < IntAddr , u64 > {
1644
1597
& self . statistics
1645
1598
}
@@ -1694,7 +1647,6 @@ impl fmt::Debug for CumulativeStatistics {
1694
1647
)
1695
1648
. field ( "shards_stats_by_partitions" , & shards_summary)
1696
1649
. field ( "result" , & format ! ( "{} partitions" , self . result. len( ) ) )
1697
- . field ( "dirty" , & self . dirty )
1698
1650
. finish ( )
1699
1651
}
1700
1652
}
0 commit comments