Skip to content

Commit 81b996c

Browse files
committed
feat: routing/http/server supports /routing/v1/peers
1 parent af91c37 commit 81b996c

File tree

7 files changed

+323
-93
lines changed

7 files changed

+323
-93
lines changed

routing/http/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func (c *client) GetProviders(ctx context.Context, key cid.Cid) (provs iter.Resu
192192
it = iter.ToResultIter(sliceIt)
193193
case mediaTypeNDJSON:
194194
skipBodyClose = true
195-
it = ndjson.NewProvidersResponseIter(resp.Body)
195+
it = ndjson.NewRecordsIter(resp.Body)
196196
default:
197197
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
198198
return nil, errors.New("unknown content type")

routing/http/client/client_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ func (m *mockContentRouter) GetProviders(ctx context.Context, key cid.Cid, limit
3535
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
3636
}
3737

38+
func (m *mockContentRouter) GetPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error) {
39+
args := m.Called(ctx, pid, limit)
40+
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
41+
}
42+
3843
func (m *mockContentRouter) GetIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
3944
args := m.Called(ctx, name)
4045
return args.Get(0).(*ipns.Record), args.Error(1)

routing/http/server/server.go

Lines changed: 163 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/ipfs/boxo/routing/http/types/iter"
2020
jsontypes "github.com/ipfs/boxo/routing/http/types/json"
2121
"github.com/ipfs/go-cid"
22+
"github.com/libp2p/go-libp2p/core/peer"
2223

2324
logging "github.com/ipfs/go-log/v2"
2425
)
@@ -37,6 +38,7 @@ var logger = logging.Logger("service/server/delegatedrouting")
3738

3839
const (
3940
GetProvidersPath = "/routing/v1/providers/{cid}"
41+
GetPeersPath = "/routing/v1/peers/{peer-id}"
4042
GetIPNSRecordPath = "/routing/v1/ipns/{cid}"
4143
)
4244

@@ -50,6 +52,10 @@ type ContentRouter interface {
5052
// Limit indicates the maximum amount of results to return; 0 means unbounded.
5153
GetProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error)
5254

55+
// GetPeers searches for peers who have the provided [peer.ID].
56+
// Limit indicates the maximum amount of results to return; 0 means unbounded.
57+
GetPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error)
58+
5359
// GetIPNSRecord searches for an [ipns.Record] for the given [ipns.Name].
5460
GetIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error)
5561

@@ -96,8 +102,10 @@ func Handler(svc ContentRouter, opts ...Option) http.Handler {
96102

97103
r := mux.NewRouter()
98104
r.HandleFunc(GetProvidersPath, server.getProviders).Methods(http.MethodGet)
105+
r.HandleFunc(GetPeersPath, server.getPeers).Methods(http.MethodGet)
99106
r.HandleFunc(GetIPNSRecordPath, server.getIPNSRecord).Methods(http.MethodGet)
100107
r.HandleFunc(GetIPNSRecordPath, server.putIPNSRecord).Methods(http.MethodPut)
108+
101109
return r
102110
}
103111

@@ -108,6 +116,43 @@ type server struct {
108116
streamingRecordsLimit int
109117
}
110118

119+
func (s *server) detectResponseType(r *http.Request) (string, error) {
120+
var (
121+
supportsNDJSON bool
122+
supportsJSON bool
123+
124+
acceptHeaders = r.Header.Values("Accept")
125+
)
126+
127+
if len(acceptHeaders) == 0 {
128+
return mediaTypeJSON, nil
129+
}
130+
131+
for _, acceptHeader := range acceptHeaders {
132+
for _, accept := range strings.Split(acceptHeader, ",") {
133+
mediaType, _, err := mime.ParseMediaType(accept)
134+
if err != nil {
135+
return "", fmt.Errorf("unable to parse Accept header: %w", err)
136+
}
137+
138+
switch mediaType {
139+
case mediaTypeJSON, mediaTypeWildcard:
140+
supportsJSON = true
141+
case mediaTypeNDJSON:
142+
supportsNDJSON = true
143+
}
144+
}
145+
}
146+
147+
if supportsNDJSON && !s.disableNDJSON {
148+
return mediaTypeNDJSON, nil
149+
} else if supportsJSON {
150+
return mediaTypeJSON, nil
151+
} else {
152+
return "", errors.New("no supported content types")
153+
}
154+
}
155+
111156
func (s *server) getProviders(w http.ResponseWriter, httpReq *http.Request) {
112157
vars := mux.Vars(httpReq)
113158
cidStr := vars["cid"]
@@ -117,43 +162,23 @@ func (s *server) getProviders(w http.ResponseWriter, httpReq *http.Request) {
117162
return
118163
}
119164

120-
var handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record])
165+
mediaType, err := s.detectResponseType(httpReq)
166+
if err != nil {
167+
writeErr(w, "GetProviders", http.StatusBadRequest, err)
168+
return
169+
}
121170

