Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/integration-tests/test/fetch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import { type Fetch, fetch } from '@libp2p/fetch'
import { expect } from 'aegir/chai'
import { createLibp2p } from 'libp2p'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { isWebWorker } from 'wherearewe'
import { createBaseOptions } from './fixtures/base-options.js'
import type { Libp2p } from '@libp2p/interface'
Expand Down Expand Up @@ -31,9 +32,9 @@ describe('fetch', () => {
const DATA_B = { foobar: 'goodnight moon' }

const generateLookupFunction = function (prefix: string, data: Record<string, string>) {
return async function (key: string): Promise<Uint8Array | undefined> {
return async function (key: Uint8Array): Promise<Uint8Array | undefined> {
key = key.slice(prefix.length) // strip prefix from key
const val = data[key]
const val = data[uint8ArrayToString(key)]
if (val != null) {
return (new TextEncoder()).encode(val)
}
Expand Down
7 changes: 4 additions & 3 deletions packages/protocol-fetch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ const libp2p = await createLibp2p({
}
})

// Given a key (as a string) returns a value (as a Uint8Array), or undefined
// if the key isn't found.
// Given a key (as a Uint8Array) returns a value (as a Uint8Array), or
// undefined if the key isn't found.
//
// All keys must be prefixed by the same prefix, which will be used to find
// the appropriate key lookup function.
async function my_subsystem_key_lookup (key: string): Promise<Uint8Array | undefined> {
async function my_subsystem_key_lookup (key: Uint8Array): Promise<Uint8Array | undefined> {
// app specific callback to lookup key-value pairs.
return Uint8Array.from([0, 1, 2, 3, 4])
}
Expand Down
74 changes: 48 additions & 26 deletions packages/protocol-fetch/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
await data.stream.close()
})
.catch(err => {
this.log.error(err)
this.log.error('error handling message - %e', err)

Check warning on line 46 in packages/protocol-fetch/src/fetch.ts

View check run for this annotation

Codecov / codecov/patch

packages/protocol-fetch/src/fetch.ts#L46

Added line #L46 was not covered by tests
})
}, {
maxInboundStreams: this.init.maxInboundStreams,
Expand All @@ -64,8 +64,12 @@
/**
* Sends a request to fetch the value associated with the given key from the given peer
*/
async fetch (peer: PeerId, key: string, options: AbortOptions = {}): Promise<Uint8Array | undefined> {
this.log('dialing %s to %p', this.protocol, peer)
async fetch (peer: PeerId, key: string | Uint8Array, options: AbortOptions = {}): Promise<Uint8Array | undefined> {
if (typeof key === 'string') {
key = uint8arrayFromString(key)
}

this.log.trace('dialing %s to %p', this.protocol, peer)

const connection = await this.components.connectionManager.openConnection(peer, options)
let signal = options.signal
Expand All @@ -75,7 +79,7 @@
// create a timeout if no abort signal passed
if (signal == null) {
const timeout = this.init.timeout ?? DEFAULT_TIMEOUT
this.log('using default timeout of %d ms', timeout)
this.log.trace('using default timeout of %d ms', timeout)
signal = AbortSignal.timeout(timeout)

setMaxListeners(Infinity, signal)
Expand All @@ -93,7 +97,7 @@
// make stream abortable
signal.addEventListener('abort', onAbort, { once: true })

this.log('fetch %s', key)
this.log.trace('fetch %m', key)

const pb = pbStream(stream)
await pb.write({
Expand All @@ -105,20 +109,20 @@

switch (response.status) {
case (FetchResponse.StatusCode.OK): {
this.log('received status for %s ok', key)
this.log.trace('received status OK for %m', key)
return response.data
}
case (FetchResponse.StatusCode.NOT_FOUND): {
this.log('received status for %s not found', key)
this.log('received status NOT_FOUND for %m', key)
return
}
case (FetchResponse.StatusCode.ERROR): {
this.log('received status for %s error', key)
this.log('received status ERROR for %m', key)
const errmsg = uint8arrayToString(response.data)
throw new ProtocolError('Error in fetch protocol response: ' + errmsg)
}
default: {
this.log('received status for %s unknown', key)
this.log('received status unknown for %m', key)

Check warning on line 125 in packages/protocol-fetch/src/fetch.ts

View check run for this annotation

Codecov / codecov/patch

packages/protocol-fetch/src/fetch.ts#L125

Added line #L125 was not covered by tests
throw new InvalidMessageError('Unknown response status')
}
}
Expand Down Expand Up @@ -149,21 +153,32 @@
})

let response: FetchResponse
const lookup = this._getLookupFunction(request.identifier)
if (lookup != null) {
this.log('look up data with identifier %s', request.identifier)
const data = await lookup(request.identifier)
if (data != null) {
this.log('sending status for %s ok', request.identifier)
response = { status: FetchResponse.StatusCode.OK, data }
} else {
this.log('sending status for %s not found', request.identifier)
response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) }
}
} else {
this.log('sending status for %s error', request.identifier)
const errmsg = uint8arrayFromString(`No lookup function registered for key: ${request.identifier}`)
const key = uint8arrayToString(request.identifier)

const lookup = this._getLookupFunction(key)

if (lookup == null) {
this.log.trace('sending status ERROR for %m', request.identifier)
const errmsg = uint8arrayFromString('No lookup function registered for key')
response = { status: FetchResponse.StatusCode.ERROR, data: errmsg }
} else {
this.log.trace('lookup data with identifier %s', lookup.prefix)

try {
const data = await lookup.fn(request.identifier)

if (data == null) {
this.log.trace('sending status NOT_FOUND for %m', request.identifier)
response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) }
} else {
this.log.trace('sending status OK for %m', request.identifier)
response = { status: FetchResponse.StatusCode.OK, data }
}
} catch (err: any) {
this.log.error('error during lookup of %m - %e', request.identifier, err)
const errmsg = uint8arrayFromString(err.message)
response = { status: FetchResponse.StatusCode.ERROR, data: errmsg }
}

Check warning on line 181 in packages/protocol-fetch/src/fetch.ts

View check run for this annotation

Codecov / codecov/patch

packages/protocol-fetch/src/fetch.ts#L178-L181

Added lines #L178 - L181 were not covered by tests
}

await pb.write(response, FetchResponse, {
Expand All @@ -174,7 +189,7 @@
signal
})
} catch (err: any) {
this.log('error answering fetch request', err)
this.log.error('error answering fetch request - %e', err)
stream.abort(err)
}
}
Expand All @@ -183,10 +198,17 @@
* Given a key, finds the appropriate function for looking up its corresponding value, based on
* the key's prefix.
*/
_getLookupFunction (key: string): LookupFunction | undefined {
_getLookupFunction (key: string): { fn: LookupFunction, prefix: string } | undefined {
for (const prefix of this.lookupFunctions.keys()) {
if (key.startsWith(prefix)) {
return this.lookupFunctions.get(prefix)
const fn = this.lookupFunctions.get(prefix)

if (fn != null) {
return {
fn,
prefix
}
}
}
}
}
Expand Down
18 changes: 13 additions & 5 deletions packages/protocol-fetch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
* }
* })
*
* // Given a key (as a string) returns a value (as a Uint8Array), or undefined
* // if the key isn't found.
* // Given a key (as a Uint8Array) returns a value (as a Uint8Array), or
* // undefined if the key isn't found.
* //
* // All keys must be prefixed by the same prefix, which will be used to find
* // the appropriate key lookup function.
* async function my_subsystem_key_lookup (key: string): Promise<Uint8Array | undefined> {
* async function my_subsystem_key_lookup (key: Uint8Array): Promise<Uint8Array | undefined> {
* // app specific callback to lookup key-value pairs.
* return Uint8Array.from([0, 1, 2, 3, 4])
* }
Expand Down Expand Up @@ -56,8 +57,15 @@ export interface FetchInit {
timeout?: number
}

/**
* A lookup function is registered against a specific identifier prefix and is
* invoked when a remote peer requests a value with that prefix
*/
export interface LookupFunction {
(key: string): Promise<Uint8Array | undefined>
/**
* The key is the identifier requested by the remote peer
*/
(key: Uint8Array): Promise<Uint8Array | undefined>
}

export interface FetchComponents {
Expand All @@ -70,7 +78,7 @@ export interface Fetch {
/**
* Sends a request to fetch the value associated with the given key from the given peer
*/
fetch(peer: PeerId, key: string, options?: AbortOptions): Promise<Uint8Array | undefined>
fetch(peer: PeerId, key: string | Uint8Array, options?: AbortOptions): Promise<Uint8Array | undefined>

/**
* Registers a new lookup callback that can map keys to values, for a given set of keys that
Expand Down
17 changes: 9 additions & 8 deletions packages/protocol-fetch/src/pb/proto.proto
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
syntax = "proto3";

message FetchRequest {
string identifier = 1;
bytes identifier = 1;
}

message FetchResponse {
StatusCode status = 1;
enum StatusCode {
OK = 0;
NOT_FOUND = 1;
ERROR = 2;
}
bytes data = 2;
enum StatusCode {
OK = 0;
NOT_FOUND = 1;
ERROR = 2;
}

StatusCode status = 1;
bytes data = 2;
}
10 changes: 5 additions & 5 deletions packages/protocol-fetch/src/pb/proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { alloc as uint8ArrayAlloc } from 'uint8arrays/alloc'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface FetchRequest {
identifier: string
identifier: Uint8Array
}

export namespace FetchRequest {
Expand All @@ -22,17 +22,17 @@ export namespace FetchRequest {
w.fork()
}

if ((obj.identifier != null && obj.identifier !== '')) {
if ((obj.identifier != null && obj.identifier.byteLength > 0)) {
w.uint32(10)
w.string(obj.identifier)
w.bytes(obj.identifier)
}

if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
identifier: ''
identifier: uint8ArrayAlloc(0)
}

const end = length == null ? reader.len : reader.pos + length
Expand All @@ -42,7 +42,7 @@ export namespace FetchRequest {

switch (tag >>> 3) {
case 1: {
obj.identifier = reader.string()
obj.identifier = reader.bytes()
break
}
default: {
Expand Down
16 changes: 9 additions & 7 deletions packages/protocol-fetch/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { duplexPair } from 'it-pair/duplex'
import { pbStream } from 'it-protobuf-stream'
import sinon from 'sinon'
import { stubInterface, type StubbedInstance } from 'sinon-ts'
import { fromString as uint8arrayFromString } from 'uint8arrays/from-string'
import { toString as uint8arrayToString } from 'uint8arrays/to-string'
import { Fetch } from '../src/fetch.js'
import { FetchRequest, FetchResponse } from '../src/pb/proto.js'
import type { ComponentLogger, Connection, Stream, PeerId } from '@libp2p/interface'
Expand Down Expand Up @@ -89,7 +91,7 @@ describe('fetch', () => {
const pb = pbStream(incomingStream)
const request = await pb.read(FetchRequest)

expect(request.identifier).to.equal(key)
expect(uint8arrayToString(request.identifier)).to.equal(key)

await pb.write({
status: FetchResponse.StatusCode.OK,
Expand All @@ -112,7 +114,7 @@ describe('fetch', () => {
const pb = pbStream(incomingStream)
const request = await pb.read(FetchRequest)

expect(request.identifier).to.equal(key)
expect(uint8arrayToString(request.identifier)).to.equal(key)

await pb.write({
status: FetchResponse.StatusCode.NOT_FOUND
Expand All @@ -134,7 +136,7 @@ describe('fetch', () => {
const pb = pbStream(incomingStream)
const request = await pb.read(FetchRequest)

expect(request.identifier).to.equal(key)
expect(uint8arrayToString(request.identifier)).to.equal(key)

await pb.write({
status: FetchResponse.StatusCode.ERROR
Expand Down Expand Up @@ -177,7 +179,7 @@ describe('fetch', () => {
} = createStreams(components)

fetch.registerLookupFunction('/test', async (k) => {
expect(k).to.equal(key)
expect(k).to.equalBytes(uint8arrayFromString(key))
return value
})

Expand All @@ -189,7 +191,7 @@ describe('fetch', () => {
const pb = pbStream(outgoingStream)

await pb.write({
identifier: key
identifier: uint8arrayFromString(key)
}, FetchRequest)

const response = await pb.read(FetchResponse)
Expand Down Expand Up @@ -218,7 +220,7 @@ describe('fetch', () => {
const pb = pbStream(outgoingStream)

await pb.write({
identifier: key
identifier: uint8arrayFromString(key)
}, FetchRequest)

const response = await pb.read(FetchResponse)
Expand All @@ -242,7 +244,7 @@ describe('fetch', () => {
const pb = pbStream(outgoingStream)

await pb.write({
identifier: key
identifier: uint8arrayFromString(key)
}, FetchRequest)

const response = await pb.read(FetchResponse)
Expand Down
Loading