Skip to content

Commit 57779ec

Browse files
committed
Implement filters on deployment, job & origin
This allows filtering of events based on the above mentioned fields. Multiple filters can be configured and they will run against a message in the order specified. A filter has the form field₀:comperator₀:value₀;field₁:comperator₁:value₁;…;fieldₙ,comperatorₙ,valueₙ `field` is the messages field to check against. `comperator` is how to compate the `field`'s value to the user provided value ('mustContain', 'mustNotContain') `value` is the value the user want's to check messages for Benchmarks have been added to get an understanding of the performance impact of those filters.
1 parent 20724e2 commit 57779ec

11 files changed

+541
-25
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ This is recommended for dev environments only.
8888
* `BOLTDB_PATH`: Bolt database path.
8989
* `EVENTS`: A comma separated list of events to include. Possible values: ValueMetric,CounterEvent,Error,LogMessage,HttpStartStop,ContainerMetric
9090
* `EXTRA_FIELDS`: Extra fields to annotate your events with (format is key:value,key:value).
91+
* `FILTERS`: Filter events on deployment, job, or origin. The format is `field₀:comperator₀:value₀;field₁:comperator₁:value₁;…;fieldₙ,comperatorₙ,valueₙ` where
92+
* `<field>` is the messages field to check against, e.g. deployment or job
93+
* `<comperator>` how to compare the <field>'s value to the user provided <value>, e.g.: mustContain, mustNotContain
94+
* `<value>` the actual value the message's <field> should be checked for
9195
* `FLUSH_INTERVAL`: Time interval (in s/m/h. For example, 3600s or 60m or 1h) for flushing queue to Splunk regardless of CONSUMER_QUEUE_SIZE. Protects against stale events in low throughput systems.
9296
* `CONSUMER_QUEUE_SIZE`: Sets the internal consumer queue buffer size. Events will be pushed to Splunk after queue is full.
9397
* `HEC_BATCH_SIZE`: Set the batch size for the events to push to HEC (Splunk HTTP Event Collector).

eventfilter/eventfilter_suite_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package eventfilter_test
2+
3+
import (
4+
"testing"
5+
6+
. "github.com/onsi/ginkgo"
7+
. "github.com/onsi/gomega"
8+
)
9+
10+
func TestEventfilter(t *testing.T) {
11+
RegisterFailHandler(Fail)
12+
RunSpecs(t, "Eventfilter Suite")
13+
}

