Skip to content

Commit 7249e8c

Browse files
authored
Merge pull request #257 from eval-exec/exec/fix-clean-up-matched-blocks-at-startup
fix: Handle missing blocks to prevent sync stall
2 parents e6eaa40 + 7835653 commit 7249e8c

File tree

12 files changed

+295
-84
lines changed

12 files changed

+295
-84
lines changed

light-client-lib/src/protocols/filter/block_filter.rs

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,22 +143,76 @@ impl FilterProtocol {
143143
debug!("found best proved peer {}", peer);
144144

145145
let mut matched_blocks = self.peers.matched_blocks().write().await;
146-
if let Some((db_start_number, blocks_count, db_blocks)) =
147-
self.storage.get_earliest_matched_blocks()
148-
{
146+
if let Some(db_matched_blocks) = self.storage.get_earliest_matched_blocks() {
149147
debug!(
150148
"try recover matched blocks from storage, start_number={}, \
151149
blocks_count={}, matched_count: {}",
152-
db_start_number,
153-
blocks_count,
150+
db_matched_blocks.start_number,
151+
db_matched_blocks.blocks_count,
154152
matched_blocks.len(),
155153
);
156154
let option = matched_blocks.is_empty();
157155

158156
if option {
159-
// recover matched blocks from storage
157+
debug!("matched_blocks is empty");
158+
159+
// Filter out blocks already marked as missing (e.g., uncle blocks)
160+
let filtered_blocks: Vec<_> = db_matched_blocks
161+
.blocks
162+
.iter()
163+
.filter(|b| {
164+
if let Some((_added_ts, _first_sent, missing)) =
165+
self.peers.get_header_fetch_info(&b.hash)
166+
{
167+
debug!(
168+
"header fetch info for block {:#x}:added_ts={}, first_sent={}, missing={}",
169+
b.hash,
170+
_added_ts,
171+
_first_sent,
172+
missing
173+
);
174+
if missing {
175+
debug!(
176+
"Skipping block {:#x} from DB - marked as missing",
177+
b.hash
178+
);
179+
return false;
180+
}
181+
} else {
182+
trace!("not found header fetch info for block {:#x}", b.hash);
183+
}
184+
true
185+
})
186+
.map(|b| (b.hash.clone(), b.proved))
187+
.collect();
188+
189+
if filtered_blocks.is_empty() {
190+
// All blocks in this DB range are missing - remove it and load next
191+
info!(
192+
"All {} blocks in DB range {} are missing (uncle blocks), removing",
193+
db_matched_blocks.blocks.len(),
194+
db_matched_blocks.start_number
195+
);
196+
self.storage
197+
.remove_matched_blocks(db_matched_blocks.start_number);
198+
199+
// Also remove these blocks from fetching_headers to prevent memory leak
200+
for block in &db_matched_blocks.blocks {
201+
self.peers.remove_fetching_header(&block.hash);
202+
}
203+
204+
// Will load next range on next iteration
205+
return;
206+
} else {
207+
debug!(
208+
"add filtered blocks into matched_blocks, count={}",
209+
filtered_blocks.len()
210+
);
211+
}
212+
213+
// recover matched blocks from storage (only non-missing ones)
160214
self.peers
161-
.add_matched_blocks(&mut matched_blocks, db_blocks);
215+
.add_matched_blocks(&mut matched_blocks, filtered_blocks);
162216
let tip_header = self.storage.get_tip_header();
163217
prove_or_download_matched_blocks(
164218
Arc::clone(&self.peers),
@@ -174,6 +228,11 @@ impl FilterProtocol {
174228
);
175229
self.send_get_block_filters(nc, *peer, start_number);
176230
}
231+
} else {
232+
debug!("matched blocks are not empty:");
233+
matched_blocks.iter().for_each(|matched_block| {
234+
debug!("matched_block: {}, {:?}", matched_block.0, matched_block.1);
235+
});
177236
}
178237
} else if self.should_ask(immediately).await && could_ask_more {
179238
debug!(
@@ -262,10 +321,9 @@ impl FilterProtocol {
262321
peer: PeerIndex,
263322
start_number: BlockNumber,
264323
) {
265-
trace!(
324+
debug!(
266325
"request block filter from peer {}, starts at {}",
267-
peer,
268-
start_number
326+
peer, start_number
269327
);
270328
let content = packed::GetBlockFilters::new_builder()
271329
.start_number(start_number)

light-client-lib/src/protocols/filter/components/block_filter_hashes_process.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use ckb_network::{BoxedCKBProtocolContext, PeerIndex};
22
use ckb_types::{core::BlockNumber, packed, prelude::*};
3-
use log::trace;
3+
use log::{debug, trace};
44
use rand::seq::SliceRandom as _;
55

66
use crate::protocols::{FilterProtocol, Status, StatusCode};
@@ -28,6 +28,7 @@ impl<'a> BlockFilterHashesProcess<'a> {
2828
}
2929

3030
pub async fn execute(self) -> Status {
31+
debug!("process BlockFilterHashes message");
3132
let peer_state = if let Some(peer_state) = self.protocol.peers.get_state(&self.peer_index) {
3233
peer_state
3334
} else {
@@ -51,7 +52,7 @@ impl<'a> BlockFilterHashesProcess<'a> {
5152
.map(|item| item.to_entity())
5253
.collect::<Vec<_>>();
5354

54-
trace!(
55+
debug!(
5556
"peer {}: last-state: {}, add block filter hashes (start: {}, len: {}) \
5657
and parent block filter hash is {:#x}",
5758
self.peer_index,

light-client-lib/src/protocols/filter/components/block_filters_process.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ impl<'a> BlockFiltersProcess<'a> {
3333
}
3434

3535
pub async fn execute(self) -> Status {
36+
debug!("block filters process execute");
37+
3638
if self.filter.storage.is_filter_scripts_empty() {
3739
info!("ignoring, filter scripts may have been cleared during syncing");
3840
return Status::ok();
@@ -226,12 +228,15 @@ impl<'a> BlockFiltersProcess<'a> {
226228
);
227229
let option = matched_blocks.is_empty();
228230
if option {
229-
if let Some((_start_number, _blocks_count, db_blocks)) =
230-
self.filter.storage.get_earliest_matched_blocks()
231-
{
232-
self.filter
233-
.peers
234-
.add_matched_blocks(&mut matched_blocks, db_blocks);
231+
if let Some(db_matched_blocks) = self.filter.storage.get_earliest_matched_blocks() {
232+
self.filter.peers.add_matched_blocks(
233+
&mut matched_blocks,
234+
db_matched_blocks
235+
.blocks
236+
.into_iter()
237+
.map(|b| (b.hash, b.proved))
238+
.collect(),
239+
);
235240
prove_or_download_matched_blocks(
236241
Arc::clone(&self.filter.peers),
237242
&tip_header,

light-client-lib/src/protocols/light_client/components/send_blocks_proof.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ impl<'a> SendBlocksProofProcess<'a> {
3939

4040
pub(crate) async fn execute(self) -> Status {
4141
let status = self.execute_internally().await;
42+
debug!("block proof status: {}", status);
4243
self.protocol
4344
.peers()
4445
.update_blocks_proof_request(self.peer_index, None, false);
@@ -98,6 +99,9 @@ impl<'a> SendBlocksProofProcess<'a> {
9899
.to_entity()
99100
.into_iter()
100101
.collect::<Vec<_>>();
102+
103+
debug!("got block proof: missing {:?}", &missing_block_hashes);
104+
101105
if !original_request.check_block_hashes(&received_block_hashes, &missing_block_hashes) {
102106
error!("peer {} send an unknown proof", self.peer_index);
103107
return StatusCode::UnexpectedResponse.into();
@@ -224,6 +228,52 @@ impl<'a> SendBlocksProofProcess<'a> {
224228
self.protocol
225229
.peers()
226230
.mark_fetching_headers_missing(&missing_block_hashes);
231+
232+
// Remove missing blocks from matched_blocks to prevent batch stall
233+
// This is safe because:
234+
// 1. If these are uncle blocks from an old fork, they've already been re-filtered
235+
// 2. If from a recent reorg, SendLastStateProof will detect it and trigger
236+
// rollback_to_block(), which resets min_filtered_block_number and re-runs filters
237+
// 3. New main chain blocks at the same heights will be checked during re-filtering
238+
if original_request.should_get_blocks() && !missing_block_hashes.is_empty() {
239+
let mut matched_blocks = self.protocol.peers().matched_blocks().write().await;
240+
let mut removed_count = 0;
241+
242+
for missing_hash in &missing_block_hashes {
243+
if matched_blocks.remove(&missing_hash.unpack()).is_some() {
244+
removed_count += 1;
245+
debug!(
246+
"Removed missing block {:#x} from matched_blocks \
247+
(likely uncle block or peer doesn't have it)",
248+
missing_hash
249+
);
250+
}
251+
}
252+
253+
if removed_count > 0 {
254+
info!(
255+
"Removed {} missing block(s) from matched_blocks. \
256+
If due to reorg, filters will re-run from fork point to check new blocks.",
257+
removed_count
258+
);
259+
260+
// Check if batch is now complete (all remaining blocks have been downloaded)
261+
let all_downloaded = self
262+
.protocol
263+
.peers()
264+
.all_matched_blocks_downloaded(&matched_blocks);
265+
266+
if all_downloaded && !matched_blocks.is_empty() {
267+
info!(
268+
"Batch complete after removing missing blocks, {} blocks ready",
269+
matched_blocks.len()
270+
);
271+
} else if matched_blocks.is_empty() {
272+
debug!("matched_blocks now empty after removing missing blocks");
273+
}
274+
}
275+
}
276+
227277
Status::ok()
228278
}
229279
}

light-client-lib/src/protocols/light_client/mod.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -355,10 +355,10 @@ impl LightClientProtocol {
355355
info!("rollback to block#1 since previous last header number is 1");
356356
let mut matched_blocks = self.peers.matched_blocks().write().await;
357357

358-
while let Some((start_number, _, _)) = self.storage.get_latest_matched_blocks()
359-
{
360-
if start_number > 0 {
361-
self.storage.remove_matched_blocks(start_number);
358+
while let Some(matched_blocks_data) = self.storage.get_latest_matched_blocks() {
359+
if matched_blocks_data.start_number > 0 {
360+
self.storage
361+
.remove_matched_blocks(matched_blocks_data.start_number);
362362
}
363363
}
364364
self.storage.rollback_to_block(1);
@@ -385,18 +385,19 @@ impl LightClientProtocol {
385385
debug!("fork to number: {}", to_number);
386386
let mut matched_blocks = self.peers.matched_blocks().write().await;
387387
let mut start_number_opt = None;
388-
while let Some((start_number, blocks_count, _)) =
389-
self.storage.get_latest_matched_blocks()
390-
{
388+
while let Some(matched_blocks_data) = self.storage.get_latest_matched_blocks() {
391389
// Remove matched blocks if the range contains blocks after the fork point
392390
// The range is [start_number, start_number + blocks_count - 1]
393391
// Fork point (to_number) is the last valid block, so remove ranges containing blocks > to_number
394-
if start_number + blocks_count > to_number + 1 {
392+
if matched_blocks_data.start_number + matched_blocks_data.blocks_count
393+
> to_number + 1
394+
{
395395
debug!("remove matched blocks start from: {} (range covers {} blocks, contains blocks after fork at {})",
396-
start_number, blocks_count, to_number);
397-
self.storage.remove_matched_blocks(start_number);
396+
matched_blocks_data.start_number, matched_blocks_data.blocks_count, to_number);
397+
self.storage
398+
.remove_matched_blocks(matched_blocks_data.start_number);
398399
} else {
399-
start_number_opt = Some(start_number);
400+
start_number_opt = Some(matched_blocks_data.start_number);
400401
break;
401402
}
402403
}
@@ -704,6 +705,11 @@ impl LightClientProtocol {
704705
}
705706

706707
async fn get_idle_blocks(&mut self, nc: &BoxedCKBProtocolContext) {
708+
// Clean up old missing headers (uncle blocks) older than 1 hour
709+
// to prevent unbounded memory growth in fetching_headers
710+
const MAX_MISSING_AGE_MS: u64 = 3_600_000; // 1 hour
711+
self.peers.cleanup_old_missing_headers(MAX_MISSING_AGE_MS);
712+
707713
let tip_header = self.storage.get_tip_header();
708714
let matched_blocks = self.peers.matched_blocks().read().await;
709715

light-client-lib/src/protocols/light_client/peers.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,9 +1216,23 @@ impl Peers {
12161216
}
12171217
pub(crate) fn mark_fetching_headers_missing(&self, block_hashes: &[Byte32]) {
12181218
for block_hash in block_hashes {
1219-
if let Some(mut value) = self.fetching_headers.get_mut(block_hash) {
1220-
value.missing = true;
1221-
}
1219+
self.fetching_headers
1220+
.entry(block_hash.clone())
1221+
.and_modify(|info| {
1222+
// Block was already in fetching_headers, mark as missing
1223+
info.missing = true;
1224+
})
1225+
.or_insert_with(|| {
1226+
// Block not in fetching_headers (e.g., loaded from matched_blocks DB)
1227+
// Insert with missing=true to prevent infinite retry loop
1228+
// This will be cleaned up when the DB range is removed
1229+
FetchInfo {
1230+
added_ts: unix_time_as_millis(),
1231+
first_sent: 0,
1232+
timeout: false,
1233+
missing: true,
1234+
}
1235+
});
12221236
}
12231237
}
12241238
pub(crate) fn mark_fetching_txs_missing(&self, tx_hashes: &[Byte32]) {
@@ -1228,6 +1242,32 @@ impl Peers {
12281242
}
12291243
}
12301244
}
1245+
1246+
/// Clean up old missing entries from fetching_headers to prevent unbounded memory growth
1247+
/// This is called periodically to remove uncle blocks that were marked as missing
1248+
/// but are no longer needed (older than max_age_ms)
1249+
pub(crate) fn cleanup_old_missing_headers(&self, max_age_ms: u64) {
1250+
let now = unix_time_as_millis();
1251+
let mut removed_count = 0;
1252+
1253+
self.fetching_headers.retain(|_hash, info| {
1254+
if info.missing && now.saturating_sub(info.added_ts) > max_age_ms {
1255+
removed_count += 1;
1256+
false // Remove this entry
1257+
} else {
1258+
true // Keep this entry
1259+
}
1260+
});
1261+
1262+
if removed_count > 0 {
1263+
log::debug!(
1264+
"Cleaned up {} old missing headers (age > {}ms)",
1265+
removed_count,
1266+
max_age_ms
1267+
);
1268+
}
1269+
}
1270+
12311271
// mark all fetching hashes (headers/txs) as timeout
12321272
pub(crate) fn mark_fetching_headers_timeout(&self, peer_index: PeerIndex) {
12331273
if let Some(peer) = self.get_peer(&peer_index) {

light-client-lib/src/protocols/synchronizer.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,17 @@ impl CKBProtocolHandler for SyncProtocol {
6565
if !matched_blocks.is_empty()
6666
&& self.peers.all_matched_blocks_downloaded(&matched_blocks)
6767
{
68-
let (start_number, blocks_count, db_blocks) = self
68+
let matched_blocks_data = self
6969
.storage
7070
.get_earliest_matched_blocks()
7171
.expect("get matched blocks from storage");
72-
let db_blocks: HashSet<_> =
73-
db_blocks.into_iter().map(|(hash, _)| hash).collect();
72+
let start_number = matched_blocks_data.start_number;
73+
let blocks_count = matched_blocks_data.blocks_count;
74+
let db_blocks: HashSet<_> = matched_blocks_data
75+
.blocks
76+
.into_iter()
77+
.map(|b| b.hash)
78+
.collect();
7479

7580
self.storage.remove_matched_blocks(start_number);
7681
let blocks = self.peers.clear_matched_blocks(&mut matched_blocks);
@@ -90,11 +95,15 @@ impl CKBProtocolHandler for SyncProtocol {
9095
.update_block_number(start_number + blocks_count - 1);
9196

9297
// send more GetBlocksProof/GetBlocks requests
93-
if let Some((_start_number, _blocks_count, db_blocks)) =
94-
self.storage.get_earliest_matched_blocks()
95-
{
96-
self.peers
97-
.add_matched_blocks(&mut matched_blocks, db_blocks);
98+
if let Some(db_matched_blocks) = self.storage.get_earliest_matched_blocks() {
99+
self.peers.add_matched_blocks(
100+
&mut matched_blocks,
101+
db_matched_blocks
102+
.blocks
103+
.into_iter()
104+
.map(|b| (b.hash, b.proved))
105+
.collect(),
106+
);
98107
let tip_header = self.storage.get_tip_header();
99108
prove_or_download_matched_blocks(
100109
Arc::clone(&self.peers),

0 commit comments

Comments
 (0)