122-
var supportsNDJSON bool
123-
var supportsJSON bool
124-
var recordsLimit int
125-
acceptHeaders := httpReq.Header.Values("Accept")
126-
if len(acceptHeaders) == 0 {
171+
var (
172+
handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record])
173+
recordsLimit int
174+
)
175+
176+
if mediaType == mediaTypeNDJSON {
177+
handlerFunc = s.getProvidersNDJSON
178+
recordsLimit = s.streamingRecordsLimit
179+
} else {
127180
handlerFunc = s.getProvidersJSON
128181
recordsLimit = s.recordsLimit
129-
} else {
130-
for _, acceptHeader := range acceptHeaders {
131-
for _, accept := range strings.Split(acceptHeader, ",") {
132-
mediaType, _, err := mime.ParseMediaType(accept)
133-
if err != nil {
134-
writeErr(w, "GetProviders", http.StatusBadRequest, fmt.Errorf("unable to parse Accept header: %w", err))
135-
return
136-
}
137-
138-
switch mediaType {
139-
case mediaTypeJSON, mediaTypeWildcard:
140-
supportsJSON = true
141-
case mediaTypeNDJSON:
142-
supportsNDJSON = true
143-
}
144-
}
145-
}
146-
147-
if supportsNDJSON && !s.disableNDJSON {
148-
handlerFunc = s.getProvidersNDJSON
149-
recordsLimit = s.streamingRecordsLimit
150-
} else if supportsJSON {
151-
handlerFunc = s.getProvidersJSON
152-
recordsLimit = s.recordsLimit
153-
} else {
154-
writeErr(w, "GetProviders", http.StatusBadRequest, errors.New("no supported content types"))
155-
return
156-
}
157182
}
158183

159184
provIter, err := s.svc.GetProviders(httpReq.Context(), cid, recordsLimit)
@@ -168,58 +193,82 @@ func (s *server) getProviders(w http.ResponseWriter, httpReq *http.Request) {
168193
func (s *server) getProvidersJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) {
169194
defer provIter.Close()
170195

171-
var (
172-
providers []types.Record
173-
i int
174-
)
175-
176-
for provIter.Next() {
177-
res := provIter.Val()
178-
if res.Err != nil {
179-
writeErr(w, "GetProviders", http.StatusInternalServerError, fmt.Errorf("delegate error on result %d: %w", i, res.Err))
180-
return
181-
}
182-
providers = append(providers, res.Val)
183-
i++
196+
providers, err := iter.ReadAllResults(provIter)
197+
if err != nil {
198+
writeErr(w, "GetProviders", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err))
199+
return
184200
}
185-
response := jsontypes.ProvidersResponse{Providers: providers}
186-
writeJSONResult(w, "GetProviders", response)
201+
202+
writeJSONResult(w, "GetProviders", jsontypes.ProvidersResponse{
203+
Providers: providers,
204+
})
187205
}
188206

189207
func (s *server) getProvidersNDJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) {
190-
defer provIter.Close()
208+
writeResultsIterNDJSON(w, provIter)
209+
}
191210

