Skip to content
6 changes: 5 additions & 1 deletion packages/kad-dht/src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ export class ContentFetching {
}

// we have peers, lets send the actual query to them
yield * this.queryManager.run(key, getValueQuery, options)
// disable early termination so all close peers respond before we select the best record
yield * this.queryManager.run(key, getValueQuery, {
...options,
disableEarlyTermination: true
})
}
}
3 changes: 2 additions & 1 deletion packages/kad-dht/src/kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
this.network = new Network(components, {
protocol: this.protocol,
logPrefix,
metricsPrefix
metricsPrefix,
timeout: init.networkDialTimeout
})

this.routingTable = new RoutingTable(components, {
Expand Down
3 changes: 3 additions & 0 deletions packages/kad-dht/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ export class Network extends TypedEventEmitter<NetworkEvents> implements Startab
this.running = false
this.protocol = init.protocol
this.timeout = new AdaptiveTimeout({
maxTimeout: 5_000,
minTimeout: 2_000,
failureMultiplier: 1.5,
...(init.timeout ?? {}),
metrics: components.metrics,
metricName: `${init.metricsPrefix}_network_message_send_times_milliseconds`
Expand Down
75 changes: 46 additions & 29 deletions packages/kad-dht/src/peer-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,55 +240,72 @@ export class PeerRouting {
*/
async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
this.log('getClosestPeers to %b', key)
const kadId = await convertBuffer(key, options)
const peers = new PeerDistanceList(kadId, this.routingTable.kBucketSize)
const self = this

const getCloserPeersQuery: QueryFunc = async function * ({ peer, path, peerKadId, signal }) {
// Accumulate the K closest peers that respond during the traversal.
// FINAL_PEER events are only emitted after the query fully converges -
// partial results from a timed-out query are not emitted because the DHT
// requires crossover between independent nodes resolving the same key.
const keyKadId = await convertBuffer(key, options)
const closestPeers = new PeerDistanceList(keyKadId, this.routingTable.kBucketSize)

const getCloserPeersQuery: QueryFunc = async function * ({ peer, path, signal }) {
self.log('getClosestPeers asking %p', peer.id)
const request: Partial<Message> = {
type: MessageType.FIND_NODE,
key
}

yield * self.network.sendRequest(peer.id, request, {
let contacted = false

for await (const event of self.network.sendRequest(peer.id, request, {
...options,
signal,
path
})

// add the peer to the list if we've managed to contact it successfully
peers.addWithKadId(peer, peerKadId, path)
}
})) {
if (event.name === 'PEER_RESPONSE') {
contacted = true
}

yield * this.queryManager.run(key, getCloserPeersQuery, options)
yield event
}

this.log('found %d peers close to %b', peers.length, key)
if (!contacted) {
return
}

for (let { peer, path } of peers.peers) {
try {
if (peer.multiaddrs.length === 0) {
peer = await self.components.peerStore.getInfo(peer.id, options)
}
let peerInfo = peer

if (peer.multiaddrs.length === 0) {
continue
if (peerInfo.multiaddrs.length === 0) {
peerInfo = await self.components.peerStore.getInfo(peer.id)
}

yield finalPeerEvent({
from: this.components.peerId,
peer: await self.components.peerStore.getInfo(peer.id, options),
path: {
index: path.index,
queued: 0,
running: 0,
total: 0
}
}, options)
if (peerInfo.multiaddrs.length > 0) {
// omit signal - peer ID hashing is fast and we don't want
// an aborted signal to prevent recording a successful contact
await closestPeers.add(peerInfo, path)
}
} catch {
continue
// peer info may not be in the peer store
}
}

yield * this.queryManager.run(key, getCloserPeersQuery, options)

// only reached on successful convergence - emit the K closest peers found
for (const { peer, path } of closestPeers.peers) {
yield finalPeerEvent({
from: this.components.peerId,
peer,
path: {
index: path.index,
queued: 0,
running: 0,
total: 0
}
}, options)
}
}

/**
Expand Down Expand Up @@ -351,7 +368,7 @@ export class PeerRouting {
id: peer.id,
multiaddrs: peer.addresses.map(({ multiaddr }) => multiaddr)
})
} catch {}
} catch { /* key may not be a valid peer multihash */ }

const keyKadId = await convertBuffer(key, options)
const ids = this.routingTable.closestPeers(keyKadId, options)
Expand Down
41 changes: 31 additions & 10 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ export interface QueryManagerComponents {

export interface QueryOptions extends RoutingOptions {
isSelfQuery?: boolean
/**
* Disable early termination of the query when a majority of paths complete.
* Use this for value-retrieval queries that need responses from all close peers
* to select the best record.
*/
disableEarlyTermination?: boolean
}

/**
Expand Down Expand Up @@ -141,9 +147,6 @@ export class QueryManager implements Startable {

const log = this.logger.forComponent(`${this.logPrefix}:query:` + uint8ArrayToString(key, 'base58btc'))

// query a subset of peers up to `kBucketSize / 2` in length
let queryFinished = false

try {
if (this.routingTable.size === 0 && !this.allowQueryWithZeroPeers) {
log('routing table was empty, waiting for some peers before running%s query', options.isSelfQuery === true ? ' self' : '')
Expand Down Expand Up @@ -195,6 +198,23 @@ export class QueryManager implements Startable {
// make sure we don't get trapped in a loop
const peersSeen = createScalableCuckooFilter(1024)

const totalPaths = peersToQuery.length
const minCompletedPaths = Math.max(1, Math.ceil(totalPaths * 0.6))
let completedPaths = 0
const earlyTerminationController = new AbortController()
setMaxListeners(Infinity, earlyTerminationController.signal)

const onPathComplete = options.disableEarlyTermination === true
? undefined
: (pathIndex: number): void => {
completedPaths++

if (completedPaths >= minCompletedPaths && completedPaths < totalPaths) {
log('path %d completed, %d/%d paths done, triggering early termination', pathIndex, completedPaths, totalPaths)
earlyTerminationController.abort()
}
}

// Create query paths from the starting peers
const paths = peersToQuery.map((peer, index) => {
return queryPath({
Expand All @@ -210,12 +230,18 @@ export class QueryManager implements Startable {
log,
peersSeen,
onProgress: options.onProgress,
connectionManager: this.connectionManager
connectionManager: this.connectionManager,
onPathComplete
})
})

// Execute the query along each disjoint path and yield their results as they become available
for await (const event of merge(...paths)) {
if (earlyTerminationController.signal.aborted) {
log('early termination: %d/%d paths completed', completedPaths, totalPaths)
break
}

if (event.name === 'QUERY_ERROR') {
log.error('query error - %e', event.error)
}
Expand All @@ -238,18 +264,13 @@ export class QueryManager implements Startable {
signal.throwIfAborted()
yield event
}

queryFinished = true
} catch (err) {
if (this.running) {
// ignore errors thrown during shut down
throw err
}
} finally {
if (!queryFinished) {
log('query exited early')
queryEarlyExitController.abort()
}
queryEarlyExitController.abort()

signal.clear()

Expand Down
11 changes: 10 additions & 1 deletion packages/kad-dht/src/query/query-path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ export interface QueryPathOptions extends RoutingOptions {
* The overall query abort signal
*/
signal: AbortSignal

/**
* Called when this path's queue goes idle, before PATH_ENDED is pushed.
* Fires out-of-band so the manager can detect completion without waiting
* for the event to flow through the merge iterator.
*/
onPathComplete?(pathIndex: number): void
}

interface QueryQueueOptions extends AbortOptions {
Expand All @@ -77,7 +84,7 @@ interface QueryQueueOptions extends AbortOptions {
* every peer encountered that we have not seen before
*/
export async function * queryPath (options: QueryPathOptions): AsyncGenerator<QueryEvent, void, undefined> {
const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, log, peersSeen, connectionManager, signal } = options
const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, log, peersSeen, connectionManager, signal, onPathComplete } = options
const events = pushable<QueryEvent>({
objectMode: true
})
Expand All @@ -89,6 +96,8 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
sort: (a, b) => uint8ArrayXorCompare(a.options.distance, b.options.distance)
})
queue.addEventListener('idle', () => {
onPathComplete?.(path)

events.push(pathEndedEvent({
path: {
index: path,
Expand Down
Loading
Loading