Skip to content

Commit 72a1a37

Browse files
committed
wait a while , for better select
1 parent 067c4a7 commit 72a1a37

File tree

4 files changed

+96
-10
lines changed

4 files changed

+96
-10
lines changed

bindings/python/py_src/lattica/client.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,9 @@ def configure_bitswap_peer_selection(
258258
top_n: int = 3,
259259
enabled: bool = True,
260260
min_peers: int = 2,
261-
enable_randomness: bool = True
261+
enable_randomness: bool = True,
262+
have_wait_window_ms: int = 100,
263+
min_candidate_ratio: float = 0.3
262264
) -> None:
263265
"""Configure Bitswap peer selection strategy.
264266
@@ -267,9 +269,16 @@ def configure_bitswap_peer_selection(
267269
enabled: Enable smart selection
268270
min_peers: Minimum peers threshold
269271
enable_randomness: Enable randomness in selection
272+
have_wait_window_ms: Wait window in ms after first Have response before selecting peers.
273+
This allows more peers to respond, ensuring better selection. Default: 100ms
274+
min_candidate_ratio: Minimum candidate ratio (0.0-1.0) before starting selection.
275+
Selection starts when candidates >= total_peers * min_candidate_ratio. Default: 0.3
270276
"""
271277
try:
272-
self._lattica_instance.configure_bitswap_peer_selection(top_n, enabled, min_peers, enable_randomness)
278+
self._lattica_instance.configure_bitswap_peer_selection(
279+
top_n, enabled, min_peers, enable_randomness,
280+
have_wait_window_ms, min_candidate_ratio
281+
)
273282
except Exception as e:
274283
raise RuntimeError(f"Failed to configure bitswap peer selection: {e}")
275284

bindings/python/src/core.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -560,12 +560,35 @@ impl LatticaSDK {
560560
}
561561

562562
/// Configure Bitswap peer selection strategy
563-
#[pyo3(signature = (top_n = 3, enabled = true, min_peers = 2, enable_randomness = true))]
564-
fn configure_bitswap_peer_selection(&self, top_n: usize, enabled: bool, min_peers: usize, enable_randomness: bool) -> PyResult<()> {
563+
///
564+
/// Args:
565+
/// top_n: Number of top peers to select (default: 3)
566+
/// enabled: Enable smart selection (default: true)
567+
/// min_peers: Minimum peers threshold (default: 2)
568+
/// enable_randomness: Enable randomness in selection (default: true)
569+
/// have_wait_window_ms: Wait window in ms after first Have response (default: 100)
570+
/// min_candidate_ratio: Minimum candidate ratio before selection (default: 0.3)
571+
#[pyo3(signature = (top_n = 3, enabled = true, min_peers = 2, enable_randomness = true, have_wait_window_ms = 100, min_candidate_ratio = 0.3))]
572+
fn configure_bitswap_peer_selection(
573+
&self,
574+
top_n: usize,
575+
enabled: bool,
576+
min_peers: usize,
577+
enable_randomness: bool,
578+
have_wait_window_ms: u64,
579+
min_candidate_ratio: f64,
580+
) -> PyResult<()> {
565581
Python::with_gil(|py| {
566582
py.allow_threads(|| {
567583
self.runtime.block_on(async move {
568-
let config = beetswap::PeerSelectionConfig { top_n, enabled, min_peers, enable_randomness };
584+
let config = beetswap::PeerSelectionConfig {
585+
top_n,
586+
enabled,
587+
min_peers,
588+
enable_randomness,
589+
have_wait_window_ms,
590+
min_candidate_ratio,
591+
};
569592
self.lattica.configure_bitswap_peer_selection(config).await
570593
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))
571594
})

third_party/beetswap/src/client.rs

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ where
8383
peer_selection_config: PeerSelectionConfig,
8484
/// Global stats
8585
global_stats: GlobalStats,
86+
/// Track the first Have response time for each CID (for wait window)
87+
cid_first_have_time: FnvHashMap<CidGeneric<S>, Instant>,
8688
}
8789

8890
#[derive(Debug)]
@@ -158,6 +160,7 @@ where
158160
new_blocks: Vec::new(),
159161
peer_selection_config: PeerSelectionConfig::default(),
160162
global_stats: GlobalStats::default(),
163+
cid_first_have_time: FnvHashMap::default(),
161164
}
162165
}
163166

@@ -276,6 +279,7 @@ where
276279
let cid = cid.to_owned();
277280
self.cid_to_queries.remove(&cid);
278281
self.wantlist.remove(&cid);
282+
self.cid_first_have_time.remove(&cid);
279283
}
280284

281285
break;
@@ -290,10 +294,14 @@ where
290294

291295
let mut new_blocks = Vec::new();
292296