192-
w.Header().Set("Content-Type", mediaTypeNDJSON)
193-
w.WriteHeader(http.StatusOK)
194-
for provIter.Next() {
195-
res := provIter.Val()
196-
if res.Err != nil {
197-
logger.Errorw("GetProviders ndjson iterator error", "Error", res.Err)
198-
return
199-
}
200-
// don't use an encoder because we can't easily differentiate writer errors from encoding errors
201-
b, err := drjson.MarshalJSONBytes(res.Val)
202-
if err != nil {
203-
logger.Errorw("GetProviders ndjson marshal error", "Error", err)
204-
return
205-
}
211+
func (s *server) getPeers(w http.ResponseWriter, r *http.Request) {
212+
pidStr := mux.Vars(r)["peer-id"]
206213

207-
_, err = w.Write(b)
208-
if err != nil {
209-
logger.Warn("GetProviders ndjson write error", "Error", err)
210-
return
211-
}
214+
// pidStr must be in CIDv1 format. Therefore, use [cid.Decode]. We can't use
215+
// [peer.Decode] because that would allow other formats to pass through.
216+
cid, err := cid.Decode(pidStr)
217+
if err != nil {
218+
writeErr(w, "GetPeers", http.StatusBadRequest, fmt.Errorf("unable to parse peer ID: %w", err))
219+
return
220+
}
212221

213-
_, err = w.Write([]byte{'\n'})
214-
if err != nil {
215-
logger.Warn("GetProviders ndjson write error", "Error", err)
216-
return
217-
}
222+
pid, err := peer.FromCid(cid)
223+
if err != nil {
224+
writeErr(w, "GetPeers", http.StatusBadRequest, fmt.Errorf("unable to parse peer ID: %w", err))
225+
return
226+
}
218227

219-
if f, ok := w.(http.Flusher); ok {
220-
f.Flush()
221-
}
228+
mediaType, err := s.detectResponseType(r)
229+
if err != nil {
230+
writeErr(w, "GetPeers", http.StatusBadRequest, err)
231+
return
232+
}
233+
234+
var (
235+
handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record])
236+
recordsLimit int
237+
)
238+
239+
if mediaType == mediaTypeNDJSON {
240+
handlerFunc = s.getPeersNDJSON
241+
recordsLimit = s.streamingRecordsLimit
242+
} else {
243+
handlerFunc = s.getPeersJSON
244+
recordsLimit = s.recordsLimit
245+
}
246+
247+
provIter, err := s.svc.GetPeers(r.Context(), pid, recordsLimit)
248+
if err != nil {
249+
writeErr(w, "GetPeers", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err))
250+
return
251+
}
252+
253+
handlerFunc(w, provIter)
254+
}
255+
256+
func (s *server) getPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[types.Record]) {
257+
defer peersIter.Close()
258+
259+
peers, err := iter.ReadAllResults(peersIter)
260+
if err != nil {
261+
writeErr(w, "GetPeers", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err))
262+
return
222263
}
264+
265+
writeJSONResult(w, "GetPeers", jsontypes.PeersResponse{
266+
Peers: peers,
267+
})
268+
}
269+
270+
func (s *server) getPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[types.Record]) {
271+
writeResultsIterNDJSON(w, peersIter)
223272
}
224273

225274
func (s *server) getIPNSRecord(w http.ResponseWriter, r *http.Request) {
@@ -347,3 +396,40 @@ func writeErr(w http.ResponseWriter, method string, statusCode int, cause error)
347396
func logErr(method, msg string, err error) {
348397
logger.Infow(msg, "Method", method, "Error", err)
349398
}
399+
400+
func writeResultsIterNDJSON(w http.ResponseWriter, resultIter iter.ResultIter[types.Record]) {
401+
defer resultIter.Close()
402+
403+
w.Header().Set("Content-Type", mediaTypeNDJSON)
404+
w.WriteHeader(http.StatusOK)
405+
406+
for resultIter.Next() {
407+
res := resultIter.Val()
408+
if res.Err != nil {
409+
logger.Errorw("ndjson iterator error", "Error", res.Err)
410+
return
411+
}
412+
// don't use an encoder because we can't easily differentiate writer errors from encoding errors
413+
b, err := drjson.MarshalJSONBytes(res.Val)
414+
if err != nil {
415+
logger.Errorw("ndjson marshal error", "Error", err)
416+
return
417+
}
418+
419+
_, err = w.Write(b)
420+
if err != nil {
421+
logger.Warn("ndjson write error", "Error", err)
422+
return
423+
}
424+
425+
_, err = w.Write([]byte{'\n'})
426+
if err != nil {
427+
logger.Warn("ndjson write error", "Error", err)
428+
return
429+
}
430+
431+
if f, ok := w.(http.Flusher); ok {
432+
f.Flush()
433+
}
434+
}
435+
}

0 commit comments

Comments
 (0)