Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
220a690
Unexport private constant
hsanjuan Aug 29, 2025
f7788a7
routing/http/server: add support for GetClosestPeers
hsanjuan Sep 1, 2025
5d4e99e
routing/http/client: add support for GetClosestPeers
hsanjuan Sep 3, 2025
d52bcb1
routing/http/contentrouter: add support for GetClosestPeers
hsanjuan Sep 4, 2025
0264929
fix: use 'closer-than' query parameter for spec compliance
lidel Sep 13, 2025
39432b9
fix: use correct variable in error message for closer-than parameter
lidel Sep 13, 2025
384effb
docs: improve GetClosestPeers godoc and use amino constant
lidel Sep 13, 2025
d53886a
refactor: rename ContentRouter to DelegatedRouter
lidel Sep 13, 2025
0a8bc14
GetClosestPeers(): remove closer-than and timeout parameters.
hsanjuan Sep 25, 2025
ce702e0
examples: go mod tidy
hsanjuan Sep 26, 2025
bdb5505
Address review
hsanjuan Sep 29, 2025
cd7be90
Merge remote-tracking branch 'origin/main' into feat/1004-get-closest…
hsanjuan Sep 29, 2025
07e74fb
Routing: GetClosestPeers: replace peer.ID argument with CID
hsanjuan Oct 16, 2025
5162f7c
Merge remote-tracking branch 'origin/main' into feat/1004-get-closest…
hsanjuan Oct 16, 2025
713b7c0
routing/http: accept CID or PeerID for GetClosestPeers
lidel Oct 17, 2025
ce6837e
docs: add GetClosestPeers changes to CHANGELOG
lidel Oct 17, 2025
3256401
fix(routing/http): correct method name in getClosestPeersJSON logging
lidel Oct 17, 2025
918fee9
Merge branch 'main' into feat/1004-get-closest-peers
gammazero Nov 12, 2025
86f6bf2
move DHTRouter interface to contentrouter from routing-helpers
hsanjuan Nov 12, 2025
17f5746
go mod tidy
hsanjuan Nov 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes:

### Added

- `routing/http`: `GET /routing/v1/dht/closest/peers/{key}` per [IPIP-476](https://github.com/ipfs/specs/pull/476)

### Changed

### Removed
Expand Down
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ require (
github.com/libp2p/go-libp2p-kad-dht v0.35.1 // indirect
github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect
github.com/libp2p/go-libp2p-record v0.3.1 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-netroute v0.3.0 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=
github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg=
github.com/libp2p/go-libp2p-record v0.3.1/go.mod h1:T8itUkLcWQLCYMqtX7Th6r7SexyUJpIyPgks757td/E=
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 h1:HdwZj9NKovMx0vqq6YNPTh6aaNzey5zHD7HeLJtq6fI=
github.com/libp2p/go-libp2p-routing-helpers v0.7.5/go.mod h1:3YaxrwP0OBPDD7my3D0KxfR89FlcX/IEbxDEDfAmj98=
github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e h1:6DSfN9gsAmBa1iyAKwIuk9GlEga45iH8MBmuYAuXmpU=
github.com/libp2p/go-libp2p-routing-helpers v0.7.6-0.20251016083611-f098f492895e/go.mod h1:Q1VSaOawgsvaa3hGl/PejADIhl2deiqSEsQDpB3Ggss=
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg=
github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0=
Expand Down
75 changes: 75 additions & 0 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,3 +638,78 @@ func (c *Client) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Recor

return nil
}

// GetClosestPeers obtains the closest peers to the given key (CID or Peer ID).
func (c *Client) GetClosestPeers(ctx context.Context, key cid.Cid) (peers iter.ResultIter[*types.PeerRecord], err error) {
m := newMeasurement("GetClosestPeers")

// Build the base URL path
u, err := gourl.JoinPath(c.baseURL, "routing/v1/dht/closest/peers", key.String())
if err != nil {
return nil, err
}

// Create the HTTP request
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", c.accepts)

m.host = req.Host
start := c.clock.Now()
resp, err := c.httpClient.Do(req)
m.latency = c.clock.Since(start)
m.err = err

if err != nil {
m.record(ctx)
return nil, err
}

var skipBodyClose bool
defer func() {
if !skipBodyClose {
resp.Body.Close()
}
}()

m.statusCode = resp.StatusCode
if resp.StatusCode == http.StatusNotFound {
m.record(ctx)
return iter.FromSlice[iter.Result[*types.PeerRecord]](nil), nil
}

if resp.StatusCode != http.StatusOK {
err := httpError(resp.StatusCode, resp.Body)
m.record(ctx)
return nil, err
}

respContentType := resp.Header.Get("Content-Type")
mediaType, _, err := mime.ParseMediaType(respContentType)
if err != nil {
m.err = err
m.record(ctx)
return nil, fmt.Errorf("parsing Content-Type: %w", err)
}

m.mediaType = mediaType

var it iter.ResultIter[*types.PeerRecord]
switch mediaType {
case mediaTypeJSON:
parsedResp := &jsontypes.PeersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
var sliceIt iter.Iter[*types.PeerRecord] = iter.FromSlice(parsedResp.Peers)
it = iter.ToResultIter(sliceIt)
case mediaTypeNDJSON:
skipBodyClose = true
it = ndjson.NewPeerRecordsIter(resp.Body)
default:
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
return nil, errors.New("unknown content type")
}

return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil
}
131 changes: 131 additions & 0 deletions routing/http/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit in
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
}