eventfilter/filters.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package eventfilter
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"github.com/cloudfoundry/sonde-go/events"
8+
)
9+
10+
const (
11+
filterSep = ";"
12+
filterValuesSep = ":"
13+
)
14+
15+
// getterFunc is a function that, given an Envelope, returns the things we care
16+
// about, e.g. the Deployment, Job, ...
17+
type getterFunc = func(msg *events.Envelope) string
18+
19+
// supportedGetters are all supported keys we can use for filters and the
20+
// getters / functions that pull the respective data out of an envelope.
21+
var supportedGetters = map[string]getterFunc{
22+
"deployment": func(msg *events.Envelope) string {
23+
return msg.GetDeployment()
24+
},
25+
"origin": func(msg *events.Envelope) string {
26+
return msg.GetOrigin()
27+
},
28+
"job": func(msg *events.Envelope) string {
29+
return msg.GetJob()
30+
},
31+
}
32+
33+
// filterFunc gets called with data from the message envelope, pulled out by a
34+
// getter, and compares it to the user provided value. If it returns true, this
35+
// message should be accepted, else the message should be dropped.
36+
type filterFunc = func(msgData, userInput string) bool
37+
38+
// supportedFilters are all supported filter names and
39+
// the filters / functions that match run against the data from the message and
40+
// compares it to the data provided by the user.
41+
// E.g. when we have
42+
// - the filter func strings.Contains
43+
// - the getter, that gets the message's origin
44+
// - and the value 'foo'
45+
// only messages with the origin 'foo' will be accepted by the filter
46+
var supportedFilters = map[string]filterFunc{
47+
"mustContain": strings.Contains,
48+
"mustNotContain": func(msgData, userInput string) bool { return !strings.Contains(msgData, userInput) },
49+
}
50+
51+
// SupportedFilterKeys lists all supported filter keys. This is only used to
52+
// signal the list of supported keys to users, e.g. for the usage text.
53+
var SupportedFilterKeys = func() []string {
54+
keys := make([]string, 0, len(supportedGetters))
55+
for k := range supportedGetters {
56+
keys = append(keys, k)
57+
}
58+
59+
return keys
60+
}()
61+
62+
// SupportedFilters lists all supported filter names. This is only used to
63+
// signal the list of supported filters to users, e.g. for the usage text.
64+
var SupportedFilters = func() []string {
65+
keys := make([]string, 0, len(supportedFilters))
66+
for k := range supportedFilters {
67+
keys = append(keys, k)
68+
}
69+
70+
return keys
71+
}()
72+
73+
// Filters is something that can tell it's Length (the number of its configured
74+
// filters) and can be used to check if an envelope is accepted or should be
75+
// dropped/discarded.
76+
type Filters interface {
77+
Accepts(*events.Envelope) bool
78+
Length() int
79+
}
80+
81+
type filterRule struct {
82+
getter getterFunc
83+
filter filterFunc
84+
value string
85+
}
86+
87+
var (
88+
errInvalidFormat = fmt.Errorf("format must be '%q:%q:<value>'", SupportedFilterKeys, SupportedFilters)
89+
errEmptyValue = fmt.Errorf("filter value must not be empty")
90+
errInvaldFilter = fmt.Errorf("filter key must be one of %q", SupportedFilterKeys)
91+
errInvalidFilterKey = fmt.Errorf("filter must be one of %q", SupportedFilters)
92+
)
93+
94+
func parseFilterConfig(filters string) ([]filterRule, error) {
95+
rules := []filterRule{}
96+
97+
for _, filterRaw := range strings.Split(filters, filterSep) {
98+
filter := strings.TrimSpace(filterRaw)
99+
100+
if filter == "" {
101+
continue
102+
}
103+
104+
tokens := strings.Split(filter, filterValuesSep)
105+
if len(tokens) != 3 {
106+
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvalidFormat)
107+
}
108+
109+
getterKey := strings.TrimSpace(strings.ToLower(tokens[0]))
110+
filterKey := strings.TrimSpace(tokens[1])
111+
112+
var ok bool
113+
rule := filterRule{
114+
value: tokens[2],
115+
}
116+
117+
if rule.value == "" {
118+
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errEmptyValue)
119+
}
120+
121+
rule.filter, ok = supportedFilters[filterKey]
122+
if !ok {
123+
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvaldFilter)
124+
}
125+
126+
rule.getter, ok = supportedGetters[getterKey]
127+
if !ok {
128+
return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvalidFilterKey)
129+
}
130+
131+
rules = append(rules, rule)
132+
}
133+
134+
return rules, nil
135+
}
136+
137+
type filter func(*events.Envelope) bool
138+
139+
type filters []filter
140+
141+
func (ef *filters) Accepts(msg *events.Envelope) bool {
142+
for _, f := range *ef {
143+
if allow := f(msg); !allow {
144+
return false
145+
}
146+
}
147+
148+
return true
149+
}
150+
151+
func (ef *filters) Length() int {
152+
return len(*ef)
153+
}
154+
155+
func (ef *filters) addFilter(valueGetter getterFunc, valueFilter filterFunc, value string) {
156+
*ef = append(*ef, func(msg *events.Envelope) bool {
157+
return valueFilter(valueGetter(msg), value)
158+
})
159+
}
160+
161+
func New(filterList string) (Filters, error) {
162+
f := &filters{}
163+
164+
rules, err := parseFilterConfig(filterList)
165+
if err != nil {
166+
return nil, err
167+
}
168+
169+
for _, rule := range rules {
170+
f.addFilter(rule.getter, rule.filter, rule.value)
171+
}
172+
173+
return f, nil
174+
}

