Skip to content

Commit d6b9156

Browse files
authored
[BEAM-11217] Implemented metrics filtering (#15482)
1 parent e4b400b commit d6b9156

File tree

4 files changed

+120
-18
lines changed

4 files changed

+120
-18
lines changed

sdks/go.mod

-3
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,14 @@ require (
3030
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
3131
github.com/golang/protobuf v1.5.2 // TODO(danoliveira): Fully replace this with google.golang.org/protobuf
3232
github.com/golang/snappy v0.0.4 // indirect
33-
github.com/google/btree v1.0.0 // indirect
3433
github.com/google/go-cmp v0.5.6
3534
github.com/google/martian/v3 v3.2.1 // indirect
3635
github.com/google/uuid v1.3.0
37-
github.com/hashicorp/golang-lru v0.5.1 // indirect
3836
github.com/kr/text v0.2.0 // indirect
3937
github.com/linkedin/goavro v2.1.0+incompatible
4038
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
4139
github.com/nightlyone/lockfile v1.0.0
4240
github.com/spf13/cobra v1.2.1
43-
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
4441
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6
4542
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914
4643
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect

sdks/go/pkg/beam/core/metrics/metrics.go

+64-1
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,40 @@ func (mr Results) AllMetrics() QueryResults {
482482
return QueryResults{mr.counters, mr.distributions, mr.gauges}
483483
}
484484

485-
// TODO(BEAM-11217): Implement Query(Filter) and metrics filtering
485+
// TODO(BEAM-11217): Implement querying metrics by DoFn
486+
487+
// SingleResult interface facilitates metrics query filtering methods.
488+
type SingleResult interface {
489+
Name() string
490+
Namespace() string
491+
}
492+
493+
// Query allows metrics querying with filter. The filter takes the form of predicate function. Example:
494+
// qr = pr.Metrics().Query(func(sr metrics.SingleResult) bool {
495+
// return sr.Namespace() == test.namespace
496+
// })
497+
func (mr Results) Query(f func(SingleResult) bool) QueryResults {
498+
counters := []CounterResult{}
499+
distributions := []DistributionResult{}
500+
gauges := []GaugeResult{}
501+
502+
for _, counter := range mr.counters {
503+
if f(counter) {
504+
counters = append(counters, counter)
505+
}
506+
}
507+
for _, distribution := range mr.distributions {
508+
if f(distribution) {
509+
distributions = append(distributions, distribution)
510+
}
511+
}
512+
for _, gauge := range mr.gauges {
513+
if f(gauge) {
514+
gauges = append(gauges, gauge)
515+
}
516+
}
517+
return QueryResults{counters, distributions, gauges}
518+
}
486519

487520
// QueryResults is the result of a query. Allows accessing all of the
488521
// metrics that matched the filter.
@@ -529,6 +562,16 @@ func (r CounterResult) Result() int64 {
529562
return r.Attempted
530563
}
531564

565+
// Name returns the Name of this Counter.
566+
func (r CounterResult) Name() string {
567+
return r.Key.Name
568+
}
569+
570+
// Namespace returns the Namespace of this Counter.
571+
func (r CounterResult) Namespace() string {
572+
return r.Key.Namespace
573+
}
574+
532575
// MergeCounters combines counter metrics that share a common key.
533576
func MergeCounters(
534577
attempted map[StepKey]int64,
@@ -571,6 +614,16 @@ func (r DistributionResult) Result() DistributionValue {
571614
return r.Attempted
572615
}
573616

617+
// Name returns the Name of this Distribution.
618+
func (r DistributionResult) Name() string {
619+
return r.Key.Name
620+
}
621+
622+
// Namespace returns the Namespace of this Distribution.
623+
func (r DistributionResult) Namespace() string {
624+
return r.Key.Namespace
625+
}
626+
574627
// MergeDistributions combines distribution metrics that share a common key.
575628
func MergeDistributions(
576629
attempted map[StepKey]DistributionValue,
@@ -613,6 +666,16 @@ func (r GaugeResult) Result() GaugeValue {
613666
return r.Attempted
614667
}
615668

669+
// Name returns the Name of this Gauge.
670+
func (r GaugeResult) Name() string {
671+
return r.Key.Name
672+
}
673+
674+
// Namespace returns the Namespace of this Gauge.
675+
func (r GaugeResult) Namespace() string {
676+
return r.Key.Namespace
677+
}
678+
616679
// StepKey uniquely identifies a metric within a pipeline graph.
617680
type StepKey struct {
618681
Step, Name, Namespace string

sdks/go/test/integration/wordcount/wordcount.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ import (
3030
)
3131

3232
var (
33-
wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
34-
empty = beam.NewCounter("extract", "emptyLines")
35-
lineLen = beam.NewDistribution("extract", "lineLenDistro")
33+
wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
34+
empty = beam.NewCounter("extract", "emptyLines")
35+
lineLen = beam.NewDistribution("extract", "lineLenDistro")
36+
smallWords = beam.NewCounter("extract", "smallWords")
3637
)
3738

3839
// CountWords is a composite transform that counts the words of a PCollection
@@ -56,6 +57,9 @@ func extractFn(ctx context.Context, line string, emit func(string)) {
5657
empty.Inc(ctx, 1)
5758
}
5859
for _, word := range wordRE.FindAllString(line, -1) {
60+
if len(word) < 6 {
61+
smallWords.Inc(ctx, 1)
62+
}
5963
emit(word)
6064
}
6165
}
@@ -74,8 +78,12 @@ func WordCount(glob, hash string, size int) *beam.Pipeline {
7478
p, s := beam.NewPipelineWithRoot()
7579

7680
in := textio.Read(s, glob)
81+
WordCountFromPCol(s, in, hash, size)
82+
return p
83+
}
84+
85+
// WordCountFromPCol counts the words from a PCollection and validates it.
86+
func WordCountFromPCol(s beam.Scope, in beam.PCollection, hash string, size int) {
7787
out := Format(s, CountWords(s, in))
7888
passert.Hash(s, out, "out", hash, size)
79-
80-
return p
8189
}

sdks/go/test/integration/wordcount/wordcount_test.go

+43-9
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ import (
1919
"strings"
2020
"testing"
2121

22-
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/memfs"
22+
"github.com/apache/beam/sdks/v2/go/pkg/beam"
23+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
2324
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
2425
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
2526
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
@@ -30,16 +31,20 @@ import (
3031

3132
func TestWordCount(t *testing.T) {
3233
tests := []struct {
33-
lines []string
34-
words int
35-
hash string
34+
lines []string
35+
words int
36+
hash string
37+
smallWordsCount int64
38+
lineLen metrics.DistributionValue
3639
}{
3740
{
3841
[]string{
3942
"foo",
4043
},
4144
1,
4245
"6zZtmVTet7aIhR3wmPE8BA==",
46+
1,
47+
metrics.DistributionValue{Count: 1, Sum: 3, Min: 3, Max: 3},
4348
},
4449
{
4550
[]string{
@@ -49,20 +54,26 @@ func TestWordCount(t *testing.T) {
4954
},
5055
1,
5156
"jAk8+k4BOH7vQDUiUZdfWg==",
57+
6,
58+
metrics.DistributionValue{Count: 3, Sum: 21, Min: 3, Max: 11},
5259
},
5360
{
5461
[]string{
5562
"bar bar foo bar foo foo",
5663
},
5764
2,
5865
"Nz70m/sn3Ep9o484r7MalQ==",
66+
6,
67+
metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23},
5968
},
6069
{
6170
[]string{
6271
"foo bar foo bar foo bar",
6372
},
6473
2,
6574
"Nz70m/sn3Ep9o484r7MalQ==", // ordering doesn't matter: same hash as above
75+
6,
76+
metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23},
6677
},
6778
{
6879
[]string{
@@ -75,19 +86,42 @@ func TestWordCount(t *testing.T) {
7586
},
7687
2,
7788
"Nz70m/sn3Ep9o484r7MalQ==", // whitespace doesn't matter: same hash as above
89+
6,
90+
metrics.DistributionValue{Count: 6, Sum: 37, Min: 0, Max: 11},
7891
},
7992
}
8093

8194
for _, test := range tests {
8295
integration.CheckFilters(t)
83-
const filename = "memfs://input"
84-
memfs.Write(filename, []byte(strings.Join(test.lines, "\n")))
85-
86-
p := WordCount(filename, test.hash, test.words)
87-
_, err := ptest.RunWithMetrics(p)
96+
p, s := beam.NewPipelineWithRoot()
97+
lines := beam.CreateList(s, test.lines)
98+
WordCountFromPCol(s, lines, test.hash, test.words)
99+
pr, err := ptest.RunWithMetrics(p)
88100
if err != nil {
89101
t.Errorf("WordCount(\"%v\") failed: %v", strings.Join(test.lines, "|"), err)
90102
}
103+
104+
qr := pr.Metrics().Query(func(sr metrics.SingleResult) bool {
105+
return sr.Name() == "smallWords"
106+
})
107+
counter := metrics.CounterResult{}
108+
if len(qr.Counters()) != 0 {
109+
counter = qr.Counters()[0]
110+
}
111+
if counter.Result() != test.smallWordsCount {
112+
t.Errorf("Metrics().Query(by Name) failed. Got %d counters, Want %d counters", counter.Result(), test.smallWordsCount)
113+
}
114+
115+
qr = pr.Metrics().Query(func(sr metrics.SingleResult) bool {
116+
return sr.Name() == "lineLenDistro"
117+
})
118+
distribution := metrics.DistributionResult{}
119+
if len(qr.Distributions()) != 0 {
120+
distribution = qr.Distributions()[0]
121+
}
122+
if distribution.Result() != test.lineLen {
123+
t.Errorf("Metrics().Query(by Name) failed. Got %v distribution, Want %v distribution", distribution.Result(), test.lineLen)
124+
}
91125
}
92126
}
93127

0 commit comments

Comments
 (0)