Skip to content

Commit 0d9676c

Browse files
committed
Implement filters on deployment, job & origin
This allows filtering of events based on the above mentioned three fields. Multiple filters can be configured and they will run against a message in the order specified. A filter has the form type₀:key₀:value₀;type₁:key₁:value₁;…;typeₙ,keyₙ,valueₙ `type` is either "must" or "mustnot" and specifies, if a messages must contain or must not match it's `key` against `value`. `key` specifies what we want to match against: the deployment name, the job name or the event's origin. It is checked if the message `key`'s value contains the `value`, case-sensitive. Benchmarks have been added to get an understanding of the performance impact of those filters.
1 parent 1ba369d commit 0d9676c

File tree

10 files changed

+514
-25
lines changed

10 files changed

+514
-25
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ This is recommended for dev environments only.
8888
* `BOLTDB_PATH`: Bolt database path.
8989
* `EVENTS`: A comma separated list of events to include. Possible values: ValueMetric,CounterEvent,Error,LogMessage,HttpStartStop,ContainerMetric
9090
* `EXTRA_FIELDS`: Extra fields to annotate your events with (format is key:value,key:value).
91+
* `FILTERS`: Filter events on deployment, job, or origin. The format is `<type0>:<key0>:<value0>;<type1>:<key1>:<value1>;...` where
92+
* `<type>` is either `must` or `mustnot`
93+
* `<key>` is one of `deployment`, `job`, or `origin`
94+
* `<value>` is a string that must be contained in `<key>`'s value
9195
* `FLUSH_INTERVAL`: Time interval (in s/m/h. For example, 3600s or 60m or 1h) for flushing queue to Splunk regardless of CONSUMER_QUEUE_SIZE. Protects against stale events in low throughput systems.
9296
* `CONSUMER_QUEUE_SIZE`: Sets the internal consumer queue buffer size. Events will be pushed to Splunk after queue is full.
9397
* `HEC_BATCH_SIZE`: Set the batch size for the events to push to HEC (Splunk HTTP Event Collector).

eventfilter/eventfilter_suite_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package eventfilter_test
2+
3+
import (
4+
"testing"
5+
6+
. "github.com/onsi/ginkgo"
7+
. "github.com/onsi/gomega"
8+
)
9+
10+
func TestEventfilter(t *testing.T) {
11+
RegisterFailHandler(Fail)
12+
RunSpecs(t, "Eventfilter Suite")
13+
}

eventfilter/filters.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package eventfilter
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"github.com/cloudfoundry/sonde-go/events"
8+
)
9+
10+
const (
11+
filterSep = ";"
12+
filterKeyValueSep = ":"
13+
Must = "must"
14+
MustNot = "mustnot"
15+
)
16+
17+
// supportedGetters are all supported keys we can use for filters and the
18+
// functions that pull the respective data out of an envelope.
19+
var supportedGetters = map[string]func(*events.Envelope) string{
20+
"deployment": func(msg *events.Envelope) string {
21+
return msg.GetDeployment()
22+
},
23+
"origin": func(msg *events.Envelope) string {
24+
return msg.GetOrigin()
25+
},
26+
"job": func(msg *events.Envelope) string {
27+
return msg.GetJob()
28+
},
29+
}
30+
31+
// SupportedFilterKeys lists all supported filter keys. This is only used to
32+
// signal the list of supported keys to users, e.g. for the usage text.
33+
var SupportedFilterKeys = func() []string {
34+
keys := make([]string, 0, len(supportedGetters))
35+
for k := range supportedGetters {
36+
keys = append(keys, k)
37+
}
38+
39+
return keys
40+
}()
41+
42+
// Filters is something that can tell it's Length (the number of its configured
43+
// filters) and can be used to check if an envelope is accepted or should be
44+
// dropped/discarded.
45+
type Filters interface {
46+
Accepts(*events.Envelope) bool
47+
Length() int
48+
}
49+
50+
type filterRule struct {
51+
key string
52+
value string
53+
must bool
54+
}
55+
56+
var (
57+
errInvalidFormat = fmt.Errorf("format must be '[%s|%s]:field:value'", Must, MustNot)
58+
errEmptyValue = fmt.Errorf("filter value must not be empty string")
59+
errInvaldFilter = fmt.Errorf("filter needs to be either %q or %q", Must, MustNot)
60+
errInvalidFilterKey = fmt.Errorf("filter key not supported")
61+
)
62+
63+
func parseFilterConfig(filters string) ([]filterRule, error) {
64+
rules := []filterRule{}
65+
66+
for _, filterRaw := range strings.Split(filters, filterSep) {
67+
filter := strings.TrimSpace(filterRaw)
68+
69+
if filter == "" {
70+
continue
71+
}
72+
73+
tokens := strings.Split(filter, filterKeyValueSep)
74+
if len(tokens) != 3 {
75+
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvalidFormat)
76+
}
77+
78+
rule := filterRule{
79+
key: tokens[1],
80+
value: tokens[2],
81+
}
82+
83+
if rule.value == "" {
84+
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errEmptyValue)
85+
}
86+
87+
switch t := strings.TrimSpace(strings.ToLower(tokens[0])); t {
88+
case MustNot:
89+
rule.must = false
90+
case Must:
91+
rule.must = true
92+
default:
93+
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvaldFilter)
94+
}
95+
96+
rules = append(rules, rule)
97+
}
98+
99+
return rules, nil
100+
}
101+
102+
type filter func(*events.Envelope) bool
103+
104+
type filters []filter
105+
106+
func (ef *filters) Accepts(msg *events.Envelope) bool {
107+
for _, f := range *ef {
108+
if allow := f(msg); !allow {
109+
return false
110+
}
111+
}
112+
113+
return true
114+
}
115+
116+
func (ef *filters) Length() int {
117+
return len(*ef)
118+
}
119+
120+
func (ef *filters) addFilter(key, value string, must bool) error {
121+
valueGetter, ok := supportedGetters[strings.ToLower(key)]
122+
if !ok {
123+
return fmt.Errorf("invalid filter key %q: %s", key, errInvalidFilterKey)
124+
}
125+
126+
*ef = append(*ef, func(msg *events.Envelope) bool {
127+
return must == strings.Contains(valueGetter(msg), value)
128+
})
129+
130+
return nil
131+
}
132+
133+
func New(filterList string) (Filters, error) {
134+
f := &filters{}
135+
136+
filters, err := parseFilterConfig(filterList)
137+
if err != nil {
138+
return nil, err
139+
}
140+
141+
for _, filter := range filters {
142+
if err := f.addFilter(filter.key, filter.value, filter.must); err != nil {
143+
return nil, err
144+
}
145+
}
146+
147+
return f, nil
148+
}

