Skip to content

Commit 1a3523d

Browse files
authored
First step in expanding pcap parser (#1031)
* add panic handling for active polling mode * add top level PCAP packet parsing * simplify very noisy FileCount metric label * add packet decoding benchmark, reorg pcap.go, add metrics * improve test coverage * improve slice allocation * go mod tidy * various PR comment fixes * revert to len(data)/1500
1 parent 781c321 commit 1a3523d

File tree

11 files changed

+292
-11
lines changed

11 files changed

+292
-11
lines changed

active/poller.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/prometheus/client_golang/prometheus"
1717
"github.com/prometheus/client_golang/prometheus/promauto"
1818
"golang.org/x/sync/errgroup"
19+
"google.golang.org/api/iterator"
1920
"google.golang.org/api/option"
2021

2122
job "github.com/m-lab/etl-gardener/client"
@@ -95,13 +96,19 @@ func postAndIgnoreResponse(ctx context.Context, url url.URL) error {
9596
// or the context is canceled.
9697
func (g *GardenerAPI) RunAll(ctx context.Context, rSrc RunnableSource, job tracker.Job) (*errgroup.Group, error) {
9798
eg := &errgroup.Group{}
99+
count := 0
98100
for {
99101
run, err := rSrc.Next(ctx)
100102
if err != nil {
101-
metrics.BackendFailureCount.WithLabelValues(
102-
job.Datatype, "rSrc.Next").Inc()
103-
log.Println(err)
104-
return eg, err
103+
if err == iterator.Done {
104+
debug.Printf("Dispatched total of %d archives for %s\n", count, job.String())
105+
return eg, nil
106+
} else {
107+
metrics.BackendFailureCount.WithLabelValues(
108+
job.Datatype, "rSrc.Next").Inc()
109+
log.Println(err, "processing", job.String())
110+
return eg, err
111+
}
105112
}
106113

107114
heartbeat := tracker.HeartbeatURL(g.trackerBase, job)
@@ -111,20 +118,28 @@ func (g *GardenerAPI) RunAll(ctx context.Context, rSrc RunnableSource, job track
111118

112119
debug.Println("Starting func")
113120

114-
f := func() error {
121+
f := func() (err error) {
115122
metrics.ActiveTasks.WithLabelValues(rSrc.Label()).Inc()
116123
defer metrics.ActiveTasks.WithLabelValues(rSrc.Label()).Dec()
117124

118-
err := run.Run(ctx)
125+
// Capture any panic and convert it to an error.
126+
defer func(tag string) {
127+
if err2 := metrics.PanicToErr(err, recover(), "Runall.f: "+tag); err2 != nil {
128+
err = err2
129+
}
130+
}(run.Info())
131+
132+
err = run.Run(ctx)
119133
if err == nil {
120134
update := tracker.UpdateURL(g.trackerBase, job, tracker.Parsing, run.Info())
121135
if postErr := postAndIgnoreResponse(ctx, *update); postErr != nil {
122136
log.Println(postErr, "on update for", job.Path())
123137
}
124138
}
125-
return err
139+
return
126140
}
127141

142+
count++
128143
eg.Go(f)
129144
}
130145
}
@@ -171,12 +186,14 @@ func (g *GardenerAPI) pollAndRun(ctx context.Context,
171186
toRunnable func(o *storage.ObjectAttrs) Runnable, tokens TokenSource) error {
172187
job, err := g.NextJob(ctx)
173188
if err != nil {
189+
log.Println(err, "on Gardener client.NextJob()")
174190
return err
175191
}
176192

177193
log.Println(job, "filter:", job.Filter)
178194
gcsSource, err := g.JobFileSource(ctx, job.Job, toRunnable)
179195
if err != nil {
196+
log.Println(err, "on JobFileSource")
180197
return err
181198
}
182199
src := Throttle(gcsSource, tokens)
@@ -189,13 +206,20 @@ func (g *GardenerAPI) pollAndRun(ctx context.Context,
189206
}
190207

191208
eg, err := g.RunAll(ctx, src, job.Job)
209+
if err != nil {
210+
log.Println(err)
211+
}
192212

193213
// Once all are dispatched, we want to wait until all have completed
194214
// before posting the state change.
195215
go func() {
196216
log.Println("all tasks dispatched for", job.Path())
197-
eg.Wait()
198-
log.Println("finished", job.Path())
217+
err := eg.Wait()
218+
if err != nil {
219+
log.Println(err, "on wait for", job.Path())
220+
} else {
221+
log.Println("finished", job.Path())
222+
}
199223
update := tracker.UpdateURL(g.trackerBase, job.Job, tracker.ParseComplete, "")
200224
// TODO - should this have a retry?
201225
if postErr := postAndIgnoreResponse(ctx, *update); postErr != nil {

active/poller_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,18 @@ import (
1212

1313
"github.com/m-lab/etl-gardener/tracker"
1414
"github.com/m-lab/etl/active"
15+
"github.com/m-lab/go/logx"
1516
"github.com/m-lab/go/rtx"
1617
"google.golang.org/api/iterator"
1718
)
1819

20+
func init() {
21+
// Always prepend the filename and line number.
22+
log.SetFlags(log.LstdFlags | log.Lshortfile)
23+
logx.LogxDebug.Set("true")
24+
logx.Debug.SetFlags(log.LstdFlags | log.Lshortfile)
25+
}
26+
1927
type fakeGardener struct {
2028
t *testing.T // for logging
2129

@@ -129,7 +137,7 @@ func TestGardenerAPI_RunAll(t *testing.T) {
129137
src, err := g.JobFileSource(ctx, job.Job, p.toRunnable)
130138
rtx.Must(err, "file source")
131139
eg, err := g.RunAll(ctx, src, job.Job)
132-
if err != iterator.Done {
140+
if err != nil {
133141
t.Fatal(err)
134142
}
135143
err = eg.Wait()

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/go-test/deep v1.0.7
1515
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
1616
github.com/google/go-jsonnet v0.17.0
17+
github.com/google/gopacket v1.1.19
1718
github.com/google/uuid v1.3.0 // indirect
1819
github.com/googleapis/google-cloud-go-testing v0.0.0-20210719221736-1c9a4c676720
1920
github.com/kr/pretty v0.2.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
184184
github.com/google/go-jsonnet v0.17.0 h1:/9NIEfhK1NQRKl3sP2536b2+x5HnZMdql7x3yK/l8JY=
185185
github.com/google/go-jsonnet v0.17.0/go.mod h1:sOcuej3UW1vpPTZOr8L7RQimqai1a57bt5j22LzGZCw=
186186
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
187+
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
188+
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
187189
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
188190
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
189191
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=

metrics/metrics.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,36 @@ var (
582582
},
583583
[]string{"table", "kind", "group"},
584584
)
585+
586+
PcapPacketCount = promauto.NewHistogramVec(
587+
prometheus.HistogramOpts{
588+
Name: "etl_pcap_packet_count",
589+
Help: "Distribution of PCAP packet counts",
590+
Buckets: []float64{
591+
1, 2, 3, 5,
592+
10, 18, 32, 56,
593+
100, 178, 316, 562,
594+
1000, 1780, 3160, 5620,
595+
10000, 17800, 31600, 56200, math.Inf(1),
596+
},
597+
},
598+
[]string{"port"},
599+
)
600+
601+
PcapConnectionDuration = promauto.NewHistogramVec(
602+
prometheus.HistogramOpts{
603+
Name: "etl_pcap_connection_duration",
604+
Help: "Distribution of PCAP connection duration",
605+
Buckets: []float64{
606+
.1, .2, .3, .5,
607+
1, 1.8, 3.2, 5.6,
608+
10, 18, 32, 56,
609+
100, 178, 316, 562,
610+
1000, 1780, 3160, 5620, math.Inf(1),
611+
},
612+
},
613+
[]string{"port"},
614+
)
585615
)
586616

587617
// catchStatus wraps the native http.ResponseWriter and captures any written HTTP

parser/pcap.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,116 @@
11
package parser
22

33
import (
4+
"fmt"
5+
"log"
6+
"net"
7+
"os"
48
"path/filepath"
59
"strings"
610
"time"
711

812
"cloud.google.com/go/bigquery"
913
"cloud.google.com/go/civil"
14+
"github.com/google/gopacket"
15+
"github.com/google/gopacket/layers"
16+
"github.com/google/gopacket/pcapgo"
1017
v2as "github.com/m-lab/annotation-service/api/v2"
1118
"github.com/m-lab/etl/etl"
1219
"github.com/m-lab/etl/metrics"
1320
"github.com/m-lab/etl/row"
1421
"github.com/m-lab/etl/schema"
22+
"github.com/m-lab/go/logx"
1523
)
1624

25+
var (
26+
info = log.New(os.Stdout, "info: ", log.LstdFlags|log.Lshortfile)
27+
sparseLogger = log.New(os.Stdout, "sparse: ", log.LstdFlags|log.Lshortfile)
28+
sparse20 = logx.NewLogEvery(sparseLogger, 50*time.Millisecond)
29+
30+
ErrNoIPLayer = fmt.Errorf("no IP layer")
31+
)
32+
33+
// Packet struct contains the packet data and metadata.
34+
type Packet struct {
35+
// If we use a pointer here, for some reason we get zero value timestamps.
36+
Ci gopacket.CaptureInfo
37+
Data []byte
38+
Err error
39+
}
40+
41+
// GetIP decodes the IP layers and returns some basic information.
42+
// It is a bit slow and does memory allocation.
43+
func (p *Packet) GetIP() (net.IP, net.IP, uint8, uint16, error) {
44+
// Decode a packet.
45+
pkt := gopacket.NewPacket(p.Data, layers.LayerTypeEthernet, gopacket.DecodeOptions{
46+
Lazy: true,
47+
NoCopy: true,
48+
SkipDecodeRecovery: true,
49+
DecodeStreamsAsDatagrams: false,
50+
})
51+
52+
if ipLayer := pkt.Layer(layers.LayerTypeIPv4); ipLayer != nil {
53+
ip, _ := ipLayer.(*layers.IPv4)
54+
// For IPv4, the TTL length is the ip.Length adjusted for the header length.
55+
return ip.SrcIP, ip.DstIP, ip.TTL, ip.Length - uint16(4*ip.IHL), nil
56+
} else if ipLayer := pkt.Layer(layers.LayerTypeIPv6); ipLayer != nil {
57+
ip, _ := ipLayer.(*layers.IPv6)
58+
// In IPv6, the Length field is the payload length.
59+
return ip.SrcIP, ip.DstIP, ip.HopLimit, ip.Length, nil
60+
} else {
61+
return nil, nil, 0, 0, ErrNoIPLayer
62+
}
63+
}
64+
65+
func GetPackets(data []byte) ([]Packet, error) {
66+
pcap, err := pcapgo.NewReader(strings.NewReader(string(data)))
67+
if err != nil {
68+
log.Print(err)
69+
return nil, err
70+
}
71+
72+
// TODO: len(data)/18 provides much better estimate of number of packets.
73+
// len(data)/18 was determined by looking at bytes/packet in a few pcaps files.
74+
// The number seems too small, but perhaps the data is still compressed at this point.
75+
// However, it seems to cause mysterious crashes in sandbox, so
76+
// reverting to /1500 for now.
77+
packets := make([]Packet, 0, len(data)/1500)
78+
79+
for data, ci, err := pcap.ZeroCopyReadPacketData(); err == nil; data, ci, err = pcap.ReadPacketData() {
80+
packets = append(packets, Packet{Ci: ci, Data: data, Err: err})
81+
}
82+
83+
if err != nil {
84+
metrics.WarningCount.WithLabelValues("pcap", "ip_layer_failure").Inc()
85+
metrics.PcapPacketCount.WithLabelValues("IP error").Observe(float64(len(packets)))
86+
return packets, err
87+
} else if len(packets) > 0 {
88+
srcIP, _, _, _, err := packets[0].GetIP()
89+
// TODO - eventually we should identify key local ports, like 443 and 3001.
90+
if err != nil {
91+
metrics.WarningCount.WithLabelValues("pcap", "ip_layer_failure").Inc()
92+
metrics.PcapPacketCount.WithLabelValues("IP error").Observe(float64(len(packets)))
93+
} else {
94+
start := packets[0].Ci.Timestamp
95+
end := packets[len(packets)-1].Ci.Timestamp
96+
duration := end.Sub(start)
97+
// TODO add TCP layer, so we can label the stats based on local port value.
98+
if len(srcIP) == 4 {
99+
metrics.PcapPacketCount.WithLabelValues("ipv4").Observe(float64(len(packets)))
100+
metrics.PcapConnectionDuration.WithLabelValues("ipv4").Observe(duration.Seconds())
101+
} else {
102+
metrics.PcapPacketCount.WithLabelValues("ipv6").Observe(float64(len(packets)))
103+
metrics.PcapConnectionDuration.WithLabelValues("ipv6").Observe(duration.Seconds())
104+
}
105+
}
106+
} else {
107+
// No packets.
108+
metrics.PcapPacketCount.WithLabelValues("unknown").Observe(float64(len(packets)))
109+
}
110+
111+
return packets, nil
112+
}
113+
17114
//=====================================================================================
18115
// PCAP Parser
19116
//=====================================================================================
@@ -73,6 +170,10 @@ func (p *PCAPParser) ParseAndInsert(fileMetadata map[string]bigquery.Value, test
73170
row.Date = fileMetadata["date"].(civil.Date)
74171
row.ID = p.GetUUID(testName)
75172

173+
// Parse top level PCAP data and update metrics.
174+
// TODO - add schema fields here.
175+
_, _ = GetPackets(rawContent)
176+
76177
// Insert the row.
77178
if err := p.Put(&row); err != nil {
78179
return err
@@ -84,6 +185,10 @@ func (p *PCAPParser) ParseAndInsert(fileMetadata map[string]bigquery.Value, test
84185
return nil
85186
}
86187

188+
//=====================================================================================
189+
// Implementation of the etl.Parser interface
190+
//=====================================================================================
191+
87192
// GetUUID extracts the UUID from the filename.
88193
// For example, for filename 2021/07/22/ndt-4c6fb_1625899199_00000000013A4623.pcap.gz,
89194
// it returns ndt-4c6fb_1625899199_00000000013A4623.

0 commit comments

Comments
 (0)