Skip to content

Commit 6fe27f8

Browse files
committed
handle missing blocks from send_blocks_proof.rs
Signed-off-by: Eval EXEC <execvy@gmail.com>
1 parent 2c1f6fe commit 6fe27f8

File tree

7 files changed

+145
-20
lines changed

7 files changed

+145
-20
lines changed

light-client-lib/src/protocols/filter/block_filter.rs

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -151,18 +151,71 @@ impl FilterProtocol {
151151
db_matched_blocks.blocks_count,
152152
matched_blocks.len(),
153153
);
154+
db_matched_blocks.blocks.iter().for_each(|b| {
155+
debug!("db matched block: {}, proved={}", b.hash, b.proved);
156+
});
154157
let option = matched_blocks.is_empty();
155158

156159
if option {
157-
// recover matched blocks from storage
158-
self.peers.add_matched_blocks(
159-
&mut matched_blocks,
160-
db_matched_blocks
161-
.blocks
162-
.into_iter()
163-
.map(|b| (b.hash, b.proved))
164-
.collect(),
165-
);
160+
debug!("matched_blocks is empty");
161+
162+
// Filter out blocks already marked as missing (e.g., uncle blocks)
163+
let filtered_blocks: Vec<_> = db_matched_blocks
164+
.blocks
165+
.iter()
166+
.filter(|b| {
167+
if let Some((_added_ts, _first_sent, missing)) =
168+
self.peers.get_header_fetch_info(&b.hash)
169+
{
170+
debug!(
171+
"header fetch info for block {:#x}:added_ts={}, first_sent={}, missing={}",
172+
b.hash,
173+
_added_ts,
174+
_first_sent,
175+
missing
176+
);
177+
if missing {
178+
debug!(
179+
"Skipping block {:#x} from DB - marked as missing",
180+
b.hash
181+
);
182+
return false;
183+
}
184+
} else {
185+
trace!("not found header fetch info for block {:#x}", b.hash);
186+
}
187+
true
188+
})
189+
.map(|b| (b.hash.clone(), b.proved))
190+
.collect();
191+
192+
if filtered_blocks.is_empty() {
193+
// All blocks in this DB range are missing - remove it and load next
194+
info!(
195+
"All {} blocks in DB range {} are missing (uncle blocks), removing",
196+
db_matched_blocks.blocks.len(),
197+
db_matched_blocks.start_number
198+
);
199+
self.storage
200+
.remove_matched_blocks(db_matched_blocks.start_number);
201+
202+
// Also remove these blocks from fetching_headers to prevent memory leak
203+
for block in &db_matched_blocks.blocks {
204+
self.peers.remove_fetching_header(&block.hash);
205+
}
206+
207+
// Will load next range on next iteration
208+
return;
209+
} else {
210+
debug!(
211+
"add filtered blocks into matched_blocks, count={}",
212+
filtered_blocks.len()
213+
);
214+
}
215+
216+
// recover matched blocks from storage (only non-missing ones)
217+
self.peers
218+
.add_matched_blocks(&mut matched_blocks, filtered_blocks);
166219
let tip_header = self.storage.get_tip_header();
167220
prove_or_download_matched_blocks(
168221
Arc::clone(&self.peers),
@@ -178,6 +231,11 @@ impl FilterProtocol {
178231
);
179232
self.send_get_block_filters(nc, *peer, start_number);
180233
}
234+
} else {
235+
debug!("matched blocks are not empty:");
236+
matched_blocks.iter().for_each(|matched_block| {
237+
debug!("matched_block: {}, {:?}", matched_block.0, matched_block.1);
238+
});
181239
}
182240
} else if self.should_ask(immediately).await && could_ask_more {
183241
debug!(
@@ -266,10 +324,9 @@ impl FilterProtocol {
266324
peer: PeerIndex,
267325
start_number: BlockNumber,
268326
) {
269-
trace!(
327+
debug!(
270328
"request block filter from peer {}, starts at {}",
271-
peer,
272-
start_number
329+
peer, start_number
273330
);
274331
let content = packed::GetBlockFilters::new_builder()
275332
.start_number(start_number)

light-client-lib/src/protocols/filter/components/block_filter_hashes_process.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use ckb_network::{BoxedCKBProtocolContext, PeerIndex};
22
use ckb_types::{core::BlockNumber, packed, prelude::*};
3-
use log::trace;
3+
use log::{debug, trace};
44
use rand::seq::SliceRandom as _;
55

66
use crate::protocols::{FilterProtocol, Status, StatusCode};
@@ -28,6 +28,7 @@ impl<'a> BlockFilterHashesProcess<'a> {
2828
}
2929

3030
pub async fn execute(self) -> Status {
31+
debug!("process BlockFilterHashes message");
3132
let peer_state = if let Some(peer_state) = self.protocol.peers.get_state(&self.peer_index) {
3233
peer_state
3334
} else {
@@ -51,7 +52,7 @@ impl<'a> BlockFilterHashesProcess<'a> {
5152
.map(|item| item.to_entity())
5253
.collect::<Vec<_>>();
5354

54-
trace!(
55+
debug!(
5556
"peer {}: last-state: {}, add block filter hashes (start: {}, len: {}) \
5657
and parent block filter hash is {:#x}",
5758
self.peer_index,

light-client-lib/src/protocols/filter/components/block_filters_process.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ impl<'a> BlockFiltersProcess<'a> {
3333
}
3434

3535
pub async fn execute(self) -> Status {
36+
debug!("block filters process execute");
37+
3638
if self.filter.storage.is_filter_scripts_empty() {
3739
info!("ignoring, filter scripts may have been cleared during syncing");
3840
return Status::ok();

light-client-lib/src/protocols/light_client/components/send_blocks_proof.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ impl<'a> SendBlocksProofProcess<'a> {
3939

4040
pub(crate) async fn execute(self) -> Status {
4141
let status = self.execute_internally().await;
42+
debug!("block proof status: {}", status);
4243
self.protocol
4344
.peers()
4445
.update_blocks_proof_request(self.peer_index, None, false);
@@ -98,6 +99,9 @@ impl<'a> SendBlocksProofProcess<'a> {
9899
.to_entity()
99100
.into_iter()
100101
.collect::<Vec<_>>();
102+
103+
debug!("got block proof: missing {:?}", &missing_block_hashes);
104+
101105
if !original_request.check_block_hashes(&received_block_hashes, &missing_block_hashes) {
102106
error!("peer {} send an unknown proof", self.peer_index);
103107
return StatusCode::UnexpectedResponse.into();
@@ -224,6 +228,52 @@ impl<'a> SendBlocksProofProcess<'a> {
224228
self.protocol
225229
.peers()
226230
.mark_fetching_headers_missing(&missing_block_hashes);
231+
232+
// Remove missing blocks from matched_blocks to prevent batch stall
233+
// This is safe because:
234+
// 1. If these are uncle blocks from an old fork, they've already been re-filtered
235+
// 2. If from a recent reorg, SendLastStateProof will detect it and trigger
236+
// rollback_to_block(), which resets min_filtered_block_number and re-runs filters
237+
// 3. New main chain blocks at the same heights will be checked during re-filtering
238+
if original_request.should_get_blocks() && !missing_block_hashes.is_empty() {
239+
let mut matched_blocks = self.protocol.peers().matched_blocks().write().await;
240+
let mut removed_count = 0;
241+
242+
for missing_hash in &missing_block_hashes {
243+
if matched_blocks.remove(&missing_hash.unpack()).is_some() {
244+
removed_count += 1;
245+
debug!(
246+
"Removed missing block {:#x} from matched_blocks \
247+
(likely uncle block or peer doesn't have it)",
248+
missing_hash
249+
);
250+
}
251+
}
252+
253+
if removed_count > 0 {
254+
info!(
255+
"Removed {} missing block(s) from matched_blocks. \
256+
If due to reorg, filters will re-run from fork point to check new blocks.",
257+
removed_count
258+
);
259+
260+
// Check if batch is now complete (all remaining blocks have been downloaded)
261+
let all_downloaded = self
262+
.protocol
263+
.peers()
264+
.all_matched_blocks_downloaded(&matched_blocks);
265+
266+
if all_downloaded && !matched_blocks.is_empty() {
267+
info!(
268+
"Batch complete after removing missing blocks, {} blocks ready",
269+
matched_blocks.len()
270+
);
271+
} else if matched_blocks.is_empty() {
272+
debug!("matched_blocks now empty after removing missing blocks");
273+
}
274+
}
275+
}
276+
227277
Status::ok()
228278
}
229279
}

light-client-lib/src/protocols/light_client/peers.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,9 +1216,23 @@ impl Peers {
12161216
}
12171217
pub(crate) fn mark_fetching_headers_missing(&self, block_hashes: &[Byte32]) {
12181218
for block_hash in block_hashes {
1219-
if let Some(mut value) = self.fetching_headers.get_mut(block_hash) {
1220-
value.missing = true;
1221-
}
1219+
self.fetching_headers
1220+
.entry(block_hash.clone())
1221+
.and_modify(|info| {
1222+
// Block was already in fetching_headers, mark as missing
1223+
info.missing = true;
1224+
})
1225+
.or_insert_with(|| {
1226+
// Block not in fetching_headers (e.g., loaded from matched_blocks DB)
1227+
// Insert with missing=true to prevent infinite retry loop
1228+
// This will be cleaned up when the DB range is removed
1229+
FetchInfo {
1230+
added_ts: unix_time_as_millis(),
1231+
first_sent: 0,
1232+
timeout: false,
1233+
missing: true,
1234+
}
1235+
});
12221236
}
12231237
}
12241238
pub(crate) fn mark_fetching_txs_missing(&self, tx_hashes: &[Byte32]) {

light-client-lib/src/storage/db/native.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -735,7 +735,6 @@ impl Storage {
735735
if entry.is_none() {
736736
break;
737737
}
738-
dbg!(&entry);
739738

740739
let matched_blocks = entry.unwrap();
741740
let start_number = matched_blocks.start_number;

light-client-lib/src/utils/network.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub(crate) fn prove_or_download_matched_blocks(
1313
nc: &dyn CKBProtocolContext,
1414
init_blocks_in_transit_per_peer: usize,
1515
) {
16+
info!("in prove_or_download-matched blocks");
1617
let best_peers: Vec<_> = peers.get_best_proved_peers(best_tip);
1718
let last_hash = best_tip.calc_header_hash();
1819

@@ -33,9 +34,10 @@ pub(crate) fn prove_or_download_matched_blocks(
3334
peers.get_matched_blocks_to_prove(matched_blocks, GET_BLOCKS_PROOF_LIMIT);
3435
if !blocks_to_prove.is_empty() {
3536
debug!(
36-
"send get blocks proof request to peer: {}, count={}",
37+
"send get blocks proof request to peer: {}, count={}, {:?}",
3738
peer_index,
38-
blocks_to_prove.len()
39+
blocks_to_prove.len(),
40+
blocks_to_prove,
3941
);
4042
let content = packed::GetBlocksProof::new_builder()
4143
.block_hashes(blocks_to_prove.pack())

0 commit comments

Comments
 (0)