eventfilter/filters_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package eventfilter_test
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter"
7+
"github.com/cloudfoundry/sonde-go/events"
8+
9+
. "github.com/onsi/ginkgo"
10+
. "github.com/onsi/ginkgo/extensions/table"
11+
. "github.com/onsi/gomega"
12+
)
13+
14+
var _ = Describe("Rule parsing", func() {
15+
testError := func(filterConf string, errorMsg string) {
16+
filters, err := eventfilter.New(filterConf)
17+
Expect(filters).To(BeNil())
18+
Expect(err).To(MatchError(ContainSubstring(errorMsg)))
19+
}
20+
DescribeTable("throws error", testError,
21+
Entry("not enough fields", ":", "format must be"),
22+
Entry("too many fields", "xxx:yyy:zzz:rrrr", "format must be"),
23+
Entry("invalid value", "xxx::", "filter value must not be empty"),
24+
Entry("invalid filter", "xxx:yyy:zzz", "needs to be either"),
25+
Entry("invalid field", "must:notValid:zzz", "invalid filter key \"notValid\""),
26+
)
27+
28+
testOk := func(filterConf string, length int) {
29+
filters, err := eventfilter.New(filterConf)
30+
Expect(err).NotTo(HaveOccurred())
31+
Expect(filters).NotTo(BeNil(), "filters have not been initialized")
32+
Expect(filters.Length()).To(Equal(length), "Expected %d filter rules", length)
33+
}
34+
DescribeTable("parses ok", testOk,
35+
Entry("no filters at all", "", 0),
36+
Entry("multiple empty rules", ";;;;", 0),
37+
Entry("filtering on deployment", "must:deployment:some deployment", 1),
38+
Entry("accepts whitespace between rules", " must:deployment:something ; must:origin:someOrigin ", 2),
39+
Entry("accepts whitespace in filter", " must :deployment:something", 1),
40+
41+
Entry("inclusion filter on deployment", "must:Deployment:something", 1),
42+
Entry("inclusion filter on origin", "must:origin:something", 1),
43+
Entry("inclusion filter on job", "must:job:something", 1),
44+
45+
Entry("exclusion filter on deployment", "mustNot:Deployment:something", 1),
46+
Entry("exclusion filter on origin", "mustNot:origin:something", 1),
47+
Entry("exclusion filter on job", "mustNot:job:something", 1),
48+
)
49+
})
50+
51+
var _ = Describe("Filtering", func() {
52+
msg := &events.Envelope{
53+
Deployment: p("p-healthwatch2-123123123"),
54+
Origin: p("some origin"),
55+
Job: p("some job"),
56+
}
57+
58+
test := func(filterConf string, expected bool) {
59+
filters, err := eventfilter.New(filterConf)
60+
Expect(err).NotTo(HaveOccurred())
61+
Expect(filters.Accepts(msg)).
62+
To(Equal(expected), "Expected event {%v} to be %s", msg, tern(expected, "accepted", "discarded"))
63+
Expect(filters).NotTo(BeNil(), "filters have not been initialized")
64+
}
65+
66+
DescribeTable("on", test,
67+
Entry("empty filter conf should accept", "", true),
68+
Entry("matching inclusion filter should accept", "must:deployment:healthwatch2", true),
69+
Entry("non-matching inclusion filter should discard", "must:deployment:something", false),
70+
Entry("matching exclusion filter should discard", "mustNot:deployment:healthwatch2", false),
71+
Entry("2nd exclusion filter should discard", "must:deployment:health ; mustNot:deployment:watch", false),
72+
Entry("3rd exclusion filter should discard",
73+
"must:deployment:health ; mustNot:job:other job ; mustNot:deployment:watch",
74+
false,
75+
),
76+
Entry("many matching inclusion filters should accept",
77+
"must:deployment:h ; must:deployment:e ; must:deployment:a ; must:deployment:l ; must:deployment:t ; must:deployment:h",
78+
true,
79+
),
80+
Entry("many non-matching exclusion filters should accept",
81+
"mustNot:deployment:x ; mustNot:deployment:y ; mustNot:deployment:z ; mustNot:deployment:u ; mustNot:deployment:b ; mustNot:deployment:r",
82+
true,
83+
),
84+
)
85+
})
86+
87+
var _ = Describe("filter keys", func() {
88+
entries := []TableEntry{}
89+
for _, key := range eventfilter.SupportedFilterKeys {
90+
entries = append(entries, Entry(fmt.Sprintf("key %q", key), key))
91+
}
92+
93+
DescribeTable("support", func(key string) {
94+
_, err := eventfilter.New("must:" + key + ":someValue")
95+
Expect(err).NotTo(HaveOccurred())
96+
}, entries...)
97+
})
98+
99+
func p(s string) *string { return &s }
100+
101+
func tern(b bool, t string, f string) string {
102+
if b {
103+
return t
104+
}
105+
106+
return f
107+
}

