Skip to content

Commit 2e939bc

Browse files
authored
Move dapr/utils/streams to kit (#68)
* Move dapr/utils/streams to kit No code changes Signed-off-by: ItalyPaleAle <[email protected]> * 💄 Signed-off-by: ItalyPaleAle <[email protected]> * Lint Signed-off-by: ItalyPaleAle <[email protected]> --------- Signed-off-by: ItalyPaleAle <[email protected]>
1 parent 76c6281 commit 2e939bc

9 files changed

+539
-2
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4
1111
github.com/sirupsen/logrus v1.9.3
1212
github.com/stretchr/testify v1.8.4
13+
github.com/tidwall/transform v0.0.0-20201103190739-32f242e2dbde
1314
golang.org/x/crypto v0.14.0
1415
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
1516
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
4545
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
4646
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
4747
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
48+
github.com/tidwall/transform v0.0.0-20201103190739-32f242e2dbde h1:AMNpJRc7P+GTwVbl8DkK2I9I8BBUzNiHuH/tlxrpan0=
49+
github.com/tidwall/transform v0.0.0-20201103190739-32f242e2dbde/go.mod h1:MvrEmduDUz4ST5pGZ7CABCnOU5f3ZiOAZzT6b1A6nX8=
4850
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
4951
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
5052
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=

signals/signals_posix_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"testing"
2525
"time"
2626

27-
"github.com/stretchr/testify/assert"
27+
"github.com/stretchr/testify/require"
2828
)
2929

