Skip to content

Commit 358924e

Browse files
authored
Merge pull request #302 from m-lab/no-delta
Suppress delta generation
2 parents 75a7cd3 + ce13e46 commit 358924e

File tree

2 files changed

+59
-39
lines changed

2 files changed

+59
-39
lines changed

cmd/etl_worker/app-ndt.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,5 @@ env_variables:
4141
BIGQUERY_PROJECT: 'mlab-sandbox'
4242
BIGQUERY_DATASET: 'mlab_sandbox'
4343
ANNOTATE_IP: 'true'
44+
NDT_OMIT_DELTAS: 'true'
4445
# TODO add custom service-account, instead of using default credentials.

parser/ndt.go

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"errors"
66
"fmt"
77
"log"
8+
"os"
89
"regexp"
10+
"strconv"
911
"strings"
1012
"time"
1113

@@ -18,6 +20,11 @@ import (
1820
"github.com/m-lab/etl/web100"
1921
)
2022

23+
var (
24+
// NDTOmitDeltas flag indicates if deltas should be suppressed.
25+
NDTOmitDeltas, _ = strconv.ParseBool(os.Getenv("NDT_OMIT_DELTAS"))
26+
)
27+
2128
const (
2229
// Some snaplogs are very large, and we don't want to parse the entire
2330
// snaplog, when there is no value. However, although the nominal test
@@ -29,7 +36,6 @@ const (
2936
// TODO - in future, we should probably detect when the connection state changes
3037
// from established, as there is little reason to parse snapshots beyond that
3138
// point.
32-
3339
minNumSnapshots = 1600 // If fewer than this, then set anomalies.num_snaps
3440
maxNumSnapshots = 2800 // If more than this, truncate, and set anomolies.num_snaps
3541
)
@@ -341,59 +347,29 @@ func (n *NDTParser) processTest(test *fileInfoAndData, testType string) {
341347
n.getAndInsertValues(test, testType)
342348
}
343349

344-
func (n *NDTParser) getAndInsertValues(test *fileInfoAndData, testType string) {
345-
// Extract the values from the last snapshot.
346-
metrics.WorkerState.WithLabelValues("parse").Inc()
347-
defer metrics.WorkerState.WithLabelValues("parse").Dec()
348-
349-
if !strings.HasSuffix(test.fn, ".gz") {
350-
metrics.WarningCount.WithLabelValues(
351-
n.TableName(), testType, "uncompressed file").Inc()
352-
}
353-
354-
snaplog, err := web100.NewSnapLog(test.data)
355-
if err != nil {
356-
metrics.ErrorCount.WithLabelValues(
357-
n.TableName(), testType, "snaplog failure").Inc()
358-
log.Printf("Unable to parse snaplog for %s, when processing: %s\n%s\n",
359-
test.fn, n.taskFileName, err)
360-
return
361-
}
362-
363-
valid := true
364-
err = snaplog.ValidateSnapshots()
365-
if err != nil {
366-
log.Printf("ValidateSnapshots failed for %s, when processing: %s (%s)\n",
367-
test.fn, n.taskFileName, err)
368-
metrics.WarningCount.WithLabelValues(
369-
n.TableName(), testType, "validate failed").Inc()
370-
// If ValidateSnapshots returns error, it generally means that there
371-
// is a problem with the last snapshot, typically a truncated file.
372-
// In most cases, there are still many valid snapshots.
373-
valid = false
374-
}
375-
376-
// HACK - just to see how expensive the Values() call is...
377-
// parse ALL the snapshots.
378-
last := &web100.Snapshot{}
379-
var deltas []schema.Web100ValueMap
350+
func (n *NDTParser) getDeltas(snaplog *web100.SnapLog, testType string) ([]schema.Web100ValueMap, int) {
351+
deltas := []schema.Web100ValueMap{}
380352
deltaFieldCount := 0
353+
if NDTOmitDeltas {
354+
return deltas, deltaFieldCount
355+
}
381356
snapshotCount := 0
357+
last := &web100.Snapshot{}
382358
for count := 0; count < snaplog.SnapCount() && count < maxNumSnapshots; count++ {
383359
snap, err := snaplog.Snapshot(count)
384360
if err != nil {
385361
// TODO - refine label and maybe write a log?
386362
metrics.TestCount.WithLabelValues(
387363
n.TableName(), testType, "snapshot failure").Inc()
388-
return
364+
return nil, 0
389365
}
390366
// Proper sizing avoids evacuate, saving about 20%, excluding BQ code.
391367
delta := schema.EmptySnap10()
392368
snap.SnapshotDeltas(last, delta)
393369
if err != nil {
394370
metrics.ErrorCount.WithLabelValues(
395371
n.TableName(), testType, "snapValues failure").Inc()
396-
return
372+
return nil, 0
397373
}
398374

399375
// Delete the constant fields.
@@ -430,6 +406,49 @@ func (n *NDTParser) getAndInsertValues(test *fileInfoAndData, testType string) {
430406
// out the most useful tags.
431407
deltas[len(deltas)-1]["is_last"] = true
432408
}
409+
410+
return deltas, deltaFieldCount
411+
}
412+
413+
func (n *NDTParser) getAndInsertValues(test *fileInfoAndData, testType string) {
414+
// Extract the values from the last snapshot.
415+
metrics.WorkerState.WithLabelValues("parse").Inc()
416+
defer metrics.WorkerState.WithLabelValues("parse").Dec()
417+
418+
if !strings.HasSuffix(test.fn, ".gz") {
419+
metrics.WarningCount.WithLabelValues(
420+
n.TableName(), testType, "uncompressed file").Inc()
421+
}
422+
423+
snaplog, err := web100.NewSnapLog(test.data)
424+
if err != nil {
425+
metrics.ErrorCount.WithLabelValues(
426+
n.TableName(), testType, "snaplog failure").Inc()
427+
log.Printf("Unable to parse snaplog for %s, when processing: %s\n%s\n",
428+
test.fn, n.taskFileName, err)
429+
return
430+
}
431+
432+
valid := true
433+
err = snaplog.ValidateSnapshots()
434+
if err != nil {
435+
log.Printf("ValidateSnapshots failed for %s, when processing: %s (%s)\n",
436+
test.fn, n.taskFileName, err)
437+
metrics.WarningCount.WithLabelValues(
438+
n.TableName(), testType, "validate failed").Inc()
439+
// If ValidateSnapshots returns error, it generally means that there
440+
// is a problem with the last snapshot, typically a truncated file.
441+
// In most cases, there are still many valid snapshots.
442+
valid = false
443+
}
444+
445+
var deltas []schema.Web100ValueMap
446+
deltaFieldCount := 0
447+
deltas, deltaFieldCount = n.getDeltas(snaplog, testType)
448+
if deltas == nil {
449+
// There was some kind of major failure parsing snapshots.
450+
return
451+
}
433452
final := snaplog.SnapCount() - 1
434453
if final > maxNumSnapshots {
435454
final = maxNumSnapshots

0 commit comments

Comments
 (0)