Skip to content

Commit 7dfac2f

Browse files
Complete unit tests for ServerX and ClientX records in tcpinfo and pt parsers (#1007)
* Move fakeAnnotator to common location * Add unit tests for ServerX, ClientX and Site and Machine fields * Verify client and server annotations
1 parent 54246ad commit 7dfac2f

File tree

7 files changed

+256
-26
lines changed

7 files changed

+256
-26
lines changed

parser/ndt.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,7 @@ func (n *NDTParser) getAndInsertValues(test *fileInfoAndData, testType string) {
610610
deltaFieldCount, test.fn, n.taskFileName)
611611
}
612612

613+
// ArchiveURL must already be valid, so error is safe to ignore.
613614
dp, _ := etl.ValidateTestPath(results["task_filename"].(string))
614615
connSpec.Get("ServerX")["Site"] = dp.Site
615616
connSpec.Get("ServerX")["Machine"] = dp.Host

parser/parser_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33
package parser_test
44

55
import (
6+
"context"
67
"fmt"
78
"log"
89
"os"
910
"testing"
11+
"time"
1012

1113
"cloud.google.com/go/bigquery"
14+
"github.com/m-lab/annotation-service/api"
15+
v2 "github.com/m-lab/annotation-service/api/v2"
1216
"github.com/m-lab/etl/bq"
1317
"github.com/m-lab/etl/etl"
1418
"github.com/m-lab/etl/metrics"
@@ -48,6 +52,20 @@ func (ti *countingInserter) Flush() error {
4852
return nil
4953
}
5054

55+
// newFakeAnnotator creates a new annotator that injects the given annotation
56+
// responses for unit testing.
57+
func newFakeAnnotator(ann map[string]*api.Annotations) *fakeAnnotator {
58+
return &fakeAnnotator{ann: ann}
59+
}
60+
61+
type fakeAnnotator struct {
62+
ann map[string]*api.Annotations
63+
}
64+
65+
func (ann *fakeAnnotator) GetAnnotations(ctx context.Context, date time.Time, ips []string, info ...string) (*v2.Response, error) {
66+
return &v2.Response{AnnotatorDate: time.Now(), Annotations: ann.ann}, nil
67+
}
68+
5169
func TestNormalizeIP(t *testing.T) {
5270
tests := []struct {
5371
name string

parser/pt.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ func (pt *PTParser) InsertOneTest(oneTest cachedPTData) {
498498
Destination: oneTest.Destination,
499499
Hop: oneTest.Hops,
500500
}
501+
// ArchiveURL must already be valid, so error is safe to ignore.
501502
dp, _ := etl.ValidateTestPath(pt.taskFileName)
502503
ptTest.ServerX.Site = dp.Site
503504
ptTest.ServerX.Machine = dp.Host
@@ -568,7 +569,6 @@ func (pt *PTParser) ParseAndInsert(meta map[string]bigquery.Value, testName stri
568569
// Process json output from traceroute-caller
569570
if strings.HasSuffix(testName, ".json") {
570571
ptTest, err := ParsePT(testName, rawContent, pt.TableName(), pt.taskFileName)
571-
572572
if err == nil {
573573
err := pt.AddRow(&ptTest)
574574
if err == etl.ErrBufferFull {
@@ -587,6 +587,11 @@ func (pt *PTParser) ParseAndInsert(meta map[string]bigquery.Value, testName stri
587587
if strings.HasSuffix(testName, ".jsonl") {
588588
ptTest, err := ParseJSONL(testName, rawContent, pt.TableName(), pt.taskFileName)
589589
if err == nil {
590+
// ArchiveURL must already be valid, so error is safe to ignore.
591+
dp, _ := etl.ValidateTestPath(pt.taskFileName)
592+
ptTest.ServerX.Site = dp.Site
593+
ptTest.ServerX.Machine = dp.Host
594+
590595
err := pt.AddRow(&ptTest)
591596
if err == etl.ErrBufferFull {
592597
// Flush asynchronously, to improve throughput.

parser/pt_test.go

Lines changed: 169 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
"time"
1010

1111
"cloud.google.com/go/bigquery"
12+
"github.com/go-test/deep"
13+
"github.com/m-lab/annotation-service/api"
14+
v2 "github.com/m-lab/annotation-service/api/v2"
1215
"github.com/m-lab/etl/parser"
1316
"github.com/m-lab/etl/schema"
1417
)
@@ -252,18 +255,83 @@ func TestParseLegacyFormatData(t *testing.T) {
252255
}
253256

254257
func TestParseJSONL(t *testing.T) {
255-
rawData, err := ioutil.ReadFile("testdata/PT/20190927T070859Z_ndt-qtfh8_1565996043_0000000000003B64.jsonl")
258+
m := map[string]*api.Annotations{
259+
"91.213.30.229": &api.Annotations{
260+
Geo: &api.GeolocationIP{
261+
ContinentCode: "NA",
262+
CountryCode: "US",
263+
Latitude: 1.0,
264+
Longitude: 2.0,
265+
},
266+
Network: &api.ASData{
267+
ASNumber: 1234,
268+
Systems: []api.System{
269+
{ASNs: []uint32{1234}},
270+
},
271+
},
272+
},
273+
"91.169.126.135": &api.Annotations{
274+
Geo: &api.GeolocationIP{
275+
ContinentCode: "EU",
276+
CountryCode: "DE",
277+
Latitude: 3.0,
278+
Longitude: 4.0,
279+
},
280+
Network: &api.ASData{
281+
ASNumber: 4321,
282+
Systems: []api.System{
283+
{ASNs: []uint32{4321}},
284+
},
285+
},
286+
},
287+
}
288+
ins := newInMemoryInserter()
289+
pt := parser.NewPTParser(ins, newFakeAnnotator(m))
290+
291+
filename := "testdata/PT/20190927T070859Z_ndt-qtfh8_1565996043_0000000000003B64.jsonl"
292+
rawData, err := ioutil.ReadFile(filename)
256293
if err != nil {
257294
t.Fatalf(err.Error())
258295
return
259296
}
260-
ptTest, err := parser.ParseJSONL("20190927T070859Z_ndt-qtfh8_1565996043_0000000000003B64.jsonl", []byte(rawData), "", "")
297+
298+
url := "gs://archive-measurement-lab/ndt/traceroute/2019/09/27/20190927T000540.410989Z-traceroute-mlab2-nuq07-ndt.tgz"
299+
meta := map[string]bigquery.Value{"filename": url}
300+
err = pt.ParseAndInsert(meta, filename, rawData)
261301
if err != nil {
262302
t.Fatalf(err.Error())
263303
}
264304

305+
if pt.NumRowsForTest() != 1 {
306+
fmt.Println(pt.NumRowsForTest())
307+
t.Fatalf("The data is not inserted, in buffer now.")
308+
}
309+
pt.Flush()
310+
311+
if len(ins.data) != 1 {
312+
fmt.Println(len(ins.data))
313+
t.Fatalf("Number of rows in inserter is wrong.")
314+
}
315+
316+
ptTest := ins.data[0].(*schema.PTTest)
317+
if ptTest.Parseinfo.TaskFileName != url {
318+
t.Fatalf("Wrong TaskFilenName; got %q, want %q", ptTest.Parseinfo.TaskFileName, url)
319+
}
320+
265321
if ptTest.UUID != "ndt-qtfh8_1565996043_0000000000003B64" {
266-
t.Fatalf("UUID parsing error %s", ptTest.UUID)
322+
t.Fatalf("Wrong UUID; got %q, want %q", ptTest.UUID, "ndt-qtfh8_1565996043_0000000000003B64")
323+
}
324+
325+
// Verify the client and server annotations match.
326+
cx := v2.ConvertAnnotationsToClientAnnotations(m["91.169.126.135"])
327+
if diff := deep.Equal(&ptTest.ClientX, cx); diff != nil {
328+
t.Errorf("ClientX annotation does not match; %#v", diff)
329+
}
330+
sx := v2.ConvertAnnotationsToServerAnnotations(m["91.213.30.229"])
331+
sx.Site = "nuq07"
332+
sx.Machine = "mlab2"
333+
if diff := deep.Equal(&ptTest.ServerX, sx); diff != nil {
334+
t.Errorf("ServerX annotation does not match; %#v", diff)
267335
}
268336
}
269337

@@ -320,12 +388,43 @@ func TestParse(t *testing.T) {
320388

321389
func TestAnnotateAndPutAsync(t *testing.T) {
322390
ins := newInMemoryInserter()
323-
pt := parser.NewPTParser(ins, &fakeAnnotator{})
391+
m := map[string]*api.Annotations{
392+
"172.17.94.34": &api.Annotations{
393+
Geo: &api.GeolocationIP{
394+
ContinentCode: "NA",
395+
CountryCode: "US",
396+
Latitude: 1.0,
397+
Longitude: 2.0,
398+
},
399+
Network: &api.ASData{
400+
ASNumber: 1234,
401+
Systems: []api.System{
402+
{ASNs: []uint32{1234}},
403+
},
404+
},
405+
},
406+
"74.125.224.100": &api.Annotations{
407+
Geo: &api.GeolocationIP{
408+
ContinentCode: "EU",
409+
CountryCode: "DE",
410+
Latitude: 3.0,
411+
Longitude: 4.0,
412+
},
413+
Network: &api.ASData{
414+
ASNumber: 4321,
415+
Systems: []api.System{
416+
{ASNs: []uint32{4321}},
417+
},
418+
},
419+
},
420+
}
421+
pt := parser.NewPTParser(ins, newFakeAnnotator(m))
324422
rawData, err := ioutil.ReadFile("testdata/PT/20170320T23:53:10Z-172.17.94.34-33456-74.125.224.100-33457.paris")
325423
if err != nil {
326424
t.Fatalf("cannot read testdata.")
327425
}
328-
meta := map[string]bigquery.Value{"filename": "gs://fake-bucket/fake-archive.tgz"}
426+
url := "gs://archive-measurement-lab/ndt/traceroute/2017/03/20/20170320T000540.410989Z-paris-traceroute-mlab4-nuq07-ndt.tgz"
427+
meta := map[string]bigquery.Value{"filename": url}
329428
err = pt.ParseAndInsert(meta, "testdata/PT/20170320T23:53:10Z-172.17.94.34-33456-74.125.224.100-33457.paris", rawData)
330429
if err != nil {
331430
t.Fatalf(err.Error())
@@ -336,24 +435,70 @@ func TestAnnotateAndPutAsync(t *testing.T) {
336435
t.Fatalf("Number of rows in PT table is wrong.")
337436
}
338437
pt.AnnotateAndPutAsync("traceroute")
339-
//pt.Inserter.Flush()
438+
340439
if len(ins.data) != 1 {
341440
fmt.Println(len(ins.data))
342441
t.Fatalf("Number of rows in inserter is wrong.")
343442
}
344-
if ins.data[0].(*schema.PTTest).Parseinfo.TaskFileName != "gs://fake-bucket/fake-archive.tgz" {
345-
t.Fatalf("Task filename is wrong.")
443+
ptTest := ins.data[0].(*schema.PTTest)
444+
if ptTest.Parseinfo.TaskFileName != url {
445+
t.Fatalf("Wrong TaskFilenName; got %q, want %q", ptTest.Parseinfo.TaskFileName, url)
446+
}
447+
448+
// Verify the client and server annotations match.
449+
cx := v2.ConvertAnnotationsToClientAnnotations(m["74.125.224.100"])
450+
if diff := deep.Equal(&ptTest.ClientX, cx); diff != nil {
451+
t.Errorf("ClientX annotation does not match; %#v", diff)
452+
}
453+
sx := v2.ConvertAnnotationsToServerAnnotations(m["172.17.94.34"])
454+
sx.Site = "nuq07"
455+
sx.Machine = "mlab4"
456+
if diff := deep.Equal(&ptTest.ServerX, sx); diff != nil {
457+
t.Errorf("ServerX annotation does not match; %#v", diff)
346458
}
347459
}
348460

349461
func TestParseAndInsert(t *testing.T) {
462+
463+
m := map[string]*api.Annotations{
464+
"2.80.132.33": &api.Annotations{
465+
Geo: &api.GeolocationIP{
466+
ContinentCode: "NA",
467+
CountryCode: "US",
468+
Latitude: 1.0,
469+
Longitude: 2.0,
470+
},
471+
Network: &api.ASData{
472+
ASNumber: 1234,
473+
Systems: []api.System{
474+
{ASNs: []uint32{1234}},
475+
},
476+
},
477+
},
478+
"91.239.96.102": &api.Annotations{
479+
Geo: &api.GeolocationIP{
480+
ContinentCode: "NA",
481+
CountryCode: "US",
482+
Latitude: 1.0,
483+
Longitude: 2.0,
484+
},
485+
Network: &api.ASData{
486+
ASNumber: 1234,
487+
Systems: []api.System{
488+
{ASNs: []uint32{1234}},
489+
},
490+
},
491+
},
492+
}
493+
350494
ins := newInMemoryInserter()
351-
pt := parser.NewPTParser(ins, &fakeAnnotator{})
495+
pt := parser.NewPTParser(ins, newFakeAnnotator(m))
352496
rawData, err := ioutil.ReadFile("testdata/PT/20130524T00:04:44Z_ALL5729.paris")
353497
if err != nil {
354498
t.Fatalf("cannot read testdata.")
355499
}
356-
meta := map[string]bigquery.Value{"filename": "gs://fake-bucket/fake-archive.tgz"}
500+
url := "gs://archive-measurement-lab/paris-traceroute/2013/05/24/20130524T000000Z-mlab3-akl01-paris-traceroute-0000.tgz"
501+
meta := map[string]bigquery.Value{"filename": url}
357502
err = pt.ParseAndInsert(meta, "testdata/PT/20130524T00:04:44Z_ALL5729.paris", rawData)
358503
if err != nil {
359504
t.Fatalf(err.Error())
@@ -369,13 +514,26 @@ func TestParseAndInsert(t *testing.T) {
369514
fmt.Println(len(ins.data))
370515
t.Fatalf("Number of rows in inserter is wrong.")
371516
}
372-
if ins.data[0].(*schema.PTTest).Parseinfo.TaskFileName != "gs://fake-bucket/fake-archive.tgz" {
517+
if ins.data[0].(*schema.PTTest).Parseinfo.TaskFileName != url {
373518
t.Fatalf("Task filename is wrong.")
374519
}
375520
// echo -n 2013-05-24T00:04:44Z-91.239.96.102-2.80.132.33 | openssl dgst -binary -md5 | base64 | tr '/+' '_-' | tr -d '='
376521
if ins.data[0].(*schema.PTTest).UUID != "R9_wGx1-cSmqtSAt5aQtNg" {
377522
t.Fatalf("UUID is wrong; got %q, want %q", ins.data[0].(*schema.PTTest).UUID, "R9_wGx1-cSmqtSAt5aQtNg")
378523
}
524+
p := ins.data[0].(*schema.PTTest)
525+
526+
// Verify the client and server annotations match.
527+
cx := v2.ConvertAnnotationsToClientAnnotations(m["91.239.96.102"])
528+
if diff := deep.Equal(&p.ClientX, cx); diff != nil {
529+
t.Errorf("ClientX annotation does not match; %#v", diff)
530+
}
531+
sx := v2.ConvertAnnotationsToServerAnnotations(m["2.80.132.33"])
532+
sx.Site = "akl01"
533+
sx.Machine = "mlab3"
534+
if diff := deep.Equal(&p.ServerX, sx); diff != nil {
535+
t.Errorf("ServerX annotation does not match; %#v", diff)
536+
}
379537
}
380538

381539
func TestProcessLastTests(t *testing.T) {

parser/ss.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ func (ss *SSParser) ParseAndInsert(meta map[string]bigquery.Value, testName stri
294294
ssTest.TaskFileName = meta["filename"].(string)
295295
}
296296

297+
// ArchiveURL must already be valid, so error is safe to ignore.
297298
dp, _ := etl.ValidateTestPath(ssTest.TaskFileName)
298299
ssTest.Web100_log_entry.Connection_spec.ServerX.Site = dp.Site
299300
ssTest.Web100_log_entry.Connection_spec.ServerX.Machine = dp.Host

parser/tcpinfo.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ func (p *TCPInfoParser) ParseAndInsert(fileMetadata map[string]bigquery.Value, t
192192
// TODO - should populate other ServerInfo fields from siteinfo API.
193193
}
194194
}
195+
// ArchiveURL must already be valid, so error is safe to ignore.
195196
dp, _ := etl.ValidateTestPath(fileMetadata["filename"].(string))
196197
row.ServerX.Site = dp.Site
197198
row.ServerX.Machine = dp.Host

0 commit comments

Comments
 (0)