Skip to content

Commit f4993ea

Browse files
Add etl.Metadata for passing task metadata to parsers (#1117)
* Add etl.Metadata structure and change Parser interface
1 parent 1e85a4f commit f4993ea

19 files changed

+137
-130
lines changed

etl/etl.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"errors"
77
"time"
88

9-
"cloud.google.com/go/bigquery"
109
"cloud.google.com/go/civil"
1110
)
1211

@@ -22,7 +21,8 @@ type ProcessingError interface {
2221
// RowStats interface defines some useful Inserter stats that will also be
2322
// implemented by Parser.
2423
// RowStats implementations should provide the invariants:
25-
// Accepted == Failed + Committed + RowsInBuffer
24+
//
25+
// Accepted == Failed + Committed + RowsInBuffer
2626
type RowStats interface {
2727
// RowsInBuffer returns the count of rows currently in the buffer.
2828
RowsInBuffer() int
@@ -36,7 +36,8 @@ type RowStats interface {
3636

3737
// Inserter is a data sink that writes to BigQuery tables.
3838
// Inserters should provide the invariants:
39-
// After Flush() returns, RowsInBuffer == 0
39+
//
40+
// After Flush() returns, RowsInBuffer == 0
4041
type Inserter interface {
4142
// Put synchronously sends a slice of rows to BigQuery
4243
// This is THREADSAFE
@@ -102,6 +103,15 @@ type InserterParams struct {
102103
MaxRetryDelay time.Duration // Maximum backoff time for Put retries.
103104
}
104105

106+
// Metadata provides metadata about the parser and archive files.
107+
type Metadata struct {
108+
Version string
109+
ArchiveURL string
110+
GitCommit string
111+
Date civil.Date
112+
Start time.Time
113+
}
114+
105115
// ErrHighInsertionFailureRate should be returned by TaskError when there are more than 10% BQ insertion errors.
106116
var ErrHighInsertionFailureRate = errors.New("too many insertion failures")
107117

@@ -116,7 +126,7 @@ type Parser interface {
116126
// meta - metadata, e.g. from the original tar file name.
117127
// testName - Name of test file (typically extracted from a tar file)
118128
// test - binary test data
119-
ParseAndInsert(meta map[string]bigquery.Value, testName string, test []byte) error
129+
ParseAndInsert(meta Metadata, testName string, test []byte) error
120130

121131
// Flush flushes any pending rows.
122132
Flush() error

parser/annotation2.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ import (
66
"strings"
77
"time"
88

9-
"cloud.google.com/go/bigquery"
10-
11-
"cloud.google.com/go/civil"
129
"github.com/m-lab/etl/etl"
1310
"github.com/m-lab/etl/metrics"
1411
"github.com/m-lab/etl/row"
@@ -60,17 +57,17 @@ func (ap *Annotation2Parser) IsParsable(testName string, data []byte) (string, b
6057
}
6158

6259
// ParseAndInsert decodes the data.Annotation2 JSON and inserts it into BQ.
63-
func (ap *Annotation2Parser) ParseAndInsert(meta map[string]bigquery.Value, testName string, test []byte) error {
60+
func (ap *Annotation2Parser) ParseAndInsert(meta etl.Metadata, testName string, test []byte) error {
6461
metrics.WorkerState.WithLabelValues(ap.TableName(), "annotation2").Inc()
6562
defer metrics.WorkerState.WithLabelValues(ap.TableName(), "annotation2").Dec()
6663

6764
row := schema.Annotation2Row{
6865
Parser: schema.ParseInfo{
69-
Version: Version(),
66+
Version: meta.Version,
7067
Time: time.Now(),
71-
ArchiveURL: meta["filename"].(string),
68+
ArchiveURL: meta.ArchiveURL,
7269
Filename: testName,
73-
GitCommit: GitCommit(),
70+
GitCommit: meta.GitCommit,
7471
},
7572
}
7673

@@ -105,7 +102,7 @@ func (ap *Annotation2Parser) ParseAndInsert(meta map[string]bigquery.Value, test
105102
// the given timestamp, regardless of the timestamp's timezone. Since we
106103
// run our systems in UTC, all timestamps will be relative to UTC and as
107104
// will these dates.
108-
row.Date = meta["date"].(civil.Date)
105+
row.Date = meta.Date
109106

110107
// Estimate the row size based on the input JSON size.
111108
metrics.RowSizeHistogram.WithLabelValues(ap.TableName()).Observe(float64(len(test)))

parser/annotation2_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import (
55
"strings"
66
"testing"
77

8-
"cloud.google.com/go/bigquery"
8+
"github.com/m-lab/etl/etl"
9+
910
"cloud.google.com/go/civil"
1011
"github.com/go-test/deep"
1112
"github.com/m-lab/etl/parser"
@@ -45,9 +46,11 @@ func TestAnnotation2Parser_ParseAndInsert(t *testing.T) {
4546
t.Fatal("IsParsable() failed; got false, want true")
4647
}
4748

48-
meta := map[string]bigquery.Value{
49-
"filename": "gs://mlab-test-bucket/ndt/ndt7/2020/03/18/" + tt.file,
50-
"date": civil.Date{Year: 2020, Month: 3, Day: 18},
49+
meta := etl.Metadata{
50+
ArchiveURL: "gs://mlab-test-bucket/ndt/ndt7/2020/03/18/" + tt.file,
51+
Date: civil.Date{Year: 2020, Month: 3, Day: 18},
52+
Version: parser.Version(),
53+
GitCommit: parser.GitCommit(),
5154
}
5255

5356
if err := n.ParseAndInsert(meta, tt.file, data); (err != nil) != tt.wantErr {

parser/hopannotation2.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"strings"
66
"time"
77

8-
"cloud.google.com/go/bigquery"
9-
"cloud.google.com/go/civil"
108
"github.com/m-lab/etl/etl"
119
"github.com/m-lab/etl/metrics"
1210
"github.com/m-lab/etl/row"
@@ -44,17 +42,17 @@ func (p *HopAnnotation2Parser) IsParsable(testName string, data []byte) (string,
4442
}
4543

4644
// ParseAndInsert decodes the HopAnnotation2 data and inserts it into BQ.
47-
func (p *HopAnnotation2Parser) ParseAndInsert(fileMetadata map[string]bigquery.Value, testName string, rawContent []byte) error {
45+
func (p *HopAnnotation2Parser) ParseAndInsert(meta etl.Metadata, testName string, rawContent []byte) error {
4846
metrics.WorkerState.WithLabelValues(p.TableName(), "hopannotation2").Inc()
4947
defer metrics.WorkerState.WithLabelValues(p.TableName(), "hopannotation2").Dec()
5048

5149
row := schema.HopAnnotation2Row{
5250
Parser: schema.ParseInfo{
53-
Version: Version(),
51+
Version: meta.Version,
5452
Time: time.Now(),
55-
ArchiveURL: fileMetadata["filename"].(string),
53+
ArchiveURL: meta.ArchiveURL,
5654
Filename: testName,
57-
GitCommit: GitCommit(),
55+
GitCommit: meta.GitCommit,
5856
},
5957
}
6058

@@ -73,7 +71,7 @@ func (p *HopAnnotation2Parser) ParseAndInsert(fileMetadata map[string]bigquery.V
7371
// the given timestamp, regardless of the timestamp's timezone. Since we
7472
// run our systems in UTC, all timestamps will be relative to UTC and as
7573
// will these dates.
76-
row.Date = fileMetadata["date"].(civil.Date)
74+
row.Date = meta.Date
7775

7876
// Estimate the row size based on the input JSON size.
7977
metrics.RowSizeHistogram.WithLabelValues(p.TableName()).Observe(float64(len(rawContent)))

parser/hopannotation2_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import (
77
"testing"
88
"time"
99

10-
"cloud.google.com/go/bigquery"
1110
"cloud.google.com/go/civil"
1211
"github.com/go-test/deep"
12+
"github.com/m-lab/etl/etl"
1313
"github.com/m-lab/etl/parser"
1414
"github.com/m-lab/etl/schema"
1515
"github.com/m-lab/go/rtx"
@@ -31,9 +31,11 @@ func TestHopAnnotation2Parser_ParseAndInsert(t *testing.T) {
3131

3232
date := civil.Date{Year: 2021, Month: 07, Day: 30}
3333

34-
meta := map[string]bigquery.Value{
35-
"filename": path.Join(hopAnnotation2GCSPath, hopAnnotation2Filename),
36-
"date": date,
34+
meta := etl.Metadata{
35+
ArchiveURL: path.Join(hopAnnotation2GCSPath, hopAnnotation2Filename),
36+
Date: date,
37+
Version: parser.Version(),
38+
GitCommit: parser.GitCommit(),
3739
}
3840

3941
if err := n.ParseAndInsert(meta, hopAnnotation2Filename, data); err != nil {

parser/ndt5_result.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"strings"
1010
"time"
1111

12-
"cloud.google.com/go/bigquery"
1312
"cloud.google.com/go/civil"
1413

1514
"github.com/m-lab/etl/etl"
@@ -66,7 +65,7 @@ func (dp *NDT5ResultParser) IsParsable(testName string, data []byte) (string, bo
6665
// backend and to eventually rely on the schema inference in m-lab/go/cloud/bqx.CreateTable().
6766

6867
// ParseAndInsert decodes the data.NDT5Result JSON and inserts it into BQ.
69-
func (dp *NDT5ResultParser) ParseAndInsert(meta map[string]bigquery.Value, testName string, test []byte) error {
68+
func (dp *NDT5ResultParser) ParseAndInsert(meta etl.Metadata, testName string, test []byte) error {
7069
metrics.WorkerState.WithLabelValues(dp.TableName(), "ndt5_result").Inc()
7170
defer metrics.WorkerState.WithLabelValues(dp.TableName(), "ndt5_result").Dec()
7271

@@ -85,13 +84,13 @@ func (dp *NDT5ResultParser) ParseAndInsert(meta map[string]bigquery.Value, testN
8584
}
8685

8786
parser := schema.ParseInfo{
88-
Version: Version(),
87+
Version: meta.Version,
8988
Time: time.Now(),
90-
ArchiveURL: meta["filename"].(string),
89+
ArchiveURL: meta.ArchiveURL,
9190
Filename: testName,
92-
GitCommit: GitCommit(),
91+
GitCommit: meta.GitCommit,
9392
}
94-
date := meta["date"].(civil.Date)
93+
date := meta.Date
9594

9695
// Since ndt5 rows can include both download (S2C) and upload (C2S)
9796
// measurements (or neither), check and write independent rows for either

parser/ndt5_result_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
"cloud.google.com/go/civil"
1010

11-
"cloud.google.com/go/bigquery"
11+
"github.com/m-lab/etl/etl"
1212
"github.com/m-lab/etl/parser"
1313
"github.com/m-lab/etl/schema"
1414
)
@@ -60,9 +60,11 @@ func TestNDT5ResultParser_ParseAndInsert(t *testing.T) {
6060
if err != nil {
6161
t.Fatalf(err.Error())
6262
}
63-
meta := map[string]bigquery.Value{
64-
"filename": "gs://mlab-test-bucket/ndt/ndt5/2019/08/22/ndt_ndt5_2019_08_22_20190822T194819.568936Z-ndt5-mlab1-lga0t-ndt.tgz",
65-
"date": d,
63+
meta := etl.Metadata{
64+
ArchiveURL: "gs://mlab-test-bucket/ndt/ndt5/2019/08/22/ndt_ndt5_2019_08_22_20190822T194819.568936Z-ndt5-mlab1-lga0t-ndt.tgz",
65+
Date: d,
66+
Version: parser.Version(),
67+
GitCommit: parser.GitCommit(),
6668
}
6769

6870
if err := n.ParseAndInsert(meta, tt.testName, resultData); (err != nil) != tt.wantErr {

parser/ndt7_result.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ import (
88
"strings"
99
"time"
1010

11-
"cloud.google.com/go/bigquery"
12-
13-
"cloud.google.com/go/civil"
1411
"github.com/m-lab/etl/etl"
1512
"github.com/m-lab/etl/metrics"
1613
"github.com/m-lab/etl/row"
@@ -65,25 +62,25 @@ func (dp *NDT7ResultParser) IsParsable(testName string, data []byte) (string, bo
6562
}
6663

6764
// ParseAndInsert decodes the data.NDT7Result JSON and inserts it into BQ.
68-
func (dp *NDT7ResultParser) ParseAndInsert(meta map[string]bigquery.Value, testName string, test []byte) error {
65+
func (dp *NDT7ResultParser) ParseAndInsert(meta etl.Metadata, testName string, test []byte) error {
6966
// TODO: derive 'ndt5' (or 'ndt7') labels from testName.
7067
metrics.WorkerState.WithLabelValues(dp.TableName(), "ndt7_result").Inc()
7168
defer metrics.WorkerState.WithLabelValues(dp.TableName(), "ndt7_result").Dec()
7269

7370
row := schema.NDT7ResultRow{
7471
Parser: schema.ParseInfo{
75-
Version: Version(),
72+
Version: meta.Version,
7673
Time: time.Now(),
77-
ArchiveURL: meta["filename"].(string),
74+
ArchiveURL: meta.ArchiveURL,
7875
Filename: testName,
79-
GitCommit: GitCommit(),
76+
GitCommit: meta.GitCommit,
8077
},
8178
}
8279

8380
// Parse the test.
8481
err := json.Unmarshal(test, &row.Raw)
8582
if err != nil {
86-
log.Println(meta["filename"].(string), testName, err)
83+
log.Println(meta.ArchiveURL, testName, err)
8784
metrics.TestTotal.WithLabelValues(dp.TableName(), "ndt7_result", "Unmarshal").Inc()
8885
return err
8986
}
@@ -106,7 +103,7 @@ func (dp *NDT7ResultParser) ParseAndInsert(meta map[string]bigquery.Value, testN
106103
// the given timestamp, regardless of the timestamp's timezone. Since we
107104
// run our systems in UTC, all timestamps will be relative to UTC and as
108105
// will these dates.
109-
row.Date = meta["date"].(civil.Date)
106+
row.Date = meta.Date
110107
if row.Raw.Download != nil {
111108
row.A = downSummary(row.Raw.Download)
112109
} else if row.Raw.Upload != nil {

parser/ndt7_result_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import (
66
"strings"
77
"testing"
88

9-
"cloud.google.com/go/bigquery"
109
"cloud.google.com/go/civil"
1110
"github.com/go-test/deep"
1211

12+
"github.com/m-lab/etl/etl"
1313
"github.com/m-lab/etl/parser"
1414
"github.com/m-lab/etl/schema"
1515
"github.com/m-lab/go/pretty"
@@ -23,9 +23,11 @@ func setupNDT7InMemoryParser(t *testing.T, testName string) (*schema.NDT7ResultR
2323
if err != nil {
2424
t.Fatalf(err.Error())
2525
}
26-
meta := map[string]bigquery.Value{
27-
"filename": "gs://mlab-test-bucket/ndt/ndt7/2020/03/18/ndt_ndt7_2020_03_18_20200318T003853.425987Z-ndt7-mlab3-syd03-ndt.tgz",
28-
"date": civil.Date{Year: 2020, Month: 3, Day: 18},
26+
meta := etl.Metadata{
27+
ArchiveURL: "gs://mlab-test-bucket/ndt/ndt7/2020/03/18/ndt_ndt7_2020_03_18_20200318T003853.425987Z-ndt7-mlab3-syd03-ndt.tgz",
28+
Date: civil.Date{Year: 2020, Month: 3, Day: 18},
29+
Version: parser.Version(),
30+
GitCommit: parser.GitCommit(),
2931
}
3032
err = n.ParseAndInsert(meta, testName, resultData)
3133
if err != nil {

parser/parser_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
// TODO(soon) Implement good tests for the existing parsers.
2-
//
32
package parser_test
43

54
import (
@@ -125,7 +124,7 @@ func TestGetHopID(t *testing.T) {
125124
}
126125
}
127126

128-
//------------------------------------------------------------------------------------
127+
// ------------------------------------------------------------------------------------
129128
// TestParser ignores the content, returns a MapSaver containing meta data and
130129
// "testname":"..."
131130
// TODO add tests
@@ -144,14 +143,15 @@ func (tp *TestParser) IsParsable(testName string, test []byte) (string, bool) {
144143
return "ext", true
145144
}
146145

147-
func (tp *TestParser) ParseAndInsert(meta map[string]bigquery.Value, testName string, test []byte) error {
146+
func (tp *TestParser) ParseAndInsert(meta etl.Metadata, testName string, test []byte) error {
148147
metrics.TestTotal.WithLabelValues("table", "test", "ok").Inc()
149-
values := make(map[string]bigquery.Value, len(meta)+1)
150-
// TODO is there a better way to do this?
151-
for k, v := range meta {
152-
values[k] = v
148+
values := map[string]bigquery.Value{
149+
"filename": meta.ArchiveURL,
150+
"date": meta.Date,
151+
"version": meta.Version,
152+
"git_commit": meta.GitCommit,
153+
"testname": testName,
153154
}
154-
values["testname"] = testName
155155
return tp.inserter.InsertRow(values)
156156
}
157157

@@ -174,7 +174,7 @@ func TestPlumbing(t *testing.T) {
174174
tci := countingInserter{}
175175
var ti etl.Inserter = &tci
176176
var p etl.Parser = NewTestParser(ti)
177-
err := p.ParseAndInsert(nil, "foo", foo[:])
177+
err := p.ParseAndInsert(etl.Metadata{}, "foo", foo[:])
178178
if err != nil {
179179
fmt.Println(err)
180180
}

0 commit comments

Comments
 (0)