eventfilter/filters_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package eventfilter_test
2+
3+
import (
4+
"github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter"
5+
"github.com/cloudfoundry/sonde-go/events"
6+
7+
. "github.com/onsi/ginkgo"
8+
. "github.com/onsi/ginkgo/extensions/table"
9+
. "github.com/onsi/gomega"
10+
)
11+
12+
var _ = Describe("Rule parsing", func() {
13+
testError := func(filterConf string, errorMsg string) {
14+
filters, err := eventfilter.New(filterConf)
15+
Expect(filters).To(BeNil())
16+
Expect(err).To(MatchError(ContainSubstring(errorMsg)))
17+
}
18+
DescribeTable("throws error", testError,
19+
Entry("not enough fields", ":", "format must be"),
20+
Entry("too many fields", "xxx:yyy:zzz:rrrr", "format must be"),
21+
Entry("invalid value", "xxx::", "filter value must not be empty"),
22+
Entry("invalid filter", "xxx:yyy:zzz", "filter key must be one of"),
23+
Entry("invalid field", "notValid:mustContain:zzz", "filter must be one of"),
24+
)
25+
26+
testOk := func(filterConf string, length int) {
27+
filters, err := eventfilter.New(filterConf)
28+
Expect(err).NotTo(HaveOccurred())
29+
Expect(filters).NotTo(BeNil(), "filters have not been initialized")
30+
Expect(filters.Length()).To(Equal(length), "Expected %d filter rules", length)
31+
}
32+
DescribeTable("parses ok", testOk,
33+
Entry("no filters at all", "", 0),
34+
Entry("multiple empty rules", ";;;;", 0),
35+
Entry("filtering on deployment", "deployment:mustContain:some deployment", 1),
36+
Entry("accepts whitespace between rules", " deployment:mustContain:something ; origin:mustContain:someOrigin ", 2),
37+
Entry("accepts whitespace in filter", "deployment: mustContain :something", 1),
38+
39+
Entry("inclusion filter on deployment", "Deployment:mustContain:something", 1),
40+
Entry("inclusion filter on origin", "origin:mustContain:something", 1),
41+
Entry("inclusion filter on job", "job:mustContain:something", 1),
42+
43+
Entry("exclusion filter on deployment", "Deployment:mustNotContain:something", 1),
44+
Entry("exclusion filter on origin", "origin:mustNotContain:something", 1),
45+
Entry("exclusion filter on job", "job:mustNotContain:something", 1),
46+
)
47+
})
48+
49+
var _ = Describe("Filtering", func() {
50+
msg := &events.Envelope{
51+
Deployment: p("p-healthwatch2-123123123"),
52+
Origin: p("some origin"),
53+
Job: p("some job"),
54+
}
55+
56+
test := func(filterConf string, expected bool) {
57+
filters, err := eventfilter.New(filterConf)
58+
Expect(err).NotTo(HaveOccurred())
59+
Expect(filters.Accepts(msg)).
60+
To(Equal(expected), "Expected event {%v} to be %s", msg, tern(expected, "accepted", "discarded"))
61+
Expect(filters).NotTo(BeNil(), "filters have not been initialized")
62+
}
63+
64+
DescribeTable("on", test,
65+
Entry("empty filter conf should accept", "", true),
66+
Entry("matching inclusion filter should accept", "deployment:mustContain:healthwatch2", true),
67+
Entry("non-matching inclusion filter should discard", "deployment:mustContain:something", false),
68+
Entry("matching exclusion filter should discard", "deployment:mustNotContain:healthwatch2", false),
69+
Entry("2nd exclusion filter should discard", "deployment:mustNotContain:health ; deployment:mustNotContain:watch", false),
70+
Entry("3rd exclusion filter should discard",
71+
"deployment:mustContain:health ; job:mustNotContain:other job ; deployment:mustNotContain:watch",
72+
false,
73+
),
74+
Entry("many matching inclusion filters should accept",
75+
"deployment:mustContain:h ; deployment:mustContain:e ; deployment:mustContain:a ; deployment:mustContain:l ; deployment:mustContain:t ; deployment:mustContain:h",
76+
true,
77+
),
78+
Entry("many non-matching exclusion filters should accept",
79+
"deployment:mustNotContain:x ; deployment:mustNotContain:y ; deployment:mustNotContain:z ; deployment:mustNotContain:u ; deployment:mustNotContain:b ; deployment:mustNotContain:r",
80+
true,
81+
),
82+
)
83+
})
84+
85+
func p(s string) *string { return &s }
86+
87+
func tern(b bool, t string, f string) string {
88+
if b {
89+
return t
90+
}
91+
92+
return f
93+
}

