-
Notifications
You must be signed in to change notification settings - Fork 520
Expand file tree
/
Copy pathkad-dht.spec.ts
More file actions
582 lines (443 loc) · 17 KB
/
kad-dht.spec.ts
File metadata and controls
582 lines (443 loc) · 17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 8] */
import { Libp2pRecord } from '@libp2p/record'
import { expect } from 'aegir/chai'
import all from 'it-all'
import drain from 'it-drain'
import filter from 'it-filter'
import last from 'it-last'
import sinon from 'sinon'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { MessageType } from '../src/index.js'
import { peerResponseEvent } from '../src/query/events.js'
import * as kadUtils from '../src/utils.js'
import { createPeerIdsWithPrivateKey } from './utils/create-peer-id.js'
import { sortDHTs } from './utils/sort-closest-peers.js'
import { TestDHT } from './utils/test-dht.js'
import type { PeerAndKey } from './utils/create-peer-id.js'
import type { FinalPeerEvent, QueryEvent, ValueEvent } from '../src/index.js'
async function findEvent (events: AsyncIterable<QueryEvent>, name: 'FINAL_PEER'): Promise<FinalPeerEvent>
async function findEvent (events: AsyncIterable<QueryEvent>, name: 'VALUE'): Promise<ValueEvent>
async function findEvent (events: AsyncIterable<QueryEvent>, name: string): Promise<QueryEvent> {
const eventTypes = new Set<string>()
const event = await last(
filter(events, event => {
eventTypes.add(event.name)
return event.name === name
})
)
if (event == null) {
throw new Error(`No ${name} event found, saw ${Array.from(eventTypes).join()}`)
}
return event
}
describe('KadDHT', () => {
let peerIds: PeerAndKey[]
let testDHT: TestDHT
beforeEach(() => {
testDHT = new TestDHT()
})
afterEach(async () => {
await testDHT.teardown()
})
before(async function () {
this.timeout(10 * 1000)
peerIds = await createPeerIdsWithPrivateKey(3)
})
describe('start and stop', () => {
it('default mode', async () => {
// off by default
const dht = await testDHT.spawn({ clientMode: undefined }, false)
await dht.dht.start()
// by default we start in client mode
expect(dht.components.registrar.handle).to.have.property('callCount', 0)
await dht.dht.setMode('server')
// now we should be in server mode
expect(dht.components.registrar.handle).to.have.property('callCount', 1)
await dht.dht.stop()
})
it('server mode', async () => {
// turn client mode off explicitly
const dht = await testDHT.spawn({ clientMode: false }, false)
await dht.dht.start()
// should have started in server mode
expect(dht.components.registrar.handle).to.have.property('callCount', 1)
await dht.dht.setMode('server')
// we were already in server mode, should have been a no-op
expect(dht.components.registrar.handle).to.have.property('callCount', 1)
await dht.dht.stop()
})
it('client mode', async () => {
// turn client mode on explicitly
const dht = await testDHT.spawn({ clientMode: true }, false)
await dht.dht.start()
await dht.dht.stop()
// should not have registered handler in client mode
expect(dht.components.registrar.handle).to.have.property('callCount', 0)
})
it('should not fail when already started', async () => {
const dht = await testDHT.spawn(undefined, false)
await dht.dht.start()
await dht.dht.start()
await dht.dht.start()
await dht.dht.stop()
})
it('should not fail to stop when was not started', async () => {
const dht = await testDHT.spawn(undefined, false)
await dht.dht.stop()
})
})
describe('content fetching', () => {
it('put - get same node', async function () {
this.timeout(10 * 1000)
const key = uint8ArrayFromString('/v/hello')
const value = uint8ArrayFromString('world')
const dht = await testDHT.spawn()
// Exchange data through the dht
await drain(dht.dht.put(key, value))
const res = await last(dht.dht.get(key))
expect(res).to.have.property('value').that.equalBytes(value)
})
it('put - get', async function () {
this.timeout(10 * 1000)
const key = uint8ArrayFromString('/v/hello')
const value = uint8ArrayFromString('world')
const [dhtA, dhtB] = await Promise.all([
testDHT.spawn(),
testDHT.spawn()
])
// Connect nodes
await testDHT.connect(dhtA, dhtB)
// Exchange data through the dht
await drain(dhtA.dht.put(key, value))
const res = await findEvent(dhtB.dht.get(key), 'VALUE')
expect(res).to.have.property('value').that.equalBytes(value)
})
it('put - get calls progress handler', async function () {
this.timeout(10 * 1000)
const key = uint8ArrayFromString('/v/hello')
const value = uint8ArrayFromString('world')
const [dhtA, dhtB] = await Promise.all([
testDHT.spawn(),
testDHT.spawn()
])
// Connect nodes
await testDHT.connect(dhtA, dhtB)
const putProgress = sinon.stub()
// Exchange data through the dht
await drain(dhtA.dht.put(key, value, {
onProgress: putProgress
}))
expect(putProgress).to.have.property('called', true)
const getProgress = sinon.stub()
await drain(dhtB.dht.get(key, {
onProgress: getProgress
}))
expect(getProgress).to.have.property('called', true)
})
it('put - should require a minimum number of peers to have successful puts', async function () {
this.timeout(10 * 1000)
const error = new Error('fake error')
const key = uint8ArrayFromString('/v/hello')
const value = uint8ArrayFromString('world')
const [dhtA, dhtB, dhtC, dhtD] = await Promise.all([
testDHT.spawn(),
testDHT.spawn(),
testDHT.spawn(),
testDHT.spawn({
// Stub verify record
validators: {
v: sinon.stub().rejects(error)
}
})
])
await Promise.all([
testDHT.connect(dhtA, dhtB),
testDHT.connect(dhtA, dhtC),
testDHT.connect(dhtA, dhtD)
])
// DHT operations
await drain(dhtA.dht.put(key, value))
const res = await last(dhtB.dht.get(key))
expect(res).to.have.property('value').that.equalBytes(value)
})
it('put - get using key with no prefix (no selector available)', async function () {
this.timeout(10 * 1000)
const key = uint8ArrayFromString('hello')
const value = uint8ArrayFromString('world')
const [dhtA, dhtB] = await Promise.all([
testDHT.spawn(),
testDHT.spawn()
])
await testDHT.connect(dhtA, dhtB)
// DHT operations
await drain(dhtA.dht.put(key, value))
const res = await last(dhtB.dht.get(key))
expect(res).to.have.property('value').that.equalBytes(value)
})
it('put - get using key from provided validator and selector', async function () {
this.timeout(10 * 1000)
const key = uint8ArrayFromString('/ipns/hello')
const value = uint8ArrayFromString('world')
const [dhtA, dhtB] = await Promise.all([
testDHT.spawn({
validators: {
ipns: sinon.stub().resolves()
},
selectors: {
ipns: sinon.stub().returns(0)
}
}),
testDHT.spawn({
validators: {
ipns: sinon.stub().resolves()
},
selectors: {
ipns: sinon.stub().returns(0)
}
})
])
await testDHT.connect(dhtA, dhtB)
// DHT operations
await drain(dhtA.dht.put(key, value))
const res = await last(dhtB.dht.get(key))
expect(res).to.have.property('value').that.equalBytes(value)
})
it('put - get with custom namespace selector keeps highest seq across 3 peers', async function () {
this.timeout(20 * 1000)
const key = uint8ArrayFromString('/ns/hello')
const encode = (seq: number, value: string): Uint8Array => uint8ArrayFromString(JSON.stringify({ seq, value }))
const getSeq = (buf: Uint8Array): number => JSON.parse(uint8ArrayToString(buf)).seq
const testOptions = {
validators: {
ns: sinon.stub().resolves()
},
selectors: {
ns: (_key: Uint8Array, records: Uint8Array[]) => {
let bestIndex = 0
let bestSeq = -1
for (let i = 0; i < records.length; i++) {
const seq = getSeq(records[i])
if (seq > bestSeq) {
bestIndex = i
bestSeq = seq
}
}
return bestIndex
}
}
}
const [dhtA, dhtB, dhtC] = await Promise.all([
testDHT.spawn(testOptions),
testDHT.spawn(testOptions),
testDHT.spawn(testOptions)
])
await Promise.all([
testDHT.connect(dhtA, dhtB),
testDHT.connect(dhtB, dhtC),
testDHT.connect(dhtA, dhtC)
])
await drain(dhtA.dht.put(key, encode(1, 'old-v1')))
await drain(dhtB.dht.put(key, encode(2, 'new-v2')))
await drain(dhtC.dht.put(key, encode(1, 'stale-v1-late')))
const [resA, resB, resC] = await Promise.all([
findEvent(dhtA.dht.get(key), 'VALUE'),
findEvent(dhtB.dht.get(key), 'VALUE'),
findEvent(dhtC.dht.get(key), 'VALUE')
])
expect(getSeq(resA.value)).to.equal(2)
expect(getSeq(resB.value)).to.equal(2)
expect(getSeq(resC.value)).to.equal(2)
})
it('put - get should fail if unrecognized key prefix in get', async function () {
this.timeout(10 * 1000)
const key = uint8ArrayFromString('/v2/hello')
const value = uint8ArrayFromString('world')
const [dhtA, dhtB] = await Promise.all([
testDHT.spawn(),
testDHT.spawn()
])
await testDHT.connect(dhtA, dhtB)
await drain(dhtA.dht.put(key, value))
await expect(last(dhtA.dht.get(key))).to.eventually.be.rejected
.with.property('name', 'MissingSelectorError')
})
it('put - same node should ignore stale updates when selector exists', async function () {
this.timeout(10 * 1000)
const key = uint8ArrayFromString('/v/hello')
const newerValue = uint8ArrayFromString('world2')
const olderValue = uint8ArrayFromString('world1')
const dht = await testDHT.spawn()
await drain(dht.dht.put(key, newerValue))
await drain(dht.dht.put(key, olderValue))
const res = await last(dht.dht.get(key))
expect(res).to.have.property('value').that.equalBytes(newerValue)
})
it('put - same node should allow overwrites when selector is unavailable', async function () {
this.timeout(10 * 1000)
const key = uint8ArrayFromString('hello')
const firstValue = uint8ArrayFromString('world1')
const secondValue = uint8ArrayFromString('world2')
const dht = await testDHT.spawn()
await drain(dht.dht.put(key, firstValue))
await drain(dht.dht.put(key, secondValue))
const res = await last(dht.dht.get(key))
expect(res).to.have.property('value').that.equalBytes(secondValue)
})
it('put - get with update', async function () {
this.timeout(20 * 1000)
const key = uint8ArrayFromString('/v/hello')
const valueA = uint8ArrayFromString('world2')
const valueB = uint8ArrayFromString('world1')
const [dhtA, dhtB] = await Promise.all([
testDHT.spawn(),
testDHT.spawn()
])
const dhtASpy = sinon.spy(dhtA.dht.network, 'sendRequest')
// put before peers connected
await drain(dhtA.dht.put(key, valueA))
await drain(dhtB.dht.put(key, valueB))
// connect peers
await testDHT.connect(dhtA, dhtB)
// get values
const resA = await last(dhtA.dht.get(key))
const resB = await last(dhtB.dht.get(key))
// first is selected because the selector sorts alphabetically and chooses
// the last value
expect(resA).to.have.property('value').that.equalBytes(valueA)
expect(resB).to.have.property('value').that.equalBytes(valueA)
let foundGetValue = false
let foundPutValue = false
for (const call of dhtASpy.getCalls()) {
if (call.args[0].equals(dhtB.components.peerId) && call.args[1].type === 'GET_VALUE') {
// query B
foundGetValue = true
}
if (call.args[0].equals(dhtB.components.peerId) && call.args[1].type === 'PUT_VALUE') {
// update B
foundPutValue = true
}
}
expect(foundGetValue).to.be.true('did not get value from dhtB')
expect(foundPutValue).to.be.true('did not update value on dhtB')
})
it('layered get', async function () {
this.timeout(40 * 1000)
const key = uint8ArrayFromString('/v/hello')
const value = uint8ArrayFromString('world')
const dhts = await sortDHTs(await Promise.all([
testDHT.spawn(),
testDHT.spawn(),
testDHT.spawn(),
testDHT.spawn()
]), await kadUtils.convertBuffer(key))
// Connect all
await Promise.all([
testDHT.connect(dhts[0], dhts[1]),
testDHT.connect(dhts[1], dhts[2]),
testDHT.connect(dhts[2], dhts[3])
])
// DHT operations
await drain(dhts[3].dht.put(key, value))
const res = await last(dhts[0].dht.get(key))
expect(res).to.have.property('value').that.equalBytes(value)
})
it('getMany with number of values = 1 goes out to swarm if there is no local value', async () => {
const key = uint8ArrayFromString('/v/hello')
const value = uint8ArrayFromString('world')
const rec = new Libp2pRecord(key, value, new Date())
const dht = await testDHT.spawn()
// Simulate returning a peer id to query
sinon.stub(dht.dht.routingTable, 'closestPeers').returns([peerIds[1].peerId])
// Simulate going out to the network and returning the record
sinon.stub(dht.dht.peerRouting, 'getValueOrPeers').callsFake(async function * (peer) {
yield peerResponseEvent({
messageType: MessageType.GET_VALUE,
from: peer,
record: rec,
path: {
index: -1,
queued: 0,
running: 0,
total: 0
}
})
})
const res = await last(dht.dht.get(key))
expect(res).to.have.property('value').that.equalBytes(value)
})
})
describe('peer routing', () => {
it('findPeer', async function () {
this.timeout(240 * 1000)
const dhts = await Promise.all([
testDHT.spawn(),
testDHT.spawn(),
testDHT.spawn(),
testDHT.spawn()
])
await Promise.all([
testDHT.connect(dhts[0], dhts[1]),
testDHT.connect(dhts[0], dhts[2]),
testDHT.connect(dhts[0], dhts[3])
])
const ids = dhts.map((d) => d.components.peerId)
const finalPeer = await findEvent(dhts[0].dht.findPeer(ids[ids.length - 1]), 'FINAL_PEER')
expect(finalPeer.peer.id.equals(ids[ids.length - 1])).to.eql(true)
})
it('getClosestPeers', async function () {
this.timeout(240 * 1000)
const nDHTs = 30
const dhts = await Promise.all(
new Array(nDHTs).fill(0).map(async () => testDHT.spawn())
)
const connected: Array<Promise<void>> = []
for (let i = 0; i < dhts.length - 1; i++) {
connected.push(testDHT.connect(dhts[i], dhts[(i + 1) % dhts.length]))
}
await Promise.all(connected)
const res = await all(filter(dhts[1].dht.getClosestPeers(uint8ArrayFromString('foo')), event => event.name === 'FINAL_PEER'))
expect(res).to.not.be.empty()
})
it('should not include requester in getClosestPeers PEER_RESPONSE', async function () {
this.timeout(240 * 1000)
const nDHTs = 30
const dhts = await Promise.all(
new Array(nDHTs).fill(0).map(async () => testDHT.spawn())
)
const connected: Array<Promise<void>> = []
for (let i = 0; i < dhts.length - 1; i++) {
connected.push(testDHT.connect(dhts[i], dhts[(i + 1) % dhts.length]))
}
await Promise.all(connected)
const res = await all(dhts[1].dht.getClosestPeers(dhts[2].components.peerId.toMultihash().bytes))
expect(res).to.not.be.empty()
// should not include requester in the response, only other peers that it
// knows who are closer
for (const event of res) {
if (event.name !== 'PEER_RESPONSE') {
continue
}
expect(event.closer.map(peer => peer.id.toString()))
.to.not.include(dhts[1].components.peerId.toString())
}
})
})
describe('errors', () => {
it('get should correctly handle an unexpected error', async function () {
this.timeout(240 * 1000)
const error = new Error('fake error')
const [dhtA, dhtB] = await Promise.all([
testDHT.spawn(),
testDHT.spawn()
])
await testDHT.connect(dhtA, dhtB)
const stub = sinon.stub(dhtA.components.connectionManager, 'openStream').rejects(error)
const errors = await all(filter(dhtA.dht.get(uint8ArrayFromString('/v/hello')), event => event.name === 'QUERY_ERROR'))
expect(errors).to.have.lengthOf(1)
expect(errors).to.have.nested.property('[0].error', error)
stub.restore()
})
})
})