Skip to content

Commit 84a2778

Browse files
Revert "Add Gardener v2 parser for switch datatype (#1044)" (#1049)
This reverts commit 5c6d46f.
1 parent 5c6d46f commit 84a2778

File tree

11 files changed

+374
-632
lines changed

11 files changed

+374
-632
lines changed

cmd/update-schema/update.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,11 @@ func CreateOrUpdateAnnotationRow(project string, dataset string, table string) e
100100
return CreateOrUpdate(schema, project, dataset, table, "Date")
101101
}
102102

103-
func CreateOrUpdateSwitchRow(project string, dataset string, table string) error {
104-
row := schema.SwitchRow{}
103+
func CreateOrUpdateSwitchStats(project string, dataset string, table string) error {
104+
row := schema.SwitchStats{}
105105
schema, err := row.Schema()
106-
rtx.Must(err, "SwitchRow.Schema")
107-
return CreateOrUpdate(schema, project, dataset, table, "Date")
106+
rtx.Must(err, "SwitchStats.Schema")
107+
return CreateOrUpdate(schema, project, dataset, table, "")
108108
}
109109

110110
func CreateOrUpdatePCAPRow(project string, dataset string, table string) error {
@@ -269,13 +269,6 @@ func updateStandardTables(project string) int {
269269
errCount++
270270
}
271271

272-
if err := CreateOrUpdateSwitchRow(project, "tmp_utilization", "switch"); err != nil {
273-
errCount++
274-
}
275-
if err := CreateOrUpdateSwitchRow(project, "raw_utilization", "switch"); err != nil {
276-
errCount++
277-
}
278-
279272
return errCount
280273
}
281274

@@ -311,6 +304,12 @@ func updateLegacyTables(project string) int {
311304
if err := CreateOrUpdateNDT5ResultRow(project, "batch", "ndt5"); err != nil {
312305
errCount++
313306
}
307+
if err := CreateOrUpdateSwitchStats(project, "base_tables", "switch"); err != nil {
308+
errCount++
309+
}
310+
if err := CreateOrUpdateSwitchStats(project, "batch", "switch"); err != nil {
311+
errCount++
312+
}
314313
return errCount
315314
}
316315

@@ -404,10 +403,10 @@ func main() {
404403
}
405404

406405
case "switch":
407-
if err := CreateOrUpdateSwitchRow(*project, "tmp_utilization", "switch"); err != nil {
406+
if err := CreateOrUpdateSwitchStats(*project, "base_tables", "switch"); err != nil {
408407
errCount++
409408
}
410-
if err := CreateOrUpdateSwitchRow(*project, "raw_utilization", "switch"); err != nil {
409+
if err := CreateOrUpdateSwitchStats(*project, "batch", "switch"); err != nil {
411410
errCount++
412411
}
413412

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ require (
1717
github.com/google/gopacket v1.1.19
1818
github.com/google/uuid v1.3.0 // indirect
1919
github.com/googleapis/google-cloud-go-testing v0.0.0-20210719221736-1c9a4c676720
20-
github.com/iancoleman/strcase v0.2.0
2120
github.com/kr/pretty v0.2.1
2221
github.com/m-lab/annotation-service v0.0.0-20210713124633-fa227b3d5b2f
2322
github.com/m-lab/etl-gardener v0.0.0-20210910143655-d4bda5bfc75d

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,6 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFb
232232
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
233233
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
234234
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
235-
github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0=
236-
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
237235
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
238236
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
239237
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=

parser/disco.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// Package parser defines the Parser interface and implementations for the different
2+
// data types.
3+
package parser
4+
5+
// This file defines the Parser subtype that handles DISCO data.
6+
7+
import (
8+
"bytes"
9+
"encoding/json"
10+
"log"
11+
"strings"
12+
"time"
13+
14+
"cloud.google.com/go/bigquery"
15+
16+
"github.com/m-lab/etl/etl"
17+
"github.com/m-lab/etl/metrics"
18+
"github.com/m-lab/etl/schema"
19+
)
20+
21+
//=====================================================================================
22+
// Disco Parser
23+
//=====================================================================================
24+
25+
// TODO(dev) add tests
26+
type DiscoParser struct {
27+
inserter etl.Inserter
28+
etl.RowStats // RowStats implemented for DiscoParser with an embedded struct.
29+
}
30+
31+
func NewDiscoParser(ins etl.Inserter) etl.Parser {
32+
return &DiscoParser{
33+
inserter: ins,
34+
RowStats: ins} // Delegate RowStats functions to the Inserter.
35+
}
36+
37+
func (dp *DiscoParser) TaskError() error {
38+
return nil
39+
}
40+
41+
// IsParsable returns the canonical test type and whether to parse data.
42+
func (dp *DiscoParser) IsParsable(testName string, data []byte) (string, bool) {
43+
// Files look like: "<date>-to-<date>-switch.json.gz"
44+
// Notice the "-" before switch.
45+
// Look for JSON and JSONL files.
46+
if strings.HasSuffix(testName, "switch.json") ||
47+
strings.HasSuffix(testName, "switch.jsonl") ||
48+
strings.HasSuffix(testName, "switch.json.gz") ||
49+
strings.HasSuffix(testName, "switch.jsonl.gz") {
50+
return "switch", true
51+
}
52+
return "unknown", false
53+
}
54+
55+
// Disco data a JSON representation that should be pushed directly into BigQuery.
56+
// For now, though, we parse into a struct, for compatibility with current inserter
57+
// backend.
58+
//
59+
// Returns:
60+
// error on Decode error
61+
// error on InsertRows error
62+
// nil on success
63+
//
64+
// TODO - optimize this to use the JSON directly, if possible.
65+
func (dp *DiscoParser) ParseAndInsert(meta map[string]bigquery.Value, testName string, test []byte) error {
66+
metrics.WorkerState.WithLabelValues(dp.TableName(), "switch").Inc()
67+
defer metrics.WorkerState.WithLabelValues(dp.TableName(), "switch").Dec()
68+
69+
rdr := bytes.NewReader(test)
70+
dec := json.NewDecoder(rdr)
71+
rowCount := 0
72+
73+
for dec.More() {
74+
stats := schema.SwitchStats{
75+
TaskFilename: meta["filename"].(string),
76+
TestID: testName,
77+
ParseTime: time.Now(),
78+
ParserVersion: Version(),
79+
// TODO: original archive "log_time" is unknown.
80+
}
81+
tmp := schema.SwitchStats{}
82+
err := dec.Decode(&tmp)
83+
if err != nil {
84+
metrics.TestCount.WithLabelValues(
85+
dp.TableName(), "disco", "Decode").Inc()
86+
// TODO(dev) Should accumulate errors, instead of aborting?
87+
return err
88+
}
89+
rowCount++
90+
91+
// For collectd in the "utilization" experiment, by design, the raw data
92+
// time range starts and ends on the hour. This means that the raw
93+
// dataset inclues 361 time bins (360 + 1 extra). Originally, this was
94+
// so the last sample of the current time range would overlap with the
95+
// first sample of the next time range. However, this parser does not
96+
// use the extra sample, so we unconditionally ignore it here. However,
97+
// this is not the case for DISCOv2, so we use the whole sample from
98+
// DISCOv2. DISCOv2 can be differentiated from collectd by the "jsonl"
99+
// suffix.
100+
if len(tmp.Sample) > 0 {
101+
if strings.HasSuffix(testName, "switch.jsonl") ||
102+
strings.HasSuffix(testName, "switch.jsonl.gz") {
103+
stats.Sample = tmp.Sample
104+
} else {
105+
stats.Sample = tmp.Sample[:len(tmp.Sample)-1]
106+
// DISCOv1's Timestamp field in each sample represents the
107+
// *beginning* of a 10s sample window, while v2's Timestamp
108+
// represents the time at which the sample was taken, which is
109+
// representative of the previous 10s. Since v2's behavior is
110+
// what we want, we add 10s to all v1 Timestamps so that the
111+
// timestamps represent the same thing for v1 and v2.
112+
for i, v := range stats.Sample {
113+
stats.Sample[i].Timestamp = v.Timestamp + 10
114+
}
115+
}
116+
}
117+
118+
// Copy remaining fields.
119+
stats.Metric = tmp.Metric
120+
stats.Hostname = tmp.Hostname
121+
stats.Experiment = tmp.Experiment
122+
123+
// Count the number of samples per record.
124+
metrics.DeltaNumFieldsHistogram.WithLabelValues(
125+
dp.TableName()).Observe(float64(len(stats.Sample)))
126+
127+
// TODO: measure metrics.RowSizeHistogram every so often with json size.
128+
metrics.RowSizeHistogram.WithLabelValues(
129+
dp.TableName()).Observe(float64(stats.Size()))
130+
131+
err = dp.inserter.InsertRow(stats)
132+
if err != nil {
133+
switch t := err.(type) {
134+
case bigquery.PutMultiError:
135+
// TODO improve error handling??
136+
metrics.TestCount.WithLabelValues(
137+
dp.TableName(), "disco", "insert-multi").Inc()
138+
log.Printf("%v\n", t[0].Error())
139+
default:
140+
metrics.TestCount.WithLabelValues(
141+
dp.TableName(), "disco", "insert-other").Inc()
142+
}
143+
// TODO(dev) Should accumulate errors, instead of aborting?
144+
return err
145+
}
146+
// Count successful inserts.
147+
metrics.TestCount.WithLabelValues(dp.TableName(), "disco", "ok").Inc()
148+
}
149+
150+
// Measure the distribution of records per file.
151+
metrics.EntryFieldCountHistogram.WithLabelValues(
152+
dp.TableName()).Observe(float64(rowCount))
153+
154+
return nil
155+
}
156+
157+
// These functions are also required to complete the etl.Parser interface. For Disco,
158+
// we just forward the calls to the Inserter.
159+
func (dp *DiscoParser) Flush() error {
160+
return dp.inserter.Flush()
161+
}
162+
163+
func (dp *DiscoParser) TableName() string {
164+
return dp.inserter.TableBase()
165+
}
166+
167+
func (dp *DiscoParser) FullTableName() string {
168+
return dp.inserter.FullTableName()
169+
}

parser/disco_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package parser_test
2+
3+
import (
4+
"log"
5+
"testing"
6+
"time"
7+
8+
"cloud.google.com/go/bigquery"
9+
10+
"github.com/m-lab/etl/bq"
11+
"github.com/m-lab/etl/etl"
12+
"github.com/m-lab/etl/fake"
13+
"github.com/m-lab/etl/parser"
14+
)
15+
16+
func init() {
17+
log.SetFlags(log.LstdFlags | log.Lshortfile)
18+
}
19+
20+
var test_data []byte = []byte(
21+
`{
22+
"sample": [{"timestamp": 69850, "value": 0.0}, {"timestamp": 69860, "value": 0.0}],
23+
"metric": "switch.multicast.local.rx",
24+
"hostname": "mlab4.sea05.measurement-lab.org",
25+
"experiment": "s1.sea05.measurement-lab.org"}
26+
{"sample": [],
27+
"metric": "switch.multicast.local.rx",
28+
"hostname": "mlab1.sea05.measurement-lab.org",
29+
"experiment": "s1.sea05.measurement-lab.org"}`)
30+
31+
// This tests the parser, using a fake inserter, so that it runs entirely locally.
32+
func TestJSONParsing(t *testing.T) {
33+
// This creates a real inserter, with a fake uploader, for local testing.
34+
uploader := fake.FakeUploader{}
35+
ins, err := bq.NewBQInserter(etl.InserterParams{
36+
Project: "mlab-sandbox", Dataset: "dataset", Table: "disco_test", Suffix: "",
37+
BufferSize: 3, PutTimeout: 10 * time.Second, MaxRetryDelay: time.Second}, &uploader)
38+
if err != nil {
39+
t.Fatal(err)
40+
}
41+
42+
var parser etl.Parser = parser.NewDiscoParser(ins)
43+
44+
meta := map[string]bigquery.Value{"filename": "fake-filename.tar", "parse_time": time.Now()}
45+
// Should result in two tests sent to inserter, but no call to uploader.
46+
err = parser.ParseAndInsert(meta, "testName", test_data)
47+
if err != nil {
48+
t.Fatal(err)
49+
}
50+
if ins.Accepted() != 2 {
51+
t.Error("Accepted = ", ins.Accepted())
52+
t.Fail()
53+
}
54+
55+
// Adds two more rows, triggering an upload of 3 rows.
56+
err = parser.ParseAndInsert(meta, "testName", test_data)
57+
if err != nil {
58+
t.Fatal(err)
59+
}
60+
if len(uploader.Rows) != 3 {
61+
t.Error("Expected 3, got", len(uploader.Rows))
62+
}
63+
// The testName was that of a DISCOv1 filename, for which the parser omits
64+
// the last sample for each metric, so even though there are two input
65+
// samples there should only be one in the resulting row.
66+
if uploader.Rows[1].Row["sample"] != nil && len(uploader.Rows[1].Row["sample"].([]bigquery.Value)) != 1 {
67+
t.Error("Expected 1, got", len(uploader.Rows[1].Row["sample"].([]bigquery.Value)))
68+
}
69+
70+
// Adds two more rows, triggering an upload of 3 rows.
71+
err = parser.ParseAndInsert(meta, "testName-switch.jsonl", test_data)
72+
if err != nil {
73+
t.Fatal(err)
74+
}
75+
76+
if ins.Accepted() != 6 {
77+
t.Error("Accepted = ", ins.Accepted())
78+
}
79+
if ins.RowsInBuffer() != 0 {
80+
t.Error("RowsInBuffer = ", ins.RowsInBuffer())
81+
}
82+
if len(uploader.Rows) != 3 {
83+
t.Error("Expected 3, got", len(uploader.Rows))
84+
}
85+
86+
if uploader.Rows[0].Row["sample"] != nil && len(uploader.Rows[0].Row["sample"].([]bigquery.Value)) != 1 {
87+
t.Error("Expected 1, got", len(uploader.Rows[0].Row["sample"].([]bigquery.Value)))
88+
}
89+
// The testName was that of a DISCOv2 filename (suffix of -switch.jsonl),
90+
// for which the parser should include all samples. Therefore, since the
91+
// input had two samples, so should the resulting row.
92+
if uploader.Rows[1].Row["sample"] != nil && len(uploader.Rows[1].Row["sample"].([]bigquery.Value)) != 2 {
93+
t.Error("Expected 2, got", len(uploader.Rows[1].Row["sample"].([]bigquery.Value)))
94+
}
95+
if uploader.Rows[0].Row["task_filename"].(string) != "fake-filename.tar" {
96+
t.Error("task_filename incorrect: Expected 'fake-filename.tar', got",
97+
uploader.Rows[0].Row["task_filename"].(string))
98+
}
99+
if uploader.Rows[0].Row["test_id"].(string) != "testName" {
100+
t.Error("task_filename incorrect: Expected 'testName', got",
101+
uploader.Rows[0].Row["test_id"].(string))
102+
}
103+
if uploader.Rows[0].Row["metric"].(string) != "switch.multicast.local.rx" {
104+
t.Error("task_filename incorrect: Expected 'switch.multicast.local.rx', got",
105+
uploader.Rows[0].Row["metric"].(string))
106+
}
107+
if uploader.Rows[0].Row["hostname"].(string) != "mlab1.sea05.measurement-lab.org" {
108+
t.Error("task_filename incorrect: Expected 'mlab1.sea05.measuremet-lab.org', got",
109+
uploader.Rows[0].Row["hostname"].(string))
110+
}
111+
if uploader.Rows[0].Row["experiment"].(string) != "s1.sea05.measurement-lab.org" {
112+
t.Error("task_filename incorrect: Expected 's1.sea05.measuremet-lab.org', got",
113+
uploader.Rows[0].Row["experiment"].(string))
114+
}
115+
116+
if err != nil {
117+
log.Printf("Request: %v\n", uploader.Request)
118+
log.Printf("Rows Len: %d\n", len(uploader.Rows))
119+
if len(uploader.Rows) > 0 {
120+
log.Printf("Rows[0]: %v\n", uploader.Rows[0])
121+
log.Printf("Rows[0]['sample']: %v\n", len(uploader.Rows[0].Row["sample"].([]bigquery.Value)))
122+
}
123+
t.Error(err)
124+
}
125+
}
126+
127+
// DISABLED
128+
// This tests insertion into a test table in the cloud. Should not normally be executed.
129+
func xTestRealBackend(t *testing.T) {
130+
ins, err := bq.NewInserter(etl.SW, time.Now())
131+
var parser etl.Parser = parser.NewDiscoParser(ins)
132+
133+
meta := map[string]bigquery.Value{"filename": "filename", "parse_time": time.Now()}
134+
for i := 0; i < 3; i++ {
135+
// Iterations:
136+
// Add two rows, no upload.
137+
// Add two more rows, triggering an upload of 3 rows.
138+
// Add two more rows, triggering an upload of 3 rows.
139+
err = parser.ParseAndInsert(meta, "testName", test_data)
140+
if ins.Accepted() != 2 {
141+
t.Error("Accepted = ", ins.Accepted())
142+
t.Fail()
143+
}
144+
}
145+
146+
if ins.Accepted() != 6 {
147+
t.Error("Accepted = ", ins.Accepted())
148+
}
149+
if ins.RowsInBuffer() != 0 {
150+
t.Error("RowsInBuffer = ", ins.RowsInBuffer())
151+
}
152+
153+
if err != nil {
154+
t.Error(err)
155+
}
156+
}

0 commit comments

Comments
 (0)