@@ -68,6 +68,33 @@ struct EventInfo<'a> {
6868 metrics : & ' a Arc < NetworkMetrics > ,
6969}
7070
71+ #[ cfg( feature = "kad" ) ]
72+ fn store_dht_record < K > (
73+ dht_get_results : & mut HashMap < K , DhtResults > ,
74+ id : K ,
75+ dht_record : DhtRecord ,
76+ ) -> u8
77+ where
78+ K : Eq + std:: hash:: Hash ,
79+ {
80+ let results = dht_get_results. entry ( id) . or_insert_with ( || DhtResults {
81+ count : 0 ,
82+ best_value : dht_record. clone ( ) ,
83+ outdated_values : vec ! [ ] ,
84+ } ) ;
85+
86+ results. count += 1 ;
87+
88+ if dht_record > results. best_value {
89+ results. outdated_values . push ( results. best_value . clone ( ) ) ;
90+ results. best_value = dht_record;
91+ } else if dht_record < results. best_value {
92+ results. outdated_values . push ( dht_record) ;
93+ }
94+
95+ results. count
96+ }
97+
7198pub ( crate ) fn new_swarm (
7299 config : Config ,
73100 contacts : Arc < RwLock < PeerContactBook > > ,
@@ -531,32 +558,10 @@ fn handle_dht_get(
531558 }
532559 } ;
533560
534- if step. count . get ( ) == 1_usize {
535- // This is our first record
536- let results = DhtResults {
537- count : 0 , // Will be increased in the next step
538- best_value : dht_record. clone ( ) ,
539- outdated_values : vec ! [ ] ,
540- } ;
541- event_info. state . dht_get_results . insert ( id, results) ;
542- }
561+ let count = store_dht_record ( & mut event_info. state . dht_get_results , id, dht_record) ;
543562
544- // We should always have a stored result
545- let Some ( results) = event_info. state . dht_get_results . get_mut ( & id) else {
546- log:: error!( query_id = ?id, "DHT inconsistent state" ) ;
547- return ;
548- } ;
549-
550- results. count += 1 ;
551- // Replace best value if needed and update the outdated values
552- if dht_record > results. best_value {
553- results. outdated_values . push ( results. best_value . clone ( ) ) ;
554- results. best_value = dht_record;
555- } else if dht_record < results. best_value {
556- results. outdated_values . push ( dht_record)
557- }
558563 // Check if we already have a quorum
559- if results . count == event_info. state . dht_quorum {
564+ if count == event_info. state . dht_quorum {
560565 event_info
561566 . swarm
562567 . behaviour_mut ( )
@@ -571,7 +576,18 @@ fn handle_dht_get(
571576 // and push the best result to the cache candidates
572577
573578 let Some ( results) = event_info. state . dht_get_results . remove ( & id) else {
574- log:: error!( query_id = ?id, "DHT inconsistent state" ) ;
579+ warn ! (
580+ query_id = ?id,
581+ "DHT query finished without any verified record"
582+ ) ;
583+
584+ if let Some ( output) = event_info. state . dht_gets . remove ( & id) {
585+ if output. send ( Err ( NetworkError :: DhtNoValidRecord ) ) . is_err ( ) {
586+ error ! ( query_id = ?id, error = "receiver hung up" , "could not send get record query result to channel" ) ;
587+ }
588+ } else {
589+ warn ! ( query_id = ?id, ?step, "GetRecord query result for unknown query ID" ) ;
590+ }
575591 return ;
576592 } ;
577593
@@ -621,6 +637,62 @@ fn handle_dht_get(
621637 }
622638}
623639
640+ #[ cfg( all( test, feature = "kad" ) ) ]
641+ mod tests {
642+ use std:: collections:: HashMap ;
643+
644+ use libp2p:: kad:: { Record , RecordKey } ;
645+ use nimiq_keys:: Address ;
646+ use nimiq_validator_network:: validator_record:: ValidatorRecord ;
647+
648+ use super :: { store_dht_record, DhtRecord , DhtResults } ;
649+
650+ fn test_dht_record ( timestamp : u64 , key_suffix : u8 ) -> DhtRecord {
651+ let peer_id = libp2p:: PeerId :: random ( ) ;
652+ let validator_record = ValidatorRecord :: new ( peer_id, Address :: default ( ) , timestamp) ;
653+ let record = Record {
654+ key : RecordKey :: new ( & [ key_suffix] ) ,
655+ value : vec ! [ key_suffix] ,
656+ publisher : Some ( peer_id) ,
657+ expires : None ,
658+ } ;
659+
660+ DhtRecord :: Validator ( peer_id, validator_record, record)
661+ }
662+
663+ #[ test]
664+ fn stores_first_verified_record_even_if_query_already_progressed ( ) {
665+ let mut results = HashMap :: < u8 , DhtResults > :: new ( ) ;
666+ let record = test_dht_record ( 10 , 1 ) ;
667+
668+ let count = store_dht_record ( & mut results, 7 , record. clone ( ) ) ;
669+
670+ assert_eq ! ( count, 1 ) ;
671+
672+ let stored = results. get ( & 7 ) . expect ( "record should initialize results" ) ;
673+ assert_eq ! ( stored. count, 1 ) ;
674+ assert ! ( stored. best_value == record) ;
675+ assert ! ( stored. outdated_values. is_empty( ) ) ;
676+ }
677+
678+ #[ test]
679+ fn keeps_newest_record_and_tracks_outdated_ones ( ) {
680+ let mut results = HashMap :: < u8 , DhtResults > :: new ( ) ;
681+ let older = test_dht_record ( 10 , 1 ) ;
682+ let newer = test_dht_record ( 20 , 2 ) ;
683+
684+ store_dht_record ( & mut results, 3 , older. clone ( ) ) ;
685+ let count = store_dht_record ( & mut results, 3 , newer. clone ( ) ) ;
686+
687+ assert_eq ! ( count, 2 ) ;
688+
689+ let stored = results. get ( & 3 ) . expect ( "query results should exist" ) ;
690+ assert_eq ! ( stored. count, 2 ) ;
691+ assert ! ( stored. best_value == newer) ;
692+ assert ! ( stored. outdated_values == vec![ older] ) ;
693+ }
694+ }
695+
624696#[ cfg( feature = "kad" ) ]
625697fn handle_dht_put_record (
626698 id : QueryId ,
0 commit comments