Skip to content

Commit 05315ee

Browse files
Add parser for scamper1 datatype (#1021)
* Add parser for scamper1 datatype * Add json tag to HopID field * Small wording fix * address review comments
1 parent 6da5a5e commit 05315ee

File tree

10 files changed

+513
-3
lines changed

10 files changed

+513
-3
lines changed

etl/globals.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ const (
263263
SS = DataType("sidestream")
264264
PCAP = DataType("pcap")
265265
PT = DataType("traceroute")
266+
SCAMPER1 = DataType("scamper1")
266267
SW = DataType("switch")
267268
TCPINFO = DataType("tcpinfo")
268269
INVALID = DataType("invalid")
@@ -280,6 +281,7 @@ var (
280281
"sidestream": SS,
281282
"paris-traceroute": PT,
282283
"pcap": PCAP,
284+
"scamper1": SCAMPER1,
283285
"switch": SW,
284286
"tcpinfo": TCPINFO,
285287
"traceroute": PT,
@@ -294,6 +296,7 @@ var (
294296
SS: "sidestream",
295297
PCAP: "pcap",
296298
PT: "traceroute",
299+
SCAMPER1: "scamper1",
297300
SW: "switch",
298301
TCPINFO: "tcpinfo",
299302
NDT5: "ndt5",
@@ -315,6 +318,7 @@ var (
315318
SS: 500, // Average json size is 2.5K
316319
PCAP: 200,
317320
PT: 20,
321+
SCAMPER1: 200,
318322
SW: 100,
319323
NDT5: 200,
320324
NDT7: 200,

etl/globals_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,16 @@ func TestValidateTestPath(t *testing.T) {
127127
"archive-measurement-lab", "ndt", "pcap", "2021/07/22", "20210722", "000107.470279", "pcap", "mlab1", "dfw05", "ndt", "", "", ".tgz",
128128
},
129129
},
130+
{
131+
name: "scamper1-tgz",
132+
path: `gs://archive-measurement-lab/ndt/scamper1/2021/09/08/20210908T215656.886052Z-scamper1-mlab3-bog03-ndt.tgz`,
133+
wantType: etl.SCAMPER1,
134+
want: etl.DataPath{
135+
`gs://archive-measurement-lab/ndt/scamper1/2021/09/08/20210908T215656.886052Z-scamper1-mlab3-bog03-ndt.tgz`,
136+
`ndt/scamper1/2021/09/08/20210908T215656.886052Z-scamper1-mlab3-bog03-ndt.tgz`,
137+
"archive-measurement-lab", "ndt", "scamper1", "2021/09/08", "20210908", "215656.886052", "scamper1", "mlab3", "bog03", "ndt", "", "", ".tgz",
138+
},
139+
},
130140
}
131141
for _, tt := range tests {
132142
t.Run(tt.name, func(t *testing.T) {

parser/disco.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// Package parser defines the Parser interface and implementations for the different
2-
// test types, NDT, Paris Traceroute, and SideStream.
2+
// data types.
33
package parser
44

55
// This file defines the Parser subtype that handles DISCO data.

parser/parser.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// Package parser defines the Parser interface and implementations for the different
2-
// test types, NDT, Paris Traceroute, and SideStream.
2+
// data types.
33
package parser
44

55
import (
@@ -94,6 +94,8 @@ func NewSinkParser(dt etl.DataType, sink row.Sink, table string, ann api.Annotat
9494
return NewTCPInfoParser(sink, table, "", ann)
9595
case etl.PCAP:
9696
return NewPCAPParser(sink, table, "", ann)
97+
case etl.SCAMPER1:
98+
return NewScamper1Parser(sink, table, "", ann)
9799
default:
98100
return nil
99101
}

parser/scamper1.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package parser
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
"time"
7+
8+
"cloud.google.com/go/bigquery"
9+
"cloud.google.com/go/civil"
10+
v2as "github.com/m-lab/annotation-service/api/v2"
11+
"github.com/m-lab/etl/etl"
12+
"github.com/m-lab/etl/metrics"
13+
"github.com/m-lab/etl/row"
14+
"github.com/m-lab/etl/schema"
15+
"github.com/m-lab/traceroute-caller/parser"
16+
)
17+
18+
//=====================================================================================
19+
// scamper1 Parser
20+
//=====================================================================================
21+
22+
const (
23+
scamper1 = "scamper1"
24+
)
25+
26+
// Scamper1Parser handles parsing for the scamper1 datatype.
27+
type Scamper1Parser struct {
28+
*row.Base
29+
table string
30+
suffix string
31+
}
32+
33+
// NewScamper1Parser returns a new parser for the scamper1 archives.
34+
func NewScamper1Parser(sink row.Sink, table, suffix string, ann v2as.Annotator) etl.Parser {
35+
bufSize := etl.SCAMPER1.BQBufferSize()
36+
if ann == nil {
37+
ann = v2as.GetAnnotator(etl.BatchAnnotatorURL)
38+
}
39+
40+
return &Scamper1Parser{
41+
Base: row.NewBase(table, sink, bufSize, ann),
42+
table: table,
43+
suffix: suffix,
44+
}
45+
}
46+
47+
// GetTraceStartDate extracts the date portion of the cycle-start timestamp in YYYYMMDD format.
48+
func GetTraceStartDate(cycleStartTime float64) string {
49+
traceStartTime := time.Unix(int64(cycleStartTime), 0).UTC()
50+
date := traceStartTime.Format("20060102")
51+
return date
52+
}
53+
54+
// parseTracelb parses the TracelbLine struct defined in traceroute-caller and populates the BQTracelbLine.
55+
func parseTracelb(bqScamperOutput *schema.BQScamperOutput, tracelb parser.TracelbLine) {
56+
bqScamperOutput.Tracelb = schema.BQTracelbLine{
57+
Type: tracelb.Type,
58+
Version: tracelb.Version,
59+
Userid: tracelb.Userid,
60+
Method: tracelb.Method,
61+
Src: tracelb.Src,
62+
Dst: tracelb.Dst,
63+
Start: tracelb.Start,
64+
ProbeSize: tracelb.ProbeSize,
65+
Firsthop: tracelb.Firsthop,
66+
Attempts: tracelb.Attempts,
67+
Confidence: tracelb.Confidence,
68+
Tos: tracelb.Tos,
69+
Gaplimit: tracelb.Gaplint,
70+
WaitTimeout: tracelb.WaitTimeout,
71+
WaitProbe: tracelb.WaitProbe,
72+
Probec: tracelb.Probec,
73+
ProbecMax: tracelb.ProbecMax,
74+
Nodec: tracelb.Nodec,
75+
Linkc: tracelb.Linkc,
76+
}
77+
78+
nodes := tracelb.Nodes
79+
bqScamperOutput.Tracelb.Nodes = make([]schema.BQScamperNode, 0, len(nodes))
80+
date := GetTraceStartDate(bqScamperOutput.CycleStart.StartTime)
81+
hostname := bqScamperOutput.CycleStart.Hostname
82+
83+
for _, node := range nodes {
84+
bqLinkArray := make([]schema.BQScamperLinkArray, 0, len(node.Links))
85+
for _, link := range node.Links {
86+
bqLinks := schema.BQScamperLinkArray{}
87+
bqLinks.Links = make([]parser.ScamperLink, len(link))
88+
copy(bqLinks.Links, link)
89+
bqLinkArray = append(bqLinkArray, bqLinks)
90+
}
91+
92+
bqScamperNode := schema.BQScamperNode{
93+
HopID: fmt.Sprintf("%s_%s_%s", date, hostname, node.Addr),
94+
Addr: node.Addr,
95+
Name: node.Name,
96+
QTTL: node.QTTL,
97+
Linkc: node.Linkc,
98+
Links: bqLinkArray,
99+
}
100+
bqScamperOutput.Tracelb.Nodes = append(bqScamperOutput.Tracelb.Nodes, bqScamperNode)
101+
}
102+
}
103+
104+
// IsParsable returns the canonical test type and whether to parse data.
105+
func (p *Scamper1Parser) IsParsable(testName string, data []byte) (string, bool) {
106+
if strings.HasSuffix(testName, "jsonl") {
107+
return scamper1, true
108+
}
109+
return "", false
110+
}
111+
112+
// ParseAndInsert decodes the scamper1 data and inserts it into BQ.
113+
func (p *Scamper1Parser) ParseAndInsert(fileMetadata map[string]bigquery.Value, testName string, rawContent []byte) error {
114+
metrics.WorkerState.WithLabelValues(p.TableName(), scamper1).Inc()
115+
defer metrics.WorkerState.WithLabelValues(p.TableName(), scamper1).Dec()
116+
117+
scamperOutput, err := parser.ParseTraceroute(rawContent)
118+
if err != nil {
119+
return fmt.Errorf("failed to parse scamper1 file: %v", err)
120+
}
121+
122+
bqScamperOutput := schema.BQScamperOutput{
123+
Metadata: scamperOutput.Metadata,
124+
CycleStart: scamperOutput.CycleStart,
125+
CycleStop: scamperOutput.CycleStop,
126+
}
127+
parseTracelb(&bqScamperOutput, scamperOutput.Tracelb)
128+
129+
parseInfo := schema.ParseInfo{
130+
Version: Version(),
131+
Time: time.Now(),
132+
ArchiveURL: fileMetadata["filename"].(string),
133+
Filename: testName,
134+
GitCommit: GitCommit(),
135+
}
136+
137+
row := schema.Scamper1Row{
138+
ID: bqScamperOutput.Metadata.UUID,
139+
Parser: parseInfo,
140+
Date: fileMetadata["date"].(civil.Date),
141+
Raw: bqScamperOutput,
142+
}
143+
144+
// Insert the row.
145+
if err := p.Put(&row); err != nil {
146+
return err
147+
}
148+
149+
// Count successful inserts.
150+
metrics.TestCount.WithLabelValues(p.TableName(), scamper1, "ok").Inc()
151+
152+
return nil
153+
}
154+
155+
// NB: These functions are also required to complete the etl.Parser interface
156+
// For scamper1, we just forward the calls to the Inserter.
157+
158+
// Flush flushes any pending rows.
159+
func (p *Scamper1Parser) Flush() error {
160+
return p.Base.Flush()
161+
}
162+
163+
// TableName of the table that this Parser inserts into.
164+
// Used for metrics and logging.
165+
func (p *Scamper1Parser) TableName() string {
166+
return p.table
167+
}
168+
169+
// FullTableName of the BQ table that the uploader pushes to,
170+
// including $YYYYMMNN, or _YYYYMMNN.
171+
func (p *Scamper1Parser) FullTableName() string {
172+
return p.table + p.suffix
173+
}
174+
175+
// RowsInBuffer returns the count of rows currently in the buffer.
176+
func (p *Scamper1Parser) RowsInBuffer() int {
177+
return p.GetStats().Pending
178+
}
179+
180+
// Committed returns the count of rows successfully committed to BQ.
181+
func (p *Scamper1Parser) Committed() int {
182+
return p.GetStats().Committed
183+
}
184+
185+
// Accepted returns the count of all rows received through InsertRow(s).
186+
func (p *Scamper1Parser) Accepted() int {
187+
return p.GetStats().Total()
188+
}
189+
190+
// Failed returns the count of all rows that could not be committed.
191+
func (p *Scamper1Parser) Failed() int {
192+
return p.GetStats().Failed
193+
}

0 commit comments

Comments
 (0)