3030
func TestContext(t *testing.T) {
@@ -35,7 +35,7 @@ func TestContext(t *testing.T) {
3535
onlyOneSignalHandler = make(chan struct{})
3636

3737
ctx := Context()
38-
assert.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGINT))
38+
require.NoError(t, syscall.Kill(syscall.Getpid(), syscall.SIGINT))
3939
select {
4040
case <-ctx.Done():
4141
case <-time.After(1 * time.Second):

streams/limitreadcloser.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package streams
15+
16+
import (
17+
"errors"
18+
"io"
19+
)
20+
21+
/*!
22+
Adapted from the Go 1.18.3 source code
23+
Copyright 2009 The Go Authors. All rights reserved.
24+
License: BSD (https://github.com/golang/go/blob/go1.18.3/LICENSE)
25+
*/
26+
27+
// ErrStreamTooLarge is returned by LimitReadCloser when the stream is too large.
28+
var ErrStreamTooLarge = errors.New("stream too large")
29+
30+
// LimitReadCloser returns a ReadCloser that reads from r but stops with ErrStreamTooLarge after n bytes.
31+
func LimitReadCloser(r io.ReadCloser, n int64) io.ReadCloser {
32+
return &limitReadCloser{
33+
R: r,
34+
N: n,
35+
}
36+
}
37+
38+
type limitReadCloser struct {
39+
R io.ReadCloser
40+
N int64
41+
closed bool
42+
}
43+
44+
func (l *limitReadCloser) Read(p []byte) (n int, err error) {
45+
if l.N < 0 || l.R == nil {
46+
return 0, ErrStreamTooLarge
47+
}
48+
if len(p) == 0 {
49+
return 0, nil
50+
}
51+
if l.closed {
52+
return 0, io.EOF
53+
}
54+
if int64(len(p)) > (l.N + 1) {
55+
p = p[0:(l.N + 1)]
56+
}
57+
n, err = l.R.Read(p)
58+
l.N -= int64(n)
59+
if l.N < 0 {
60+
// Special case if we just read the "l.N+1" byte
61+
if l.N == -1 {
62+
n--
63+
}
64+
if err == nil {
65+
err = ErrStreamTooLarge
66+
}
67+
if !l.closed {
68+
l.closed = true
69+
l.R.Close()
70+
}
71+
}
72+
return
73+
}
74+
75+
func (l *limitReadCloser) Close() error {
76+
if l.closed {
77+
return nil
78+
}
79+
l.closed = true
80+
return l.R.Close()
81+
}

streams/limitreadcloser_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package streams
15+
16+
import (
17+
"io"
18+
"strings"
19+
"testing"
20+
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
func TestLimitReadCloser(t *testing.T) {
25+
t.Run("stream shorter than limit", func(t *testing.T) {
26+
s := LimitReadCloser(io.NopCloser(strings.NewReader("e ho guardato dentro un'emozione")), 1000)
27+
read, err := io.ReadAll(s)
28+
require.NoError(t, err)
29+
require.Equal(t, "e ho guardato dentro un'emozione", string(read))
30+
31+
// Reading again should return io.EOF
32+
n, err := s.Read(read)
33+
require.ErrorIs(t, err, io.EOF)
34+
require.Equal(t, 0, n)
35+
})
36+
37+
t.Run("stream has same length as limit", func(t *testing.T) {
38+
s := LimitReadCloser(io.NopCloser(strings.NewReader("e ci ho visto dentro tanto amore")), 32)
39+
read, err := io.ReadAll(s)
40+
require.NoError(t, err)
41+
require.Equal(t, "e ci ho visto dentro tanto amore", string(read))
42+
43+
// Reading again should return io.EOF
44+
n, err := s.Read(read)
45+
require.ErrorIs(t, err, io.EOF)
46+
require.Equal(t, 0, n)
47+
})
48+
49+
t.Run("stream longer than limit", func(t *testing.T) {
50+
s := LimitReadCloser(io.NopCloser(strings.NewReader("che ho capito perche' non si comanda al cuore")), 21)
51+
read, err := io.ReadAll(s)
52+
require.Error(t, err)
53+
require.ErrorIs(t, err, ErrStreamTooLarge)
54+
require.Equal(t, "che ho capito perche'", string(read))
55+
56+
// Reading again should return ErrStreamTooLarge again
57+
n, err := s.Read(read)
58+
require.ErrorIs(t, err, ErrStreamTooLarge)
59+
require.Equal(t, 0, n)
60+
})
61+
62+
t.Run("stream longer than limit, read with byte slice", func(t *testing.T) {
63+
s := LimitReadCloser(io.NopCloser(strings.NewReader("e va bene cosi'")), 4)
64+
65+
read := make([]byte, 100)
66+
n, err := s.Read(read)
67+
require.Error(t, err)
68+
require.ErrorIs(t, err, ErrStreamTooLarge)
69+
require.Equal(t, "e va", string(read[0:n]))
70+
71+
// Reading again should return ErrStreamTooLarge again
72+
n, err = s.Read(read)
73+
require.ErrorIs(t, err, ErrStreamTooLarge)
74+
require.Equal(t, 0, n)
75+
})
76+
77+
t.Run("read in two segments", func(t *testing.T) {
78+
s := LimitReadCloser(io.NopCloser(strings.NewReader("senza parole")), 9)
79+
80+
read := make([]byte, 5)
81+
82+
n, err := s.Read(read)
83+
require.NoError(t, err)
84+
require.Equal(t, "senza", string(read[0:n]))
85+
86+
n, err = s.Read(read)
87+
require.Error(t, err)
88+
require.ErrorIs(t, err, ErrStreamTooLarge)
89+
require.Equal(t, " par", string(read[0:n]))
90+
91+
// Reading again should return ErrStreamTooLarge again
92+
n, err = s.Read(read)
93+
require.ErrorIs(t, err, ErrStreamTooLarge)
94+
require.Equal(t, 0, n)
95+
})
96+
97+
t.Run("close early", func(t *testing.T) {
98+
s := LimitReadCloser(io.NopCloser(strings.NewReader("senza parole")), 10)
99+
100+
// Read 5 bytes then close
101+
read := make([]byte, 5)
102+
n, err := s.Read(read)
103+
require.NoError(t, err)
104+
require.Equal(t, "senza", string(read[0:n]))
105+
106+
// Reading should now return io.EOF
107+
err = s.Close()
108+
require.NoError(t, err)
109+
110+
n, err = s.Read(read)
111+
require.Error(t, err)
112+
require.ErrorIs(t, err, io.EOF)
113+
require.Equal(t, 0, n)
114+
})
115+
116+
t.Run("stream is nil", func(t *testing.T) {
117+
s := LimitReadCloser(nil, 10)
118+
119+
// Reading should return ErrStreamTooLarge again
120+
n, err := s.Read(make([]byte, 10))
121+
require.ErrorIs(t, err, ErrStreamTooLarge)
122+
require.Equal(t, 0, n)
123+
})
124+
}

streams/multireadercloser.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package streams
15+
16+
import (
17+
"errors"
18+
"io"
19+
"net/http"
20+
)
21+
22+
// NewMultiReaderCloser returns a stream that is like io.MultiReader but that can be closed.
23+
// When the returned stream is closed, it closes the readable streams too, if they implement io.Closer.
24+
func NewMultiReaderCloser(readers ...io.Reader) *MultiReaderCloser {
25+
r := make([]io.Reader, len(readers))
26+
copy(r, readers)
27+
return &MultiReaderCloser{
28+
readers: r,
29+
}
30+
}
31+
32+
/*!
33+
Adapted from the Go 1.19.3 source code
34+
Copyright 2009 The Go Authors. All rights reserved.
35+
License: BSD (https://github.com/golang/go/blob/go1.19.3/LICENSE)
36+
*/
37+
38+
// MultiReaderCloser is an io.MultiReader that also implements the io.Closer interface to close the readable streams.
39+
// Readable streams are also closed when we're done reading from them.
40+
type MultiReaderCloser struct {
41+
readers []io.Reader
42+
}
43+
44+
func (mr *MultiReaderCloser) Read(p []byte) (n int, err error) {
45+
for len(mr.readers) > 0 {
46+
r := mr.readers[0]
47+
n, err = r.Read(p)
48+
49+
// When reading from a http.Response Body, we may get ErrBodyReadAfterClose if we already read it all
50+
// We consider that the same as io.EOF
51+
if errors.Is(err, http.ErrBodyReadAfterClose) {
52+
err = io.EOF
53+
mr.readers = mr.readers[1:]
54+
} else if err == io.EOF {
55+
if rc, ok := r.(io.Closer); ok {
56+
_ = rc.Close()
57+
}
58+
mr.readers = mr.readers[1:]
59+
}
60+
if n > 0 || err != io.EOF {
61+
if err == io.EOF && len(mr.readers) > 0 {
62+
// Don't return EOF yet. More readers remain.
63+
err = nil
64+
}
65+
return
66+
}
67+
}
68+
return 0, io.EOF
69+
}
70+
71+
func (mr *MultiReaderCloser) WriteTo(w io.Writer) (sum int64, err error) {
72+
return mr.writeToWithBuffer(w, make([]byte, 1024*32))
73+
}
74+
75+
func (mr *MultiReaderCloser) writeToWithBuffer(w io.Writer, buf []byte) (sum int64, err error) {
76+
var n int64
77+
for i, r := range mr.readers {
78+
n, err = io.CopyBuffer(w, r, buf)
79+
sum += n
80+
if err != nil {
81+
mr.readers = mr.readers[i:] // permit resume / retry after error
82+
return sum, err
83+
}
84+
mr.readers[i] = nil // permit early GC
85+
}
86+
mr.readers = nil
87+
return sum, nil
88+
}
89+
90+
// Close implements io.Closer.
91+
func (mr *MultiReaderCloser) Close() error {
92+
for _, r := range mr.readers {
93+
if rc, ok := r.(io.Closer); ok {
94+
_ = rc.Close()
95+
}
96+
}
97+
mr.readers = mr.readers[:0]
98+
return nil
99+
}

0 commit comments

Comments
 (0)