Skip to content

Commit e32b0c8

Browse files
committed
feature(collator): remove processed data from reused statistics
1 parent cbd1926 commit e32b0c8

File tree

4 files changed

+230
-22
lines changed

4 files changed

+230
-22
lines changed

collator/src/collator/do_collate/finalize.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -744,7 +744,6 @@ impl Phase<FinalizeState> {
744744
.collation_data
745745
.mc_shards_processed_to_by_partitions
746746
.clone(),
747-
748747
prev_mc_block_id: Some(self.state.mc_data.block_id),
749748
}))
750749
}

collator/src/collator/do_collate/mod.rs

Lines changed: 122 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,26 @@ impl CollatorStdImpl {
105105
};
106106
let created_by = author.to_bytes().into();
107107

108+
let is_first_block_after_prev_master = is_first_block_after_prev_master(
109+
prev_shard_data.blocks_ids()[0], // TODO: consider split/merge
110+
&mc_data.shards,
111+
);
112+
let part_stat_ranges = if is_first_block_after_prev_master && mc_data.block_id.seqno > 0 {
113+
if next_block_id_short.is_masterchain() {
114+
self.mc_compute_part_stat_ranges(
115+
&mc_data,
116+
&next_block_id_short,
117+
top_shard_blocks_info.clone().unwrap(),
118+
)
119+
.await?
120+
} else {
121+
self.compute_part_stat_ranges(&mc_data, &next_block_id_short)
122+
.await?
123+
}
124+
} else {
125+
None
126+
};
127+
108128
let collation_data = self.create_collation_data(
109129
next_block_id_short,
110130
next_chain_time,
@@ -117,18 +137,6 @@ impl CollatorStdImpl {
117137
let anchors_cache = std::mem::take(&mut self.anchors_cache);
118138
let block_serializer_cache = self.block_serializer_cache.clone();
119139

120-
let is_first_block_after_prev_master = is_first_block_after_prev_master(
121-
prev_shard_data.blocks_ids()[0], // TODO: consider split/merge
122-
&mc_data.shards,
123-
);
124-
125-
let part_stat_ranges = if is_first_block_after_prev_master && mc_data.block_id.seqno > 0 {
126-
self.compute_part_stat_ranges(&mc_data, &collation_data.block_id_short)
127-
.await?
128-
} else {
129-
None
130-
};
131-
132140
let state = Box::new(ActualState {
133141
collation_config,
134142
collation_data,
@@ -1199,27 +1207,34 @@ impl CollatorStdImpl {
11991207
);
12001208
}
12011209

1210+
/// Collect ranges for loading statistics if current block is first after master
1211+
/// Using previous masterchain block diff for range
1212+
/// and other shards diffs between previous masterchain block - 1 and previous masterchain block
12021213
async fn compute_part_stat_ranges(
12031214
&self,
12041215
mc_data: &McData,
12051216
block_id_short: &BlockIdShort,
12061217
) -> Result<Option<Vec<QueueShardBoundedRange>>> {
12071218
let mc_block_id = mc_data.block_id;
1219+
12081220
let prev_mc_block_id = mc_data
12091221
.prev_mc_block_id
12101222
.context("Prev MC block must be present")?;
12111223

12121224
if prev_mc_block_id.seqno + 1 != mc_block_id.seqno {
1213-
bail!(
1225+
tracing::error!(
1226+
target: tracing_targets::COLLATOR,
12141227
"Prev MC block ID has an incorrect sequence. Prev: {prev_mc_block_id:?}. \
12151228
Current: {mc_block_id:?}"
12161229
);
1230+
return Ok(None);
12171231
}
12181232

12191233
let prev_mc_state = self
12201234
.state_node_adapter
12211235
.load_state(&prev_mc_block_id)
12221236
.await?;
1237+
12231238
let prev_mc_block_shards: FastHashMap<ShardIdent, ShardDescriptionShort> = prev_mc_state
12241239
.state_extra()
12251240
.cloned()?
@@ -1257,6 +1272,10 @@ impl CollatorStdImpl {
12571272
return Ok(None);
12581273
};
12591274

1275+
if prev_descr.seqno == 0 {
1276+
return Ok(None);
1277+
}
1278+
12601279
let Some(first_diff_msg) = self
12611280
.get_max_message(&prev_descr.get_block_id(*shard))
12621281
.await
@@ -1282,7 +1301,96 @@ impl CollatorStdImpl {
12821301
Ok(Some(ranges))
12831302
}
12841303

1285-
// Async helper function that retrieves max_message from either MQ or state diff
1304+
/// Collect ranges for loading cumulative statistics if collation block is master
1305+
/// Using range from previous masterchain block diff
1306+
/// and diffs between previous masterchain top blocks and current top blocks
1307+
async fn mc_compute_part_stat_ranges(
1308+
&self,
1309+
mc_data: &McData,
1310+
block_id_short: &BlockIdShort,
1311+
top_shard_blocks_info: Vec<TopBlockDescription>,
1312+
) -> Result<Option<Vec<QueueShardBoundedRange>>> {
1313+
let prev_mc_block_id = mc_data.block_id;
1314+
1315+
if prev_mc_block_id.seqno + 1 != block_id_short.seqno {
1316+
tracing::error!(
1317+
target: tracing_targets::COLLATOR,
1318+
"Prev MC block ID has an incorrect sequence. Prev: {prev_mc_block_id:?}.
1319+
Current: {:?}",
1320+
block_id_short.seqno
1321+
);
1322+
return Ok(None);
1323+
}
1324+
1325+
let prev_mc_state = self
1326+
.state_node_adapter
1327+
.load_state(&prev_mc_block_id)
1328+
.await?;
1329+
1330+
let prev_mc_block_shards: FastHashMap<ShardIdent, ShardDescriptionShort> = prev_mc_state
1331+
.state_extra()
1332+
.cloned()?
1333+
.shards
1334+
.as_vec()?
1335+
.iter()
1336+
.cloned()
1337+
.collect();
1338+
1339+
let mut ranges = Vec::<QueueShardBoundedRange>::new();
1340+
1341+
// Load max_message from masterchain block diff
1342+
let Some(master_max_msg) = self
1343+
.get_max_message(&prev_mc_block_id)
1344+
.await
1345+
.context("loading diff for mc block")?
1346+
else {
1347+
return Ok(None);
1348+
};
1349+
1350+
ranges.push(QueueShardBoundedRange {
1351+
shard_ident: prev_mc_block_id.shard,
1352+
from: Bound::Included(master_max_msg),
1353+
to: Bound::Included(master_max_msg),
1354+
});
1355+
1356+
// Iterate over all updated shard blocks and add their diff ranges
1357+
for top_block_description in top_shard_blocks_info {
1358+
let Some(prev_descr) = prev_mc_block_shards.get(&top_block_description.block_id.shard)
1359+
else {
1360+
tracing::warn!(target: tracing_targets::COLLATOR, "prev_mc_block_shards not found: {:?}", top_block_description.block_id.shard);
1361+
return Ok(None);
1362+
};
1363+
1364+
if prev_descr.seqno == 0 {
1365+
return Ok(None);
1366+
}
1367+
1368+
let Some(first_diff_msg) = self
1369+
.get_max_message(&prev_descr.get_block_id(top_block_description.block_id.shard))
1370+
.await
1371+
.context("loading first diff msg")?
1372+
else {
1373+
return Ok(None);
1374+
};
1375+
let Some(last_diff_msg) = self
1376+
.get_max_message(&top_block_description.block_id)
1377+
.await
1378+
.context("loading last diff msg")?
1379+
else {
1380+
return Ok(None);
1381+
};
1382+
1383+
ranges.push(QueueShardBoundedRange {
1384+
shard_ident: top_block_description.block_id.shard,
1385+
from: Bound::Excluded(first_diff_msg),
1386+
to: Bound::Included(last_diff_msg),
1387+
});
1388+
}
1389+
1390+
Ok(Some(ranges))
1391+
}
1392+
1393+
/// Helper function that retrieves `max_message` from either MQ or state diff
12861394
async fn get_max_message(&self, block_id: &BlockId) -> Result<Option<QueueKey>> {
12871395
if let Some(diff) =
12881396
self.mq_adapter

collator/src/collator/messages_reader/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,16 +217,29 @@ impl<V: InternalMessageValue> MessagesReader<V> {
217217
.copied()
218218
.collect();
219219

220+
cumulative_stats_just_loaded = true;
220221
match (previous_cumulative_statistics, cx.part_stat_ranges) {
221222
(Some(mut previous_cumulative_statistics), Some(part_stat_ranges)) => {
223+
// update all_shards_processed_to_by_partitions and truncate processed data
222224
previous_cumulative_statistics.update_processed_to_by_partitions(
223-
params.all_shards_processed_to_by_partitions,
225+
params.all_shards_processed_to_by_partitions.clone(),
224226
);
227+
228+
// truncate data before range
229+
previous_cumulative_statistics.truncate_before(
230+
&cx.for_shard_id,
231+
cx.prev_state_gen_lt,
232+
cx.mc_state_gen_lt,
233+
&cx.mc_top_shards_end_lts.iter().copied().collect(),
234+
);
235+
236+
// partial load statistics and enrich current value
225237
previous_cumulative_statistics.load_partial(
226238
mq_adapter.clone(),
227239
&partitions,
228240
part_stat_ranges,
229241
)?;
242+
230243
previous_cumulative_statistics
231244
}
232245
_ => {
@@ -240,7 +253,6 @@ impl<V: InternalMessageValue> MessagesReader<V> {
240253
cx.mc_state_gen_lt,
241254
&cx.mc_top_shards_end_lts.iter().copied().collect(),
242255
)?;
243-
cumulative_stats_just_loaded = true;
244256
inner_cumulative_statistics
245257
}
246258
}

collator/src/collator/types.rs

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::collections::{BTreeMap, VecDeque};
2+
use std::fmt;
23
use std::sync::{Arc, OnceLock};
34
use std::time::Duration;
45

@@ -1308,6 +1309,7 @@ impl CumulativeStatistics {
13081309
}
13091310
}
13101311

1312+
/// Create range and full load statistics and store it
13111313
pub fn load<V: InternalMessageValue>(
13121314
&mut self,
13131315
mq_adapter: Arc<dyn MessageQueueAdapter<V>>,
@@ -1342,6 +1344,7 @@ impl CumulativeStatistics {
13421344
partitions: &FastHashSet<QueuePartitionIdx>,
13431345
ranges: Vec<QueueShardBoundedRange>,
13441346
) -> Result<()> {
1347+
self.dirty = true;
13451348
tracing::trace!(
13461349
target: tracing_targets::COLLATOR,
13471350
"cumulative_stats_partial_ranges: {:?}",
@@ -1376,14 +1379,68 @@ impl CumulativeStatistics {
13761379
Ok(())
13771380
}
13781381

1382+
/// Create range and remove data before this range
1383+
pub fn truncate_before(
1384+
&mut self,
1385+
current_shard: &ShardIdent,
1386+
prev_state_gen_lt: Lt,
1387+
mc_state_gen_lt: Lt,
1388+
mc_top_shards_end_lts: &FastHashMap<ShardIdent, Lt>,
1389+
) {
1390+
let ranges = Self::compute_cumulative_stats_ranges(
1391+
current_shard,
1392+
&self.all_shards_processed_to_by_partitions,
1393+
prev_state_gen_lt,
1394+
mc_state_gen_lt,
1395+
mc_top_shards_end_lts,
1396+
);
1397+
1398+
for r in &ranges {
1399+
self.truncate_range(r);
1400+
}
1401+
1402+
self.dirty = true;
1403+
}
1404+
1405+
/// Remove data before range
1406+
fn truncate_range(&mut self, range: &QueueShardBoundedRange) {
1407+
let cut_key = match &range.from {
1408+
Bound::Included(k) | Bound::Excluded(k) => *k,
1409+
};
1410+
1411+
if let Some(by_partitions) = self.shards_stats_by_partitions.get_mut(&range.shard_ident) {
1412+
by_partitions.retain(|_part, diffs| {
1413+
diffs.retain(|k, _| k > &cut_key);
1414+
!diffs.is_empty()
1415+
});
1416+
1417+
if by_partitions.is_empty() {
1418+
self.shards_stats_by_partitions.remove(&range.shard_ident);
1419+
}
1420+
}
1421+
}
1422+
1423+
/// Update `all_shards_processed_to_by_partitions` and remove
1424+
/// processed data
13791425
pub fn update_processed_to_by_partitions(
13801426
&mut self,
1381-
all_shards_processed_to_by_partitions: FastHashMap<
1382-
ShardIdent,
1383-
(bool, ProcessedToByPartitions),
1384-
>,
1427+
new_pt: FastHashMap<ShardIdent, (bool, ProcessedToByPartitions)>,
13851428
) {
1386-
self.all_shards_processed_to_by_partitions = all_shards_processed_to_by_partitions;
1429+
for (&dst_shard, (_, ref processed_to)) in &new_pt {
1430+
let changed = match self.all_shards_processed_to_by_partitions.get(&dst_shard) {
1431+
Some((_, old_pt)) => old_pt != processed_to,
1432+
None => true,
1433+
};
1434+
1435+
if changed {
1436+
self.handle_processed_to_update(dst_shard, processed_to.clone());
1437+
}
1438+
}
1439+
1440+
// TODO should remove full stat by shard if it doesn't exist in new_pt?
1441+
1442+
self.all_shards_processed_to_by_partitions = new_pt;
1443+
self.dirty = true;
13871444
}
13881445

13891446
fn compute_cumulative_stats_ranges(
@@ -1444,6 +1501,9 @@ impl CumulativeStatistics {
14441501
}
14451502
}
14461503
}
1504+
if diff_partition_stats.is_empty() {
1505+
return;
1506+
}
14471507

14481508
// finally add weeded stats
14491509
self.add_diff_partition_stats(
@@ -1647,3 +1707,32 @@ impl ConcurrentQueueStatistics {
16471707
}
16481708
}
16491709
}
1710+
1711+
impl fmt::Debug for CumulativeStatistics {
1712+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1713+
let shards_summary: Vec<String> = self
1714+
.shards_stats_by_partitions
1715+
.iter()
1716+
.map(|(shard_id, by_partitions)| {
1717+
let parts: Vec<String> = by_partitions
1718+
.iter()
1719+
.map(|(part, diffs)| format!("p{}:{}", part, diffs.len()))
1720+
.collect();
1721+
format!("{} -> {}", shard_id, parts.join(", "))
1722+
})
1723+
.collect();
1724+
1725+
f.debug_struct("CumulativeStatistics")
1726+
.field(
1727+
"processed_to",
1728+
&format!(
1729+
"{} shards",
1730+
self.all_shards_processed_to_by_partitions.len()
1731+
),
1732+
)
1733+
.field("shards_stats_by_partitions", &shards_summary)
1734+
.field("result", &format!("{} partitions", self.result.len()))
1735+
.field("dirty", &self.dirty)
1736+
.finish()
1737+
}
1738+
}

0 commit comments

Comments
 (0)