Skip to content

Commit a7bf076

Browse files
viquezclaudiojsdanielh
authored andcommitted
Fix DHT GetRecord query wedge on untrusted peer responses
1 parent 0403cb3 commit a7bf076

4 files changed

Lines changed: 115 additions & 4 deletions

File tree

network-libp2p/src/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ pub enum NetworkError {
2828
#[error("DHT query finished without a verified record")]
2929
DhtNoValidRecord,
3030

31+
#[error("DHT get query entered an inconsistent state")]
32+
DhtGetInconsistentState,
33+
34+
#[error("DHT get record verification failed")]
35+
DhtGetVerificationFailed,
36+
37+
#[error("DHT get query timed out")]
38+
DhtGetTimeout,
39+
3140
#[error("DHT PutRecord error: {0:?}")]
3241
DhtPutRecord(libp2p::kad::PutRecordError),
3342

network-libp2p/src/network.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,10 @@ impl NetworkInterface for Network {
644644
})
645645
.await?;
646646

647-
let data = output_rx.await??;
647+
let Ok(data) = timeout(REQUEST_TIMEOUT, output_rx).await else {
648+
return Err(NetworkError::DhtGetTimeout);
649+
};
650+
let data = data??;
648651
// Now decode the signed record and returned the tagged signable record
649652
let signed_record: TaggedSigned<V, T> = Deserialize::deserialize_from_vec(&data)?;
650653
Ok(Some(signed_record.record))

network-libp2p/src/swarm.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,27 +539,63 @@ fn handle_dht_event(event: kad::Event, event_info: EventInfo) {
539539
}
540540
}
541541

542+
#[cfg(feature = "kad")]
543+
fn abort_dht_get(
544+
id: QueryId,
545+
step: ProgressStep,
546+
error: NetworkError,
547+
reason: &'static str,
548+
event_info: &mut EventInfo,
549+
) {
550+
if let Some(mut query) = event_info.swarm.behaviour_mut().dht.query_mut(&id) {
551+
query.finish();
552+
}
553+
554+
event_info.state.dht_get_results.remove(&id);
555+
556+
if let Some(output) = event_info.state.dht_gets.remove(&id) {
557+
if output.send(Err(error)).is_err() {
558+
error!(query_id = ?id, ?step, "could not send aborted get record query result to channel");
559+
}
560+
} else {
561+
warn!(query_id = ?id, ?step, %reason, "GetRecord query abort for unknown query ID");
562+
}
563+
}
564+
542565
#[cfg(feature = "kad")]
543566
fn handle_dht_get(
544567
id: QueryId,
545568
result: Result<GetRecordOk, GetRecordError>,
546569
_stats: QueryStats,
547570
step: ProgressStep,
548-
event_info: EventInfo,
571+
mut event_info: EventInfo,
549572
) {
573+
if !event_info.state.dht_gets.contains_key(&id)
574+
&& !event_info.state.dht_get_results.contains_key(&id)
575+
{
576+
debug!(query_id = ?id, ?step, "Ignoring stale GetRecord query progress");
577+
return;
578+
}
579+
550580
match result {
551581
Ok(GetRecordOk::FoundRecord(record)) => {
552582
// Verify incoming record
553583
let dht_record = match event_info.dht_verifier.verify(&record.record) {
554584
Ok(record) => record,
555585
Err(error) => {
556586
warn!(?error, "DHT record verification failed");
587+
abort_dht_get(
588+
id,
589+
step,
590+
NetworkError::DhtGetVerificationFailed,
591+
"DHT get record verification failed",
592+
&mut event_info,
593+
);
557594
return;
558595
}
559596
};
560597

561598
let count = store_dht_record(&mut event_info.state.dht_get_results, id, dht_record);
562-
563599
// Check if we already have a quorum
564600
if count == event_info.state.dht_quorum {
565601
event_info

network-libp2p/tests/network.rs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use nimiq_network_interface::{
1616
use nimiq_network_libp2p::{
1717
dht,
1818
discovery::{self, peer_contacts::PeerContact},
19-
Config, Network,
19+
Config, Network, NetworkError,
2020
};
2121
use nimiq_serde::{Deserialize, Serialize};
2222
use nimiq_test_log::test;
@@ -245,6 +245,17 @@ impl dht::Verifier for Verifier {
245245
}
246246
}
247247

248+
struct RejectingVerifier;
249+
250+
impl dht::Verifier for RejectingVerifier {
251+
fn verify(
252+
&self,
253+
_record: &libp2p::kad::Record,
254+
) -> Result<dht::DhtRecord, dht::DhtVerifierError> {
255+
Err(dht::DhtVerifierError::InvalidSignature)
256+
}
257+
}
258+
248259
async fn create_network_with_n_peers(
249260
n_peers: usize,
250261
) -> (
@@ -515,6 +526,58 @@ async fn dht_put_and_get() {
515526
assert_eq!(fetched_record, Some(put_record));
516527
}
517528

529+
#[test(tokio::test)]
530+
#[cfg(feature = "kad")]
531+
async fn dht_get_with_verifier_failure_returns_error() {
532+
let addr1 = multiaddr![Memory(rand::random::<u64>())];
533+
let addr2 = multiaddr![Memory(rand::random::<u64>())];
534+
let keys = Arc::new(RwLock::new(BTreeMap::default()));
535+
536+
let net1 = Network::new(network_config(addr1.clone()), Verifier::new(&keys)).await;
537+
net1.listen_on(vec![addr1.clone()]).await;
538+
539+
let net2 = Network::new(network_config(addr2.clone()), RejectingVerifier).await;
540+
net2.listen_on(vec![addr2.clone()]).await;
541+
542+
let mut events1 = net1.subscribe_events();
543+
let mut events2 = net2.subscribe_events();
544+
545+
net2.dial_address(addr1).await.unwrap();
546+
547+
let event1 = helper::get_next_peer_event(&mut events1).await;
548+
helper::assert_peer_joined(&event1, &net2.get_local_peer_id());
549+
550+
let event2 = helper::get_next_peer_event(&mut events2).await;
551+
helper::assert_peer_joined(&event2, &net1.get_local_peer_id());
552+
553+
sleep(Duration::from_secs(10)).await;
554+
555+
let mut rng = test_rng(false);
556+
let keypair = KeyPair::generate(&mut rng);
557+
let key: Address = (&keypair.public).into();
558+
559+
let put_record = ValidatorRecord {
560+
peer_id: net1.get_local_peer_id(),
561+
validator_address: key.clone(),
562+
timestamp: 0x42u64,
563+
};
564+
565+
assert!(keys.write().insert(key.clone(), keypair.public).is_none());
566+
net1.dht_put(&key, &put_record, &keypair).await.unwrap();
567+
568+
let result = timeout(
569+
Duration::from_secs(2),
570+
net2.dht_get::<_, ValidatorRecord<PeerId>, KeyPair>(&key),
571+
)
572+
.await;
573+
574+
assert!(result.is_ok(), "dht_get future hung");
575+
assert!(matches!(
576+
result.unwrap(),
577+
Err(NetworkError::DhtGetVerificationFailed)
578+
));
579+
}
580+
518581
#[test(tokio::test)]
519582
async fn ban_peer() {
520583
let (net1, net2) = create_connected_networks().await;

0 commit comments

Comments
 (0)