Skip to content

Commit 97975e0

Browse files
authored
Merge pull request #195 from m-lab/max-size
Add max file size, and clean up error handling
2 parents 60f655d + 435495c commit 97975e0

File tree

3 files changed

+111
-48
lines changed

3 files changed

+111
-48
lines changed

storage/storage.go

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
storage "google.golang.org/api/storage/v1"
2727
)
2828

29+
var OVERSIZE_FILE = errors.New("Oversize file")
30+
2931
type TarReader interface {
3032
Next() (*tar.Header, error)
3133
Read(b []byte) (int, error)
@@ -106,8 +108,10 @@ func (rr *ETLSource) nextData(h *tar.Header, trial int) ([]byte, bool, error) {
106108
}
107109

108110
// Next reads the next test object from the tar file.
111+
// Skips reading contents of any file larger than maxSize, returning empty data
112+
// and storage.OVERSIZE_FILE error.
109113
// Returns io.EOF when there are no more tests.
110-
func (rr *ETLSource) NextTest() (string, []byte, error) {
114+
func (rr *ETLSource) NextTest(maxSize int64) (string, []byte, error) {
111115
metrics.WorkerState.WithLabelValues("read").Inc()
112116
defer metrics.WorkerState.WithLabelValues("read").Dec()
113117

@@ -136,28 +140,33 @@ func (rr *ETLSource) NextTest() (string, []byte, error) {
136140
time.Sleep(delay)
137141
}
138142

143+
if h.Size > maxSize {
144+
return h.Name, data, OVERSIZE_FILE
145+
}
146+
139147
// Only process regular files.
140-
if h.Typeflag == tar.TypeReg {
141-
trial = 0
142-
delay = 16 * time.Millisecond
143-
for {
144-
trial++
145-
var retry bool
146-
data, retry, err = rr.nextData(h, trial)
147-
if err == nil {
148-
break
149-
}
150-
if !retry || trial >= 10 {
151-
// FYI, it appears that stream errors start in the
152-
// nextData phase of reading, but then persist on
153-
// the next call to nextHeader.
154-
break
155-
}
156-
// For each trial, increase backoff delay by 2x.
157-
delay *= 2
158-
time.Sleep(delay)
148+
if h.Typeflag != tar.TypeReg {
149+
return h.Name, data, nil
150+
}
159151

152+
trial = 0
153+
delay = 16 * time.Millisecond
154+
for {
155+
trial++
156+
var retry bool
157+
data, retry, err = rr.nextData(h, trial)
158+
if err == nil {
159+
break
160+
}
161+
if !retry || trial >= 10 {
162+
// FYI, it appears that stream errors start in the
163+
// nextData phase of reading, but then persist on
164+
// the next call to nextHeader.
165+
break
160166
}
167+
// For each trial, increase backoff delay by 2x.
168+
delay *= 2
169+
time.Sleep(delay)
161170
}
162171

163172
return h.Name, data, nil

task/task.go

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@ import (
1717
"github.com/m-lab/etl/storage"
1818
)
1919

20+
// Impose 200MiB max size for a single file. Larger than this risks an OOM if there are
21+
// multiple large files at on multiple tasks.
22+
// This can be overridden with SetMaxFileSize()
23+
const MAX_FILE_SIZE = 20 * 1024 * 1024
24+
2025
// TODO(dev) Add unit tests for meta data.
2126
type Task struct {
2227
// ETLSource and Parser are both embedded, so their interfaces are delegated
2328
// to the component structs.
2429
*storage.ETLSource // Source from which to read tests.
2530
etl.Parser // Parser to parse the tests.
2631

27-
meta map[string]bigquery.Value // Metadata about this task.
32+
meta map[string]bigquery.Value // Metadata about this task.
33+
maxFileSize int64 // Max file size to avoid OOM.
2834
}
2935

3036
// NewTask constructs a task, injecting the source and the parser.
@@ -34,10 +40,14 @@ func NewTask(filename string, src *storage.ETLSource, prsr etl.Parser) *Task {
3440
meta["filename"] = filename
3541
meta["parse_time"] = time.Now()
3642
meta["attempt"] = 1
37-
t := Task{src, prsr, meta}
43+
t := Task{src, prsr, meta, MAX_FILE_SIZE}
3844
return &t
3945
}
4046

47+
func (tt *Task) SetMaxFileSize(max int64) {
48+
tt.maxFileSize = max
49+
}
50+
4151
// ProcessAllTests loops through all the tests in a tar file, calls the
4252
// injected parser to parse them, and inserts them into bigquery. Returns the
4353
// number of files processed.
@@ -46,28 +56,45 @@ func (tt *Task) ProcessAllTests() (int, error) {
4656
defer metrics.WorkerState.WithLabelValues("task").Dec()
4757
files := 0
4858
nilData := 0
59+
var testname string
60+
var data []byte
61+
var err error
4962
// Read each file from the tar
50-
for testname, data, err := tt.NextTest(); err != io.EOF; testname, data, err = tt.NextTest() {
63+
64+
for testname, data, err = tt.NextTest(tt.maxFileSize); err != io.EOF; testname, data, err = tt.NextTest(tt.maxFileSize) {
5165
files++
5266
if err != nil {
53-
if err == io.EOF {
67+
switch {
68+
case err == io.EOF:
5469
break
55-
}
56-
// We are seeing several of these per hour, a little more than
57-
// one in one thousand files. duration varies from 10 seconds up to several
58-
// minutes.
59-
// Example:
60-
// filename:gs://m-lab-sandbox/ndt/2016/04/10/20160410T000000Z-mlab1-ord02-ndt-0002.tgz
61-
// files:666 duration:1m47.571825351s
62-
// err:stream error: stream ID 801; INTERNAL_ERROR
63-
// Because of the break, this error is passed up, and counted at the Task level.
64-
log.Printf("filename:%s testname:%s files:%d, duration:%v err:%v",
65-
tt.meta["filename"], testname, files,
66-
time.Since(tt.meta["parse_time"].(time.Time)), err)
70+
case err == storage.OVERSIZE_FILE:
71+
log.Printf("filename:%s testname:%s files:%d, duration:%v err:%v",
72+
tt.meta["filename"], testname, files,
73+
time.Since(tt.meta["parse_time"].(time.Time)), err)
74+
metrics.TestCount.WithLabelValues(
75+
tt.Parser.TableName(), "unknown", "oversize file").Inc()
76+
continue
77+
default:
78+
// We are seeing several of these per hour, a little more than
79+
// one in one thousand files. duration varies from 10 seconds
80+
// up to several minutes.
81+
// Example:
82+
// filename:
83+
// gs://m-lab-sandbox/ndt/2016/04/10/20160410T000000Z-mlab1-ord02-ndt-0002.tgz
84+
// files:666 duration:1m47.571825351s
85+
// err:stream error: stream ID 801; INTERNAL_ERROR
86+
// Because of the break, this error is passed up, and counted at
87+
// the Task level.
88+
log.Printf("filename:%s testname:%s files:%d, duration:%v err:%v",
89+
tt.meta["filename"], testname, files,
90+
time.Since(tt.meta["parse_time"].(time.Time)), err)
6791

68-
metrics.TestCount.WithLabelValues(
69-
tt.Parser.TableName(), "unknown", "unrecovered").Inc()
70-
break
92+
metrics.TestCount.WithLabelValues(
93+
tt.Parser.TableName(), "unknown", "unrecovered").Inc()
94+
// Since we don't understand these errors, safest thing to do is
95+
// stop processing the tar file (and task).
96+
break
97+
}
7198
}
7299
if data == nil {
73100
// TODO(dev) Handle directories (expected) and other
@@ -89,14 +116,18 @@ func (tt *Task) ProcessAllTests() (int, error) {
89116
}
90117

91118
// Flush any rows cached in the inserter.
92-
err := tt.Flush()
119+
flushErr := tt.Flush()
93120

94-
if err != nil {
95-
log.Printf("%v", err)
121+
if flushErr != nil {
122+
log.Printf("%v", flushErr)
96123
}
97124
// TODO - make this debug or remove
98125
log.Printf("Processed %d files, %d nil data, %d rows committed, %d failed, from %s into %s",
99126
files, nilData, tt.Parser.Committed(), tt.Parser.Failed(),
100127
tt.meta["filename"], tt.Parser.FullTableName())
101-
return files, err
128+
// Return the file count, and the terminal error, if other than EOF.
129+
if err != io.EOF {
130+
return files, err
131+
}
132+
return files, nil
102133
}

task/task_test.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ func MakeTestSource(t *testing.T) *storage.ETLSource {
4444
t.Fatal(err)
4545
}
4646

47+
// Put a large file in the middle to test skipping.
48+
hdr = tar.Header{Name: "big_file", Mode: 0666, Typeflag: tar.TypeReg, Size: int64(101)}
49+
tw.WriteHeader(&hdr)
50+
_, err = tw.Write(make([]byte, 101))
51+
if err != nil {
52+
t.Fatal(err)
53+
}
54+
4755
hdr = tar.Header{Name: "bar", Mode: 0666, Typeflag: tar.TypeReg, Size: int64(11)}
4856
tw.WriteHeader(&hdr)
4957
_, err = tw.Write([]byte("butter milk"))
@@ -84,7 +92,7 @@ func TestTarFileInput(t *testing.T) {
8492

8593
// Among other things, this requires that tp implements etl.Parser.
8694
tt := task.NewTask("filename", rdr, tp)
87-
fn, bb, err := tt.NextTest()
95+
fn, bb, err := tt.NextTest(100)
8896
if err != nil {
8997
t.Error(err)
9098
}
@@ -95,7 +103,19 @@ func TestTarFileInput(t *testing.T) {
95103
t.Error("Expected biscuits but got ", string(bb))
96104
}
97105

98-
fn, bb, err = tt.NextTest()
106+
// Here we expect an oversize file error, with filename = big_file.
107+
fn, bb, err = tt.NextTest(100)
108+
if fn != "big_file" {
109+
t.Error("Expected big_file: " + fn)
110+
}
111+
if err == nil {
112+
t.Error("Expected oversize file")
113+
} else if err != storage.OVERSIZE_FILE {
114+
t.Error("Expected oversize file but got: " + err.Error())
115+
}
116+
117+
// This is the last file, so we expect EOF.
118+
fn, bb, err = tt.NextTest(100)
99119
if err != nil {
100120
t.Error(err)
101121
}
@@ -110,15 +130,18 @@ func TestTarFileInput(t *testing.T) {
110130
rdr = MakeTestSource(t)
111131

112132
tt = task.NewTask("filename", rdr, tp)
133+
tt.SetMaxFileSize(100)
113134
fc, err := tt.ProcessAllTests()
114135
if err != nil {
115136
t.Error("Expected nil error, but got %v", err)
116137
}
117-
if fc != len(tp.files) {
118-
t.Error("Number of files counted (%s) does not match files parsed", fc, len(tp.files))
138+
// Should see 3 files.
139+
if fc != 3 {
140+
t.Error("Expected 3 files: ", fc)
119141
}
142+
// ... but process only two.
120143
if len(tp.files) != 2 {
121-
t.Error("Too few files ", len(tp.files))
144+
t.Error("Should have processed two files: ", len(tp.files))
122145
}
123146
if !reflect.DeepEqual(tp.files, []string{"foo", "bar"}) {
124147
t.Error("Not expected files: ", tp.files)

0 commit comments

Comments
 (0)