Skip to content

Commit 7f90b27

Browse files
authored
Add ReadCloser input. (#8)
* Add ReadCloser input. * Update go version.
1 parent d2a445e commit 7f90b27

File tree

3 files changed

+139
-2
lines changed

3 files changed

+139
-2
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ os:
77
- osx
88

99
go:
10-
- 1.5.x
1110
- 1.6.x
1211
- 1.7.x
1312
- 1.8.x
13+
- 1.9.x
1414
- master
1515

1616
script:

reader.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
type reader struct {
2222
in io.Reader // Input reader
23+
closer io.Closer // Optional closer
2324
ready chan *buffer // Buffers ready to be handed to the reader
2425
reuse chan *buffer // Buffers to reuse for input reading
2526
exit chan struct{} // Closes when finished
@@ -51,7 +52,30 @@ func NewReader(rd io.Reader) io.ReadCloser {
5152
return ret
5253
}
5354

54-
// NewSize returns a reader with a custom number of buffers and size.
55+
// New returns a reader that will asynchronously read from
56+
// the supplied reader into 4 buffers of 1MB each.
57+
//
58+
// It will start reading from the input at once, maybe even before this
59+
// function has returned.
60+
//
61+
// The input can be read from the returned reader.
62+
// When done use Close() to release the buffers,
63+
// which will also close the supplied closer.
64+
func NewReadCloser(rd io.ReadCloser) io.ReadCloser {
65+
if rd == nil {
66+
return nil
67+
}
68+
69+
ret, err := NewReadCloserSize(rd, 4, 1<<20)
70+
71+
// Should not be possible to trigger from other packages.
72+
if err != nil {
73+
panic("unexpected error:" + err.Error())
74+
}
75+
return ret
76+
}
77+
78+
// NewReaderSize returns a reader with a custom number of buffers and size.
5579
// buffers is the number of queued buffers and size is the size of each
5680
// buffer in bytes.
5781
func NewReaderSize(rd io.Reader, buffers, size int) (io.ReadCloser, error) {
@@ -69,6 +93,24 @@ func NewReaderSize(rd io.Reader, buffers, size int) (io.ReadCloser, error) {
6993
return a, nil
7094
}
7195

96+
// NewReadCloserSize returns a reader with a custom number of buffers and size.
97+
// buffers is the number of queued buffers and size is the size of each
98+
// buffer in bytes.
99+
func NewReadCloserSize(rc io.ReadCloser, buffers, size int) (io.ReadCloser, error) {
100+
if size <= 0 {
101+
return nil, fmt.Errorf("buffer size too small")
102+
}
103+
if buffers <= 0 {
104+
return nil, fmt.Errorf("number of buffers too small")
105+
}
106+
if rc == nil {
107+
return nil, fmt.Errorf("nil input reader supplied")
108+
}
109+
a := &reader{closer: rc}
110+
a.init(rc, buffers, size)
111+
return a, nil
112+
}
113+
72114
// initialize the reader
73115
func (a *reader) init(rd io.Reader, buffers, size int) {
74116
a.in = rd
@@ -177,6 +219,12 @@ func (a *reader) Close() (err error) {
177219
case a.exit <- struct{}{}:
178220
<-a.exited
179221
}
222+
if a.closer != nil {
223+
// Only call once
224+
c := a.closer
225+
a.closer = nil
226+
return c.Close()
227+
}
180228
return nil
181229
}
182230

reader_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,95 @@ func TestReader(t *testing.T) {
6565
}
6666
}
6767

68+
type testCloser struct {
69+
io.Reader
70+
closed int
71+
onClose error
72+
}
73+
74+
func (t *testCloser) Close() error {
75+
t.closed++
76+
return t.onClose
77+
}
78+
79+
func TestReadCloser(t *testing.T) {
80+
buf := bytes.NewBufferString("Testbuffer")
81+
cl := &testCloser{Reader: buf}
82+
ar, err := readahead.NewReadCloserSize(cl, 4, 10000)
83+
if err != nil {
84+
t.Fatal("error when creating:", err)
85+
}
86+
87+
var dst = make([]byte, 100)
88+
n, err := ar.Read(dst)
89+
if err != nil {
90+
t.Fatal("error when reading:", err)
91+
}
92+
if n != 10 {
93+
t.Fatal("unexpected length, expected 10, got ", n)
94+
}
95+
96+
n, err = ar.Read(dst)
97+
if err != io.EOF {
98+
t.Fatal("expected io.EOF, got", err)
99+
}
100+
if n != 0 {
101+
t.Fatal("unexpected length, expected 0, got ", n)
102+
}
103+
104+
// Test read after error
105+
n, err = ar.Read(dst)
106+
if err != io.EOF {
107+
t.Fatal("expected io.EOF, got", err)
108+
}
109+
if n != 0 {
110+
t.Fatal("unexpected length, expected 0, got ", n)
111+
}
112+
113+
err = ar.Close()
114+
if err != nil {
115+
t.Fatal("error when closing:", err)
116+
}
117+
if cl.closed != 1 {
118+
t.Fatal("want close count 1, got:", cl.closed)
119+
}
120+
// Test Close without reading everything
121+
buf = bytes.NewBuffer(make([]byte, 50000))
122+
cl = &testCloser{Reader: buf}
123+
ar = readahead.NewReadCloser(cl)
124+
err = ar.Close()
125+
if err != nil {
126+
t.Fatal("error when closing:", err)
127+
}
128+
if cl.closed != 1 {
129+
t.Fatal("want close count 1, got:", cl.closed)
130+
}
131+
// Test error forwarding
132+
cl = &testCloser{Reader: buf, onClose: errors.New("an error")}
133+
ar = readahead.NewReadCloser(cl)
134+
err = ar.Close()
135+
if err != cl.onClose {
136+
t.Fatal("want error when closing, got", err)
137+
}
138+
if cl.closed != 1 {
139+
t.Fatal("want close count 1, got:", cl.closed)
140+
}
141+
// Test multiple closes
142+
cl = &testCloser{Reader: buf}
143+
ar = readahead.NewReadCloser(cl)
144+
err = ar.Close()
145+
if err != nil {
146+
t.Fatal("error when closing:", err)
147+
}
148+
err = ar.Close()
149+
if err != nil {
150+
t.Fatal("error when closing:", err)
151+
}
152+
if cl.closed != 1 {
153+
t.Fatal("want close count 1, got:", cl.closed)
154+
}
155+
}
156+
68157
func TestWriteTo(t *testing.T) {
69158
buf := bytes.NewBufferString("Testbuffer")
70159
ar, err := readahead.NewReaderSize(buf, 4, 10000)

0 commit comments

Comments
 (0)