Skip to content

Commit 336637c

Browse files
Add parser for hopannotation1 datatype (#1015)
* Add parser for hopannotation1 datatype * add update to Dockerfile * Add update to Dockerfile.testing * move get update command in Dockerfile.testing * reverse order of flag and update * use hyphen instead of dash * Add descriptions to HopAnnotation1Row.yaml * overwrite id in HopAnnotation1Row.yaml * Add hopannotation1 to existing bigquery exporter queries * Undo last commit
1 parent 8e36328 commit 336637c

File tree

13 files changed

+396
-10
lines changed

13 files changed

+396
-10
lines changed

cmd/generate_schema_docs/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ func main() {
151151

152152
generators := []schemaGenerator{
153153
&schema.AnnotationRow{},
154+
&schema.HopAnnotation1Row{},
154155
&schema.NDT5ResultRow{},
155156
&schema.NDT7ResultRow{},
156157
&schema.TCPRow{},

cmd/generate_schema_docs/main_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,9 @@ func Test_main(t *testing.T) {
4444
if err != nil {
4545
t.Errorf("main() missing output file; missing schema_pcaprow.md")
4646
}
47+
48+
_, err = os.Stat(path.Join(tmpdir, "schema_hopannotation1row.md"))
49+
if err != nil {
50+
t.Errorf("main() missing output file; missing schema_hopannotation1row.md")
51+
}
4752
}

cmd/update-schema/update.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ func CreateOrUpdatePCAPRow(project string, dataset string, table string) error {
114114
return CreateOrUpdate(schema, project, dataset, table, "Date")
115115
}
116116

117+
func CreateOrUpdateHopAnnotation1Row(project string, dataset string, table string) error {
118+
row := schema.HopAnnotation1Row{}
119+
schema, err := row.Schema()
120+
rtx.Must(err, "HopAnnotation1Row.Schema")
121+
return CreateOrUpdate(schema, project, dataset, table, "Date")
122+
}
123+
117124
// listTemplateTables finds all template tables for the given project, datatype, and base table name.
118125
// Because this function must enumerate all tables in the dataset to find matching names, it may be slow.
119126
func listTemplateTables(project, dataset, table string) ([]string, error) {
@@ -241,6 +248,13 @@ func updateStandardTables(project string) int {
241248
errCount++
242249
}
243250

251+
if err := CreateOrUpdateHopAnnotation1Row(project, "tmp_ndt", "hopannotation1"); err != nil {
252+
errCount++
253+
}
254+
if err := CreateOrUpdateHopAnnotation1Row(project, "raw_ndt", "hopannotation1"); err != nil {
255+
errCount++
256+
}
257+
244258
return errCount
245259
}
246260

@@ -390,6 +404,14 @@ func main() {
390404
errCount++
391405
}
392406

407+
case "hopannotation1":
408+
if err := CreateOrUpdateHopAnnotation1Row(*project, "tmp_ndt", "hopannotation1"); err != nil {
409+
errCount++
410+
}
411+
if err := CreateOrUpdateHopAnnotation1Row(*project, "raw_ndt", "hopannotation1"); err != nil {
412+
errCount++
413+
}
414+
393415
default:
394416
log.Fatal("invalid updateType: ", *updateType)
395417
}

etl/globals.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ func (dt DataType) BQBufferSize() int {
255255
// TODO - use camelcase.
256256
const (
257257
ANNOTATION = DataType("annotation")
258+
HOPANNOTATION1 = DataType("hopannotation1")
258259
NDT = DataType("ndt")
259260
NDT5 = DataType("ndt5")
260261
NDT7 = DataType("ndt7")
@@ -272,6 +273,7 @@ var (
272273
// TODO - this should be loaded from a config.
273274
dirToDataType = map[string]DataType{
274275
"annotation": ANNOTATION,
276+
"hopannotation1": HOPANNOTATION1,
275277
"ndt": NDT,
276278
"ndt5": NDT5,
277279
"ndt7": NDT7,
@@ -286,16 +288,17 @@ var (
286288
// DataTypeToTable maps from data type to BigQuery table name.
287289
// TODO - this should be loaded from a config.
288290
dataTypeToTable = map[DataType]string{
289-
ANNOTATION: "annotation",
290-
NDT: "ndt",
291-
SS: "sidestream",
292-
PCAP: "pcap",
293-
PT: "traceroute",
294-
SW: "switch",
295-
TCPINFO: "tcpinfo",
296-
NDT5: "ndt5",
297-
NDT7: "ndt7",
298-
INVALID: "invalid",
291+
ANNOTATION: "annotation",
292+
HOPANNOTATION1: "hopannotation1",
293+
NDT: "ndt",
294+
SS: "sidestream",
295+
PCAP: "pcap",
296+
PT: "traceroute",
297+
SW: "switch",
298+
TCPINFO: "tcpinfo",
299+
NDT5: "ndt5",
300+
NDT7: "ndt7",
301+
INVALID: "invalid",
299302
}
300303

301304
// Map from data type to number of buffer size for BQ insertion.
@@ -305,6 +308,7 @@ var (
305308
// TODO - this should be loaded from a config
306309
dataTypeToBQBufferSize = map[DataType]int{
307310
ANNOTATION: 400, // around 1k each.
311+
HOPANNOTATION1: 200,
308312
NDT: 10,
309313
NDT_OMIT_DELTAS: 50,
310314
TCPINFO: 5,

parser/hopannotation1.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package parser
2+
3+
import (
4+
"encoding/json"
5+
"strings"
6+
"time"
7+
8+
"cloud.google.com/go/bigquery"
9+
"cloud.google.com/go/civil"
10+
v2as "github.com/m-lab/annotation-service/api/v2"
11+
"github.com/m-lab/etl/etl"
12+
"github.com/m-lab/etl/metrics"
13+
"github.com/m-lab/etl/row"
14+
"github.com/m-lab/etl/schema"
15+
)
16+
17+
//=====================================================================================
18+
// HopAnnotation1 Parser
19+
//=====================================================================================
20+
21+
// HopAnnotation1Parser handles parsing for the HopAnnotation1 datatype.
22+
type HopAnnotation1Parser struct {
23+
*row.Base
24+
table string
25+
suffix string
26+
}
27+
28+
// NewHopAnnotation1Parser returns a new parser for the HopAnnotation1 archives.
29+
func NewHopAnnotation1Parser(sink row.Sink, table, suffix string, ann v2as.Annotator) etl.Parser {
30+
bufSize := etl.HOPANNOTATION1.BQBufferSize()
31+
if ann == nil {
32+
ann = v2as.GetAnnotator(etl.BatchAnnotatorURL)
33+
}
34+
35+
return &HopAnnotation1Parser{
36+
Base: row.NewBase(table, sink, bufSize, ann),
37+
table: table,
38+
suffix: suffix,
39+
}
40+
}
41+
42+
// IsParsable returns the canonical test type and whether to parse data.
43+
func (p *HopAnnotation1Parser) IsParsable(testName string, data []byte) (string, bool) {
44+
if strings.HasSuffix(testName, "json") {
45+
return "hopannotation1", true
46+
}
47+
return "", false
48+
}
49+
50+
// ParseAndInsert decodes the HopAnnotation1 data and inserts it into BQ.
51+
func (p *HopAnnotation1Parser) ParseAndInsert(fileMetadata map[string]bigquery.Value, testName string, rawContent []byte) error {
52+
metrics.WorkerState.WithLabelValues(p.TableName(), "hopannotation1").Inc()
53+
defer metrics.WorkerState.WithLabelValues(p.TableName(), "hopannotation1").Dec()
54+
55+
row := schema.HopAnnotation1Row{
56+
Parser: schema.ParseInfo{
57+
Version: Version(),
58+
Time: time.Now(),
59+
ArchiveURL: fileMetadata["filename"].(string),
60+
Filename: testName,
61+
GitCommit: GitCommit(),
62+
},
63+
}
64+
65+
raw := schema.HopAnnotation1{}
66+
err := json.Unmarshal(rawContent, &raw)
67+
if err != nil {
68+
metrics.TestCount.WithLabelValues(p.TableName(), "hopannotation1", "decode-location-error").Inc()
69+
return err
70+
}
71+
72+
// Fill in the row.
73+
row.ID = raw.ID
74+
row.Raw = &raw
75+
// NOTE: Civil is not TZ adjusted. It takes the year, month, and date from
76+
// the given timestamp, regardless of the timestamp's timezone. Since we
77+
// run our systems in UTC, all timestamps will be relative to UTC and as
78+
// will these dates.
79+
row.Date = fileMetadata["date"].(civil.Date)
80+
81+
// Estimate the row size based on the input JSON size.
82+
metrics.RowSizeHistogram.WithLabelValues(p.TableName()).Observe(float64(len(rawContent)))
83+
84+
// Insert the row.
85+
err = p.Base.Put(&row)
86+
if err != nil {
87+
return err
88+
}
89+
// Count successful inserts.
90+
metrics.TestCount.WithLabelValues(p.TableName(), "hopannotation1", "ok").Inc()
91+
92+
return nil
93+
}
94+
95+
// NB: These functions are also required to complete the etl.Parser interface
96+
// For HopAnnotation1, we just forward the calls to the Inserter.
97+
98+
func (p *HopAnnotation1Parser) Flush() error {
99+
return p.Base.Flush()
100+
}
101+
102+
func (p *HopAnnotation1Parser) TableName() string {
103+
return p.table
104+
}
105+
106+
func (p *HopAnnotation1Parser) FullTableName() string {
107+
return p.table + p.suffix
108+
}
109+
110+
// RowsInBuffer returns the count of rows currently in the buffer.
111+
func (p *HopAnnotation1Parser) RowsInBuffer() int {
112+
return p.GetStats().Pending
113+
}
114+
115+
// Committed returns the count of rows successfully committed to BQ.
116+
func (p *HopAnnotation1Parser) Committed() int {
117+
return p.GetStats().Committed
118+
}
119+
120+
// Accepted returns the count of all rows received through InsertRow(s).
121+
func (p *HopAnnotation1Parser) Accepted() int {
122+
return p.GetStats().Total()
123+
}
124+
125+
// Failed returns the count of all rows that could not be committed.
126+
func (p *HopAnnotation1Parser) Failed() int {
127+
return p.GetStats().Failed
128+
}

parser/hopannotation1_test.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package parser_test
2+
3+
import (
4+
"io/ioutil"
5+
"path"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
"cloud.google.com/go/bigquery"
11+
"cloud.google.com/go/civil"
12+
"github.com/go-test/deep"
13+
"github.com/m-lab/etl/parser"
14+
"github.com/m-lab/etl/schema"
15+
"github.com/m-lab/go/rtx"
16+
"github.com/m-lab/uuid-annotator/annotator"
17+
)
18+
19+
const (
20+
hopAnnotation1Filename = "20210818T174432Z_1e0b318cf3c2_91.189.88.152.json"
21+
hopAnnotation1GCSPath = "gs://archive-measurement-lab/ndt/hopannotation1/2021/07/30/"
22+
)
23+
24+
func TestHopAnnotation1Parser_ParseAndInsert(t *testing.T) {
25+
ins := newInMemorySink()
26+
n := parser.NewHopAnnotation1Parser(ins, "test", "_suffix", &fakeAnnotator{})
27+
28+
data, err := ioutil.ReadFile(path.Join("testdata/HopAnnotation1/", hopAnnotation1Filename))
29+
rtx.Must(err, "failed to load test file")
30+
31+
date := civil.Date{Year: 2021, Month: 07, Day: 30}
32+
33+
meta := map[string]bigquery.Value{
34+
"filename": path.Join(hopAnnotation1GCSPath, hopAnnotation1Filename),
35+
"date": date,
36+
}
37+
38+
if err := n.ParseAndInsert(meta, hopAnnotation1Filename, data); err != nil {
39+
t.Errorf("HopAnnotation1Parser.ParseAndInsert() error = %v, wantErr %v", err, true)
40+
}
41+
42+
if n.Accepted() != 1 {
43+
t.Fatal("Failed to insert snaplog data", ins)
44+
}
45+
n.Flush()
46+
47+
row := ins.data[0].(*schema.HopAnnotation1Row)
48+
49+
expectedParseInfo := schema.ParseInfo{
50+
Version: "https://github.com/m-lab/etl/tree/foobar",
51+
Time: row.Parser.Time,
52+
ArchiveURL: path.Join(hopAnnotation1GCSPath, hopAnnotation1Filename),
53+
Filename: hopAnnotation1Filename,
54+
Priority: 0,
55+
GitCommit: "12345678",
56+
}
57+
58+
expectedGeolocation := annotator.Geolocation{
59+
ContinentCode: "EU",
60+
CountryCode: "GB",
61+
CountryName: "United Kingdom",
62+
Subdivision1ISOCode: "ENG",
63+
Subdivision1Name: "England",
64+
City: "London",
65+
PostalCode: "EC2V",
66+
Latitude: 51.5095,
67+
Longitude: -0.0955,
68+
AccuracyRadiusKm: 200,
69+
}
70+
71+
expectedNetwork := annotator.Network{
72+
CIDR: "91.189.88.0/21",
73+
ASNumber: 41231,
74+
ASName: "Canonical Group Limited",
75+
Systems: []annotator.System{
76+
{ASNs: []uint32{41231}},
77+
},
78+
}
79+
80+
expectedAnnotations := annotator.ClientAnnotations{
81+
Geo: &expectedGeolocation,
82+
Network: &expectedNetwork,
83+
}
84+
85+
expectedRaw := schema.HopAnnotation1{
86+
ID: "20210818_1e0b318cf3c2_91.189.88.152",
87+
Timestamp: time.Date(2021, 8, 18, 17, 44, 32, 0, time.UTC),
88+
Annotations: &expectedAnnotations,
89+
}
90+
91+
expectedHopAnnotation1Row := schema.HopAnnotation1Row{
92+
ID: "20210818_1e0b318cf3c2_91.189.88.152",
93+
Parser: expectedParseInfo,
94+
Date: date,
95+
Raw: &expectedRaw,
96+
}
97+
98+
if diff := deep.Equal(row, &expectedHopAnnotation1Row); diff != nil {
99+
t.Errorf("HopAnnotation1Parser.ParseAndInsert() different row: %s", strings.Join(diff, "\n"))
100+
}
101+
102+
}
103+
104+
func TestHopAnnotation1_IsParsable(t *testing.T) {
105+
tests := []struct {
106+
name string
107+
testName string
108+
want bool
109+
}{
110+
{
111+
name: "success-hopannotation1",
112+
testName: hopAnnotation1Filename,
113+
want: true,
114+
},
115+
{
116+
name: "error-bad-extension",
117+
testName: "badfile.badextension",
118+
want: false,
119+
},
120+
}
121+
122+
for _, tt := range tests {
123+
t.Run(tt.name, func(t *testing.T) {
124+
data, err := ioutil.ReadFile(path.Join(`testdata/HopAnnotation1/`, tt.testName))
125+
if err != nil {
126+
t.Fatalf(err.Error())
127+
}
128+
p := &parser.HopAnnotation1Parser{}
129+
_, got := p.IsParsable(tt.testName, data)
130+
if got != tt.want {
131+
t.Errorf("HopAnnotation1Parser.IsParsable() got = %v, want %v", got, tt.want)
132+
}
133+
})
134+
}
135+
}

parser/parser.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ func NewSinkParser(dt etl.DataType, sink row.Sink, table string, ann api.Annotat
8484
switch dt {
8585
case etl.ANNOTATION:
8686
return NewAnnotationParser(sink, table, "", ann)
87+
case etl.HOPANNOTATION1:
88+
return NewHopAnnotation1Parser(sink, table, "", ann)
8789
case etl.NDT5:
8890
return NewNDT5ResultParser(sink, table, "", ann)
8991
case etl.NDT7:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"ID":"20210818_1e0b318cf3c2_91.189.88.152","Timestamp":"2021-08-18T17:44:32Z","Annotations":{"Geo":{"ContinentCode":"EU","CountryCode":"GB","CountryName":"United Kingdom","Subdivision1ISOCode":"ENG","Subdivision1Name":"England","City":"London","PostalCode":"EC2V","Latitude":51.5095,"Longitude":-0.0955,"AccuracyRadiusKm":200},"Network":{"CIDR":"91.189.88.0/21","ASNumber":41231,"ASName":"Canonical Group Limited","Systems":[{"ASNs":[41231]}]}}}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
baddata

0 commit comments

Comments
 (0)