293-
// Update presence
297+
// Update presence and track first Have time for wait window
294298
for (cid, block_presence) in msg.block_presences {
295299
match block_presence {
296-
BlockPresenceType::Have => peer_state.wantlist.got_have(&cid),
300+
BlockPresenceType::Have => {
301+
peer_state.wantlist.got_have(&cid);
302+
// Record first Have time for this CID (for wait window mechanism)
303+
self.cid_first_have_time.entry(cid).or_insert_with(Instant::now);
304+
}
297305
BlockPresenceType::DontHave => peer_state.wantlist.got_dont_have(&cid),
298306
}
299307
}
@@ -340,6 +348,9 @@ where
340348
continue;
341349
}
342350

351+
// Clean up wait window tracking
352+
self.cid_first_have_time.remove(&cid);
353+
343354
peer_state.wantlist.got_block(&cid);
344355
new_blocks.push((cid, block.clone()));
345356

@@ -392,8 +403,11 @@ where
392403
}
393404
}
394405

395-
// Select optimal peers for each CID
406+
// Select optimal peers for each CID (with wait window mechanism)
396407
let mut selected_peers_for_cid: FnvHashMap<CidGeneric<S>, Vec<PeerId>> = FnvHashMap::default();
408+
let total_peers = self.peers.len();
409+
let wait_window = Duration::from_millis(self.peer_selection_config.have_wait_window_ms);
410+
let min_candidate_ratio = self.peer_selection_config.min_candidate_ratio;
397411

398412
for (cid, candidate_peers) in &cid_to_candidates {
399413
let already_sent = *cid_already_sent_count.get(cid).unwrap_or(&0);
@@ -404,6 +418,37 @@ where
404418
continue;
405419
}
406420

421+
// Wait window check: ensure we have enough candidates before selecting
422+
let should_select = if let Some(first_have_time) = self.cid_first_have_time.get(cid) {
423+
let elapsed = first_have_time.elapsed();
424+
let min_candidates = ((total_peers as f64) * min_candidate_ratio).ceil() as usize;
425+
426+
// Start selection if:
427+
// 1. Wait window has elapsed, OR
428+
// 2. We have enough candidates (>= min_candidate_ratio), OR
429+
// 3. We have enough candidates to fill remaining slots
430+
let window_elapsed = elapsed >= wait_window;
431+
let enough_candidates = candidate_peers.len() >= min_candidates;
432+
let can_fill_slots = candidate_peers.len() >= (top_n - already_sent);
433+
434+
if !window_elapsed && !enough_candidates && !can_fill_slots {
435+
debug!(
436+
"CID {} - waiting for more candidates: elapsed={:?}, candidates={}, min={}, window={:?}",
437+
cid, elapsed, candidate_peers.len(), min_candidates, wait_window
438+
);
439+
false
440+
} else {
441+
true
442+
}
443+
} else {
444+
// No first Have time recorded yet (shouldn't happen, but be safe)
445+
true
446+
};
447+
448+
if !should_select {
449+
continue;
450+
}
451+
407452
let need_to_select = top_n - already_sent;
408453

409454
let candidates_with_metrics: FnvHashMap<PeerId, &PeerMetrics> = candidate_peers
@@ -422,8 +467,9 @@ where
422467
let selected = PeerSelector::select_top_peers(&candidates_with_metrics, &temp_config);
423468

424469
debug!(
425-
"CID {} - sent: {}, candidates: {}, selected: {} (top_n={})",
426-
cid, already_sent, candidates_with_metrics.len(), selected.len(), top_n
470+
"CID {} - sent: {}, candidates: {}, selected: {} (top_n={}, waited: {:?})",
471+
cid, already_sent, candidates_with_metrics.len(), selected.len(), top_n,
472+
self.cid_first_have_time.get(cid).map(|t| t.elapsed())
427473
);
428474

429475
selected_peers_for_cid.insert(*cid, selected);

third_party/beetswap/src/peer_selection.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ pub struct PeerSelectionConfig {
147147
pub min_peers: usize,
148148
/// Enable randomness in selection
149149
pub enable_randomness: bool,
150+
/// Wait window in milliseconds after first Have response before selecting peers
151+
/// This allows more peers to respond with Have, ensuring better selection
152+
pub have_wait_window_ms: u64,
153+
/// Minimum candidate ratio before starting selection (0.0 - 1.0)
154+
/// Selection starts when: candidates >= total_peers * min_candidate_ratio
155+
pub min_candidate_ratio: f64,
150156
}
151157

152158
impl Default for PeerSelectionConfig {
@@ -156,6 +162,8 @@ impl Default for PeerSelectionConfig {
156162
enabled: true,
157163
min_peers: 2,
158164
enable_randomness: true,
165+
have_wait_window_ms: 100, // Wait 100ms for more Have responses
166+
min_candidate_ratio: 0.3, // Or start when 30% of peers responded
159167
}
160168
}
161169
}

0 commit comments

Comments
 (0)