-
Notifications
You must be signed in to change notification settings - Fork 521
Expand file tree
/
Copy pathquery-path.ts
More file actions
236 lines (204 loc) · 6.53 KB
/
query-path.ts
File metadata and controls
236 lines (204 loc) · 6.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import { AbortError } from '@libp2p/interface'
import { Queue } from '@libp2p/utils'
import { pushable } from 'it-pushable'
import { xor as uint8ArrayXor } from 'uint8arrays/xor'
import { xorCompare as uint8ArrayXorCompare } from 'uint8arrays/xor-compare'
import { convertPeerId, convertBuffer } from '../utils.js'
import { pathEndedEvent, queryErrorEvent } from './events.js'
import type { QueryEvent } from '../index.js'
import type { QueryFunc } from '../query/types.js'
import type { Logger, PeerId, RoutingOptions, AbortOptions, PeerInfo } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'
import type { Filter } from '@libp2p/utils'
export interface QueryPathOptions extends RoutingOptions {
/**
* What are we trying to find
*/
key: Uint8Array
/**
* Where we start our query
*/
startingPeers: PeerId[]
/**
* Who we are
*/
ourPeerId: PeerId
/**
* The query function to run with each peer
*/
query: QueryFunc
/**
* How many concurrent node/value lookups to run
*/
alpha: number
/**
* The index within `k` this path represents
*/
path: number
/**
* How many disjoint paths are in this query
*/
numPaths: number
/**
* Query log
*/
log: Logger
/**
* Set of peers seen by this and other paths
*/
peersSeen: Filter
/**
* The libp2p connection manager
*/
connectionManager: ConnectionManager
/**
* The overall query abort signal
*/
signal: AbortSignal
/**
* Called when this path's queue goes idle, before PATH_ENDED is pushed.
* Fires out-of-band so the manager can detect completion without waiting
* for the event to flow through the merge iterator.
*/
onPathComplete?(pathIndex: number): void
}
interface QueryQueueOptions extends AbortOptions {
distance: Uint8Array
}
/**
* Walks a path through the DHT, calling the passed query function for
* every peer encountered that we have not seen before
*/
export async function * queryPath (options: QueryPathOptions): AsyncGenerator<QueryEvent, void, undefined> {
const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, log, peersSeen, connectionManager, signal, onPathComplete } = options
const events = pushable<QueryEvent>({
objectMode: true
})
// Only ALPHA node/value lookups are allowed at any given time for each process
// https://github.com/libp2p/specs/tree/master/kad-dht#alpha-concurrency-parameter-%CE%B1
const queue = new Queue<undefined, QueryQueueOptions>({
concurrency: alpha,
sort: (a, b) => uint8ArrayXorCompare(a.options.distance, b.options.distance)
})
queue.addEventListener('idle', () => {
onPathComplete?.(path)
events.push(pathEndedEvent({
path: {
index: path,
queued: queue.queued,
running: queue.running,
total: queue.size
}
}, options))
events.end()
})
queue.addEventListener('failure', (evt) => {
log.error('error during query - %e', evt.detail.error)
})
const onAbort = (): void => {
queue.abort()
events.end(new AbortError())
}
signal.addEventListener('abort', onAbort)
try {
// perform lookups on kadId, not the actual value
const kadId = await convertBuffer(key, {
signal
})
/**
* Adds the passed peer to the query queue if it's not us and no other path
* has passed through this peer
*/
function queryPeer (peer: PeerInfo, peerKadId: Uint8Array): void {
if (peer == null) {
return
}
peersSeen.add(peer.id.toMultihash().bytes)
const peerXor = uint8ArrayXor(peerKadId, kadId)
queue.add(async () => {
try {
for await (const event of query({
...options,
key,
peer,
path: {
index: path,
queued: queue.queued,
running: queue.running,
total: queue.size
},
numPaths,
peerKadId,
signal
})) {
// if there are closer peers and the query has not completed, continue the query
if (event.name === 'PEER_RESPONSE') {
for (const closerPeer of event.closer) {
if (peersSeen.has(closerPeer.id.toMultihash().bytes)) { // eslint-disable-line max-depth
log('already seen %p in query', closerPeer.id)
continue
}
if (ourPeerId.equals(closerPeer.id)) { // eslint-disable-line max-depth
log('not querying ourselves')
continue
}
if (!(await connectionManager.isDialable(closerPeer.multiaddrs))) { // eslint-disable-line max-depth
log('not querying undialable peer')
continue
}
const closerPeerKadId = await convertPeerId(closerPeer.id, {
signal
})
const closerPeerXor = uint8ArrayXor(closerPeerKadId, kadId)
// only continue query if closer peer is actually closer
if (uint8ArrayXorCompare(closerPeerXor, peerXor) !== -1) { // eslint-disable-line max-depth
log('skipping %p as they are not closer to %b than %p', closerPeer.id, key, peer.id)
continue
}
log('querying closer peer %p', closerPeer.id)
queryPeer(closerPeer, closerPeerKadId)
}
}
events.push({
...event,
path: {
index: path,
queued: queue.queued,
running: queue.running,
total: queue.size
}
})
}
} catch (err: any) {
// yield error event if query is continuing
events.push(queryErrorEvent({
from: peer.id,
error: err,
path: {
index: path,
queued: queue.queued,
running: queue.running - 1,
total: queue.size - 1
}
}, options))
}
}, {
distance: peerXor
}).catch(err => {
log.error('error during query - %e', err)
})
}
// begin the query with the starting peers
await Promise.all(
startingPeers.map(async startingPeer => {
queryPeer({ id: startingPeer, multiaddrs: [] }, await convertPeerId(startingPeer, {
signal
}))
})
)
// yield results as they come in
yield * events
} finally {
signal.removeEventListener('abort', onAbort)
}
}