diff --git a/README.md b/README.md index 8959db4c..954ebf6a 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,10 @@ This is recommended for dev environments only. * `ENABLE_EVENT_TRACING`: Enables event trace logging. Splunk events will now contain a UUID, Splunk Nozzle Event Counts, and a Subscription-ID for Splunk correlation searches. (Default: false) * `STATUS_MONITOR_INTERVAL`: Time interval (in s/m/h. For example, 3600s or 60m or 1h) for monitoring memory queue pressure. Use to help with back-pressure insights. (Increases CPU load. Use for insights purposes only) Default is 0s (Disabled). * `DROP_WARN_THRESHOLD`: Threshold for the count of dropped events in case the downstream is slow. Based on the threshold, the errors will be logged. +* `FILTERS`: Filter events on deployment, job, or origin. The format is `field₀:comperator₀:value₀;field₁:comperator₁:value₁;…;fieldₙ,comperatorₙ,valueₙ` where + * `` is the messages field to check against, e.g. deployment or job + * `` how to compare the 's value to the user provided , e.g.: mustContain, mustNotContain + * `` the actual value the message's should be checked for - - - - diff --git a/eventfilter/eventfilter_suite_test.go b/eventfilter/eventfilter_suite_test.go new file mode 100644 index 00000000..1c594e45 --- /dev/null +++ b/eventfilter/eventfilter_suite_test.go @@ -0,0 +1,13 @@ +package eventfilter_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestEventfilter(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Eventfilter Suite") +} diff --git a/eventfilter/filters.go b/eventfilter/filters.go new file mode 100644 index 00000000..64053314 --- /dev/null +++ b/eventfilter/filters.go @@ -0,0 +1,174 @@ +package eventfilter + +import ( + "fmt" + "strings" + + "github.com/cloudfoundry/sonde-go/events" +) + +const ( + filterSep = ";" + filterValuesSep = ":" +) + +// getterFunc is a function that, given an Envelope, returns the things we care +// about, e.g. the Deployment, Job, ... +type getterFunc = func(msg *events.Envelope) string + +// supportedGetters are all supported keys we can use for filters and the +// getters / functions that pull the respective data out of an envelope. +var supportedGetters = map[string]getterFunc{ + "deployment": func(msg *events.Envelope) string { + return msg.GetDeployment() + }, + "origin": func(msg *events.Envelope) string { + return msg.GetOrigin() + }, + "job": func(msg *events.Envelope) string { + return msg.GetJob() + }, +} + +// filterFunc gets called with data from the message envelope, pulled out by a +// getter, and compares it to the user provided value. If it returns true, this +// message should be accepted, else the message should be dropped. +type filterFunc = func(msgData, userInput string) bool + +// supportedFilters are all supported filter names and +// the filters / functions that match run against the data from the message and +// compares it to the data provided by the user. +// E.g. when we have +// - the filter func strings.Contains +// - the getter, that gets the message's origin +// - and the value 'foo' +// only messages with the origin 'foo' will be accepted by the filter +var supportedFilters = map[string]filterFunc{ + "mustContain": strings.Contains, + "mustNotContain": func(msgData, userInput string) bool { return !strings.Contains(msgData, userInput) }, +} + +// SupportedFilterKeys lists all supported filter keys. This is only used to +// signal the list of supported keys to users, e.g. for the usage text. +var SupportedFilterKeys = func() []string { + keys := make([]string, 0, len(supportedGetters)) + for k := range supportedGetters { + keys = append(keys, k) + } + + return keys +}() + +// SupportedFilters lists all supported filter names. This is only used to +// signal the list of supported filters to users, e.g. for the usage text. +var SupportedFilters = func() []string { + keys := make([]string, 0, len(supportedFilters)) + for k := range supportedFilters { + keys = append(keys, k) + } + + return keys +}() + +// Filters is something that can tell it's Length (the number of its configured +// filters) and can be used to check if an envelope is accepted or should be +// dropped/discarded. +type Filters interface { + Accepts(*events.Envelope) bool + Length() int +} + +type filterRule struct { + getter getterFunc + filter filterFunc + value string +} + +var ( + errInvalidFormat = fmt.Errorf("format must be '%q:%q:'", SupportedFilterKeys, SupportedFilters) + errEmptyValue = fmt.Errorf("filter value must not be empty") + errInvaldFilter = fmt.Errorf("filter key must be one of %q", SupportedFilterKeys) + errInvalidFilterKey = fmt.Errorf("filter must be one of %q", SupportedFilters) +) + +func parseFilterConfig(filters string) ([]filterRule, error) { + rules := []filterRule{} + + for _, filterRaw := range strings.Split(filters, filterSep) { + filter := strings.TrimSpace(filterRaw) + + if filter == "" { + continue + } + + tokens := strings.Split(filter, filterValuesSep) + if len(tokens) != 3 { + return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvalidFormat) + } + + getterKey := strings.TrimSpace(strings.ToLower(tokens[0])) + filterKey := strings.TrimSpace(tokens[1]) + + var ok bool + rule := filterRule{ + value: tokens[2], + } + + if rule.value == "" { + return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errEmptyValue) + } + + rule.filter, ok = supportedFilters[filterKey] + if !ok { + return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvaldFilter) + } + + rule.getter, ok = supportedGetters[getterKey] + if !ok { + return []filterRule{}, fmt.Errorf("filter %q invalid: %s", filter, errInvalidFilterKey) + } + + rules = append(rules, rule) + } + + return rules, nil +} + +type filter func(*events.Envelope) bool + +type filters []filter + +func (ef *filters) Accepts(msg *events.Envelope) bool { + for _, f := range *ef { + if allow := f(msg); !allow { + return false + } + } + + return true +} + +func (ef *filters) Length() int { + return len(*ef) +} + +func (ef *filters) addFilter(valueGetter getterFunc, valueFilter filterFunc, value string) { + *ef = append(*ef, func(msg *events.Envelope) bool { + return valueFilter(valueGetter(msg), value) + }) +} + +func New(filterList string) (Filters, error) { + f := &filters{} + + rules, err := parseFilterConfig(filterList) + if err != nil { + return nil, err + } + + for _, rule := range rules { + f.addFilter(rule.getter, rule.filter, rule.value) + } + + return f, nil +} diff --git a/eventfilter/filters_test.go b/eventfilter/filters_test.go new file mode 100644 index 00000000..aa205712 --- /dev/null +++ b/eventfilter/filters_test.go @@ -0,0 +1,93 @@ +package eventfilter_test + +import ( + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" + "github.com/cloudfoundry/sonde-go/events" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/ginkgo/extensions/table" + . "github.com/onsi/gomega" +) + +var _ = Describe("Rule parsing", func() { + testError := func(filterConf string, errorMsg string) { + filters, err := eventfilter.New(filterConf) + Expect(filters).To(BeNil()) + Expect(err).To(MatchError(ContainSubstring(errorMsg))) + } + DescribeTable("throws error", testError, + Entry("not enough fields", ":", "format must be"), + Entry("too many fields", "xxx:yyy:zzz:rrrr", "format must be"), + Entry("invalid value", "xxx::", "filter value must not be empty"), + Entry("invalid filter", "xxx:yyy:zzz", "filter key must be one of"), + Entry("invalid field", "notValid:mustContain:zzz", "filter must be one of"), + ) + + testOk := func(filterConf string, length int) { + filters, err := eventfilter.New(filterConf) + Expect(err).NotTo(HaveOccurred()) + Expect(filters).NotTo(BeNil(), "filters have not been initialized") + Expect(filters.Length()).To(Equal(length), "Expected %d filter rules", length) + } + DescribeTable("parses ok", testOk, + Entry("no filters at all", "", 0), + Entry("multiple empty rules", ";;;;", 0), + Entry("filtering on deployment", "deployment:mustContain:some deployment", 1), + Entry("accepts whitespace between rules", " deployment:mustContain:something ; origin:mustContain:someOrigin ", 2), + Entry("accepts whitespace in filter", "deployment: mustContain :something", 1), + + Entry("inclusion filter on deployment", "Deployment:mustContain:something", 1), + Entry("inclusion filter on origin", "origin:mustContain:something", 1), + Entry("inclusion filter on job", "job:mustContain:something", 1), + + Entry("exclusion filter on deployment", "Deployment:mustNotContain:something", 1), + Entry("exclusion filter on origin", "origin:mustNotContain:something", 1), + Entry("exclusion filter on job", "job:mustNotContain:something", 1), + ) +}) + +var _ = Describe("Filtering", func() { + msg := &events.Envelope{ + Deployment: p("p-healthwatch2-123123123"), + Origin: p("some origin"), + Job: p("some job"), + } + + test := func(filterConf string, expected bool) { + filters, err := eventfilter.New(filterConf) + Expect(err).NotTo(HaveOccurred()) + Expect(filters.Accepts(msg)). + To(Equal(expected), "Expected event {%v} to be %s", msg, tern(expected, "accepted", "discarded")) + Expect(filters).NotTo(BeNil(), "filters have not been initialized") + } + + DescribeTable("on", test, + Entry("empty filter conf should accept", "", true), + Entry("matching inclusion filter should accept", "deployment:mustContain:healthwatch2", true), + Entry("non-matching inclusion filter should discard", "deployment:mustContain:something", false), + Entry("matching exclusion filter should discard", "deployment:mustNotContain:healthwatch2", false), + Entry("2nd exclusion filter should discard", "deployment:mustNotContain:health ; deployment:mustNotContain:watch", false), + Entry("3rd exclusion filter should discard", + "deployment:mustContain:health ; job:mustNotContain:other job ; deployment:mustNotContain:watch", + false, + ), + Entry("many matching inclusion filters should accept", + "deployment:mustContain:h ; deployment:mustContain:e ; deployment:mustContain:a ; deployment:mustContain:l ; deployment:mustContain:t ; deployment:mustContain:h", + true, + ), + Entry("many non-matching exclusion filters should accept", + "deployment:mustNotContain:x ; deployment:mustNotContain:y ; deployment:mustNotContain:z ; deployment:mustNotContain:u ; deployment:mustNotContain:b ; deployment:mustNotContain:r", + true, + ), + ) +}) + +func p(s string) *string { return &s } + +func tern(b bool, t string, f string) string { + if b { + return t + } + + return f +} diff --git a/eventrouter/default.go b/eventrouter/default.go index 1b08ccb8..4b78d354 100644 --- a/eventrouter/default.go +++ b/eventrouter/default.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/cloudfoundry-community/splunk-firehose-nozzle/cache" + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" fevents "github.com/cloudfoundry-community/splunk-firehose-nozzle/events" "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventsink" "github.com/cloudfoundry/sonde-go/events" @@ -18,29 +19,34 @@ type router struct { config *Config } -func New(appCache cache.Cache, sink eventsink.Sink, config *Config) (Router, error) { - selectedEvents, err := fevents.ParseSelectedEvents(config.SelectedEvents) +type filteringRouter struct { + *router + filters eventfilter.Filters +} +func New(appCache cache.Cache, sink eventsink.Sink, config *Config, filters eventfilter.Filters) (Router, error) { + selectedEvents, err := fevents.ParseSelectedEvents(config.SelectedEvents) if err != nil { return nil, err } - return &router{ + r := &router{ appCache: appCache, sink: sink, selectedEvents: selectedEvents, config: config, - }, nil -} - -func (r *router) Route(msg *events.Envelope) error { - eventType := msg.GetEventType() + } - if _, ok := r.selectedEvents[eventType.String()]; !ok { - // Ignore this event since we are not interested - return nil + // if no filters were defined, we return the original router, + // otherwise we return the filtering router + if filters == nil || filters.Length() < 1 { + return r, nil } + return &filteringRouter{router: r, filters: filters}, nil +} + +func (r *router) processMessage(msg *events.Envelope, eventType events.Envelope_EventType) error { var event *fevents.Event switch eventType { case events.Envelope_HttpStartStop: @@ -83,5 +89,30 @@ func (r *router) Route(msg *events.Envelope) error { fields := map[string]interface{}{"err": fmt.Sprintf("%s", err)} r.sink.Write(fields, "Failed to write events") } + return err } + +func (r *router) Route(msg *events.Envelope) error { + eventType := msg.GetEventType() + if _, ok := r.selectedEvents[eventType.String()]; !ok { + // Ignore this event since we are not interested + return nil + } + + return r.processMessage(msg, eventType) +} + +func (r *filteringRouter) Route(msg *events.Envelope) error { + eventType := msg.GetEventType() + if _, ok := r.selectedEvents[eventType.String()]; !ok { + // Ignore this event since we are not interested + return nil + } + + if !r.filters.Accepts(msg) { + return nil + } + + return r.processMessage(msg, eventType) +} diff --git a/eventrouter/eventrouter_benchmark_test.go b/eventrouter/eventrouter_benchmark_test.go new file mode 100644 index 00000000..21d14371 --- /dev/null +++ b/eventrouter/eventrouter_benchmark_test.go @@ -0,0 +1,172 @@ +package eventrouter_test + +import ( + "fmt" + + "github.com/cloudfoundry-community/splunk-firehose-nozzle/cache" + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventrouter" + "github.com/cloudfoundry/sonde-go/events" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + messagesPerRun = 100 * 1000 + runs = 10 +) + +var _ = Describe("eventrouter", func() { + var router eventrouter.Router + var filters eventfilter.Filters + var sink *devNullSink + + measureIt := func(f string, expectedMsgs int) { + title := fmt.Sprintf("message through-put for filter %q", f) + Measure(title, func(b Benchmarker) { + runtime := b.Time("route", func() { + err := pushMessages(router, messagesPerRun) + Expect(err).NotTo(HaveOccurred()) + Expect(sink.msgs).To(Equal(expectedMsgs)) + }) + + b.RecordValue("routed messages per micro second", float64(messagesPerRun)/runtime.Seconds()/float64(1000)) + }, runs) + } + + JustBeforeEach(func() { + cache := nullCache{} + sink = &devNullSink{} + config := &eventrouter.Config{ + SelectedEvents: "LogMessage,HttpStart,HttpStop,HttpStartStop,ValueMetric,CounterEvent,Error,ContainerMetric", + } + var err error + router, err = eventrouter.New(cache, sink, config, filters) + Expect(err).NotTo(HaveOccurred()) + }) + measureIt("", messagesPerRun) + + Describe("single accepting filter", func() { + var filterRules = "deployment:mustContain:some deployment" + BeforeEach(func() { + var err error + filters, err = eventfilter.New(filterRules) + Expect(err).NotTo(HaveOccurred()) + }) + measureIt(filterRules, messagesPerRun) + }) + + Describe("multiple filters, first one accepts, others don't match", func() { + filterRules := "deployment:mustContain:ome depl " + manyNotMatchingThings + BeforeEach(func() { + var err error + filters, err = eventfilter.New(filterRules) + Expect(err).NotTo(HaveOccurred()) + }) + measureIt(first(filterRules, 40), messagesPerRun) + }) + + Describe("multiple filters, last one accepts", func() { + filterRules := manyNotMatchingThings + " deployment:mustContain:ome depl" + BeforeEach(func() { + var err error + filters, err = eventfilter.New(filterRules) + Expect(err).NotTo(HaveOccurred()) + }) + measureIt(last(filterRules, 40), messagesPerRun) + }) + + Describe("multiple filters, first one discards", func() { + filterRules := "deployment:mustNotContain:ome depl " + manyNotMatchingThings + BeforeEach(func() { + var err error + filters, err = eventfilter.New(filterRules) + Expect(err).NotTo(HaveOccurred()) + }) + measureIt(first(filterRules, 40), 0) + }) + + Describe("multiple filters, first one accepts, second one discards, others don't match", func() { + filterRules := "deployment:mustContain:ome depl; deployment:mustNotContain:some " + manyNotMatchingThings + BeforeEach(func() { + var err error + filters, err = eventfilter.New(filterRules) + Expect(err).NotTo(HaveOccurred()) + }) + measureIt(first(filterRules, 60), 0) + }) +}) + +// manyNotMatchingThings is used to create some closures that get passed into +// the router as filters, to be able to measure if and how the number of those +// closures influences the router's performance. +const manyNotMatchingThings = ";" + + "origin:mustNotContain:ignore0; origin:mustNotContain:ignore1; origin:mustNotContain:ignore2; origin:mustNotContain:ignore3;" + + "origin:mustNotContain:ignore4; origin:mustNotContain:ignore5; origin:mustNotContain:ignore6; origin:mustNotContain:ignore7;" + + "origin:mustNotContain:ignore8; origin:mustNotContain:ignore9; origin:mustNotContain:ignore10; origin:mustNotContain:ignore11;" + + "origin:mustNotContain:ignore12; origin:mustNotContain:ignore13; origin:mustNotContain:ignore14; origin:mustNotContain:ignore15;" + + "origin:mustNotContain:ignore16; origin:mustNotContain:ignore17; origin:mustNotContain:ignore18; origin:mustNotContain:ignore19;" + +func pushMessages(r eventrouter.Router, nrOfMsgs int) error { + events := make(chan *events.Envelope, 10) + stop := make(chan struct{}) + + defer func() { close(stop) }() + + go eventGenerator(nrOfMsgs, events, stop) + + for msg := range events { + err := r.Route(msg) + if err != nil { + return err + } + } + + return nil +} + +func eventGenerator(count int, ch chan *events.Envelope, stop chan struct{}) { + defer func() { close(ch) }() + + for i := 0; i < count; i++ { + select { + case <-stop: + return + default: + ch <- &events.Envelope{ + Deployment: p("some deployment"), + Origin: p("some origin"), + } + } + } +} + +func p(s string) *string { + return &s +} + +type devNullSink struct { + msgs int +} + +func (s *devNullSink) Open() error { return nil } +func (s *devNullSink) Close() error { return nil } +func (s *devNullSink) Write(fields map[string]interface{}, msg string) error { + s.msgs += 1 + return nil +} + +type nullCache struct{} + +func (nullCache) Open() error { return nil } +func (nullCache) Close() error { return nil } +func (nullCache) GetAllApps() (map[string]*cache.App, error) { return map[string]*cache.App{}, nil } +func (nullCache) GetApp(string) (*cache.App, error) { return &cache.App{}, nil } + +func first(s string, l int) string { + return s[0:l] + "[...]" +} + +func last(s string, l int) string { + return "[...]" + s[len(s)-l:] +} diff --git a/eventrouter/eventrouter_test.go b/eventrouter/eventrouter_test.go index 9b335c07..322953a9 100644 --- a/eventrouter/eventrouter_test.go +++ b/eventrouter/eventrouter_test.go @@ -1,6 +1,7 @@ package eventrouter_test import ( + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" . "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventrouter" "github.com/cloudfoundry-community/splunk-firehose-nozzle/testing" "github.com/cloudfoundry/sonde-go/events" @@ -33,7 +34,7 @@ var _ = Describe("eventrouter", func() { config := &Config{ SelectedEvents: "LogMessage,HttpStart,HttpStop,HttpStartStop,ValueMetric,CounterEvent,Error,ContainerMetric", } - r, err = New(noCache, memSink, config) + r, err = New(noCache, memSink, config, nil) Ω(err).ShouldNot(HaveOccurred()) timestampNano = 1467040874046121775 @@ -84,7 +85,7 @@ var _ = Describe("eventrouter", func() { config := &Config{ SelectedEvents: "HttpStart", } - r, err = New(noCache, memSink, config) + r, err = New(noCache, memSink, config, nil) Ω(err).ShouldNot(HaveOccurred()) eventType = events.Envelope_HttpStop @@ -98,7 +99,7 @@ var _ = Describe("eventrouter", func() { config := &Config{ SelectedEvents: "", } - r, err = New(noCache, memSink, config) + r, err = New(noCache, memSink, config, nil) Ω(err).ShouldNot(HaveOccurred()) eventType = events.Envelope_LogMessage @@ -127,7 +128,7 @@ var _ = Describe("eventrouter", func() { config := &Config{ SelectedEvents: "invalid", } - r, err = New(noCache, memSink, config) + r, err = New(noCache, memSink, config, nil) Ω(err).ShouldNot(HaveOccurred()) eventType = invalid @@ -160,7 +161,95 @@ var _ = Describe("eventrouter", func() { config := &Config{ SelectedEvents: "invalid-event", } - _, err = New(noCache, memSink, config) + _, err = New(noCache, memSink, config, nil) Ω(err).Should(HaveOccurred()) }) }) + +var _ = Describe("eventrouter filtering", func() { + var msg *events.Envelope + var router Router + var filters eventfilter.Filters + var sink *devNullSink + + BeforeEach(func() { + msg = &events.Envelope{ + Origin: p("some origin"), + } + sink = &devNullSink{} + }) + + JustBeforeEach(func() { + config := &Config{ + SelectedEvents: "LogMessage,HttpStart,HttpStop,HttpStartStop,ValueMetric,CounterEvent,Error,ContainerMetric", + } + var err error + + router, err = New(nullCache{}, sink, config, filters) + Expect(err).NotTo(HaveOccurred()) + + err = router.Route(msg) + Expect(err).NotTo(HaveOccurred()) + }) + + Context("with a nil filter", func() { + BeforeEach(func() { + filters = nil + }) + It("routes the message", func() { + Expect(sink.msgs).To(Equal(1)) + }) + }) + + Context("with a filter that blocks everything", func() { + BeforeEach(func() { + filters = blockingFilter{} + }) + It("discards the message", func() { + Expect(sink.msgs).To(Equal(0)) + }) + }) + + Context("with an accepting origin filter", func() { + BeforeEach(func() { + filters = originFilter{allowedOrigin: msg.GetOrigin()} + }) + It("routes the message", func() { + Expect(sink.msgs).To(Equal(1)) + }) + }) + + Context("with an blocking origin filter", func() { + BeforeEach(func() { + filters = originFilter{allowedOrigin: "Invalid origin"} + }) + It("discards the message", func() { + Expect(sink.msgs).To(Equal(0)) + }) + }) +}) + +type fakeFilter struct{} + +func (fakeFilter) Length() int { return 1 } + +type allowingFilter struct{ fakeFilter } + +func (allowingFilter) Accepts(m *events.Envelope) bool { + return true +} + +type blockingFilter struct{ fakeFilter } + +func (blockingFilter) Accepts(m *events.Envelope) bool { + return false +} + +type originFilter struct { + fakeFilter + allowedOrigin string +} + +func (of originFilter) Accepts(m *events.Envelope) bool { + return of.allowedOrigin == m.GetOrigin() +} diff --git a/eventsink/splunk_test.go b/eventsink/splunk_test.go index 093ff4d2..e0a2f5bc 100644 --- a/eventsink/splunk_test.go +++ b/eventsink/splunk_test.go @@ -61,7 +61,7 @@ var _ = Describe("Splunk", func() { rconfig := &eventrouter.Config{ SelectedEvents: "ContainerMetric, CounterEvent, Error, HttpStart, HttpStartStop, HttpStop, LogMessage, ValueMetric", } - eventRouter, err = eventrouter.New(cache.NewNoCache(), memSink, rconfig) + eventRouter, err = eventrouter.New(cache.NewNoCache(), memSink, rconfig, nil) Ω(err).ShouldNot(HaveOccurred()) mockClient = &testing.EventWriterMock{} diff --git a/splunknozzle/config.go b/splunknozzle/config.go index d769b349..48912f95 100644 --- a/splunknozzle/config.go +++ b/splunknozzle/config.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" "github.com/cloudfoundry-community/splunk-firehose-nozzle/events" kingpin "gopkg.in/alecthomas/kingpin.v2" @@ -43,6 +44,8 @@ type Config struct { WantedEvents string `json:"wanted-events"` ExtraFields string `json:"extra-fields"` + Filters string `json:"filters"` + FlushInterval time.Duration `json:"flush-interval"` QueueSize int `json:"queue-size"` BatchSize int `json:"batch-size"` @@ -125,6 +128,23 @@ func NewConfigFromCmdFlags(version, branch, commit, buildos string) *Config { kingpin.Flag("extra-fields", "Extra fields you want to annotate your events with, example: '--extra-fields=env:dev,something:other "). OverrideDefaultFromEnvar("EXTRA_FIELDS").Default("").StringVar(&c.ExtraFields) + kingpin.Flag("filters", fmt.Sprintf( + "Filter events. A valid filter looks like '::'. Multiple of those "+ + "filters can be given by separating them with ';', in which case they run in the order given. "+ + " is what part of a message to check against, must be one of %q. "+ + " how to compare the 's value against , must be one of %q. "+ + " what to check a message for. "+ + "The more filters are specified, the more work needs to be done per incoming event. "+ + "Example: given the filter "+ + "'deployment:mustContain:healthwatch2; deployment:mustNotContain:exporters; job:mustContain:someJob' "+ + "and events coming from the deployments %v, only events from the deployment %q and "+ + "the job %q would be forwarded, all other events would be dropped.", + eventfilter.SupportedFilterKeys, eventfilter.SupportedFilters, + []string{"p-healthwatch2-UUID", "p-healthwatch2-exporters-UUID", "cf-UUID"}, + "p-healthwatch2-UUID", "someJob", + )). + OverrideDefaultFromEnvar("FILTERS").Default("").StringVar(&c.Filters) + kingpin.Flag("flush-interval", "Every interval flushes to Splunk Http Event Collector server"). OverrideDefaultFromEnvar("FLUSH_INTERVAL").Default("5s").DurationVar(&c.FlushInterval) kingpin.Flag("consumer-queue-size", "Consumer queue buffer size"). diff --git a/splunknozzle/config_test.go b/splunknozzle/config_test.go index a0a6624b..d6460cf1 100644 --- a/splunknozzle/config_test.go +++ b/splunknozzle/config_test.go @@ -67,6 +67,8 @@ var _ = Describe("Config", func() { os.Setenv("DEBUG", "true") os.Setenv("DROP_WARN_THRESHOLD", "100") + os.Setenv("FILTERS", "some filters") + c := NewConfigFromCmdFlags(version, branch, commit, buildos) Expect(c.ApiEndpoint).To(Equal("api.bosh-lite.com")) @@ -114,6 +116,8 @@ var _ = Describe("Config", func() { Expect(c.TraceLogging).To(BeTrue()) Expect(c.Debug).To(BeTrue()) Expect(c.DropWarnThreshold).To(Equal(100)) + + Expect(c.Filters).To(Equal("some filters")) }) It("check defaults", func() { @@ -148,6 +152,8 @@ var _ = Describe("Config", func() { Expect(c.TraceLogging).To(BeFalse()) Expect(c.Debug).To(BeFalse()) Expect(c.DropWarnThreshold).To(Equal(1000)) + + Expect(c.Filters).To(Equal("")) }) }) @@ -196,6 +202,7 @@ var _ = Describe("Config", func() { "--enable-event-tracing", "--debug", "--drop-warn-threshold=10", + "--filters=some filters", } os.Args = args }) @@ -248,6 +255,7 @@ var _ = Describe("Config", func() { Expect(c.Commit).To(Equal(commit)) Expect(c.BuildOS).To(Equal(buildos)) + Expect(c.Filters).To(Equal("some filters")) }) }) }) diff --git a/splunknozzle/nozzle.go b/splunknozzle/nozzle.go index 41fb6407..2b63ccfd 100644 --- a/splunknozzle/nozzle.go +++ b/splunknozzle/nozzle.go @@ -8,6 +8,7 @@ import ( "code.cloudfoundry.org/lager" cfclient "github.com/cloudfoundry-community/go-cfclient" "github.com/cloudfoundry-community/splunk-firehose-nozzle/cache" + "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventfilter" "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventrouter" "github.com/cloudfoundry-community/splunk-firehose-nozzle/events" "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventsink" @@ -43,7 +44,13 @@ func (s *SplunkFirehoseNozzle) EventRouter(cache cache.Cache, eventSink eventsin AddSpaceGuid: strings.Contains(LowerAddAppInfo, "spaceguid"), AddTags: s.config.AddTags, } - return eventrouter.New(cache, eventSink, config) + + filter, err := eventfilter.New(s.config.Filters) + if err != nil { + return nil, err + } + + return eventrouter.New(cache, eventSink, config, filter) } // CFClient creates a client object which can talk to Cloud Foundry diff --git a/vendor/github.com/onsi/ginkgo/extensions/table/table.go b/vendor/github.com/onsi/ginkgo/extensions/table/table.go new file mode 100644 index 00000000..ae8ab7d2 --- /dev/null +++ b/vendor/github.com/onsi/ginkgo/extensions/table/table.go @@ -0,0 +1,98 @@ +/* + +Table provides a simple DSL for Ginkgo-native Table-Driven Tests + +The godoc documentation describes Table's API. More comprehensive documentation (with examples!) is available at http://onsi.github.io/ginkgo#table-driven-tests + +*/ + +package table + +import ( + "fmt" + "reflect" + + "github.com/onsi/ginkgo" +) + +/* +DescribeTable describes a table-driven test. + +For example: + + DescribeTable("a simple table", + func(x int, y int, expected bool) { + Ω(x > y).Should(Equal(expected)) + }, + Entry("x > y", 1, 0, true), + Entry("x == y", 0, 0, false), + Entry("x < y", 0, 1, false), + ) + +The first argument to `DescribeTable` is a string description. +The second argument is a function that will be run for each table entry. Your assertions go here - the function is equivalent to a Ginkgo It. +The subsequent arguments must be of type `TableEntry`. We recommend using the `Entry` convenience constructors. + +The `Entry` constructor takes a string description followed by an arbitrary set of parameters. These parameters are passed into your function. + +Under the hood, `DescribeTable` simply generates a new Ginkgo `Describe`. Each `Entry` is turned into an `It` within the `Describe`. + +It's important to understand that the `Describe`s and `It`s are generated at evaluation time (i.e. when Ginkgo constructs the tree of tests and before the tests run). + +Individual Entries can be focused (with FEntry) or marked pending (with PEntry or XEntry). In addition, the entire table can be focused or marked pending with FDescribeTable and PDescribeTable/XDescribeTable. +*/ +func DescribeTable(description string, itBody interface{}, entries ...TableEntry) bool { + describeTable(description, itBody, entries, false, false) + return true +} + +/* +You can focus a table with `FDescribeTable`. This is equivalent to `FDescribe`. +*/ +func FDescribeTable(description string, itBody interface{}, entries ...TableEntry) bool { + describeTable(description, itBody, entries, false, true) + return true +} + +/* +You can mark a table as pending with `PDescribeTable`. This is equivalent to `PDescribe`. +*/ +func PDescribeTable(description string, itBody interface{}, entries ...TableEntry) bool { + describeTable(description, itBody, entries, true, false) + return true +} + +/* +You can mark a table as pending with `XDescribeTable`. This is equivalent to `XDescribe`. +*/ +func XDescribeTable(description string, itBody interface{}, entries ...TableEntry) bool { + describeTable(description, itBody, entries, true, false) + return true +} + +func describeTable(description string, itBody interface{}, entries []TableEntry, pending bool, focused bool) { + itBodyValue := reflect.ValueOf(itBody) + if itBodyValue.Kind() != reflect.Func { + panic(fmt.Sprintf("DescribeTable expects a function, got %#v", itBody)) + } + + if pending { + ginkgo.PDescribe(description, func() { + for _, entry := range entries { + entry.generateIt(itBodyValue) + } + }) + } else if focused { + ginkgo.FDescribe(description, func() { + for _, entry := range entries { + entry.generateIt(itBodyValue) + } + }) + } else { + ginkgo.Describe(description, func() { + for _, entry := range entries { + entry.generateIt(itBodyValue) + } + }) + } +} diff --git a/vendor/github.com/onsi/ginkgo/extensions/table/table_entry.go b/vendor/github.com/onsi/ginkgo/extensions/table/table_entry.go new file mode 100644 index 00000000..5fa645bc --- /dev/null +++ b/vendor/github.com/onsi/ginkgo/extensions/table/table_entry.go @@ -0,0 +1,81 @@ +package table + +import ( + "reflect" + + "github.com/onsi/ginkgo" +) + +/* +TableEntry represents an entry in a table test. You generally use the `Entry` constructor. +*/ +type TableEntry struct { + Description string + Parameters []interface{} + Pending bool + Focused bool +} + +func (t TableEntry) generateIt(itBody reflect.Value) { + if t.Pending { + ginkgo.PIt(t.Description) + return + } + + values := []reflect.Value{} + for i, param := range t.Parameters { + var value reflect.Value + + if param == nil { + inType := itBody.Type().In(i) + value = reflect.Zero(inType) + } else { + value = reflect.ValueOf(param) + } + + values = append(values, value) + } + + body := func() { + itBody.Call(values) + } + + if t.Focused { + ginkgo.FIt(t.Description, body) + } else { + ginkgo.It(t.Description, body) + } +} + +/* +Entry constructs a TableEntry. + +The first argument is a required description (this becomes the content of the generated Ginkgo `It`). +Subsequent parameters are saved off and sent to the callback passed in to `DescribeTable`. + +Each Entry ends up generating an individual Ginkgo It. +*/ +func Entry(description string, parameters ...interface{}) TableEntry { + return TableEntry{description, parameters, false, false} +} + +/* +You can focus a particular entry with FEntry. This is equivalent to FIt. +*/ +func FEntry(description string, parameters ...interface{}) TableEntry { + return TableEntry{description, parameters, false, true} +} + +/* +You can mark a particular entry as pending with PEntry. This is equivalent to PIt. +*/ +func PEntry(description string, parameters ...interface{}) TableEntry { + return TableEntry{description, parameters, true, false} +} + +/* +You can mark a particular entry as pending with XEntry. This is equivalent to XIt. +*/ +func XEntry(description string, parameters ...interface{}) TableEntry { + return TableEntry{description, parameters, true, false} +} diff --git a/vendor/modules.txt b/vendor/modules.txt index f18d8de1..bc70ab2d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -82,6 +82,7 @@ github.com/mailru/easyjson/jwriter ## explicit github.com/onsi/ginkgo github.com/onsi/ginkgo/config +github.com/onsi/ginkgo/extensions/table github.com/onsi/ginkgo/internal/codelocation github.com/onsi/ginkgo/internal/containernode github.com/onsi/ginkgo/internal/failer