diff --git a/packages/kad-dht/src/content-fetching/index.ts b/packages/kad-dht/src/content-fetching/index.ts index c858c2dda0..3147373899 100644 --- a/packages/kad-dht/src/content-fetching/index.ts +++ b/packages/kad-dht/src/content-fetching/index.ts @@ -302,6 +302,10 @@ export class ContentFetching { } // we have peers, lets send the actual query to them - yield * this.queryManager.run(key, getValueQuery, options) + // disable early termination so all close peers respond before we select the best record + yield * this.queryManager.run(key, getValueQuery, { + ...options, + disableEarlyTermination: true + }) } } diff --git a/packages/kad-dht/src/kad-dht.ts b/packages/kad-dht/src/kad-dht.ts index 5fb8b0d89b..0ff41c3423 100644 --- a/packages/kad-dht/src/kad-dht.ts +++ b/packages/kad-dht/src/kad-dht.ts @@ -196,7 +196,8 @@ export class KadDHT extends TypedEventEmitter implements Ka this.network = new Network(components, { protocol: this.protocol, logPrefix, - metricsPrefix + metricsPrefix, + timeout: init.networkDialTimeout }) this.routingTable = new RoutingTable(components, { diff --git a/packages/kad-dht/src/network.ts b/packages/kad-dht/src/network.ts index 5ea7c9f349..6dd89ecdb5 100644 --- a/packages/kad-dht/src/network.ts +++ b/packages/kad-dht/src/network.ts @@ -59,6 +59,9 @@ export class Network extends TypedEventEmitter implements Startab this.running = false this.protocol = init.protocol this.timeout = new AdaptiveTimeout({ + maxTimeout: 5_000, + minTimeout: 2_000, + failureMultiplier: 1.5, ...(init.timeout ?? {}), metrics: components.metrics, metricName: `${init.metricsPrefix}_network_message_send_times_milliseconds` diff --git a/packages/kad-dht/src/peer-routing/index.ts b/packages/kad-dht/src/peer-routing/index.ts index 76e0196165..893cdfe6fc 100644 --- a/packages/kad-dht/src/peer-routing/index.ts +++ b/packages/kad-dht/src/peer-routing/index.ts @@ -240,55 +240,72 @@ export class PeerRouting { */ async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { this.log('getClosestPeers to %b', key) - const kadId = await convertBuffer(key, options) - const peers = new PeerDistanceList(kadId, this.routingTable.kBucketSize) const self = this - const getCloserPeersQuery: QueryFunc = async function * ({ peer, path, peerKadId, signal }) { + // Accumulate the K closest peers that respond during the traversal. + // FINAL_PEER events are only emitted after the query fully converges - + // partial results from a timed-out query are not emitted because the DHT + // requires crossover between independent nodes resolving the same key. + const keyKadId = await convertBuffer(key, options) + const closestPeers = new PeerDistanceList(keyKadId, this.routingTable.kBucketSize) + + const getCloserPeersQuery: QueryFunc = async function * ({ peer, path, signal }) { self.log('getClosestPeers asking %p', peer.id) const request: Partial = { type: MessageType.FIND_NODE, key } - yield * self.network.sendRequest(peer.id, request, { + let contacted = false + + for await (const event of self.network.sendRequest(peer.id, request, { ...options, signal, path - }) - - // add the peer to the list if we've managed to contact it successfully - peers.addWithKadId(peer, peerKadId, path) - } + })) { + if (event.name === 'PEER_RESPONSE') { + contacted = true + } - yield * this.queryManager.run(key, getCloserPeersQuery, options) + yield event + } - this.log('found %d peers close to %b', peers.length, key) + if (!contacted) { + return + } - for (let { peer, path } of peers.peers) { try { - if (peer.multiaddrs.length === 0) { - peer = await self.components.peerStore.getInfo(peer.id, options) - } + let peerInfo = peer - if (peer.multiaddrs.length === 0) { - continue + if (peerInfo.multiaddrs.length === 0) { + peerInfo = await self.components.peerStore.getInfo(peer.id) } - yield finalPeerEvent({ - from: this.components.peerId, - peer: await self.components.peerStore.getInfo(peer.id, options), - path: { - index: path.index, - queued: 0, - running: 0, - total: 0 - } - }, options) + if (peerInfo.multiaddrs.length > 0) { + // omit signal - peer ID hashing is fast and we don't want + // an aborted signal to prevent recording a successful contact + await closestPeers.add(peerInfo, path) + } } catch { - continue + // peer info may not be in the peer store } } + + yield * this.queryManager.run(key, getCloserPeersQuery, options) + + // only reached on successful convergence - emit the K closest peers found + for (const { peer, path } of closestPeers.peers) { + yield finalPeerEvent({ + from: this.components.peerId, + peer, + path: { + index: path.index, + queued: 0, + running: 0, + total: 0 + } + }, options) + } } /** @@ -351,7 +368,7 @@ export class PeerRouting { id: peer.id, multiaddrs: peer.addresses.map(({ multiaddr }) => multiaddr) }) - } catch {} + } catch { /* key may not be a valid peer multihash */ } const keyKadId = await convertBuffer(key, options) const ids = this.routingTable.closestPeers(keyKadId, options) diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index 681c682fcf..2e226d76c0 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -40,6 +40,12 @@ export interface QueryManagerComponents { export interface QueryOptions extends RoutingOptions { isSelfQuery?: boolean + /** + * Disable early termination of the query when a majority of paths complete. + * Use this for value-retrieval queries that need responses from all close peers + * to select the best record. + */ + disableEarlyTermination?: boolean } /** @@ -141,9 +147,6 @@ export class QueryManager implements Startable { const log = this.logger.forComponent(`${this.logPrefix}:query:` + uint8ArrayToString(key, 'base58btc')) - // query a subset of peers up to `kBucketSize / 2` in length - let queryFinished = false - try { if (this.routingTable.size === 0 && !this.allowQueryWithZeroPeers) { log('routing table was empty, waiting for some peers before running%s query', options.isSelfQuery === true ? ' self' : '') @@ -195,6 +198,23 @@ export class QueryManager implements Startable { // make sure we don't get trapped in a loop const peersSeen = createScalableCuckooFilter(1024) + const totalPaths = peersToQuery.length + const minCompletedPaths = Math.max(1, Math.ceil(totalPaths * 0.6)) + let completedPaths = 0 + const earlyTerminationController = new AbortController() + setMaxListeners(Infinity, earlyTerminationController.signal) + + const onPathComplete = options.disableEarlyTermination === true + ? undefined + : (pathIndex: number): void => { + completedPaths++ + + if (completedPaths >= minCompletedPaths && completedPaths < totalPaths) { + log('path %d completed, %d/%d paths done, triggering early termination', pathIndex, completedPaths, totalPaths) + earlyTerminationController.abort() + } + } + // Create query paths from the starting peers const paths = peersToQuery.map((peer, index) => { return queryPath({ @@ -210,12 +230,18 @@ export class QueryManager implements Startable { log, peersSeen, onProgress: options.onProgress, - connectionManager: this.connectionManager + connectionManager: this.connectionManager, + onPathComplete }) }) // Execute the query along each disjoint path and yield their results as they become available for await (const event of merge(...paths)) { + if (earlyTerminationController.signal.aborted) { + log('early termination: %d/%d paths completed', completedPaths, totalPaths) + break + } + if (event.name === 'QUERY_ERROR') { log.error('query error - %e', event.error) } @@ -238,18 +264,13 @@ export class QueryManager implements Startable { signal.throwIfAborted() yield event } - - queryFinished = true } catch (err) { if (this.running) { // ignore errors thrown during shut down throw err } } finally { - if (!queryFinished) { - log('query exited early') - queryEarlyExitController.abort() - } + queryEarlyExitController.abort() signal.clear() diff --git a/packages/kad-dht/src/query/query-path.ts b/packages/kad-dht/src/query/query-path.ts index 978d173bdc..dff24090aa 100644 --- a/packages/kad-dht/src/query/query-path.ts +++ b/packages/kad-dht/src/query/query-path.ts @@ -66,6 +66,13 @@ export interface QueryPathOptions extends RoutingOptions { * The overall query abort signal */ signal: AbortSignal + + /** + * Called when this path's queue goes idle, before PATH_ENDED is pushed. + * Fires out-of-band so the manager can detect completion without waiting + * for the event to flow through the merge iterator. + */ + onPathComplete?(pathIndex: number): void } interface QueryQueueOptions extends AbortOptions { @@ -77,7 +84,7 @@ interface QueryQueueOptions extends AbortOptions { * every peer encountered that we have not seen before */ export async function * queryPath (options: QueryPathOptions): AsyncGenerator { - const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, log, peersSeen, connectionManager, signal } = options + const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, log, peersSeen, connectionManager, signal, onPathComplete } = options const events = pushable({ objectMode: true }) @@ -89,6 +96,8 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator uint8ArrayXorCompare(a.options.distance, b.options.distance) }) queue.addEventListener('idle', () => { + onPathComplete?.(path) + events.push(pathEndedEvent({ path: { index: path, diff --git a/packages/kad-dht/test/peer-routing.spec.ts b/packages/kad-dht/test/peer-routing.spec.ts index 8664cd5f12..1250f065b6 100644 --- a/packages/kad-dht/test/peer-routing.spec.ts +++ b/packages/kad-dht/test/peer-routing.spec.ts @@ -1,12 +1,16 @@ import { generateKeyPair } from '@libp2p/crypto/keys' +import { AbortError } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' import { peerIdFromPrivateKey } from '@libp2p/peer-id' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' +import drain from 'it-drain' import { stubInterface } from 'sinon-ts' import { K } from '../src/constants.js' +import { MessageType } from '../src/message/dht.js' import { PeerRouting } from '../src/peer-routing/index.js' -import { convertBuffer } from '../src/utils.js' +import { peerResponseEvent } from '../src/query/events.js' +import { convertBuffer, convertPeerId } from '../src/utils.js' import { createPeerIdsWithPrivateKey } from './utils/create-peer-id.js' import { sortClosestPeers } from './utils/sort-closest-peers.js' import type { PeerAndKey } from './utils/create-peer-id.js' @@ -61,6 +65,145 @@ describe('peer-routing', () => { peerRouting = new PeerRouting(components, init) }) + describe('getClosestPeers', () => { + it('should emit FINAL_PEER events for peers successfully contacted during a query', async () => { + const key = Uint8Array.from([0, 1, 2, 3, 4]) + const [peer1] = await getSortedPeers(key) + const peer1Multiaddr = multiaddr('/ip4/127.0.0.1/tcp/4001') + const path = { index: 0, queued: 0, running: 1, total: 1 } + + // queryManager.run calls queryFunc for peer1, then completes normally + init.queryManager.run.callsFake(async function * (k, queryFunc) { + const peer1KadId = await convertPeerId(peer1.peerId) + + yield * queryFunc({ + key: k, + peer: { id: peer1.peerId, multiaddrs: [peer1Multiaddr] }, + path, + peerKadId: peer1KadId, + numPaths: 1, + signal: new AbortController().signal + }) + }) + + // network.sendRequest returns a PEER_RESPONSE indicating successful contact + init.network.sendRequest.callsFake(async function * () { + yield peerResponseEvent({ + from: peer1.peerId, + messageType: MessageType.FIND_NODE, + closer: [], + providers: [], + record: undefined, + path + }, {}) + }) + + const events = [] + for await (const event of peerRouting.getClosestPeers(key)) { + events.push(event) + } + + const finalPeerEvents = events.filter(e => e.name === 'FINAL_PEER') + expect(finalPeerEvents).to.have.lengthOf(1) + expect(finalPeerEvents[0]).to.have.nested.property('peer.id', peer1.peerId) + }) + + it('should propagate AbortError from queryManager without emitting FINAL_PEER', async () => { + const key = Uint8Array.from([0, 1, 2, 3, 4]) + const [peer1] = await getSortedPeers(key) + const peer1Multiaddr = multiaddr('/ip4/127.0.0.1/tcp/4001') + const path = { index: 0, queued: 0, running: 1, total: 1 } + + // queryManager.run contacts peer1 successfully then times out — + // partial results must not be emitted since the query did not converge + init.queryManager.run.callsFake(async function * (k, queryFunc) { + const peer1KadId = await convertPeerId(peer1.peerId) + + yield * queryFunc({ + key: k, + peer: { id: peer1.peerId, multiaddrs: [peer1Multiaddr] }, + path, + peerKadId: peer1KadId, + numPaths: 1, + signal: new AbortController().signal + }) + + throw new AbortError('Query timed out') + }) + + init.network.sendRequest.callsFake(async function * () { + yield peerResponseEvent({ + from: peer1.peerId, + messageType: MessageType.FIND_NODE, + closer: [], + providers: [], + record: undefined, + path + }, {}) + }) + + let threw = false + const events = [] + try { + for await (const event of peerRouting.getClosestPeers(key)) { + events.push(event) + } + } catch (err: any) { + expect(err).to.have.property('name', 'AbortError') + threw = true + } + + expect(threw).to.be.true('getClosestPeers should have propagated the AbortError') + const finalPeerEvents = events.filter(e => e.name === 'FINAL_PEER') + expect(finalPeerEvents).to.have.lengthOf(0) + }) + + it('should propagate non-AbortError from queryManager', async () => { + const key = Uint8Array.from([0, 1, 2, 3, 4]) + const testError = new Error('test error') + + init.queryManager.run.callsFake(async function * () { + yield * ([] as any[]) + throw testError + }) + + await expect(drain(peerRouting.getClosestPeers(key))).to.eventually.be.rejectedWith(testError) + }) + + it('should not emit FINAL_PEER for peers that returned a query error', async () => { + const key = Uint8Array.from([0, 1, 2, 3, 4]) + const [peer1] = await getSortedPeers(key) + const peer1Multiaddr = multiaddr('/ip4/127.0.0.1/tcp/4001') + const path = { index: 0, queued: 0, running: 1, total: 1 } + + init.queryManager.run.callsFake(async function * (k, queryFunc) { + const peer1KadId = await convertPeerId(peer1.peerId) + + yield * queryFunc({ + key: k, + peer: { id: peer1.peerId, multiaddrs: [peer1Multiaddr] }, + path, + peerKadId: peer1KadId, + numPaths: 1, + signal: new AbortController().signal + }) + }) + + // network.sendRequest yields no PEER_RESPONSE — simulates a failed contact + init.network.sendRequest.callsFake(async function * () { + // yields nothing (peer was not contactable) + }) + + const events = [] + for await (const event of peerRouting.getClosestPeers(key)) { + events.push(event) + } + + const finalPeerEvents = events.filter(e => e.name === 'FINAL_PEER') + expect(finalPeerEvents).to.have.lengthOf(0) + }) + }) + describe('getClosestPeersOffline', () => { it('should only return DHT servers', async () => { const key = Uint8Array.from([0, 1, 2, 3, 4])