Skip to content

Commit aacc0b3

Browse files
authored
Add sink.close (#898)
* add json annotations * improve error logging * add Sink.Close * travis log each outcome * improve comments on errors * return Put errors * disable manual GCS test * improve ProcessAll error handling comments * Add closer to task instead of parser * move row.NullCloser * update bindata
1 parent d2e99dc commit aacc0b3

File tree

15 files changed

+191
-60
lines changed

15 files changed

+191
-60
lines changed

.travis.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ script:
105105
- for module in $MODULES; do
106106
go test -v -coverpkg=$COVER_PKGS -coverprofile=${module//\//_}.cov github.com/m-lab/etl/$module ;
107107
EC=$[ $EC || $? ] ;
108+
echo "status $EC" ;
108109
done
109110
- echo "summary status $EC" ;
110111
- if [[ $EC != 0 ]]; then false; fi ;
@@ -119,6 +120,7 @@ script:
119120
for module in metrics ; do
120121
go test -v -coverpkg=$COVER_PKGS -coverprofile=${module//\//_}.cov github.com/m-lab/etl/$module -tags=integration ;
121122
EC=$[ $EC || $? ] ;
123+
echo "summary $EC" ;
122124
done ;
123125
echo "summary status $EC" ;
124126
if [[ $EC != 0 ]]; then false; fi ;

bq/insert.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,9 @@ func (in *sink) Commit(rows []interface{}, label string) (int, error) {
409409
return in.flushSlice(rows, label, label)
410410
}
411411

412+
// Close synchronizes on the tokens, and closes the backing file.
413+
func (in *sink) Close() error { return nil }
414+
412415
// flushSlice flushes a slice of rows to BigQuery.
413416
// It returns the number of rows successfully committed.
414417
// It is NOT threadsafe.
@@ -571,7 +574,7 @@ func (sf *bqSinkFactory) Get(
571574

572575
client, err := GetClient(pdt.Project)
573576
if err != nil {
574-
return nil, factory.NewError(path.DataType, "StorageClient",
577+
return nil, factory.NewError(path.DataType, "BQClient",
575578
http.StatusInternalServerError, err)
576579
}
577580

parser/annotation.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ func (ap *AnnotationParser) ParseAndInsert(meta map[string]bigquery.Value, testN
109109
metrics.RowSizeHistogram.WithLabelValues(ap.TableName()).Observe(float64(len(test)))
110110

111111
// Insert the row.
112-
ap.Base.Put(&row)
112+
if err = ap.Base.Put(&row); err != nil {
113+
return err
114+
}
113115

114116
// Count successful inserts.
115117
metrics.TestCount.WithLabelValues(ap.TableName(), "annotation", "ok").Inc()

parser/ndt5_result.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ func (dp *NDT5ResultParser) ParseAndInsert(meta map[string]bigquery.Value, testN
109109
metrics.RowSizeHistogram.WithLabelValues(
110110
dp.TableName()).Observe(float64(len(test)))
111111

112-
dp.Base.Put(&stats)
112+
if err = dp.Base.Put(&stats); err != nil {
113+
return err
114+
}
113115
// Count successful inserts.
114116
metrics.TestCount.WithLabelValues(dp.TableName(), "ndt5_result", "ok").Inc()
115117
}

parser/ndt7_result.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,10 @@ func (dp *NDT7ResultParser) ParseAndInsert(meta map[string]bigquery.Value, testN
109109
dp.TableName()).Observe(float64(len(test)))
110110

111111
// Insert the row.
112-
dp.Base.Put(&row)
113-
112+
err = dp.Base.Put(&row)
113+
if err != nil {
114+
return err
115+
}
114116
// Count successful inserts.
115117
metrics.TestCount.WithLabelValues(dp.TableName(), "ndt7_result", "ok").Inc()
116118
return nil

parser/tcpinfo.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,12 @@ func (p *TCPInfoParser) ParseAndInsert(fileMetadata map[string]bigquery.Value, t
190190
}
191191
}
192192

193-
p.Put(&row)
193+
// For GCS, errors here are generally fatal.
194+
// For BigQuery, errors here could be fatal, or could be due to quota exceeded.
195+
err = p.Put(&row)
196+
if err != nil {
197+
return err
198+
}
194199
metrics.TestCount.WithLabelValues(p.TableName(), "tcpinfo", "ok").Inc()
195200
return nil
196201
}

parser/tcpinfo_test.go

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"testing"
1313
"time"
1414

15+
"github.com/googleapis/google-cloud-go-testing/storage/stiface"
1516
v2 "github.com/m-lab/annotation-service/api/v2"
1617

1718
"cloud.google.com/go/bigquery"
@@ -94,6 +95,10 @@ func (in *inMemorySink) Commit(data []interface{}, label string) (int, error) {
9495
return len(data), nil
9596
}
9697

98+
func (in *inMemorySink) Close() error {
99+
return nil
100+
}
101+
97102
func (in *inMemorySink) Flush() error {
98103
in.committed = len(in.data)
99104
return nil
@@ -102,6 +107,10 @@ func (in *inMemorySink) Committed() int {
102107
return in.committed
103108
}
104109

110+
type nullCloser struct{}
111+
112+
func (nc nullCloser) Close() error { return nil }
113+
105114
// NOTE: This uses a fake annotator which returns no annotations.
106115
// TODO: This test seems to be flakey in travis - sometimes only 357 tests instead of 362
107116
func TestTCPParser(t *testing.T) {
@@ -118,7 +127,7 @@ func TestTCPParser(t *testing.T) {
118127
// Inject fake inserter and annotator
119128
ins := newInMemorySink()
120129
p := parser.NewTCPInfoParser(ins, "test", "_suffix", &fakeAnnotator{})
121-
task := task.NewTask(filename, src, p)
130+
task := task.NewTask(filename, src, p, nullCloser{})
122131

123132
startDecode := time.Now()
124133
n, err := task.ProcessAllTests()
@@ -228,7 +237,7 @@ func TestTCPTask(t *testing.T) {
228237
t.Fatal("Failed reading testdata from", filename)
229238
}
230239

231-
task := task.NewTask(filename, src, p)
240+
task := task.NewTask(filename, src, p, &nullCloser{})
232241

233242
n, err := task.ProcessAllTests()
234243
if err != nil {
@@ -253,7 +262,7 @@ func TestBQSaver(t *testing.T) {
253262
t.Fatal("Failed reading testdata from", filename)
254263
}
255264

256-
task := task.NewTask(filename, src, p)
265+
task := task.NewTask(filename, src, p, &nullCloser{})
257266

258267
_, err = task.ProcessAllTests()
259268
if err != nil {
@@ -272,6 +281,51 @@ func TestBQSaver(t *testing.T) {
272281
}
273282
}
274283

284+
// This test writes 364 rows to a json file in GCS.
285+
// The rows can then be loaded into a BQ table, using the schema in testdata, like:
286+
// bq load --source_format=NEWLINE_DELIMITED_JSON \
287+
// mlab-sandbox:gfr.small_tcpinfo gs://archive-mlab-testing/gfr/tcpinfo.json ./schema.json
288+
// Recommend commenting out snapshots in tcpinfo.go.
289+
func TestTaskToGCS(t *testing.T) {
290+
t.Skip("Skipping test intended for manual experimentation")
291+
292+
os.Setenv("RELEASE_TAG", "foobar")
293+
parser.InitParserVersionForTest()
294+
295+
c, err := storage.GetStorageClient(true)
296+
if err != nil {
297+
t.Fatal(err)
298+
}
299+
300+
rw, err := storage.NewRowWriter(context.Background(), stiface.AdaptClient(c), "archive-mlab-testing", "gfr/tcpinfo.json")
301+
if err != nil {
302+
t.Fatal(err)
303+
}
304+
// Inject fake inserter and annotator
305+
p := parser.NewTCPInfoParser(rw, "test", "suffix", &fakeAnnotator{})
306+
307+
filename := "testdata/20190516T013026.744845Z-tcpinfo-mlab4-arn02-ndt.tgz"
308+
src, err := fileSource(filename)
309+
if err != nil {
310+
t.Fatal("Failed reading testdata from", filename)
311+
}
312+
313+
task := task.NewTask(filename, src, p, &nullCloser{})
314+
315+
n, err := task.ProcessAllTests()
316+
if err != nil {
317+
t.Fatal(err)
318+
}
319+
err = rw.Close()
320+
if err != nil {
321+
t.Fatal(err)
322+
}
323+
324+
if n != 364 {
325+
t.Errorf("Expected ProcessAllTests to handle %d files, but it handled %d.\n", 364, n)
326+
}
327+
}
328+
275329
func BenchmarkTCPParser(b *testing.B) {
276330
os.Setenv("RELEASE_TAG", "foobar")
277331
parser.InitParserVersionForTest()
@@ -288,7 +342,7 @@ func BenchmarkTCPParser(b *testing.B) {
288342
b.Fatalf("cannot read testdata.")
289343
}
290344

291-
task := task.NewTask(filename, src, p)
345+
task := task.NewTask(filename, src, p, &nullCloser{})
292346

293347
n, err = task.ProcessAllTests()
294348
if err != nil {

row/row.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package row
66
import (
77
"context"
88
"errors"
9+
"io"
910
"log"
1011
"sync"
1112
"time"
@@ -109,6 +110,7 @@ type HasStats interface {
109110
// Implementations should be threadsafe.
110111
type Sink interface {
111112
Commit(rows []interface{}, label string) (int, error)
113+
io.Closer
112114
}
113115

114116
// Buffer provides all basic functionality generally needed for buffering, annotating, and inserting
@@ -313,9 +315,14 @@ func (pb *Base) TaskError() error {
313315
return nil
314316
}
315317

318+
var logAnnError = logx.NewLogEvery(nil, 60*time.Second)
319+
316320
func (pb *Base) commit(rows []interface{}) error {
317-
// TODO - care about error?
318-
_ = pb.ann.Annotate(rows, pb.label)
321+
err := pb.ann.Annotate(rows, pb.label)
322+
if err != nil {
323+
logAnnError.Println("annotation: ", err)
324+
}
325+
319326
// TODO do we need these to be done in order.
320327
// This is synchronous, blocking, and thread safe.
321328
done, err := pb.sink.Commit(rows, pb.label)
@@ -342,7 +349,7 @@ func (pb *Base) Flush() error {
342349
// sequential calls to Put. However, once a block of rows is submitted
343350
// to pb.commit, it should be written in the same order to the Sink.
344351
// TODO improve Annotatable architecture.
345-
func (pb *Base) Put(row Annotatable) {
352+
func (pb *Base) Put(row Annotatable) error {
346353
rows := pb.buf.Append(row)
347354
pb.stats.Inc()
348355

@@ -351,9 +358,11 @@ func (pb *Base) Put(row Annotatable) {
351358
// TODO consider making this asynchronous.
352359
err := pb.commit(rows)
353360
if err != nil {
354-
log.Println(err)
361+
log.Println(pb.label, err)
362+
return err
355363
}
356364
}
365+
return nil
357366
}
358367

359368
// NullAnnotator satisfies the Annotatable interface without actually doing

row/row_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ func (in *inMemorySink) Commit(data []interface{}, label string) (int, error) {
7272
return len(data), nil
7373
}
7474

75+
func (in *inMemorySink) Close() error { return nil }
76+
7577
func TestBase(t *testing.T) {
7678
ins := &inMemorySink{}
7779

schema/annotation.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ import (
1313
// AnnotationRow defines the BQ schema using 'Standard Columns' conventions for
1414
// the annotation datatype produced by the uuid-annotator.
1515
type AnnotationRow struct {
16-
UUID string `bigquery:"id"` // NOTE: there is no 'a' record for AnnotationRows.
17-
Server annotator.ServerAnnotations `bigquery:"server"`
18-
Client annotator.ClientAnnotations `bigquery:"client"`
19-
Parser ParseInfo `bigquery:"parser"`
20-
Date civil.Date `bigquery:"date"`
16+
UUID string `bigquery:"id" json:"id"` // NOTE: there is no 'a' record for AnnotationRows.
17+
Server annotator.ServerAnnotations `bigquery:"server" json:"server"`
18+
Client annotator.ClientAnnotations `bigquery:"client" json:"client"`
19+
Parser ParseInfo `bigquery:"parser" json:"parser"`
20+
Date civil.Date `bigquery:"date" json:"date"`
2121

2222
// NOTE: there is no 'Raw' field for annotation datatypes because the
2323
// uuid-annotator output schema was designed to be used directly by the parser.

0 commit comments

Comments
 (0)