Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion packages/kad-dht/src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
valueEvent,
queryErrorEvent
} from '../query/events.js'
import { bestRecord } from '../record/selectors.js'
import { bestRecord, getRecordSelector } from '../record/selectors.js'
import { verifyRecord } from '../record/validators.js'
import { createPutRecord, bufferToRecordKey } from '../utils.js'
import type { KadDHTComponents, Validators, Selectors, ValueEvent, QueryEvent } from '../index.js'
Expand Down Expand Up @@ -147,6 +147,25 @@ export class ContentFetching {

// store the record locally
const dsKey = bufferToRecordKey(this.datastorePrefix, key)
const selector = getRecordSelector(this.selectors, key)

if (selector != null) {
try {
const existingRaw = await this.components.datastore.get(dsKey, options)
const existingRecord = Libp2pRecord.deserialize(existingRaw)
const selected = selector(key, [value, existingRecord.value])

if (selected !== 0) {
this.log('ignoring stale local value for key %b', key)
return
}
} catch (err: any) {
if (err.name !== 'NotFoundError') {
throw err
}
}
}

this.log(`storing record for key ${dsKey.toString()}`)
await this.components.datastore.put(dsKey, record.subarray(), options)

Expand Down
1 change: 1 addition & 0 deletions packages/kad-dht/src/kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
providers: this.providers,
peerRouting: this.peerRouting,
validators: this.validators,
selectors: this.selectors,
logPrefix,
metricsPrefix,
datastorePrefix,
Expand Down
26 changes: 18 additions & 8 deletions packages/kad-dht/src/record/selectors.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import { InvalidParametersError } from '@libp2p/interface'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { MissingSelectorError } from '../errors.js'
import type { Selectors } from '../index.js'
import type { SelectFn, Selectors } from '../index.js'

export function getRecordSelector (selectors: Selectors, key: Uint8Array): SelectFn | undefined {
const parts = uint8ArrayToString(key).split('/')

if (parts.length < 3) {
return
}

return selectors[parts[1].toString()]
}

/**
* Select the best record out of the given records
Expand All @@ -11,16 +21,16 @@ export function bestRecord (selectors: Selectors, k: Uint8Array, records: Uint8A
throw new InvalidParametersError('No records given')
}

const kStr = uint8ArrayToString(k)
const parts = kStr.split('/')
const selector = getRecordSelector(selectors, k)

if (parts.length < 3) {
throw new InvalidParametersError('Record key does not have a selector function')
}
if (selector == null) {
const kStr = uint8ArrayToString(k)
const parts = kStr.split('/')

const selector = selectors[parts[1].toString()]
if (parts.length < 3) {
throw new InvalidParametersError('Record key does not have a selector function')
}

if (selector == null) {
throw new MissingSelectorError(`No selector function configured for key type "${parts[1]}"`)
}

Expand Down
30 changes: 27 additions & 3 deletions packages/kad-dht/src/rpc/handlers/put-value.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { InvalidMessageError } from '@libp2p/interface'
import { Libp2pRecord } from '@libp2p/record'
import { getRecordSelector } from '../../record/selectors.js'
import { verifyRecord } from '../../record/validators.js'
import { bufferToRecordKey } from '../../utils.js'
import type { Validators } from '../../index.js'
import type { Selectors, Validators } from '../../index.js'
import type { Message } from '../../message/dht.js'
import type { DHTMessageHandler } from '../index.js'
import type { ComponentLogger, Logger, PeerId } from '@libp2p/interface'
import type { Datastore } from 'interface-datastore'

export interface PutValueHandlerInit {
validators: Validators
selectors: Selectors
logPrefix: string
datastorePrefix: string
}
Expand All @@ -22,16 +24,18 @@ export interface PutValueHandlerComponents {
export class PutValueHandler implements DHTMessageHandler {
private readonly components: PutValueHandlerComponents
private readonly validators: Validators
private readonly selectors: Selectors
private readonly log: Logger
private readonly datastorePrefix: string

constructor (components: PutValueHandlerComponents, init: PutValueHandlerInit) {
const { validators } = init
const { validators, selectors } = init

this.components = components
this.log = components.logger.forComponent(`${init.logPrefix}:rpc:handlers:put-value`)
this.datastorePrefix = `${init.datastorePrefix}/record`
this.validators = validators
this.selectors = selectors
}

async handle (peerId: PeerId, msg: Message): Promise<Message> {
Expand All @@ -48,8 +52,28 @@ export class PutValueHandler implements DHTMessageHandler {

await verifyRecord(this.validators, deserializedRecord)

deserializedRecord.timeReceived = new Date()
const recordKey = bufferToRecordKey(this.datastorePrefix, deserializedRecord.key)

const selector = getRecordSelector(this.selectors, deserializedRecord.key)

if (selector != null) {
try {
const existingRaw = await this.components.datastore.get(recordKey)
const existingRecord = Libp2pRecord.deserialize(existingRaw)
const selected = selector(deserializedRecord.key, [deserializedRecord.value, existingRecord.value])

if (selected !== 0) {
this.log('ignoring stale value for key %b', key)
return msg
}
} catch (err: any) {
if (err.name !== 'NotFoundError') {
throw err
}
}
}

deserializedRecord.timeReceived = new Date()
await this.components.datastore.put(recordKey, deserializedRecord.serialize().subarray())
this.log('put record for %b into datastore under key %k', key, recordKey)
} catch (err: any) {
Expand Down
3 changes: 2 additions & 1 deletion packages/kad-dht/src/rpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { GetProvidersHandler } from './handlers/get-providers.js'
import { GetValueHandler } from './handlers/get-value.js'
import { PingHandler } from './handlers/ping.js'
import { PutValueHandler } from './handlers/put-value.js'
import type { PeerInfoMapper, Validators } from '../index.js'
import type { PeerInfoMapper, Selectors, Validators } from '../index.js'
import type { PeerRouting } from '../peer-routing/index.js'
import type { Providers } from '../providers.js'
import type { FindNodeHandlerComponents } from './handlers/find-node.js'
Expand All @@ -26,6 +26,7 @@ export interface RPCInit {
providers: Providers
peerRouting: PeerRouting
validators: Validators
selectors: Selectors
logPrefix: string
metricsPrefix: string
datastorePrefix: string
Expand Down
88 changes: 88 additions & 0 deletions packages/kad-dht/test/kad-dht.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import filter from 'it-filter'
import last from 'it-last'
import sinon from 'sinon'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { MessageType } from '../src/index.js'
import { peerResponseEvent } from '../src/query/events.js'
import * as kadUtils from '../src/utils.js'
Expand Down Expand Up @@ -268,6 +269,63 @@ describe('KadDHT', () => {
expect(res).to.have.property('value').that.equalBytes(value)
})

it('put - get with custom namespace selector keeps highest seq across 3 peers', async function () {
this.timeout(20 * 1000)

const key = uint8ArrayFromString('/ns/hello')
const encode = (seq: number, value: string): Uint8Array => uint8ArrayFromString(JSON.stringify({ seq, value }))
const getSeq = (buf: Uint8Array): number => JSON.parse(uint8ArrayToString(buf)).seq

const testOptions = {
validators: {
ns: sinon.stub().resolves()
},
selectors: {
ns: (_key: Uint8Array, records: Uint8Array[]) => {
let bestIndex = 0
let bestSeq = -1

for (let i = 0; i < records.length; i++) {
const seq = getSeq(records[i])

if (seq > bestSeq) {
bestIndex = i
bestSeq = seq
}
}

return bestIndex
}
}
}

const [dhtA, dhtB, dhtC] = await Promise.all([
testDHT.spawn(testOptions),
testDHT.spawn(testOptions),
testDHT.spawn(testOptions)
])

await Promise.all([
testDHT.connect(dhtA, dhtB),
testDHT.connect(dhtB, dhtC),
testDHT.connect(dhtA, dhtC)
])

await drain(dhtA.dht.put(key, encode(1, 'old-v1')))
await drain(dhtB.dht.put(key, encode(2, 'new-v2')))
await drain(dhtC.dht.put(key, encode(1, 'stale-v1-late')))

const [resA, resB, resC] = await Promise.all([
findEvent(dhtA.dht.get(key), 'VALUE'),
findEvent(dhtB.dht.get(key), 'VALUE'),
findEvent(dhtC.dht.get(key), 'VALUE')
])

expect(getSeq(resA.value)).to.equal(2)
expect(getSeq(resB.value)).to.equal(2)
expect(getSeq(resC.value)).to.equal(2)
})

it('put - get should fail if unrecognized key prefix in get', async function () {
this.timeout(10 * 1000)

Expand All @@ -287,6 +345,36 @@ describe('KadDHT', () => {
.with.property('name', 'MissingSelectorError')
})

it('put - same node should ignore stale updates when selector exists', async function () {
this.timeout(10 * 1000)

const key = uint8ArrayFromString('/v/hello')
const newerValue = uint8ArrayFromString('world2')
const olderValue = uint8ArrayFromString('world1')
const dht = await testDHT.spawn()

await drain(dht.dht.put(key, newerValue))
await drain(dht.dht.put(key, olderValue))

const res = await last(dht.dht.get(key))
expect(res).to.have.property('value').that.equalBytes(newerValue)
})

it('put - same node should allow overwrites when selector is unavailable', async function () {
this.timeout(10 * 1000)

const key = uint8ArrayFromString('hello')
const firstValue = uint8ArrayFromString('world1')
const secondValue = uint8ArrayFromString('world2')
const dht = await testDHT.spawn()

await drain(dht.dht.put(key, firstValue))
await drain(dht.dht.put(key, secondValue))

const res = await last(dht.dht.get(key))
expect(res).to.have.property('value').that.equalBytes(secondValue)
})

it('put - get with update', async function () {
this.timeout(20 * 1000)

Expand Down
71 changes: 70 additions & 1 deletion packages/kad-dht/test/rpc/handlers/put-value.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { MessageType } from '../../../src/message/dht.js'
import { PutValueHandler } from '../../../src/rpc/handlers/put-value.js'
import * as utils from '../../../src/utils.js'
import { createPeerIdWithPrivateKey } from '../../utils/create-peer-id.js'
import type { Validators } from '../../../src/index.js'
import type { Selectors, Validators } from '../../../src/index.js'
import type { Message } from '../../../src/message/dht.js'
import type { PeerAndKey } from '../../utils/create-peer-id.js'
import type { Datastore } from 'interface-datastore'
Expand All @@ -23,11 +23,13 @@ describe('rpc - handlers - PutValue', () => {
let handler: PutValueHandler
let datastore: Datastore
let validators: Validators
let selectors: Selectors

beforeEach(async () => {
sourcePeer = await createPeerIdWithPrivateKey()
datastore = new MemoryDatastore()
validators = {}
selectors = {}

const components = {
datastore,
Expand All @@ -36,6 +38,7 @@ describe('rpc - handlers - PutValue', () => {

handler = new PutValueHandler(components, {
validators,
selectors,
logPrefix: 'dht',
datastorePrefix: '/dht'
})
Expand Down Expand Up @@ -92,4 +95,70 @@ describe('rpc - handlers - PutValue', () => {
await delay(10)
expect(rec.timeReceived.getTime()).to.be.lessThan(Date.now())
})

it('ignores older records when a namespace selector prefers existing values', async () => {
const recordKey = utils.bufferToRecordKey('/dht/record', uint8ArrayFromString('/val/hello'))
const existingRecord = new Libp2pRecord(
uint8ArrayFromString('/val/hello'),
uint8ArrayFromString('world2'),
new Date()
)

await datastore.put(recordKey, existingRecord.serialize().subarray())

const msg: Message = {
type: T,
key: uint8ArrayFromString('/val/hello'),
closer: [],
providers: [],
record: new Libp2pRecord(
uint8ArrayFromString('/val/hello'),
uint8ArrayFromString('world1'),
new Date()
).serialize()
}

validators.val = async () => {}
selectors.val = (key, records) => {
return records[0][records[1].length - 1] > records[1][records[1].length - 1] ? 0 : 1
}

await handler.handle(sourcePeer.peerId, msg)

const updated = Libp2pRecord.deserialize(await datastore.get(recordKey))
expect(updated.value).to.equalBytes(uint8ArrayFromString('world2'))
})

it('stores newer records when a namespace selector prefers incoming values', async () => {
const recordKey = utils.bufferToRecordKey('/dht/record', uint8ArrayFromString('/val/hello'))
const existingRecord = new Libp2pRecord(
uint8ArrayFromString('/val/hello'),
uint8ArrayFromString('world1'),
new Date()
)

await datastore.put(recordKey, existingRecord.serialize().subarray())

const msg: Message = {
type: T,
key: uint8ArrayFromString('/val/hello'),
closer: [],
providers: [],
record: new Libp2pRecord(
uint8ArrayFromString('/val/hello'),
uint8ArrayFromString('world2'),
new Date()
).serialize()
}

validators.val = async () => {}
selectors.val = (key, records) => {
return records[0][records[1].length - 1] > records[1][records[1].length - 1] ? 0 : 1
}

await handler.handle(sourcePeer.peerId, msg)

const updated = Libp2pRecord.deserialize(await datastore.get(recordKey))
expect(updated.value).to.equalBytes(uint8ArrayFromString('world2'))
})
})
Loading