diff --git a/packages/kad-dht/src/content-fetching/index.ts b/packages/kad-dht/src/content-fetching/index.ts index c858c2dda0..ae40ce8dee 100644 --- a/packages/kad-dht/src/content-fetching/index.ts +++ b/packages/kad-dht/src/content-fetching/index.ts @@ -13,7 +13,7 @@ import { valueEvent, queryErrorEvent } from '../query/events.js' -import { bestRecord } from '../record/selectors.js' +import { bestRecord, getRecordSelector } from '../record/selectors.js' import { verifyRecord } from '../record/validators.js' import { createPutRecord, bufferToRecordKey } from '../utils.js' import type { KadDHTComponents, Validators, Selectors, ValueEvent, QueryEvent } from '../index.js' @@ -147,6 +147,25 @@ export class ContentFetching { // store the record locally const dsKey = bufferToRecordKey(this.datastorePrefix, key) + const selector = getRecordSelector(this.selectors, key) + + if (selector != null) { + try { + const existingRaw = await this.components.datastore.get(dsKey, options) + const existingRecord = Libp2pRecord.deserialize(existingRaw) + const selected = selector(key, [value, existingRecord.value]) + + if (selected !== 0) { + this.log('ignoring stale local value for key %b', key) + return + } + } catch (err: any) { + if (err.name !== 'NotFoundError') { + throw err + } + } + } + this.log(`storing record for key ${dsKey.toString()}`) await this.components.datastore.put(dsKey, record.subarray(), options) diff --git a/packages/kad-dht/src/kad-dht.ts b/packages/kad-dht/src/kad-dht.ts index 5fb8b0d89b..53fe5656f9 100644 --- a/packages/kad-dht/src/kad-dht.ts +++ b/packages/kad-dht/src/kad-dht.ts @@ -270,6 +270,7 @@ export class KadDHT extends TypedEventEmitter implements Ka providers: this.providers, peerRouting: this.peerRouting, validators: this.validators, + selectors: this.selectors, logPrefix, metricsPrefix, datastorePrefix, diff --git a/packages/kad-dht/src/record/selectors.ts b/packages/kad-dht/src/record/selectors.ts index 68e57a850d..d14ca983bd 100644 --- a/packages/kad-dht/src/record/selectors.ts +++ b/packages/kad-dht/src/record/selectors.ts @@ -1,7 +1,17 @@ import { InvalidParametersError } from '@libp2p/interface' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { MissingSelectorError } from '../errors.js' -import type { Selectors } from '../index.js' +import type { SelectFn, Selectors } from '../index.js' + +export function getRecordSelector (selectors: Selectors, key: Uint8Array): SelectFn | undefined { + const parts = uint8ArrayToString(key).split('/') + + if (parts.length < 3) { + return + } + + return selectors[parts[1].toString()] +} /** * Select the best record out of the given records @@ -11,16 +21,16 @@ export function bestRecord (selectors: Selectors, k: Uint8Array, records: Uint8A throw new InvalidParametersError('No records given') } - const kStr = uint8ArrayToString(k) - const parts = kStr.split('/') + const selector = getRecordSelector(selectors, k) - if (parts.length < 3) { - throw new InvalidParametersError('Record key does not have a selector function') - } + if (selector == null) { + const kStr = uint8ArrayToString(k) + const parts = kStr.split('/') - const selector = selectors[parts[1].toString()] + if (parts.length < 3) { + throw new InvalidParametersError('Record key does not have a selector function') + } - if (selector == null) { throw new MissingSelectorError(`No selector function configured for key type "${parts[1]}"`) } diff --git a/packages/kad-dht/src/rpc/handlers/put-value.ts b/packages/kad-dht/src/rpc/handlers/put-value.ts index 0cb9649bed..68e2309ab9 100644 --- a/packages/kad-dht/src/rpc/handlers/put-value.ts +++ b/packages/kad-dht/src/rpc/handlers/put-value.ts @@ -1,8 +1,9 @@ import { InvalidMessageError } from '@libp2p/interface' import { Libp2pRecord } from '@libp2p/record' +import { getRecordSelector } from '../../record/selectors.js' import { verifyRecord } from '../../record/validators.js' import { bufferToRecordKey } from '../../utils.js' -import type { Validators } from '../../index.js' +import type { Selectors, Validators } from '../../index.js' import type { Message } from '../../message/dht.js' import type { DHTMessageHandler } from '../index.js' import type { ComponentLogger, Logger, PeerId } from '@libp2p/interface' @@ -10,6 +11,7 @@ import type { Datastore } from 'interface-datastore' export interface PutValueHandlerInit { validators: Validators + selectors: Selectors logPrefix: string datastorePrefix: string } @@ -22,16 +24,18 @@ export interface PutValueHandlerComponents { export class PutValueHandler implements DHTMessageHandler { private readonly components: PutValueHandlerComponents private readonly validators: Validators + private readonly selectors: Selectors private readonly log: Logger private readonly datastorePrefix: string constructor (components: PutValueHandlerComponents, init: PutValueHandlerInit) { - const { validators } = init + const { validators, selectors } = init this.components = components this.log = components.logger.forComponent(`${init.logPrefix}:rpc:handlers:put-value`) this.datastorePrefix = `${init.datastorePrefix}/record` this.validators = validators + this.selectors = selectors } async handle (peerId: PeerId, msg: Message): Promise { @@ -48,8 +52,28 @@ export class PutValueHandler implements DHTMessageHandler { await verifyRecord(this.validators, deserializedRecord) - deserializedRecord.timeReceived = new Date() const recordKey = bufferToRecordKey(this.datastorePrefix, deserializedRecord.key) + + const selector = getRecordSelector(this.selectors, deserializedRecord.key) + + if (selector != null) { + try { + const existingRaw = await this.components.datastore.get(recordKey) + const existingRecord = Libp2pRecord.deserialize(existingRaw) + const selected = selector(deserializedRecord.key, [deserializedRecord.value, existingRecord.value]) + + if (selected !== 0) { + this.log('ignoring stale value for key %b', key) + return msg + } + } catch (err: any) { + if (err.name !== 'NotFoundError') { + throw err + } + } + } + + deserializedRecord.timeReceived = new Date() await this.components.datastore.put(recordKey, deserializedRecord.serialize().subarray()) this.log('put record for %b into datastore under key %k', key, recordKey) } catch (err: any) { diff --git a/packages/kad-dht/src/rpc/index.ts b/packages/kad-dht/src/rpc/index.ts index 3586f887da..8195448f3a 100644 --- a/packages/kad-dht/src/rpc/index.ts +++ b/packages/kad-dht/src/rpc/index.ts @@ -7,7 +7,7 @@ import { GetProvidersHandler } from './handlers/get-providers.js' import { GetValueHandler } from './handlers/get-value.js' import { PingHandler } from './handlers/ping.js' import { PutValueHandler } from './handlers/put-value.js' -import type { PeerInfoMapper, Validators } from '../index.js' +import type { PeerInfoMapper, Selectors, Validators } from '../index.js' import type { PeerRouting } from '../peer-routing/index.js' import type { Providers } from '../providers.js' import type { FindNodeHandlerComponents } from './handlers/find-node.js' @@ -26,6 +26,7 @@ export interface RPCInit { providers: Providers peerRouting: PeerRouting validators: Validators + selectors: Selectors logPrefix: string metricsPrefix: string datastorePrefix: string diff --git a/packages/kad-dht/test/kad-dht.spec.ts b/packages/kad-dht/test/kad-dht.spec.ts index 6afb0dbaf2..83874453e3 100644 --- a/packages/kad-dht/test/kad-dht.spec.ts +++ b/packages/kad-dht/test/kad-dht.spec.ts @@ -9,6 +9,7 @@ import filter from 'it-filter' import last from 'it-last' import sinon from 'sinon' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { MessageType } from '../src/index.js' import { peerResponseEvent } from '../src/query/events.js' import * as kadUtils from '../src/utils.js' @@ -268,6 +269,63 @@ describe('KadDHT', () => { expect(res).to.have.property('value').that.equalBytes(value) }) + it('put - get with custom namespace selector keeps highest seq across 3 peers', async function () { + this.timeout(20 * 1000) + + const key = uint8ArrayFromString('/ns/hello') + const encode = (seq: number, value: string): Uint8Array => uint8ArrayFromString(JSON.stringify({ seq, value })) + const getSeq = (buf: Uint8Array): number => JSON.parse(uint8ArrayToString(buf)).seq + + const testOptions = { + validators: { + ns: sinon.stub().resolves() + }, + selectors: { + ns: (_key: Uint8Array, records: Uint8Array[]) => { + let bestIndex = 0 + let bestSeq = -1 + + for (let i = 0; i < records.length; i++) { + const seq = getSeq(records[i]) + + if (seq > bestSeq) { + bestIndex = i + bestSeq = seq + } + } + + return bestIndex + } + } + } + + const [dhtA, dhtB, dhtC] = await Promise.all([ + testDHT.spawn(testOptions), + testDHT.spawn(testOptions), + testDHT.spawn(testOptions) + ]) + + await Promise.all([ + testDHT.connect(dhtA, dhtB), + testDHT.connect(dhtB, dhtC), + testDHT.connect(dhtA, dhtC) + ]) + + await drain(dhtA.dht.put(key, encode(1, 'old-v1'))) + await drain(dhtB.dht.put(key, encode(2, 'new-v2'))) + await drain(dhtC.dht.put(key, encode(1, 'stale-v1-late'))) + + const [resA, resB, resC] = await Promise.all([ + findEvent(dhtA.dht.get(key), 'VALUE'), + findEvent(dhtB.dht.get(key), 'VALUE'), + findEvent(dhtC.dht.get(key), 'VALUE') + ]) + + expect(getSeq(resA.value)).to.equal(2) + expect(getSeq(resB.value)).to.equal(2) + expect(getSeq(resC.value)).to.equal(2) + }) + it('put - get should fail if unrecognized key prefix in get', async function () { this.timeout(10 * 1000) @@ -287,6 +345,36 @@ describe('KadDHT', () => { .with.property('name', 'MissingSelectorError') }) + it('put - same node should ignore stale updates when selector exists', async function () { + this.timeout(10 * 1000) + + const key = uint8ArrayFromString('/v/hello') + const newerValue = uint8ArrayFromString('world2') + const olderValue = uint8ArrayFromString('world1') + const dht = await testDHT.spawn() + + await drain(dht.dht.put(key, newerValue)) + await drain(dht.dht.put(key, olderValue)) + + const res = await last(dht.dht.get(key)) + expect(res).to.have.property('value').that.equalBytes(newerValue) + }) + + it('put - same node should allow overwrites when selector is unavailable', async function () { + this.timeout(10 * 1000) + + const key = uint8ArrayFromString('hello') + const firstValue = uint8ArrayFromString('world1') + const secondValue = uint8ArrayFromString('world2') + const dht = await testDHT.spawn() + + await drain(dht.dht.put(key, firstValue)) + await drain(dht.dht.put(key, secondValue)) + + const res = await last(dht.dht.get(key)) + expect(res).to.have.property('value').that.equalBytes(secondValue) + }) + it('put - get with update', async function () { this.timeout(20 * 1000) diff --git a/packages/kad-dht/test/rpc/handlers/put-value.spec.ts b/packages/kad-dht/test/rpc/handlers/put-value.spec.ts index f7acad7f85..24b5023579 100644 --- a/packages/kad-dht/test/rpc/handlers/put-value.spec.ts +++ b/packages/kad-dht/test/rpc/handlers/put-value.spec.ts @@ -11,7 +11,7 @@ import { MessageType } from '../../../src/message/dht.js' import { PutValueHandler } from '../../../src/rpc/handlers/put-value.js' import * as utils from '../../../src/utils.js' import { createPeerIdWithPrivateKey } from '../../utils/create-peer-id.js' -import type { Validators } from '../../../src/index.js' +import type { Selectors, Validators } from '../../../src/index.js' import type { Message } from '../../../src/message/dht.js' import type { PeerAndKey } from '../../utils/create-peer-id.js' import type { Datastore } from 'interface-datastore' @@ -23,11 +23,13 @@ describe('rpc - handlers - PutValue', () => { let handler: PutValueHandler let datastore: Datastore let validators: Validators + let selectors: Selectors beforeEach(async () => { sourcePeer = await createPeerIdWithPrivateKey() datastore = new MemoryDatastore() validators = {} + selectors = {} const components = { datastore, @@ -36,6 +38,7 @@ describe('rpc - handlers - PutValue', () => { handler = new PutValueHandler(components, { validators, + selectors, logPrefix: 'dht', datastorePrefix: '/dht' }) @@ -92,4 +95,70 @@ describe('rpc - handlers - PutValue', () => { await delay(10) expect(rec.timeReceived.getTime()).to.be.lessThan(Date.now()) }) + + it('ignores older records when a namespace selector prefers existing values', async () => { + const recordKey = utils.bufferToRecordKey('/dht/record', uint8ArrayFromString('/val/hello')) + const existingRecord = new Libp2pRecord( + uint8ArrayFromString('/val/hello'), + uint8ArrayFromString('world2'), + new Date() + ) + + await datastore.put(recordKey, existingRecord.serialize().subarray()) + + const msg: Message = { + type: T, + key: uint8ArrayFromString('/val/hello'), + closer: [], + providers: [], + record: new Libp2pRecord( + uint8ArrayFromString('/val/hello'), + uint8ArrayFromString('world1'), + new Date() + ).serialize() + } + + validators.val = async () => {} + selectors.val = (key, records) => { + return records[0][records[1].length - 1] > records[1][records[1].length - 1] ? 0 : 1 + } + + await handler.handle(sourcePeer.peerId, msg) + + const updated = Libp2pRecord.deserialize(await datastore.get(recordKey)) + expect(updated.value).to.equalBytes(uint8ArrayFromString('world2')) + }) + + it('stores newer records when a namespace selector prefers incoming values', async () => { + const recordKey = utils.bufferToRecordKey('/dht/record', uint8ArrayFromString('/val/hello')) + const existingRecord = new Libp2pRecord( + uint8ArrayFromString('/val/hello'), + uint8ArrayFromString('world1'), + new Date() + ) + + await datastore.put(recordKey, existingRecord.serialize().subarray()) + + const msg: Message = { + type: T, + key: uint8ArrayFromString('/val/hello'), + closer: [], + providers: [], + record: new Libp2pRecord( + uint8ArrayFromString('/val/hello'), + uint8ArrayFromString('world2'), + new Date() + ).serialize() + } + + validators.val = async () => {} + selectors.val = (key, records) => { + return records[0][records[1].length - 1] > records[1][records[1].length - 1] ? 0 : 1 + } + + await handler.handle(sourcePeer.peerId, msg) + + const updated = Libp2pRecord.deserialize(await datastore.get(recordKey)) + expect(updated.value).to.equalBytes(uint8ArrayFromString('world2')) + }) }) diff --git a/packages/kad-dht/test/rpc/index.node.ts b/packages/kad-dht/test/rpc/index.node.ts index 60a567f4db..11cc911d01 100644 --- a/packages/kad-dht/test/rpc/index.node.ts +++ b/packages/kad-dht/test/rpc/index.node.ts @@ -18,7 +18,7 @@ import { RoutingTable } from '../../src/routing-table/index.js' import { RPC } from '../../src/rpc/index.js' import { passthroughMapper } from '../../src/utils.js' import { createPeerIdWithPrivateKey } from '../utils/create-peer-id.js' -import type { Validators } from '../../src/index.js' +import type { Selectors, Validators } from '../../src/index.js' import type { RPCComponents } from '../../src/rpc/index.js' import type { PeerAndKey } from '../utils/create-peer-id.js' import type { Libp2pEvents, Connection, PeerStore } from '@libp2p/interface' @@ -32,6 +32,7 @@ describe('rpc', () => { let providers: SinonStubbedInstance let peerRouting: SinonStubbedInstance let validators: Validators + let selectors: Selectors let datastore: Datastore let routingTable: RoutingTable @@ -57,12 +58,14 @@ describe('rpc', () => { peerRouting = Sinon.createStubInstance(PeerRouting) routingTable = Sinon.createStubInstance(RoutingTable) validators = {} + selectors = {} rpc = new RPC(components, { routingTable, providers, peerRouting, validators, + selectors, logPrefix: '', metricsPrefix: '', datastorePrefix: '',