func (m *mockContentRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) {
args := m.Called(ctx, key)
return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1)
}

func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
args := m.Called(ctx, name)
rec, _ := args.Get(0).(*ipns.Record)
Expand Down Expand Up @@ -836,6 +841,132 @@ func TestClient_EmptyResponses(t *testing.T) {
}
}

func TestClient_GetClosestPeers(t *testing.T) {
bitswapPeerRecord := makePeerRecord([]string{"transport-bitswap"})
httpPeerRecord := makePeerRecord([]string{"transport-ipfs-gateway-http"})

peerRecords := []iter.Result[*types.PeerRecord]{
{Val: &bitswapPeerRecord},
{Val: &httpPeerRecord},
}

key := peer.ToCid(*bitswapPeerRecord.ID)

cases := []struct {
name string
httpStatusCode int
stopServer bool
routerResult []iter.Result[*types.PeerRecord]
routerErr error
clientRequiresStreaming bool
serverStreamingDisabled bool

expErrContains osErrContains
expResult []iter.Result[*types.PeerRecord]
expStreamingResponse bool
expJSONResponse bool
}{
{
name: "happy case",
routerResult: peerRecords,
expResult: peerRecords,
expStreamingResponse: true,
},
{
name: "server doesn't support streaming",
routerResult: peerRecords,
expResult: peerRecords,
serverStreamingDisabled: true,
expJSONResponse: true,
},
{
name: "client requires streaming but server doesn't support it",
serverStreamingDisabled: true,
clientRequiresStreaming: true,
expErrContains: osErrContains{expContains: "HTTP error with StatusCode=400: no supported content types"},
},
{
name: "returns an error if there's a non-200 response",
httpStatusCode: 500,
expErrContains: osErrContains{expContains: "HTTP error with StatusCode=500"},
},
{
name: "returns an error if the HTTP client returns a non-HTTP error",
stopServer: true,
expErrContains: osErrContains{
expContains: "connect: connection refused",
expContainsWin: "connectex: No connection could be made because the target machine actively refused it.",
},
},
{
name: "returns no providers if the HTTP server returns a 404 response",
httpStatusCode: 404,
expResult: nil,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
var clientOpts []Option
var serverOpts []server.Option
var onRespReceived []func(*http.Response)
var onReqReceived []func(*http.Request)

if c.serverStreamingDisabled {
serverOpts = append(serverOpts, server.WithStreamingResultsDisabled())
}

if c.clientRequiresStreaming {
clientOpts = append(clientOpts, WithStreamResultsRequired())
onReqReceived = append(onReqReceived, func(r *http.Request) {
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Accept"))
})
}

if c.expStreamingResponse {
onRespReceived = append(onRespReceived, func(r *http.Response) {
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type"))
})
}

if c.expJSONResponse {
onRespReceived = append(onRespReceived, func(r *http.Response) {
assert.Equal(t, mediaTypeJSON, r.Header.Get("Content-Type"))
})
}

deps := makeTestDeps(t, clientOpts, serverOpts)

deps.recordingHTTPClient.f = append(deps.recordingHTTPClient.f, onRespReceived...)
deps.recordingHandler.f = append(deps.recordingHandler.f, onReqReceived...)

client := deps.client
router := deps.router

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

if c.httpStatusCode != 0 {
deps.server.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(c.httpStatusCode)
})
}

if c.stopServer {
deps.server.Close()
}

routerResultIter := iter.FromSlice(c.routerResult)
router.On("GetClosestPeers", mock.Anything, key).Return(routerResultIter, c.routerErr)

resultIter, err := client.GetClosestPeers(ctx, key)
c.expErrContains.errContains(t, err)

results := iter.ReadAll(resultIter)
assert.Equal(t, c.expResult, results)
})
}
}

