Skip to content

Commit ac0a3bc

Browse files
committed
feat: routing/http/client supports /routing/v1/peers
1 parent 789b5f6 commit ac0a3bc

File tree

2 files changed

+217
-17
lines changed

2 files changed

+217
-17
lines changed

routing/http/client/client.go

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func (c *measuringIter[T]) Close() error {
127127
return c.Iter.Close()
128128
}
129129

130-
func (c *client) GetProviders(ctx context.Context, key cid.Cid) (provs iter.ResultIter[types.Record], err error) {
130+
func (c *client) GetProviders(ctx context.Context, key cid.Cid) (providers iter.ResultIter[types.Record], err error) {
131131
// TODO test measurements
132132
m := newMeasurement("GetProviders")
133133

@@ -201,6 +201,79 @@ func (c *client) GetProviders(ctx context.Context, key cid.Cid) (provs iter.Resu
201201
return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil
202202
}
203203

204+
func (c *client) GetPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[types.Record], err error) {
205+
m := newMeasurement("GetPeers")
206+
207+
url := c.baseURL + "/routing/v1/peers/" + peer.ToCid(pid).String()
208+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
209+
if err != nil {
210+
return nil, err
211+
}
212+
req.Header.Set("Accept", c.accepts)
213+
214+
m.host = req.Host
215+
216+
start := c.clock.Now()
217+
resp, err := c.httpClient.Do(req)
218+
219+
m.err = err
220+
m.latency = c.clock.Since(start)
221+
222+
if err != nil {
223+
m.record(ctx)
224+
return nil, err
225+
}
226+
227+
m.statusCode = resp.StatusCode
228+
if resp.StatusCode == http.StatusNotFound {
229+
resp.Body.Close()
230+
m.record(ctx)
231+
return iter.FromSlice[iter.Result[types.Record]](nil), nil
232+
}
233+
234+
if resp.StatusCode != http.StatusOK {
235+
err := httpError(resp.StatusCode, resp.Body)
236+
resp.Body.Close()
237+
m.record(ctx)
238+
return nil, err
239+
}
240+
241+
respContentType := resp.Header.Get("Content-Type")
242+
mediaType, _, err := mime.ParseMediaType(respContentType)
243+
if err != nil {
244+
resp.Body.Close()
245+
m.err = err
246+
m.record(ctx)
247+
return nil, fmt.Errorf("parsing Content-Type: %w", err)
248+
}
249+
250+
m.mediaType = mediaType
251+
252+
var skipBodyClose bool
253+
defer func() {
254+
if !skipBodyClose {
255+
resp.Body.Close()
256+
}
257+
}()
258+
259+
var it iter.ResultIter[types.Record]
260+
switch mediaType {
261+
case mediaTypeJSON:
262+
parsedResp := &jsontypes.PeersResponse{}
263+
err = json.NewDecoder(resp.Body).Decode(parsedResp)
264+
var sliceIt iter.Iter[types.Record] = iter.FromSlice(parsedResp.Peers)
265+
it = iter.ToResultIter(sliceIt)
266+
case mediaTypeNDJSON:
267+
skipBodyClose = true
268+
it = ndjson.NewRecordsIter(resp.Body)
269+
default:
270+
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
271+
return nil, errors.New("unknown content type")
272+
}
273+
274+
return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil
275+
}
276+
204277
func (c *client) GetIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
205278
url := c.baseURL + "/routing/v1/ipns/" + name.String()
206279

routing/http/client/client_test.go

Lines changed: 143 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func addrsToDRAddrs(addrs []multiaddr.Multiaddr) (drmas []types.Multiaddr) {
138138
return
139139
}
140140

141-
func makeBSReadProviderResp() types.PeerRecord {
141+
func makePeerRecord() types.PeerRecord {
142142
peerID, addrs, _ := makeProviderAndIdentity()
143143
return types.PeerRecord{
144144
Schema: types.SchemaPeer,
@@ -189,7 +189,7 @@ func (e *osErrContains) errContains(t *testing.T, err error) {
189189
}
190190

191191
func TestClient_GetProviders(t *testing.T) {
192-
bsReadProvResp := makeBSReadProviderResp()
192+
bsReadProvResp := makePeerRecord()
193193
bitswapProvs := []iter.Result[types.Record]{
194194
{Val: &bsReadProvResp},
195195
}
@@ -198,26 +198,26 @@ func TestClient_GetProviders(t *testing.T) {
198198
name string
199199
httpStatusCode int
200200
stopServer bool
201-
routerProvs []iter.Result[types.Record]
201+
routerResult []iter.Result[types.Record]
202202
routerErr error
203203
clientRequiresStreaming bool
204204
serverStreamingDisabled bool
205205

206206
expErrContains osErrContains
207-
expProvs []iter.Result[types.Record]
207+
expResult []iter.Result[types.Record]
208208
expStreamingResponse bool
209209
expJSONResponse bool
210210
}{
211211
{
212212
name: "happy case",
213-
routerProvs: bitswapProvs,
214-
expProvs: bitswapProvs,
213+
routerResult: bitswapProvs,
214+
expResult: bitswapProvs,
215215
expStreamingResponse: true,
216216
},
217217
{
218218
name: "server doesn't support streaming",
219-
routerProvs: bitswapProvs,
220-
expProvs: bitswapProvs,
219+
routerResult: bitswapProvs,
220+
expResult: bitswapProvs,
221221
serverStreamingDisabled: true,
222222
expJSONResponse: true,
223223
},
@@ -243,7 +243,7 @@ func TestClient_GetProviders(t *testing.T) {
243243
{
244244
name: "returns no providers if the HTTP server returns a 404 respones",
245245
httpStatusCode: 404,
246-
expProvs: nil,
246+
expResult: nil,
247247
},
248248
}
249249
for _, c := range cases {
@@ -268,6 +268,7 @@ func TestClient_GetProviders(t *testing.T) {
268268
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type"))
269269
})
270270
}
271+
271272
if c.expJSONResponse {
272273
onRespReceived = append(onRespReceived, func(r *http.Response) {
273274
assert.Equal(t, mediaTypeJSON, r.Header.Get("Content-Type"))
@@ -296,20 +297,146 @@ func TestClient_GetProviders(t *testing.T) {
296297
}
297298
cid := makeCID()
298299

299-
findProvsIter := iter.FromSlice(c.routerProvs)
300-
300+
routerResultIter := iter.FromSlice(c.routerResult)
301301
if c.expStreamingResponse {
302-
router.On("GetProviders", mock.Anything, cid, 0).Return(findProvsIter, c.routerErr)
302+
router.On("GetProviders", mock.Anything, cid, 0).Return(routerResultIter, c.routerErr)
303303
} else {
304-
router.On("GetProviders", mock.Anything, cid, 20).Return(findProvsIter, c.routerErr)
304+
router.On("GetProviders", mock.Anything, cid, 20).Return(routerResultIter, c.routerErr)
305305
}
306306

307-
provsIter, err := client.GetProviders(ctx, cid)
307+
resultIter, err := client.GetProviders(ctx, cid)
308+
c.expErrContains.errContains(t, err)
309+
310+
results := iter.ReadAll[iter.Result[types.Record]](resultIter)
311+
assert.Equal(t, c.expResult, results)
312+
})
313+
}
314+
}
315+
316+
func TestClient_GetPeers(t *testing.T) {
317+
peerRecord := makePeerRecord()
318+
peerRecords := []iter.Result[types.Record]{
319+
{Val: &peerRecord},
320+
}
321+
pid := *peerRecord.ID
322+
323+
cases := []struct {
324+
name string
325+
httpStatusCode int
326+
stopServer bool
327+
routerResult []iter.Result[types.Record]
328+
routerErr error
329+
clientRequiresStreaming bool
330+
serverStreamingDisabled bool
331+
332+
expErrContains osErrContains
333+
expResult []iter.Result[types.Record]
334+
expStreamingResponse bool
335+
expJSONResponse bool
336+
}{
337+
{
338+
name: "happy case",
339+
routerResult: peerRecords,
340+
expResult: peerRecords,
341+
expStreamingResponse: true,
342+
},
343+
{
344+
name: "server doesn't support streaming",
345+
routerResult: peerRecords,
346+
expResult: peerRecords,
347+
serverStreamingDisabled: true,
348+
expJSONResponse: true,
349+
},
350+
{
351+
name: "client requires streaming but server doesn't support it",
352+
serverStreamingDisabled: true,
353+
clientRequiresStreaming: true,
354+
expErrContains: osErrContains{expContains: "HTTP error with StatusCode=400: no supported content types"},
355+
},
356+
{
357+
name: "returns an error if there's a non-200 response",
358+
httpStatusCode: 500,
359+
expErrContains: osErrContains{expContains: "HTTP error with StatusCode=500"},
360+
},
361+
{
362+
name: "returns an error if the HTTP client returns a non-HTTP error",
363+
stopServer: true,
364+
expErrContains: osErrContains{
365+
expContains: "connect: connection refused",
366+
expContainsWin: "connectex: No connection could be made because the target machine actively refused it.",
367+
},
368+
},
369+
{
370+
name: "returns no providers if the HTTP server returns a 404 respones",
371+
httpStatusCode: 404,
372+
expResult: nil,
373+
},
374+
}
375+
for _, c := range cases {
376+
t.Run(c.name, func(t *testing.T) {
377+
var (
378+
clientOpts []Option
379+
serverOpts []server.Option
380+
onRespReceived []func(*http.Response)
381+
onReqReceived []func(*http.Request)
382+
)
383+
384+
if c.serverStreamingDisabled {
385+
serverOpts = append(serverOpts, server.WithStreamingResultsDisabled())
386+
}
387+
388+
if c.clientRequiresStreaming {
389+
clientOpts = append(clientOpts, WithStreamResultsRequired())
390+
onReqReceived = append(onReqReceived, func(r *http.Request) {
391+
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Accept"))
392+
})
393+
}
394+
395+
if c.expStreamingResponse {
396+
onRespReceived = append(onRespReceived, func(r *http.Response) {
397+
assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type"))
398+
})
399+
}
400+
401+
if c.expJSONResponse {
402+
onRespReceived = append(onRespReceived, func(r *http.Response) {
403+
assert.Equal(t, mediaTypeJSON, r.Header.Get("Content-Type"))
404+
})
405+
}
406+
407+
deps := makeTestDeps(t, clientOpts, serverOpts)
408+
409+
deps.recordingHTTPClient.f = append(deps.recordingHTTPClient.f, onRespReceived...)
410+
deps.recordingHandler.f = append(deps.recordingHandler.f, onReqReceived...)
411+
412+
client := deps.client
413+
router := deps.router
414+
415+
ctx, cancel := context.WithCancel(context.Background())
416+
t.Cleanup(cancel)
417+
418+
if c.httpStatusCode != 0 {
419+
deps.server.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
420+
w.WriteHeader(c.httpStatusCode)
421+
})
422+
}
423+
424+
if c.stopServer {
425+
deps.server.Close()
426+
}
427+
428+
routerResultIter := iter.FromSlice(c.routerResult)
429+
if c.expStreamingResponse {
430+
router.On("GetPeers", mock.Anything, pid, 0).Return(routerResultIter, c.routerErr)
431+
} else {
432+
router.On("GetPeers", mock.Anything, pid, 20).Return(routerResultIter, c.routerErr)
433+
}
308434

435+
resultIter, err := client.GetPeers(ctx, pid)
309436
c.expErrContains.errContains(t, err)
310437

311-
provs := iter.ReadAll[iter.Result[types.Record]](provsIter)
312-
assert.Equal(t, c.expProvs, provs)
438+
results := iter.ReadAll[iter.Result[types.Record]](resultIter)
439+
assert.Equal(t, c.expResult, results)
313440
})
314441
}
315442
}

0 commit comments

Comments
 (0)