Skip to content

Commit 81507c8

Browse files
Update pcap parser processing rate to 1 in 10 archives (#1014)
* Update pcap parser processing rate to 1 in 10 archives * Use etl.DataType to cast type names * Add DataType under test to active_test path * Use path.Join() instead of string concatenation * Change variable name to 'prefix'
1 parent c7c6ced commit 81507c8

File tree

5 files changed

+151
-13
lines changed

5 files changed

+151
-13
lines changed

active/active.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"google.golang.org/api/iterator"
2121

2222
"cloud.google.com/go/storage"
23+
"github.com/m-lab/etl-gardener/tracker"
24+
"github.com/m-lab/etl/etl"
2325
"github.com/m-lab/etl/metrics"
2426
"github.com/m-lab/go/cloud/gcs"
2527
"github.com/m-lab/go/logx"
@@ -103,17 +105,17 @@ type GCSSource struct {
103105
}
104106

105107
// NewGCSSource creates a new source for active processing.
106-
func NewGCSSource(ctx context.Context, label string, fl FileLister, toRunnable func(*storage.ObjectAttrs) Runnable) (*GCSSource, error) {
108+
func NewGCSSource(ctx context.Context, job tracker.Job, fl FileLister, toRunnable func(*storage.ObjectAttrs) Runnable) (*GCSSource, error) {
107109
src := GCSSource{
108110
ctx: WithFail(ctx),
109111
fileLister: fl,
110112
toRunnable: toRunnable,
111113

112114
pendingChan: make(chan Runnable, 0),
113-
label: label,
115+
label: job.Path(),
114116
}
115117

116-
go src.streamToPending(ctx)
118+
go src.streamToPending(ctx, job)
117119

118120
return &src, nil
119121
}
@@ -162,7 +164,7 @@ func (src *GCSSource) Next(ctx context.Context) (Runnable, error) {
162164
// It fetches the list of files once, then converts files to Runnables until all files are
163165
// handled, or the context is canceled or expires.
164166
// The Runnables are pulled from the queue by Next().
165-
func (src *GCSSource) streamToPending(ctx context.Context) {
167+
func (src *GCSSource) streamToPending(ctx context.Context, job tracker.Job) {
166168
// No matter what else happens, we eventually want to close the pendingChan.
167169
defer close(src.pendingChan)
168170

@@ -174,6 +176,10 @@ func (src *GCSSource) streamToPending(ctx context.Context) {
174176
return
175177
}
176178

179+
index := 0
180+
dataType := etl.DataType(job.Datatype)
181+
skipCount := dataType.SkipCount()
182+
177183
for _, f := range files {
178184
debug.Println(f)
179185
if f == nil {
@@ -186,8 +192,12 @@ func (src *GCSSource) streamToPending(ctx context.Context) {
186192
metrics.ActiveErrors.WithLabelValues(src.Label(), "streamToPending").Inc()
187193
break
188194
}
189-
debug.Printf("Adding gs://%s/%s", f.Bucket, f.Name)
190-
// Blocks until consumer reads channel.
191-
src.pendingChan <- src.toRunnable(f)
195+
196+
if index%(skipCount+1) == 0 {
197+
debug.Printf("Adding gs://%s/%s", f.Bucket, f.Name)
198+
// Blocks until consumer reads channel.
199+
src.pendingChan <- src.toRunnable(f)
200+
}
201+
index++
192202
}
193203
}

active/active_test.go

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"log"
88
"os"
9+
"path"
910
"sync"
1011
"testing"
1112
"time"
@@ -15,6 +16,7 @@ import (
1516
"golang.org/x/sync/errgroup"
1617
"google.golang.org/api/iterator"
1718

19+
"github.com/m-lab/etl-gardener/tracker"
1820
"github.com/m-lab/etl/active"
1921
"github.com/m-lab/go/cloud/gcs"
2022
"github.com/m-lab/go/logx"
@@ -23,6 +25,10 @@ import (
2325
"github.com/m-lab/go/cloudtest/gcsfake"
2426
)
2527

28+
var (
29+
job = tracker.Job{}
30+
)
31+
2632
func init() {
2733
// Always prepend the filename and line number.
2834
log.SetFlags(log.LstdFlags | log.Lshortfile)
@@ -101,6 +107,31 @@ func standardLister() active.FileLister {
101107
return active.FileListerFunc(bh, "ndt/ndt5/2019/01/01/", nil)
102108
}
103109

110+
func skipFilesListener(dataType string) active.FileLister {
111+
client := gcsfake.GCSClient{}
112+
prefix := path.Join("ndt/", dataType, "/2019/01/01/")
113+
client.AddTestBucket("foobar",
114+
&gcsfake.BucketHandle{
115+
ObjAttrs: []*storage.ObjectAttrs{
116+
{Bucket: "foobar", Name: path.Join(prefix, "obj1"), Updated: time.Now()},
117+
{Bucket: "foobar", Name: path.Join(prefix, "obj2"), Updated: time.Now()},
118+
{Bucket: "foobar", Name: path.Join(prefix, "obj3"), Updated: time.Now()},
119+
{Bucket: "foobar", Name: path.Join(prefix, "obj4"), Updated: time.Now()},
120+
{Bucket: "foobar", Name: path.Join(prefix, "obj5"), Updated: time.Now()},
121+
{Bucket: "foobar", Name: path.Join(prefix, "obj6"), Updated: time.Now()},
122+
{Bucket: "foobar", Name: path.Join(prefix, "obj7"), Updated: time.Now()},
123+
{Bucket: "foobar", Name: path.Join(prefix, "obj8"), Updated: time.Now()},
124+
{Bucket: "foobar", Name: path.Join(prefix, "obj9"), Updated: time.Now()},
125+
{Bucket: "foobar", Name: path.Join(prefix, "obj10"), Updated: time.Now()},
126+
{Bucket: "foobar", Name: path.Join(prefix, "obj11"), Updated: time.Now()},
127+
}})
128+
129+
bh, err := gcs.GetBucket(context.Background(), &client, "foobar")
130+
rtx.Must(err, "GetBucket failed")
131+
return active.FileListerFunc(bh, prefix, nil)
132+
133+
}
134+
104135
func runAll(ctx context.Context, rSrc active.RunnableSource) (*errgroup.Group, error) {
105136
eg := &errgroup.Group{}
106137
for {
@@ -123,7 +154,7 @@ func runAll(ctx context.Context, rSrc active.RunnableSource) (*errgroup.Group, e
123154
func TestGCSSourceBasic(t *testing.T) {
124155
p := newCounter(t)
125156
ctx := context.Background()
126-
fs, err := active.NewGCSSource(ctx, "test", standardLister(), p.toRunnable)
157+
fs, err := active.NewGCSSource(ctx, job, standardLister(), p.toRunnable)
127158
if err != nil {
128159
t.Fatal(err)
129160
}
@@ -149,7 +180,7 @@ func TestWithRunFailures(t *testing.T) {
149180
p.addOutcome(os.ErrInvalid)
150181

151182
ctx := context.Background()
152-
fs, err := active.NewGCSSource(ctx, "test", standardLister(), p.toRunnable)
183+
fs, err := active.NewGCSSource(ctx, job, standardLister(), p.toRunnable)
153184
if err != nil {
154185
t.Fatal(err)
155186
}
@@ -174,7 +205,7 @@ func TestWithRunFailures(t *testing.T) {
174205
func TestExpiredContext(t *testing.T) {
175206
p := newCounter(t)
176207
ctx := context.Background()
177-
fs, err := active.NewGCSSource(ctx, "test", standardLister(), p.toRunnable)
208+
fs, err := active.NewGCSSource(ctx, job, standardLister(), p.toRunnable)
178209
if err != nil {
179210
t.Fatal(err)
180211
}
@@ -199,7 +230,7 @@ func TestWithStorageError(t *testing.T) {
199230
p := newCounter(t)
200231

201232
ctx := context.Background()
202-
fs, err := active.NewGCSSource(ctx, "test", ErroringLister, p.toRunnable)
233+
fs, err := active.NewGCSSource(ctx, job, ErroringLister, p.toRunnable)
203234
if err != nil {
204235
t.Fatal(err)
205236
}
@@ -214,7 +245,7 @@ func TestExpiredFileListerContext(t *testing.T) {
214245
p := newCounter(t)
215246

216247
ctx := context.Background()
217-
fs, err := active.NewGCSSource(ctx, "test", standardLister(), p.toRunnable)
248+
fs, err := active.NewGCSSource(ctx, job, standardLister(), p.toRunnable)
218249
if err != nil {
219250
t.Fatal(err)
220251
}
@@ -237,3 +268,55 @@ func TestExpiredFileListerContext(t *testing.T) {
237268
t.Error("Should return os.ErrInvalid", err)
238269
}
239270
}
271+
272+
func TestSkipFiles(t *testing.T) {
273+
tests := []struct {
274+
name string
275+
successCount int
276+
failureCount int
277+
}{
278+
{
279+
name: "pcap",
280+
successCount: 2,
281+
failureCount: 0,
282+
},
283+
{
284+
name: "ndt7",
285+
successCount: 11,
286+
failureCount: 0,
287+
},
288+
{
289+
name: "foo",
290+
successCount: 11,
291+
failureCount: 0,
292+
},
293+
}
294+
295+
for _, tt := range tests {
296+
t.Run(tt.name, func(t *testing.T) {
297+
p := newCounter(t)
298+
ctx := context.Background()
299+
fs, err := active.NewGCSSource(ctx, tracker.Job{Datatype: tt.name}, skipFilesListener(tt.name), p.toRunnable)
300+
if err != nil {
301+
t.Fatal(err)
302+
}
303+
304+
eg, err := runAll(ctx, fs)
305+
if err != iterator.Done {
306+
t.Fatal(err)
307+
}
308+
err = eg.Wait()
309+
if err != nil {
310+
t.Error(err)
311+
}
312+
313+
if p.success != tt.successCount {
314+
t.Errorf("for %s, %d should have succeeded, got %d", tt.name, tt.successCount, p.success)
315+
}
316+
317+
if p.fail != tt.failureCount {
318+
t.Errorf("for %s, %d should have failed, got %d", tt.name, tt.failureCount, p.fail)
319+
}
320+
})
321+
}
322+
}

active/poller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (g *GardenerAPI) JobFileSource(ctx context.Context, job tracker.Job,
154154
return nil, err
155155
}
156156
lister := FileListerFunc(bh, prefix, filter)
157-
gcsSource, err := NewGCSSource(ctx, job.Path(), lister, toRunnable)
157+
gcsSource, err := NewGCSSource(ctx, job, lister, toRunnable)
158158
if err != nil {
159159
failMetric(job, "GCSSource")
160160
return nil, err

etl/globals.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,13 @@ var (
318318
}
319319
// There is also a mapping of data types to queue names in
320320
// queue_pusher.go
321+
322+
// Map from data type to number of files to skip when processing said type.
323+
// It allows us process fewer archives when there is a very high volume of data.
324+
// TODO - this should be loaded from a config.
325+
dataTypeToSkipCount = map[DataType]int{
326+
PCAP: 9,
327+
}
321328
)
322329

323330
/*******************************************************************************
@@ -331,6 +338,11 @@ func DirToTablename(dir string) string {
331338
return dataTypeToTable[dirToDataType[dir]]
332339
}
333340

341+
// SkipCount returns the number of files to skip when processing each DataType.
342+
func (dt DataType) SkipCount() int {
343+
return dataTypeToSkipCount[dt]
344+
}
345+
334346
// BigqueryProject returns the appropriate project.
335347
func (dt DataType) BigqueryProject() string {
336348
project := BigqueryProject

etl/globals_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,3 +368,36 @@ func TestDirToTablename(t *testing.T) {
368368
t.Errorf("DirToTablename() failed to translate PT dir name correctly.")
369369
}
370370
}
371+
372+
func TestSkipCount(t *testing.T) {
373+
tests := []struct {
374+
name string
375+
dataType etl.DataType
376+
want int
377+
}{
378+
{
379+
name: "ndt7",
380+
dataType: etl.NDT7,
381+
want: 0,
382+
},
383+
{
384+
name: "pcap",
385+
dataType: etl.PCAP,
386+
want: 9,
387+
},
388+
{
389+
name: "invalid",
390+
dataType: etl.INVALID,
391+
want: 0,
392+
},
393+
}
394+
395+
for _, tt := range tests {
396+
t.Run(tt.name, func(t *testing.T) {
397+
got := tt.dataType.SkipCount()
398+
if got != tt.want {
399+
t.Errorf("SkipCount() = %d, want %d", got, tt.want)
400+
}
401+
})
402+
}
403+
}

0 commit comments

Comments
 (0)