Skip to content

Commit 4006199

Browse files
Migrate ndt5 to use standard columns (#1045)
* Rename NDT5ResultRowV2 descriptions file * Support ndt5 with standard columns * Update go.mod and go.sum * Write separate c2s and s2c rows
1 parent 56a2669 commit 4006199

File tree

12 files changed

+203
-113
lines changed

12 files changed

+203
-113
lines changed

cmd/update-schema/update.go

Lines changed: 12 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,11 @@ func CreateOrUpdateNDTWeb100(project string, dataset string, table string) error
6868
return CreateOrUpdate(schema, project, dataset, table, "")
6969
}
7070

71-
func CreateOrUpdateNDT5ResultRow(project string, dataset string, table string) error {
72-
row := schema.NDT5ResultRow{}
71+
func CreateOrUpdateNDT5ResultRowV2(project string, dataset string, table string) error {
72+
row := schema.NDT5ResultRowV2{}
7373
schema, err := row.Schema()
74-
rtx.Must(err, "NDT5ResultRow.Schema")
75-
76-
// NOTE: NDT5ResultRow does not support the TestTime field yet.
77-
return CreateOrUpdate(schema, project, dataset, table, "")
78-
}
79-
80-
func CreateOrUpdateNDT5ResultRowStandardColumns(project string, dataset string, table string) error {
81-
row := schema.NDT5ResultRowStandardColumns{}
82-
schema, err := row.Schema()
83-
rtx.Must(err, "NDT5ResultRowStandardColumns.Schema")
84-
85-
// NOTE: NDT5ResultRow does not support the TestTime field yet.
86-
return CreateOrUpdate(schema, project, dataset, table, "")
74+
rtx.Must(err, "NDT5ResultRowV2.Schema")
75+
return CreateOrUpdate(schema, project, dataset, table, "Date")
8776
}
8877

8978
func CreateOrUpdateNDT7ResultRow(project string, dataset string, table string) error {
@@ -213,22 +202,6 @@ func CreateOrUpdate(schema bigquery.Schema, project, dataset, table, partField s
213202
return err
214203
}
215204

216-
func updateNDT5SC(project string) int {
217-
errCount := 0
218-
if err := CreateOrUpdateNDT5ResultRowStandardColumns(project, "raw_ndt", "ndt5"); err != nil {
219-
errCount++
220-
}
221-
if err := CreateOrUpdateNDT5ResultRowStandardColumns(project, "tmp_ndt", "ndt5"); err != nil {
222-
errCount++
223-
}
224-
// TODO enable this after removing ndt.ndt5 views from etl-schema, and migrating
225-
// measurement-lab:ndt.ndt5 to point to mlab-oti:base_tables.ndt5
226-
// if err := CreateOrUpdateNDT5ResultRowStandardColumns(project, "ndt", "ndt5"); err != nil {
227-
// errCount++
228-
// }
229-
return errCount
230-
}
231-
232205
// Only tables that support Standard Columns should be included here.
233206
func updateStandardTables(project string) int {
234207
errCount := 0
@@ -239,7 +212,12 @@ func updateStandardTables(project string) int {
239212
errCount++
240213
}
241214

242-
errCount += updateNDT5SC(project)
215+
if err := CreateOrUpdateNDT5ResultRowV2(project, "tmp_ndt", "ndt5"); err != nil {
216+
errCount++
217+
}
218+
if err := CreateOrUpdateNDT5ResultRowV2(project, "raw_ndt", "ndt5"); err != nil {
219+
errCount++
220+
}
243221

244222
if err := CreateOrUpdateAnnotationRow(project, "tmp_ndt", "annotation"); err != nil {
245223
errCount++
@@ -298,12 +276,6 @@ func updateLegacyTables(project string) int {
298276
if err := CreateOrUpdateNDTWeb100(project, "batch", "ndt"); err != nil {
299277
errCount++
300278
}
301-
if err := CreateOrUpdateNDT5ResultRow(project, "base_tables", "ndt5"); err != nil {
302-
errCount++
303-
}
304-
if err := CreateOrUpdateNDT5ResultRow(project, "batch", "ndt5"); err != nil {
305-
errCount++
306-
}
307279
if err := CreateOrUpdateSwitchStats(project, "base_tables", "switch"); err != nil {
308280
errCount++
309281
}
@@ -375,14 +347,11 @@ func main() {
375347
errCount++
376348
}
377349

378-
case "ndt5sc":
379-
errCount += updateNDT5SC(*project)
380-
381350
case "ndt5":
382-
if err := CreateOrUpdateNDT5ResultRow(*project, "base_tables", "ndt5"); err != nil {
351+
if err := CreateOrUpdateNDT5ResultRowV2(*project, "raw_ndt", "ndt5"); err != nil {
383352
errCount++
384353
}
385-
if err := CreateOrUpdateNDT5ResultRow(*project, "batch", "ndt5"); err != nil {
354+
if err := CreateOrUpdateNDT5ResultRowV2(*project, "tmp_ndt", "ndt5"); err != nil {
386355
errCount++
387356
}
388357

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ require (
2828
github.com/m-lab/uuid-annotator v0.4.5
2929
github.com/prometheus/client_golang v1.12.1
3030
github.com/prometheus/client_model v0.2.0
31+
github.com/sirupsen/logrus v1.8.1
3132
github.com/valyala/gozstd v1.13.0
3233
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
3334
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
347347
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
348348
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
349349
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
350+
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
350351
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
351352
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
352353
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=

metrics/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ var (
407407
Name: "etl_row_json_size",
408408
Help: "Row json size distributions.",
409409
Buckets: []float64{
410+
1, // count empty files.
410411
100, 200, 400, 800, 1600, 3200, 6400, 10000, 20000,
411412
40000, 80000, 160000, 320000, 500000, 600000, 700000,
412413
800000, 900000, 1000000, 1200000, 1500000, 2000000, 5000000,

parser/ndt5_result.go

Lines changed: 106 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ package parser
33
// This file defines the Parser subtype that handles NDT5Result data.
44

55
import (
6-
"bytes"
76
"encoding/json"
87
"log"
98
"regexp"
109
"strings"
1110
"time"
1211

1312
"cloud.google.com/go/bigquery"
13+
"cloud.google.com/go/civil"
1414

1515
v2as "github.com/m-lab/annotation-service/api/v2"
1616

@@ -35,7 +35,7 @@ type NDT5ResultParser struct {
3535
func NewNDT5ResultParser(sink row.Sink, label, suffix string, ann v2as.Annotator) etl.Parser {
3636
bufSize := etl.NDT5.BQBufferSize()
3737
if ann == nil {
38-
ann = v2as.GetAnnotator(etl.BatchAnnotatorURL)
38+
ann = &nullAnnotator{}
3939
}
4040

4141
return &NDT5ResultParser{
@@ -73,7 +73,6 @@ func (dp *NDT5ResultParser) IsParsable(testName string, data []byte) (string, bo
7373

7474
// ParseAndInsert decodes the data.NDT5Result JSON and inserts it into BQ.
7575
func (dp *NDT5ResultParser) ParseAndInsert(meta map[string]bigquery.Value, testName string, test []byte) error {
76-
// TODO: derive 'ndt5' (or 'ndt7') labels from testName.
7776
metrics.WorkerState.WithLabelValues(dp.TableName(), "ndt5_result").Inc()
7877
defer metrics.WorkerState.WithLabelValues(dp.TableName(), "ndt5_result").Dec()
7978

@@ -84,44 +83,122 @@ func (dp *NDT5ResultParser) ParseAndInsert(meta map[string]bigquery.Value, testN
8483
// to 2019-08-26 (v0.12). For these tests the ClientMetadata will be empty.
8584
var re = regexp.MustCompile(`,"ClientMetadata":{[^}]+}`)
8685
test = []byte(re.ReplaceAllString(string(test), ``))
86+
if len(test) == 0 {
87+
// This is an empty test.
88+
// NOTE: We may wish to record these for full e2e accounting.
89+
metrics.RowSizeHistogram.WithLabelValues(dp.TableName()).Observe(float64(len(test)))
90+
return nil
91+
}
8792

88-
rdr := bytes.NewReader(test)
89-
dec := json.NewDecoder(rdr)
90-
91-
for dec.More() {
92-
stats := schema.NDT5ResultRow{
93-
TestID: testName,
94-
ParseInfo: &schema.ParseInfoV0{
95-
TaskFileName: meta["filename"].(string),
96-
ParseTime: time.Now(),
97-
ParserVersion: Version(),
98-
},
99-
}
100-
err := dec.Decode(&stats.Result)
101-
if err != nil {
102-
log.Println(err)
103-
metrics.TestTotal.WithLabelValues(
104-
dp.TableName(), "ndt5_result", "Decode").Inc()
93+
parser := schema.ParseInfo{
94+
Version: Version(),
95+
Time: time.Now(),
96+
ArchiveURL: meta["filename"].(string),
97+
Filename: testName,
98+
GitCommit: GitCommit(),
99+
}
100+
date := meta["date"].(civil.Date)
101+
102+
// Since ndt5 rows can include both download (S2C) and upload (C2S)
103+
// measurements (or neither), check and write independent rows for either
104+
// direction. This approach results in one row for upload, one row for
105+
// download just like the ndt7 data. The `Raw.Control` structure will be
106+
// shared when there are upload and download measurements on the same test.
107+
108+
// S2C
109+
result, err := dp.newResult(test, parser, date)
110+
if err != nil {
111+
metrics.TestTotal.WithLabelValues(dp.TableName(), "ndt5_result", "Decode").Inc()
112+
return err
113+
}
114+
if result.Raw.S2C != nil && result.Raw.S2C.UUID != "" {
115+
dp.prepareS2CRow(result)
116+
if err = dp.Base.Put(result); err != nil {
105117
return err
106118
}
119+
}
107120

108-
// Set the LogTime to the Result.StartTime
109-
stats.LogTime = stats.Result.StartTime.Unix()
110-
111-
// Estimate the row size based on the input JSON size.
112-
metrics.RowSizeHistogram.WithLabelValues(
113-
dp.TableName()).Observe(float64(len(test)))
121+
// C2S
122+
result, err = dp.newResult(test, parser, date)
123+
if err != nil {
124+
metrics.TestTotal.WithLabelValues(dp.TableName(), "ndt5_result", "Decode").Inc()
125+
return err
126+
}
127+
if result.Raw.C2S != nil && result.Raw.C2S.UUID != "" {
128+
dp.prepareC2SRow(result)
129+
if err = dp.Base.Put(result); err != nil {
130+
return err
131+
}
132+
}
114133

115-
if err = dp.Base.Put(&stats); err != nil {
134+
// Neither C2S nor S2C
135+
result, err = dp.newResult(test, parser, date)
136+
if err != nil {
137+
metrics.TestTotal.WithLabelValues(dp.TableName(), "ndt5_result", "Decode").Inc()
138+
return err
139+
}
140+
if result.Raw.C2S == nil && result.Raw.S2C == nil {
141+
result.ID = result.Raw.Control.UUID
142+
result.A = nil // nothing to summarize.
143+
if err = dp.Base.Put(result); err != nil {
116144
return err
117145
}
118-
// Count successful inserts.
119-
metrics.TestTotal.WithLabelValues(dp.TableName(), "ndt5_result", "ok").Inc()
120146
}
121147

148+
// Estimate the row size based on the input JSON size.
149+
metrics.RowSizeHistogram.WithLabelValues(dp.TableName()).Observe(float64(len(test)))
150+
151+
// Count successful inserts.
152+
metrics.TestTotal.WithLabelValues(dp.TableName(), "ndt5_result", "ok").Inc()
122153
return nil
123154
}
124155

156+
func (dp *NDT5ResultParser) newResult(test []byte, parser schema.ParseInfo, date civil.Date) (*schema.NDT5ResultRowV2, error) {
157+
result := &schema.NDT5ResultRowV2{
158+
Parser: parser,
159+
Date: date,
160+
}
161+
err := json.Unmarshal(test, &result.Raw)
162+
if err != nil {
163+
return nil, err
164+
}
165+
return result, nil
166+
}
167+
168+
func (dp *NDT5ResultParser) prepareS2CRow(row *schema.NDT5ResultRowV2) {
169+
// Record S2C result.
170+
s2c := row.Raw.S2C
171+
row.ID = s2c.UUID
172+
row.A = &schema.NDT5Summary{
173+
UUID: s2c.UUID,
174+
TestTime: s2c.StartTime,
175+
MeanThroughputMbps: s2c.MeanThroughputMbps,
176+
CongestionControl: "cubic",
177+
MinRTT: float64(s2c.MinRTT) / float64(time.Millisecond),
178+
}
179+
// NOTE: the TCPInfo structure was introduced in v0.18.0. Measurements
180+
// from earlier versions will not have values in the TCPInfo struct here.
181+
if s2c.TCPInfo != nil && s2c.TCPInfo.BytesSent > 0 {
182+
row.A.LossRate = float64(s2c.TCPInfo.BytesRetrans) / float64(s2c.TCPInfo.BytesSent)
183+
}
184+
row.Raw.C2S = nil
185+
}
186+
187+
func (dp *NDT5ResultParser) prepareC2SRow(row *schema.NDT5ResultRowV2) {
188+
// Record C2S result.
189+
c2s := row.Raw.C2S
190+
row.ID = c2s.UUID
191+
row.A = &schema.NDT5Summary{
192+
UUID: c2s.UUID,
193+
TestTime: c2s.StartTime,
194+
MeanThroughputMbps: c2s.MeanThroughputMbps,
195+
CongestionControl: "unknown",
196+
MinRTT: -1, // unknown.
197+
LossRate: -1, // unknown.
198+
}
199+
row.Raw.S2C = nil
200+
}
201+
125202
// NB: These functions are also required to complete the etl.Parser interface.
126203
// For NDT5Result, we just forward the calls to the Inserter.
127204

0 commit comments

Comments
 (0)