Skip to content

Commit aa40fe9

Browse files
authored
Merge pull request #206 from m-lab/break
Fix break bug.
2 parents 970dc23 + 81a5c2a commit aa40fe9

File tree

4 files changed

+52
-16
lines changed

4 files changed

+52
-16
lines changed

storage/storage.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ type TarReader interface {
3434
}
3535

3636
type ETLSource struct {
37-
TarReader // TarReader interface provided by an embedded struct.
38-
io.Closer // Closer interface to be provided by an embedded struct.
37+
TarReader // TarReader interface provided by an embedded struct.
38+
io.Closer // Closer interface to be provided by an embedded struct.
39+
RetryBaseTime time.Duration // The base time for backoff and retry.
3940
}
4041

4142
// Retrieve next file header.
@@ -121,10 +122,11 @@ func (rr *ETLSource) NextTest(maxSize int64) (string, []byte, error) {
121122
var data []byte
122123
var h *tar.Header
123124

124-
// Last trial will be after total delay of 16ms + 32ms + ... + 8192ms,
125-
// or about 15 seconds.
125+
// With default RetryBaseTime, the last trial will be after total delay of
126+
// 16ms + 32ms + ... + 8192ms, or about 15 seconds.
127+
// TODO - should add a random element to the backoff?
126128
trial := 0
127-
delay := 16 * time.Millisecond
129+
delay := rr.RetryBaseTime
128130
for {
129131
trial++
130132
var retry bool
@@ -150,7 +152,7 @@ func (rr *ETLSource) NextTest(maxSize int64) (string, []byte, error) {
150152
}
151153

152154
trial = 0
153-
delay = 16 * time.Millisecond
155+
delay = rr.RetryBaseTime
154156
for {
155157
trial++
156158
var retry bool
@@ -236,7 +238,8 @@ func NewETLSource(client *http.Client, uri string) (*ETLSource, error) {
236238
}
237239
tarReader := tar.NewReader(rdr)
238240

239-
return &ETLSource{tarReader, closer}, nil
241+
timeout := 16 * time.Millisecond
242+
return &ETLSource{tarReader, closer, timeout}, nil
240243
}
241244

242245
// Create a storage reader client.

storage/storage_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestNewTarReader(t *testing.T) {
2323
defer src.Close()
2424

2525
count := 0
26-
for _, _, err := src.NextTest(); err != io.EOF; _, _, err = src.NextTest() {
26+
for _, _, err := src.NextTest(10000000); err != io.EOF; _, _, err = src.NextTest(10000000) {
2727
if err != nil {
2828
t.Fatal(err)
2929
}
@@ -42,7 +42,7 @@ func TestNewTarReaderGzip(t *testing.T) {
4242
defer src.Close()
4343

4444
count := 0
45-
for _, _, err := src.NextTest(); err != io.EOF; _, _, err = src.NextTest() {
45+
for _, _, err := src.NextTest(10000000); err != io.EOF; _, _, err = src.NextTest(10000000) {
4646
if err != nil {
4747
t.Fatal(err)
4848
}

task/task.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Task struct {
3030
etl.Parser // Parser to parse the tests.
3131

3232
meta map[string]bigquery.Value // Metadata about this task.
33-
maxFileSize int64 // Max file size to avoid OOM.
33+
maxFileSize int64 // Max file size to avoid OOM.
3434
}
3535

3636
// NewTask constructs a task, injecting the source and the parser.
@@ -61,19 +61,20 @@ func (tt *Task) ProcessAllTests() (int, error) {
6161
var err error
6262
// Read each file from the tar
6363

64+
OUTER:
6465
for testname, data, err = tt.NextTest(tt.maxFileSize); err != io.EOF; testname, data, err = tt.NextTest(tt.maxFileSize) {
6566
files++
6667
if err != nil {
6768
switch {
6869
case err == io.EOF:
69-
break
70+
break OUTER
7071
case err == storage.OVERSIZE_FILE:
7172
log.Printf("filename:%s testname:%s files:%d, duration:%v err:%v",
7273
tt.meta["filename"], testname, files,
7374
time.Since(tt.meta["parse_time"].(time.Time)), err)
7475
metrics.TestCount.WithLabelValues(
7576
tt.Parser.TableName(), "unknown", "oversize file").Inc()
76-
continue
77+
continue OUTER
7778
default:
7879
// We are seeing several of these per hour, a little more than
7980
// one in one thousand files. duration varies from 10 seconds
@@ -93,7 +94,7 @@ func (tt *Task) ProcessAllTests() (int, error) {
9394
tt.Parser.TableName(), "unknown", "unrecovered").Inc()
9495
// Since we don't understand these errors, safest thing to do is
9596
// stop processing the tar file (and task).
96-
break
97+
break OUTER
9798
}
9899
}
99100
if data == nil {

task/task_test.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ package task_test
55
import (
66
"archive/tar"
77
"bytes"
8+
"errors"
89
"fmt"
910
"reflect"
1011
"testing"
1112

1213
"cloud.google.com/go/bigquery"
1314

15+
"time"
16+
1417
"github.com/m-lab/etl/parser"
1518
"github.com/m-lab/etl/storage" // TODO - would be better not to have this.
1619
"github.com/m-lab/etl/task"
@@ -59,7 +62,7 @@ func MakeTestSource(t *testing.T) *storage.ETLSource {
5962
t.Fatal(err)
6063
}
6164

62-
return &storage.ETLSource{tar.NewReader(b), NullCloser{}}
65+
return &storage.ETLSource{tar.NewReader(b), NullCloser{}, time.Millisecond}
6366
}
6467

6568
type TestParser struct {
@@ -83,8 +86,37 @@ func (tp *TestParser) ParseAndInsert(meta map[string]bigquery.Value, testName st
8386
return nil
8487
}
8588

86-
// TODO(dev) - add unit tests for tgz and tar.gz files
87-
// TODO(dev) - add good comments
89+
type badSource struct{}
90+
91+
func (bs *badSource) Next() (*tar.Header, error) {
92+
return nil, errors.New("Random Error")
93+
}
94+
func (bs *badSource) Read(b []byte) (int, error) {
95+
return 0, errors.New("Read error")
96+
}
97+
98+
// TODO - this test is very slow, because it triggers the backoff and retry mechanism.
99+
func TestBadTarFileInput(t *testing.T) {
100+
rdr := &storage.ETLSource{&badSource{}, NullCloser{}, time.Millisecond}
101+
102+
tp := &TestParser{}
103+
104+
// Among other things, this requires that tp implements etl.Parser.
105+
tt := task.NewTask("filename", rdr, tp)
106+
fc, err := tt.ProcessAllTests()
107+
if err.Error() != "Random Error" {
108+
t.Error("Expected Random Error, but got %v", err)
109+
}
110+
// Should see 1 files.
111+
if fc != 1 {
112+
t.Error("Expected 1 file: ", fc)
113+
}
114+
// ... but process none.
115+
if len(tp.files) != 0 {
116+
t.Error("Should have processed no files: ", len(tp.files))
117+
}
118+
}
119+
88120
func TestTarFileInput(t *testing.T) {
89121
rdr := MakeTestSource(t)
90122

0 commit comments

Comments
 (0)