Skip to content

Commit 7528aa9

Browse files
committed
feature(collator): remove processed data from reused statistics
1 parent cbd1926 commit 7528aa9

File tree

3 files changed

+219
-21
lines changed

3 files changed

+219
-21
lines changed

collator/src/collator/do_collate/mod.rs

Lines changed: 116 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,26 @@ impl CollatorStdImpl {
105105
};
106106
let created_by = author.to_bytes().into();
107107

108+
let is_first_block_after_prev_master = is_first_block_after_prev_master(
109+
prev_shard_data.blocks_ids()[0], // TODO: consider split/merge
110+
&mc_data.shards,
111+
);
112+
let part_stat_ranges = if is_first_block_after_prev_master && mc_data.block_id.seqno > 0 {
113+
if next_block_id_short.is_masterchain() {
114+
self.mc_compute_part_stat_ranges(
115+
&mc_data,
116+
&next_block_id_short,
117+
top_shard_blocks_info.clone().unwrap(),
118+
)
119+
.await?
120+
} else {
121+
self.compute_part_stat_ranges(&mc_data, &next_block_id_short)
122+
.await?
123+
}
124+
} else {
125+
None
126+
};
127+
108128
let collation_data = self.create_collation_data(
109129
next_block_id_short,
110130
next_chain_time,
@@ -117,18 +137,6 @@ impl CollatorStdImpl {
117137
let anchors_cache = std::mem::take(&mut self.anchors_cache);
118138
let block_serializer_cache = self.block_serializer_cache.clone();
119139

120-
let is_first_block_after_prev_master = is_first_block_after_prev_master(
121-
prev_shard_data.blocks_ids()[0], // TODO: consider split/merge
122-
&mc_data.shards,
123-
);
124-
125-
let part_stat_ranges = if is_first_block_after_prev_master && mc_data.block_id.seqno > 0 {
126-
self.compute_part_stat_ranges(&mc_data, &collation_data.block_id_short)
127-
.await?
128-
} else {
129-
None
130-
};
131-
132140
let state = Box::new(ActualState {
133141
collation_config,
134142
collation_data,
@@ -1205,21 +1213,25 @@ impl CollatorStdImpl {
12051213
block_id_short: &BlockIdShort,
12061214
) -> Result<Option<Vec<QueueShardBoundedRange>>> {
12071215
let mc_block_id = mc_data.block_id;
1216+
12081217
let prev_mc_block_id = mc_data
12091218
.prev_mc_block_id
12101219
.context("Prev MC block must be present")?;
12111220

12121221
if prev_mc_block_id.seqno + 1 != mc_block_id.seqno {
1213-
bail!(
1222+
tracing::error!(
1223+
target: tracing_targets::COLLATOR,
12141224
"Prev MC block ID has an incorrect sequence. Prev: {prev_mc_block_id:?}. \
12151225
Current: {mc_block_id:?}"
12161226
);
1227+
return Ok(None);
12171228
}
12181229

12191230
let prev_mc_state = self
12201231
.state_node_adapter
12211232
.load_state(&prev_mc_block_id)
12221233
.await?;
1234+
12231235
let prev_mc_block_shards: FastHashMap<ShardIdent, ShardDescriptionShort> = prev_mc_state
12241236
.state_extra()
12251237
.cloned()?
@@ -1257,6 +1269,10 @@ impl CollatorStdImpl {
12571269
return Ok(None);
12581270
};
12591271

1272+
if prev_descr.seqno == 0 {
1273+
return Ok(None);
1274+
}
1275+
12601276
let Some(first_diff_msg) = self
12611277
.get_max_message(&prev_descr.get_block_id(*shard))
12621278
.await
@@ -1282,7 +1298,93 @@ impl CollatorStdImpl {
12821298
Ok(Some(ranges))
12831299
}
12841300

1285-
// Async helper function that retrieves max_message from either MQ or state diff
1301+
async fn mc_compute_part_stat_ranges(
1302+
&self,
1303+
mc_data: &McData,
1304+
block_id_short: &BlockIdShort,
1305+
top_shard_blocks_info: Vec<TopBlockDescription>,
1306+
) -> Result<Option<Vec<QueueShardBoundedRange>>> {
1307+
let prev_mc_block_id = mc_data.block_id;
1308+
1309+
if prev_mc_block_id.seqno + 1 != block_id_short.seqno {
1310+
tracing::error!(
1311+
target: tracing_targets::COLLATOR,
1312+
"Prev MC block ID has an incorrect sequence. Prev: {prev_mc_block_id:?}.
1313+
Current: {:?}",
1314+
block_id_short.seqno
1315+
);
1316+
return Ok(None);
1317+
}
1318+
1319+
let prev_mc_state = self
1320+
.state_node_adapter
1321+
.load_state(&prev_mc_block_id)
1322+
.await?;
1323+
1324+
let prev_mc_block_shards: FastHashMap<ShardIdent, ShardDescriptionShort> = prev_mc_state
1325+
.state_extra()
1326+
.cloned()?
1327+
.shards
1328+
.as_vec()?
1329+
.iter()
1330+
.cloned()
1331+
.collect();
1332+
1333+
let mut ranges = Vec::<QueueShardBoundedRange>::new();
1334+
1335+
// Load max_message from masterchain block diff
1336+
let Some(master_max_msg) = self
1337+
.get_max_message(&prev_mc_block_id)
1338+
.await
1339+
.context("loading diff for mc block")?
1340+
else {
1341+
return Ok(None);
1342+
};
1343+
1344+
ranges.push(QueueShardBoundedRange {
1345+
shard_ident: prev_mc_block_id.shard,
1346+
from: Bound::Included(master_max_msg),
1347+
to: Bound::Included(master_max_msg),
1348+
});
1349+
1350+
// Iterate over all updated shard blocks and add their diff ranges
1351+
for top_block_description in top_shard_blocks_info {
1352+
let Some(prev_descr) = prev_mc_block_shards.get(&top_block_description.block_id.shard)
1353+
else {
1354+
tracing::warn!(target: tracing_targets::COLLATOR, "prev_mc_block_shards not found: {:?}", top_block_description.block_id.shard);
1355+
return Ok(None);
1356+
};
1357+
1358+
if prev_descr.seqno == 0 {
1359+
return Ok(None);
1360+
}
1361+
1362+
let Some(first_diff_msg) = self
1363+
.get_max_message(&prev_descr.get_block_id(top_block_description.block_id.shard))
1364+
.await
1365+
.context("loading first diff msg")?
1366+
else {
1367+
return Ok(None);
1368+
};
1369+
let Some(last_diff_msg) = self
1370+
.get_max_message(&top_block_description.block_id)
1371+
.await
1372+
.context("loading last diff msg")?
1373+
else {
1374+
return Ok(None);
1375+
};
1376+
1377+
ranges.push(QueueShardBoundedRange {
1378+
shard_ident: top_block_description.block_id.shard,
1379+
from: Bound::Excluded(first_diff_msg),
1380+
to: Bound::Included(last_diff_msg),
1381+
});
1382+
}
1383+
1384+
Ok(Some(ranges))
1385+
}
1386+
1387+
// helper function that retrieves max_message from either MQ or state diff
12861388
async fn get_max_message(&self, block_id: &BlockId) -> Result<Option<QueueKey>> {
12871389
if let Some(diff) =
12881390
self.mq_adapter

collator/src/collator/messages_reader/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,16 +217,29 @@ impl<V: InternalMessageValue> MessagesReader<V> {
217217
.copied()
218218
.collect();
219219

220+
cumulative_stats_just_loaded = true;
220221
match (previous_cumulative_statistics, cx.part_stat_ranges) {
221222
(Some(mut previous_cumulative_statistics), Some(part_stat_ranges)) => {
223+
// update all_shards_processed_to_by_partitions and truncate processed data
222224
previous_cumulative_statistics.update_processed_to_by_partitions(
223-
params.all_shards_processed_to_by_partitions,
225+
params.all_shards_processed_to_by_partitions.clone(),
224226
);
227+
228+
// truncate data before range
229+
previous_cumulative_statistics.truncate_before(
230+
&cx.for_shard_id,
231+
cx.prev_state_gen_lt,
232+
cx.mc_state_gen_lt,
233+
&cx.mc_top_shards_end_lts.iter().copied().collect(),
234+
);
235+
236+
// partial load statistics and add enrich current value
225237
previous_cumulative_statistics.load_partial(
226238
mq_adapter.clone(),
227239
&partitions,
228240
part_stat_ranges,
229241
)?;
242+
230243
previous_cumulative_statistics
231244
}
232245
_ => {
@@ -240,7 +253,6 @@ impl<V: InternalMessageValue> MessagesReader<V> {
240253
cx.mc_state_gen_lt,
241254
&cx.mc_top_shards_end_lts.iter().copied().collect(),
242255
)?;
243-
cumulative_stats_just_loaded = true;
244256
inner_cumulative_statistics
245257
}
246258
}

collator/src/collator/types.rs

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::collections::{BTreeMap, VecDeque};
2+
use std::fmt;
23
use std::sync::{Arc, OnceLock};
34
use std::time::Duration;
45

@@ -1342,6 +1343,7 @@ impl CumulativeStatistics {
13421343
partitions: &FastHashSet<QueuePartitionIdx>,
13431344
ranges: Vec<QueueShardBoundedRange>,
13441345
) -> Result<()> {
1346+
self.dirty = true;
13451347
tracing::trace!(
13461348
target: tracing_targets::COLLATOR,
13471349
"cumulative_stats_partial_ranges: {:?}",
@@ -1376,14 +1378,64 @@ impl CumulativeStatistics {
13761378
Ok(())
13771379
}
13781380

1381+
pub fn truncate_before(
1382+
&mut self,
1383+
current_shard: &ShardIdent,
1384+
prev_state_gen_lt: Lt,
1385+
mc_state_gen_lt: Lt,
1386+
mc_top_shards_end_lts: &FastHashMap<ShardIdent, Lt>,
1387+
) {
1388+
let ranges = Self::compute_cumulative_stats_ranges(
1389+
current_shard,
1390+
&self.all_shards_processed_to_by_partitions,
1391+
prev_state_gen_lt,
1392+
mc_state_gen_lt,
1393+
mc_top_shards_end_lts,
1394+
);
1395+
1396+
for r in &ranges {
1397+
self.truncate_range(r);
1398+
}
1399+
1400+
self.dirty = true;
1401+
}
1402+
1403+
fn truncate_range(&mut self, range: &QueueShardBoundedRange) {
1404+
let cut_key = match &range.from {
1405+
Bound::Included(k) | Bound::Excluded(k) => *k,
1406+
};
1407+
1408+
if let Some(by_partitions) = self.shards_stats_by_partitions.get_mut(&range.shard_ident) {
1409+
by_partitions.retain(|_part, diffs| {
1410+
diffs.retain(|k, _| k > &cut_key);
1411+
!diffs.is_empty()
1412+
});
1413+
1414+
if by_partitions.is_empty() {
1415+
self.shards_stats_by_partitions.remove(&range.shard_ident);
1416+
}
1417+
}
1418+
}
1419+
13791420
pub fn update_processed_to_by_partitions(
13801421
&mut self,
1381-
all_shards_processed_to_by_partitions: FastHashMap<
1382-
ShardIdent,
1383-
(bool, ProcessedToByPartitions),
1384-
>,
1422+
new_pt: FastHashMap<ShardIdent, (bool, ProcessedToByPartitions)>,
13851423
) {
1386-
self.all_shards_processed_to_by_partitions = all_shards_processed_to_by_partitions;
1424+
for (&dst_shard, (_, ref processed_to)) in &new_pt {
1425+
let changed = match self.all_shards_processed_to_by_partitions.get(&dst_shard) {
1426+
Some((_, old_pt)) => old_pt != processed_to,
1427+
None => true,
1428+
};
1429+
1430+
if changed {
1431+
self.handle_processed_to_update(dst_shard, processed_to.clone());
1432+
}
1433+
}
1434+
1435+
// TODO should remove full stat by shard if it doesn't exist in new_pt?
1436+
1437+
self.all_shards_processed_to_by_partitions = new_pt;
1438+
self.dirty = true;
13871439
}
13881440

13891441
fn compute_cumulative_stats_ranges(
@@ -1444,6 +1496,9 @@ impl CumulativeStatistics {
14441496
}
14451497
}
14461498
}
1499+
if diff_partition_stats.is_empty() {
1500+
return;
1501+
}
14471502

14481503
// finally add weeded stats
14491504
self.add_diff_partition_stats(
@@ -1647,3 +1702,32 @@ impl ConcurrentQueueStatistics {
16471702
}
16481703
}
16491704
}
1705+
1706+
impl fmt::Debug for CumulativeStatistics {
1707+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1708+
let shards_summary: Vec<String> = self
1709+
.shards_stats_by_partitions
1710+
.iter()
1711+
.map(|(shard_id, by_partitions)| {
1712+
let parts: Vec<String> = by_partitions
1713+
.iter()
1714+
.map(|(part, diffs)| format!("p{}:{}", part, diffs.len()))
1715+
.collect();
1716+
format!("{} -> {}", shard_id, parts.join(", "))
1717+
})
1718+
.collect();
1719+
1720+
f.debug_struct("CumulativeStatistics")
1721+
.field(
1722+
"processed_to",
1723+
&format!(
1724+
"{} shards",
1725+
self.all_shards_processed_to_by_partitions.len()
1726+
),
1727+
)
1728+
.field("shards_stats_by_partitions", &shards_summary)
1729+
.field("result", &format!("{} partitions", self.result.len()))
1730+
.field("dirty", &self.dirty)
1731+
.finish()
1732+
}
1733+
}

0 commit comments

Comments
 (0)