func TestNormalizeBaseURL(t *testing.T) {
cases := []struct {
name string
Expand Down
51 changes: 51 additions & 0 deletions routing/http/contentrouter/contentrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ var logger = logging.Logger("routing/http/contentrouter")

const ttl = 24 * time.Hour

// A Client provides HTTP Delegated Routing methods. See also [server.DelegatedRouter].
type Client interface {
FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error)
ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error)
FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error)
GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error)
PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error
// GetClosestPeers returns the DHT closest peers to the given key (CID or Peer ID).
GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error)
}

type contentRouter struct {
Expand All @@ -37,12 +40,17 @@ type contentRouter struct {
maxProvideBatchSize int
}

type DHTRouter interface {
GetClosestPeers(context.Context, cid.Cid) (<-chan peer.AddrInfo, error)
}

var (
_ routing.ContentRouting = (*contentRouter)(nil)
_ routing.PeerRouting = (*contentRouter)(nil)
_ routing.ValueStore = (*contentRouter)(nil)
_ routinghelpers.ProvideManyRouter = (*contentRouter)(nil)
_ routinghelpers.ReadyAbleRouter = (*contentRouter)(nil)
_ DHTRouter = (*contentRouter)(nil)
)

type option func(c *contentRouter)
Expand All @@ -59,6 +67,8 @@ func WithMaxProvideBatchSize(max int) option {
}
}

// NewContentRoutingClient returns a client that conforms to the
// ContentRouting interfaces.
func NewContentRoutingClient(c Client, opts ...option) *contentRouter {
cr := &contentRouter{
client: c,
Expand Down Expand Up @@ -300,3 +310,44 @@ func (c *contentRouter) SearchValue(ctx context.Context, key string, opts ...rou

return ch, nil
}

func (c *contentRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (<-chan peer.AddrInfo, error) {
iter, err := c.client.GetClosestPeers(ctx, key)
if err != nil {
return nil, err
}
infos := make(chan peer.AddrInfo)
go func() {
defer iter.Close()
defer close(infos)
for iter.Next() {
res := iter.Val()
if res.Err != nil {
logger.Warnf("error iterating peer responses: %s", res.Err)
continue
}

var addrs []multiaddr.Multiaddr
for _, a := range res.Val.Addrs {
addrs = append(addrs, a.Multiaddr)
}

// If there are no addresses there's nothing of value to return
if len(addrs) == 0 {
continue
}

select {
case <-ctx.Done():
logger.Warnf("aborting GetClosestPeers: %s", ctx.Err())
return
case infos <- peer.AddrInfo{
ID: *res.Val.ID,
Addrs: addrs,
}:
}
}
}()

return infos, nil
}
Loading
Loading