Skip to content

Commit 88b5c7c

Browse files
authored
Merge branch 'master' into biome
2 parents 9749f94 + 572c047 commit 88b5c7c

1 file changed

Lines changed: 50 additions & 26 deletions

File tree

packages/portalnetwork/src/networks/contentLookup.ts

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export class ContentLookup {
2525
private finished: boolean
2626
private content: ContentLookupResponse
2727
private pending: Set<NodeId>
28+
private queuedPeers: Set<NodeId>
2829
private completedRequests?: Map<NodeId, NodeId[]>
2930
private contentTrace?: ContentTrace
3031
constructor(network: BaseNetwork, contentKey: Uint8Array, trace = false) {
@@ -37,6 +38,7 @@ export class ContentLookup {
3738
this.finished = false
3839
this.meta = new Map()
3940
this.pending = new Set()
41+
this.queuedPeers = new Set()
4042
this.completedRequests = trace ? new Map() : undefined
4143
this.contentTrace = trace
4244
? {
@@ -47,6 +49,23 @@ export class ContentLookup {
4749
: undefined
4850
}
4951

52+
private addPeerToQueue = (enr: ENR) => {
53+
if (this.queuedPeers.has(enr.nodeId) || this.network.portal.uTP.hasRequests(enr.nodeId)) {
54+
return
55+
}
56+
57+
const dist = distance(enr.nodeId, this.contentId)
58+
this.lookupPeers.push({ enr, distance: Number(dist) })
59+
this.queuedPeers.add(enr.nodeId)
60+
this.meta.set('0x' + enr.nodeId, {
61+
enr: enr.encodeTxt(),
62+
distance: bigIntToHex(dist),
63+
})
64+
this.logger(
65+
`Adding ${shortId(enr.nodeId)} to lookup queue (${this.lookupPeers.size()})`,
66+
)
67+
}
68+
5069
/**
5170
* Queries the 5 nearest nodes in the history network routing table and recursively
5271
* requests peers closer to the content until either the content is found or there are no more peers to query
@@ -73,26 +92,28 @@ export class ContentLookup {
7392
// Sort known peers by distance to the content
7493
const nearest = this.network.routingTable.values()
7594
for (const enr of nearest) {
76-
// // Skip if the node has an active uTP request
77-
if (this.network.portal.uTP.hasRequests(enr.nodeId) === true) {
78-
continue
79-
}
80-
const dist = distance(enr.nodeId, this.contentId)
81-
this.lookupPeers.push({ enr, distance: Number(dist) })
82-
this.meta.set(enr.nodeId, { enr: enr.encodeTxt(), distance: bigIntToHex(dist) })
95+
this.addPeerToQueue(enr)
8396
}
8497

8598
while (!this.finished && (this.lookupPeers.length > 0 || this.pending.size > 0)) {
8699
if (this.lookupPeers.length > 0) {
87100
// Ask more peers (up to 5) for content
88101
const peerBatch: LookupPeer[] = []
89-
while (this.lookupPeers.peek() && peerBatch.length < 5) {
102+
const availableSlots = 5 - this.pending.size
103+
while (this.lookupPeers.peek() && peerBatch.length < availableSlots) {
90104
const next = this.lookupPeers.pop()!
91105
peerBatch.push(next)
92106
}
93107
const promises = peerBatch.map((peer) => {
108+
const controller = new AbortController()
109+
const timeoutId = setTimeout(() => {
110+
controller.abort()
111+
}, this.timeout)
112+
94113
return Promise.race([
95-
this.processPeer(peer),
114+
this.processPeer(peer, controller.signal).finally(() => {
115+
clearTimeout(timeoutId)
116+
}),
96117
new Promise((resolve) =>
97118
setTimeout(() => {
98119
resolve(undefined)
@@ -155,7 +176,7 @@ export class ContentLookup {
155176
return this.content
156177
}
157178

158-
private processPeer = async (peer: LookupPeer): Promise<ContentLookupResponse | void> => {
179+
private processPeer = async (peer: LookupPeer, signal?: AbortSignal): Promise<ContentLookupResponse | void> => {
159180
if (this.finished) return
160181
if (this.network.routingTable.isIgnored(peer.enr.nodeId)) {
161182
this.logger(`peer ${shortId(peer.enr.nodeId)} is ignored`)
@@ -165,7 +186,21 @@ export class ContentLookup {
165186
this.pending.add(peer.enr.encodeTxt())
166187
this.logger(`Requesting content from ${shortId(peer.enr.nodeId)}`)
167188
try {
168-
const res = await this.network.sendFindContent!(peer.enr, this.contentKey)
189+
// Create a promise that rejects when the signal is aborted
190+
const abortPromise = new Promise((_, reject) => {
191+
if (signal) {
192+
signal.addEventListener('abort', () => {
193+
reject(new Error('Request cancelled'))
194+
})
195+
}
196+
})
197+
198+
// Race between the actual request and the abort signal
199+
const res = await Promise.race([
200+
this.network.sendFindContent!(peer.enr, this.contentKey),
201+
abortPromise
202+
]) as ContentLookupResponse | undefined
203+
169204
this.pending.delete(peer.enr.encodeTxt())
170205
if (this.finished) {
171206
this.logger(`Response from ${shortId(peer.enr.nodeId)} arrived after lookup finished`)
@@ -193,21 +228,7 @@ export class ContentLookup {
193228
this.logger(`received ${res.enrs.length} ENRs for closer nodes`)
194229
for (const enr of res.enrs) {
195230
const decodedEnr = ENR.decode(enr)
196-
// // Skip if the node has an active uTP request
197-
if (this.network.portal.uTP.hasRequests(decodedEnr.nodeId) === true) {
198-
continue
199-
}
200-
if (!this.meta.has(decodedEnr.nodeId)) {
201-
const dist = distance(decodedEnr.nodeId, this.contentId)
202-
this.lookupPeers.push({ enr: decodedEnr, distance: Number(dist) })
203-
this.meta.set('0x' + decodedEnr.nodeId, {
204-
enr: decodedEnr.encodeTxt(),
205-
distance: bigIntToHex(dist),
206-
})
207-
this.logger(
208-
`Adding ${shortId(decodedEnr.nodeId)} to lookup queue (${this.lookupPeers.size()})`,
209-
)
210-
}
231+
this.addPeerToQueue(decodedEnr)
211232
}
212233
this.completedRequests &&
213234
this.completedRequests.set(
@@ -218,6 +239,9 @@ export class ContentLookup {
218239
}
219240
} catch (err) {
220241
this.pending.delete(peer.enr.encodeTxt())
242+
if (signal?.aborted) {
243+
this.logger(`Request to ${shortId(peer.enr.nodeId)} was cancelled`)
244+
}
221245
throw err
222246
}
223247
}

0 commit comments

Comments
 (0)