eventrouter/default.go

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55

66
"github.com/cloudfoundry-community/splunk-firehose-nozzle/cache"
7+
"github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter"
78
fevents "github.com/cloudfoundry-community/splunk-firehose-nozzle/events"
89
"github.com/cloudfoundry-community/splunk-firehose-nozzle/eventsink"
910
"github.com/cloudfoundry/sonde-go/events"
@@ -18,29 +19,34 @@ type router struct {
1819
config *Config
1920
}
2021

21-
func New(appCache cache.Cache, sink eventsink.Sink, config *Config) (Router, error) {
22-
selectedEvents, err := fevents.ParseSelectedEvents(config.SelectedEvents)
22+
type filteringRouter struct {
23+
*router
24+
filters eventfilter.Filters
25+
}
2326

27+
func New(appCache cache.Cache, sink eventsink.Sink, config *Config, filters eventfilter.Filters) (Router, error) {
28+
selectedEvents, err := fevents.ParseSelectedEvents(config.SelectedEvents)
2429
if err != nil {
2530
return nil, err
2631
}
2732

28-
return &router{
33+
r := &router{
2934
appCache: appCache,
3035
sink: sink,
3136
selectedEvents: selectedEvents,
3237
config: config,
33-
}, nil
34-
}
35-
36-
func (r *router) Route(msg *events.Envelope) error {
37-
eventType := msg.GetEventType()
38+
}
3839

39-
if _, ok := r.selectedEvents[eventType.String()]; !ok {
40-
// Ignore this event since we are not interested
41-
return nil
40+
// if no filters were defined, we return the original router,
41+
// otherwise we return the filtering router
42+
if filters == nil || filters.Length() < 1 {
43+
return r, nil
4244
}
4345

46+
return &filteringRouter{router: r, filters: filters}, nil
47+
}
48+
49+
func (r *router) processMessage(msg *events.Envelope, eventType events.Envelope_EventType) error {
4450
var event *fevents.Event
4551
switch eventType {
4652
case events.Envelope_HttpStartStop:
@@ -83,5 +89,30 @@ func (r *router) Route(msg *events.Envelope) error {
8389
fields := map[string]interface{}{"err": fmt.Sprintf("%s", err)}
8490
r.sink.Write(fields, "Failed to write events")
8591
}
92+
8693
return err
8794
}
95+
96+
func (r *router) Route(msg *events.Envelope) error {
97+
eventType := msg.GetEventType()
98+
if _, ok := r.selectedEvents[eventType.String()]; !ok {
99+
// Ignore this event since we are not interested
100+
return nil
101+
}
102+
103+
return r.processMessage(msg, eventType)
104+
}
105+
106+
func (r *filteringRouter) Route(msg *events.Envelope) error {
107+
eventType := msg.GetEventType()
108+
if _, ok := r.selectedEvents[eventType.String()]; !ok {
109+
// Ignore this event since we are not interested
110+
return nil
111+
}
112+
113+
if !r.filters.Accepts(msg) {
114+
return nil
115+
}
116+
117+
return r.processMessage(msg, eventType)
118+
}

0 commit comments

Comments
 (0)