Skip to content

Commit 0a329a7

Browse files
committed
refactor(collator): cumulative statistics code optimization
1 parent 19f5a19 commit 0a329a7

File tree

4 files changed

+138
-156
lines changed

4 files changed

+138
-156
lines changed

collator/src/collator/do_collate/mod.rs

Lines changed: 57 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,73 +1222,27 @@ impl CollatorStdImpl {
12221222
mc_data_stuff: &McDataStuff,
12231223
block_id_short: &BlockIdShort,
12241224
) -> Result<Option<Vec<QueueShardBoundedRange>>> {
1225-
let mc_block_id = mc_data_stuff.current.block_id;
1226-
1227-
let prov_mc_state = if let Some(ref prev_mc_state) = mc_data_stuff.previous {
1228-
prev_mc_state
1229-
} else {
1225+
let Some(prev_state) = &mc_data_stuff.previous else {
12301226
return Ok(None);
12311227
};
12321228

1233-
let prev_mc_block_shards: FastHashMap<ShardIdent, ShardDescriptionShort> =
1234-
prov_mc_state.shards.iter().cloned().collect();
1235-
1236-
let mut ranges = Vec::<QueueShardBoundedRange>::new();
1237-
1238-
// Load max_message from masterchain block diff
1239-
let Some(master_max_msg) = self
1240-
.get_max_message(&mc_block_id)
1241-
.await
1242-
.context("loading diff for mc block")?
1243-
else {
1244-
return Ok(None);
1245-
};
1246-
1247-
ranges.push(QueueShardBoundedRange {
1248-
shard_ident: mc_block_id.shard,
1249-
from: Bound::Included(master_max_msg),
1250-
to: Bound::Included(master_max_msg),
1251-
});
1252-
1253-
// Iterate over all updated shard blocks and add their diff ranges
1254-
for (shard, current_descr) in mc_data_stuff
1255-
.current
1256-
.shards
1257-
.iter()
1258-
.filter(|(shard, d)| d.top_sc_block_updated && shard != &block_id_short.shard)
1259-
{
1260-
let Some(prev_descr) = prev_mc_block_shards.get(shard) else {
1261-
tracing::warn!(target: tracing_targets::COLLATOR, "prev_mc_block_shards not found: {shard:?}");
1262-
return Ok(None);
1263-
};
1264-
1265-
if prev_descr.seqno == 0 {
1266-
return Ok(None);
1229+
let prev_shards: FastHashMap<_, _> = prev_state.shards.iter().cloned().collect();
1230+
1231+
let mut shard_pairs = Vec::new();
1232+
for (shard, curr) in mc_data_stuff.current.shards.iter() {
1233+
if curr.top_sc_block_updated && *shard != block_id_short.shard {
1234+
if let Some(prev) = prev_shards.get(shard) {
1235+
shard_pairs.push((
1236+
*shard,
1237+
prev.get_block_id(*shard),
1238+
curr.get_block_id(*shard),
1239+
));
1240+
}
12671241
}
1268-
1269-
let Some(first_diff_msg) = self
1270-
.get_max_message(&prev_descr.get_block_id(*shard))
1271-
.await
1272-
.context("loading first diff msg")?
1273-
else {
1274-
return Ok(None);
1275-
};
1276-
let Some(last_diff_msg) = self
1277-
.get_max_message(&current_descr.get_block_id(*shard))
1278-
.await
1279-
.context("loading last diff msg")?
1280-
else {
1281-
return Ok(None);
1282-
};
1283-
1284-
ranges.push(QueueShardBoundedRange {
1285-
shard_ident: *shard,
1286-
from: Bound::Excluded(first_diff_msg),
1287-
to: Bound::Included(last_diff_msg),
1288-
});
12891242
}
12901243

1291-
Ok(Some(ranges))
1244+
self.build_ranges(&mc_data_stuff.current.block_id, shard_pairs)
1245+
.await
12921246
}
12931247

12941248
/// Collect ranges for loading cumulative statistics if collation block is master
@@ -1299,67 +1253,69 @@ impl CollatorStdImpl {
12991253
mc_data_stuff: &McDataStuff,
13001254
top_shard_blocks_info: Vec<TopBlockDescription>,
13011255
) -> Result<Option<Vec<QueueShardBoundedRange>>> {
1302-
let prev_mc_block_id = mc_data_stuff.current.block_id;
1303-
1304-
let prev_mc_block_shards: FastHashMap<ShardIdent, ShardDescriptionShort> =
1305-
mc_data_stuff.current.shards.iter().cloned().collect();
1256+
let prev_shards: FastHashMap<_, _> = mc_data_stuff.current.shards.iter().cloned().collect();
1257+
1258+
let mut shard_pairs = Vec::new();
1259+
for top in top_shard_blocks_info {
1260+
if let Some(prev) = prev_shards.get(&top.block_id.shard) {
1261+
shard_pairs.push((
1262+
top.block_id.shard,
1263+
prev.get_block_id(top.block_id.shard),
1264+
top.block_id,
1265+
));
1266+
}
1267+
}
13061268

1307-
let mut ranges = Vec::<QueueShardBoundedRange>::new();
1269+
self.build_ranges(&mc_data_stuff.current.block_id, shard_pairs)
1270+
.await
1271+
}
13081272

1309-
// Load max_message from masterchain block diff
1273+
async fn build_ranges(
1274+
&self,
1275+
master_block_id: &BlockId,
1276+
shard_pairs: Vec<(ShardIdent, BlockId, BlockId)>,
1277+
) -> Result<Option<Vec<QueueShardBoundedRange>>> {
13101278
let Some(master_max_msg) = self
1311-
.get_max_message(&prev_mc_block_id)
1279+
.get_diff_max_message(master_block_id)
13121280
.await
13131281
.context("loading diff for mc block")?
13141282
else {
13151283
return Ok(None);
13161284
};
13171285

1318-
ranges.push(QueueShardBoundedRange {
1319-
shard_ident: prev_mc_block_id.shard,
1286+
// add mc block diff range
1287+
let mut ranges = vec![QueueShardBoundedRange {
1288+
shard_ident: master_block_id.shard,
13201289
from: Bound::Included(master_max_msg),
13211290
to: Bound::Included(master_max_msg),
1322-
});
1323-
1324-
// Iterate over all updated shard blocks and add their diff ranges
1325-
for top_block_description in top_shard_blocks_info {
1326-
let Some(prev_descr) = prev_mc_block_shards.get(&top_block_description.block_id.shard)
1327-
else {
1328-
tracing::warn!(target: tracing_targets::COLLATOR, "prev_mc_block_shards not found: {:?}", top_block_description.block_id.shard);
1329-
return Ok(None);
1330-
};
1291+
}];
13311292

1332-
if prev_descr.seqno == 0 {
1293+
for (shard, prev_id, curr_id) in shard_pairs {
1294+
if prev_id.seqno == 0 {
13331295
return Ok(None);
13341296
}
13351297

1336-
let Some(first_diff_msg) = self
1337-
.get_max_message(&prev_descr.get_block_id(top_block_description.block_id.shard))
1338-
.await
1339-
.context("loading first diff msg")?
1340-
else {
1341-
return Ok(None);
1342-
};
1343-
let Some(last_diff_msg) = self
1344-
.get_max_message(&top_block_description.block_id)
1345-
.await
1346-
.context("loading last diff msg")?
1347-
else {
1348-
return Ok(None);
1349-
};
1350-
1351-
ranges.push(QueueShardBoundedRange {
1352-
shard_ident: top_block_description.block_id.shard,
1353-
from: Bound::Excluded(first_diff_msg),
1354-
to: Bound::Included(last_diff_msg),
1355-
});
1298+
let (first_msg, last_msg) = tokio::try_join!(
1299+
self.get_diff_max_message(&prev_id),
1300+
self.get_diff_max_message(&curr_id)
1301+
)
1302+
.context("loading shard diffs")?;
1303+
1304+
match (first_msg, last_msg) {
1305+
(Some(first), Some(last)) => ranges.push(QueueShardBoundedRange {
1306+
shard_ident: shard,
1307+
from: Bound::Excluded(first),
1308+
to: Bound::Included(last),
1309+
}),
1310+
_ => return Ok(None),
1311+
}
13561312
}
13571313

13581314
Ok(Some(ranges))
13591315
}
13601316

13611317
/// Helper function that retrieves `max_message` from either MQ or state diff
1362-
async fn get_max_message(&self, block_id: &BlockId) -> Result<Option<QueueKey>> {
1318+
async fn get_diff_max_message(&self, block_id: &BlockId) -> Result<Option<QueueKey>> {
13631319
if let Some(diff) =
13641320
self.mq_adapter
13651321
.get_diff_info(&block_id.shard, block_id.seqno, DiffZone::Both)?

collator/src/collator/messages_reader/mod.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -226,12 +226,12 @@ impl<V: InternalMessageValue> MessagesReader<V> {
226226
);
227227

228228
// truncate data before range
229-
previous_cumulative_statistics.truncate_before(
230-
&cx.for_shard_id,
231-
cx.prev_state_gen_lt,
232-
cx.mc_state_gen_lt,
233-
&cx.mc_top_shards_end_lts.iter().copied().collect(),
234-
);
229+
// previous_cumulative_statistics.truncate_before(
230+
// &cx.for_shard_id,
231+
// cx.prev_state_gen_lt,
232+
// cx.mc_state_gen_lt,
233+
// &cx.mc_top_shards_end_lts.iter().copied().collect(),
234+
// );
235235

236236
// partial load statistics and enrich current value
237237
previous_cumulative_statistics.load_partial(
@@ -240,6 +240,25 @@ impl<V: InternalMessageValue> MessagesReader<V> {
240240
part_stat_ranges,
241241
)?;
242242

243+
let mut inner_cumulative_statistics =
244+
CumulativeStatistics::new(params.all_shards_processed_to_by_partitions);
245+
inner_cumulative_statistics.load(
246+
mq_adapter.clone(),
247+
&cx.for_shard_id,
248+
&partitions,
249+
cx.prev_state_gen_lt,
250+
cx.mc_state_gen_lt,
251+
&cx.mc_top_shards_end_lts.iter().copied().collect(),
252+
)?;
253+
254+
tracing::info!(target: "local_debug", "previous_cumulative_statistics {previous_cumulative_statistics:?}");
255+
256+
if !previous_cumulative_statistics
257+
.shards_stats_eq(&inner_cumulative_statistics)
258+
{
259+
panic!("111")
260+
}
261+
243262
previous_cumulative_statistics
244263
}
245264
_ => {

0 commit comments

Comments
 (0)