Skip to content

Commit 5470635

Browse files
committed
routing/http/types: add announcement-related types
also fixes an issue where NewPeerRecordsIter was actually not filtering the out the records with different schema. instead it was returning an empty result with no error. that could lead to errors.
1 parent 76d9292 commit 5470635

File tree

8 files changed

+490
-334
lines changed

8 files changed

+490
-334
lines changed

routing/http/types/iter/filter.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package iter
2+
3+
// Filter invokes f on each element of iter, filtering the results.
4+
func Filter[T any](iter Iter[T], f func(t T) bool) *FilterIter[T] {
5+
return &FilterIter[T]{iter: iter, f: f}
6+
}
7+
8+
type FilterIter[T any] struct {
9+
iter Iter[T]
10+
f func(T) bool
11+
12+
done bool
13+
val T
14+
}
15+
16+
func (f *FilterIter[T]) Next() bool {
17+
if f.done {
18+
return false
19+
}
20+
21+
ok := f.iter.Next()
22+
if !ok {
23+
f.done = true
24+
return false
25+
}
26+
27+
val := f.iter.Val()
28+
if !f.f(val) {
29+
return f.Next()
30+
}
31+
32+
f.val = val
33+
return true
34+
}
35+
36+
func (f *FilterIter[T]) Val() T {
37+
return f.val
38+
}
39+
40+
func (f *FilterIter[T]) Close() error {
41+
return f.iter.Close()
42+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package iter
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestFilter(t *testing.T) {
11+
for _, c := range []struct {
12+
input Iter[int]
13+
f func(int) bool
14+
expResults []int
15+
}{
16+
{
17+
input: FromSlice([]int{1, 2, 3}),
18+
f: func(i int) bool { return i != 2 },
19+
expResults: []int{1, 3},
20+
},
21+
{
22+
input: FromSlice([]int{}),
23+
f: func(i int) bool { return true },
24+
expResults: nil,
25+
},
26+
{
27+
input: FromSlice([]int{-3, -2, 1, -5, 2}),
28+
f: func(i int) bool { return i > 0 },
29+
expResults: []int{1, 2},
30+
},
31+
{
32+
input: FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9}),
33+
f: func(i int) bool { return i%2 == 0 },
34+
expResults: []int{2, 4, 6, 8},
35+
},
36+
} {
37+
t.Run(fmt.Sprintf("%v", c.input), func(t *testing.T) {
38+
iter := Filter(c.input, c.f)
39+
var res []int
40+
for iter.Next() {
41+
res = append(res, iter.Val())
42+
}
43+
assert.Equal(t, c.expResults, res)
44+
})
45+
}
46+
}

routing/http/types/json/requests.go

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,11 @@
11
package json
22