eventrouter/default.go

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55

66
"github.com/cloudfoundry-community/splunk-firehose-nozzle/cache"
7+
"github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter"
78
fevents "github.com/cloudfoundry-community/splunk-firehose-nozzle/events"
89
"github.com/cloudfoundry-community/splunk-firehose-nozzle/eventsink"
910
"github.com/cloudfoundry/sonde-go/events"
@@ -18,29 +19,34 @@ type router struct {
1819
config *Config
1920
}
2021

21-
func New(appCache cache.Cache, sink eventsink.Sink, config *Config) (Router, error) {
22-
selectedEvents, err := fevents.ParseSelectedEvents(config.SelectedEvents)
22+
type filteringRouter struct {
23+
*router
24+
filters eventfilter.Filters
25+
}
2326

27+
func New(appCache cache.Cache, sink eventsink.Sink, config *Config, filters eventfilter.Filters) (Router, error) {
28+
selectedEvents, err := fevents.ParseSelectedEvents(config.SelectedEvents)
2429
if err != nil {
2530
return nil, err
2631
}
2732

28-
return &router{
33+
r := &router{
2934
appCache: appCache,
3035
sink: sink,
3136
selectedEvents: selectedEvents,
3237
config: config,
33-
}, nil
34-
}
35-
36-
func (r *router) Route(msg *events.Envelope) error {
37-
eventType := msg.GetEventType()
38+
}
3839

39-
if _, ok := r.selectedEvents[eventType.String()]; !ok {
40-
// Ignore this event since we are not interested
41-
return nil
40+
// if no filters were defined, we return the original router,
41+
// otherwise we return the filtering router
42+
if filters == nil || filters.Length() < 1 {
43+
return r, nil
4244
}
4345

46+
return &filteringRouter{router: r, filters: filters}, nil
47+
}
48+
49+
func (r *router) processMessage(msg *events.Envelope, eventType events.Envelope_EventType) error {
4450
var event *fevents.Event
4551
switch eventType {
4652
case events.Envelope_HttpStartStop:
@@ -83,5 +89,30 @@ func (r *router) Route(msg *events.Envelope) error {
8389
fields := map[string]interface{}{"err": fmt.Sprintf("%s", err)}
8490
r.sink.Write(fields, "Failed to write events")
8591
}
92+
8693
return err
8794
}
95+
96+
func (r *router) Route(msg *events.Envelope) error {
97+
eventType := msg.GetEventType()
98+
if _, ok := r.selectedEvents[eventType.String()]; !ok {
99+
// Ignore this event since we are not interested
100+
return nil
101+
}
102+
103+
return r.processMessage(msg, eventType)
104+
}
105+
106+
func (r *filteringRouter) Route(msg *events.Envelope) error {
107+
eventType := msg.GetEventType()
108+
if _, ok := r.selectedEvents[eventType.String()]; !ok {
109+
// Ignore this event since we are not interested
110+
return nil
111+
}
112+
113+
if !r.filters.Accepts(msg) {
114+
return nil
115+
}
116+
117+
return r.processMessage(msg, eventType)
118+
}

0 commit comments

Comments
 (0)