3-
import (
4-
"encoding/json"
5-
6-
"github.com/ipfs/boxo/routing/http/types"
7-
)
8-
9-
// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]:
10-
//
11-
// [IPIP-378]: https://github.com/ipfs/specs/pull/378
12-
type WriteProvidersRequest struct {
13-
Providers []types.Record
3+
// AnnounceProvidersRequest is the content of a PUT Providers request.
4+
type AnnounceProvidersRequest struct {
5+
Error string
6+
Providers RecordsArray
147
}
158

16-
func (r *WriteProvidersRequest) UnmarshalJSON(b []byte) error {
17-
type wpr struct{ Providers []json.RawMessage }
18-
var tempWPR wpr
19-
err := json.Unmarshal(b, &tempWPR)
20-
if err != nil {
21-
return err
22-
}
23-
24-
for _, provBytes := range tempWPR.Providers {
25-
var rawProv types.UnknownRecord
26-
err := json.Unmarshal(provBytes, &rawProv)
27-
if err != nil {
28-
return err
29-
}
30-
31-
switch rawProv.Schema {
32-
//lint:ignore SA1019 // ignore staticcheck
33-
case types.SchemaBitswap:
34-
//lint:ignore SA1019 // ignore staticcheck
35-
var prov types.WriteBitswapRecord
36-
err := json.Unmarshal(rawProv.Bytes, &prov)
37-
if err != nil {
38-
return err
39-
}
40-
r.Providers = append(r.Providers, &prov)
41-
default:
42-
var prov types.UnknownRecord
43-
err := json.Unmarshal(b, &prov)
44-
if err != nil {
45-
return err
46-
}
47-
r.Providers = append(r.Providers, &prov)
48-
}
49-
}
50-
return nil
51-
}
9+
// AnnouncePeersRequest is the content of a PUT Peers request.
10+
// TODO: is the the same? Shouldn't Providers be Peers?
11+
type AnnouncePeersRequest = AnnounceProvidersRequest

routing/http/types/json/responses.go

Lines changed: 7 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,8 @@ func (r *RecordsArray) UnmarshalJSON(b []byte) error {
4141
return err
4242
}
4343
*r = append(*r, &prov)
44-
//lint:ignore SA1019 // ignore staticcheck
45-
case types.SchemaBitswap:
46-
//lint:ignore SA1019 // ignore staticcheck
47-
var prov types.BitswapRecord
44+
case types.SchemaAnnouncement:
45+
var prov types.AnnouncementRecord
4846
err := json.Unmarshal(provBytes, &prov)
4947
if err != nil {
5048
return err
@@ -58,41 +56,10 @@ func (r *RecordsArray) UnmarshalJSON(b []byte) error {
5856
return nil
5957
}
6058

61-
// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]:
62-
//
63-
// [IPIP-378]: https://github.com/ipfs/specs/pull/378
64-
type WriteProvidersResponse struct {
65-
ProvideResults []types.Record
59+
// AnnounceProvidersResponse is the result of a PUT Providers request.
60+
type AnnounceProvidersResponse struct {
61+
ProvideResults []*types.AnnouncementRecord
6662
}
6763

68-
func (r *WriteProvidersResponse) UnmarshalJSON(b []byte) error {
69-
var tempWPR struct{ ProvideResults []json.RawMessage }
70-
err := json.Unmarshal(b, &tempWPR)
71-
if err != nil {
72-
return err
73-
}
74-
75-
for _, provBytes := range tempWPR.ProvideResults {
76-
var rawProv types.UnknownRecord
77-
err := json.Unmarshal(provBytes, &rawProv)
78-
if err != nil {
79-
return err
80-
}
81-
82-
switch rawProv.Schema {
83-
//lint:ignore SA1019 // ignore staticcheck
84-
case types.SchemaBitswap:
85-
//lint:ignore SA1019 // ignore staticcheck
86-
var prov types.WriteBitswapRecordResponse
87-
err := json.Unmarshal(rawProv.Bytes, &prov)
88-
if err != nil {
89-
return err
90-
}
91-
r.ProvideResults = append(r.ProvideResults, &prov)
92-
default:
93-
r.ProvideResults = append(r.ProvideResults, &rawProv)
94-
}
95-
}
96-
97-
return nil
98-
}
64+
// AnnouncePeersResponse is the result of a PUT Peers request.
65+
type AnnouncePeersResponse = AnnounceProvidersResponse

routing/http/types/ndjson/records.go

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ndjson
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"io"
67

78
"github.com/ipfs/boxo/routing/http/types"
@@ -26,10 +27,8 @@ func NewRecordsIter(r io.Reader) iter.Iter[iter.Result[types.Record]] {
2627
return result
2728
}
2829
result.Val = &prov
29-
//lint:ignore SA1019 // ignore staticcheck
30-
case types.SchemaBitswap:
31-
//lint:ignore SA1019 // ignore staticcheck
32-
var prov types.BitswapRecord
30+
case types.SchemaAnnouncement:
31+
var prov types.AnnouncementRecord
3332
err := json.Unmarshal(upr.Val.Bytes, &prov)
3433
if err != nil {
3534
result.Err = err
@@ -45,29 +44,42 @@ func NewRecordsIter(r io.Reader) iter.Iter[iter.Result[types.Record]] {
4544
return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn)
4645
}
4746

48-
// NewPeerRecordsIter returns an iterator that reads [types.PeerRecord] from the given [io.Reader].
49-
// Records with a different schema are safely ignored. If you want to read all records, use
47+
// NewAnnouncementRecordsIter returns an iterator that reads [types.AnnouncementRecord]
48+
// from the given [io.Reader]. Records with a different schema are ignored. To read all
49+
// records, use [NewRecordsIter] instead.
50+
func NewAnnouncementRecordsIter(r io.Reader) iter.Iter[iter.Result[*types.AnnouncementRecord]] {
51+
return newFilteredRecords[*types.AnnouncementRecord](r, types.SchemaPeer)
52+
}
53+
54+
// NewPeerRecordsIter returns an iterator that reads [types.PeerRecord] from the given
55+
// [io.Reader]. Records with a different schema are ignored. To read all records, use
5056
// [NewRecordsIter] instead.
5157
func NewPeerRecordsIter(r io.Reader) iter.Iter[iter.Result[*types.PeerRecord]] {
52-
jsonIter := iter.FromReaderJSON[types.UnknownRecord](r)
53-
mapFn := func(upr iter.Result[types.UnknownRecord]) iter.Result[*types.PeerRecord] {
54-
var result iter.Result[*types.PeerRecord]
55-
if upr.Err != nil {
56-
result.Err = upr.Err
57-
return result
58-
}
59-
switch upr.Val.Schema {
60-
case types.SchemaPeer:
61-
var prov types.PeerRecord
62-
err := json.Unmarshal(upr.Val.Bytes, &prov)
63-
if err != nil {
64-
result.Err = err
58+
return newFilteredRecords[*types.PeerRecord](r, types.SchemaPeer)
59+
}
60+
61+
func newFilteredRecords[T any](r io.Reader, schema string) iter.Iter[iter.Result[T]] {
62+
return iter.Map[iter.Result[types.Record]](
63+
iter.Filter(NewRecordsIter(r), func(t iter.Result[types.Record]) bool {
64+
return t.Val.GetSchema() == schema
65+
}),
66+
func(upr iter.Result[types.Record]) iter.Result[T] {
67+
var result iter.Result[T]
68+
if upr.Err != nil {
69+
result.Err = upr.Err
6570
return result
6671
}
67-
result.Val = &prov
68-
}
69-
return result
70-
}
7172

72-
return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn)
73+
// Note that this should never happen unless [NewRecordsIter] is not well
74+
// is not well implemented.
75+
val, ok := upr.Val.(T)
76+
if !ok {
77+
result.Err = fmt.Errorf("type incompatible with schema %s", schema)
78+
return result
79+
}
80+
81+
result.Val = val
82+
return result
83+
},
84+
)
7385
}

0 commit comments

Comments
 (0)