diff --git a/.circleci/ci_nozzle_manifest.yml b/.circleci/ci_nozzle_manifest.yml new file mode 100644 index 00000000..59783146 --- /dev/null +++ b/.circleci/ci_nozzle_manifest.yml @@ -0,0 +1,36 @@ +--- +applications: + - name: splunk-firehose-nozzle + memory: 512M + instances: 2 + buildpack: go_buildpack + cmd: splunk-firehose-nozzle + env: + GOPACKAGENAME: main + API_ENDPOINT: + API_USER: + API_PASSWORD: + SPLUNK_HOST: + SPLUNK_TOKEN: + SPLUNK_INDEX: + SKIP_SSL_VALIDATION_CF: true + SKIP_SSL_VALIDATION_SPLUNK: true + JOB_NAME: splunk-nozzle + JOB_INDEX: -1 + JOB_HOST: localhost + ADD_APP_INFO: true + IGNORE_MISSING_APP: true + MISSING_APP_CACHE_INVALIDATE_TTL: 3600s + APP_CACHE_INVALIDATE_TTL: 86440s + APP_LIMITS: 1000 + BOLTDB_PATH: cache.db + EVENTS: ValueMetric,CounterEvent,Error,LogMessage,HttpStartStop,ContainerMetric + EXTRA_FIELDS: name:update-ci-test + FIREHOSE_SUBSCRIPTION_ID: splunk-ci + FLUSH_INTERVAL: 5s + CONSUMER_QUEUE_SIZE: 10000 + HEC_BATCH_SIZE: 1000 + HEC_RETRIES: 5 + HEC_WORKERS: 8 + DEBUG: false + ENABLE_EVENT_TRACING: true diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 00000000..3a4e3d3a --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,92 @@ +version: 2 # use CircleCI 2.0 +jobs: + build: + docker: + - image: circleci/golang:1.12 + working_directory: /go/src/github.com/cloudfoundry-community/splunk-firehose-nozzle + steps: # steps that comprise the `build` job + - checkout # check out source code to working directory + - run: + name: Install Dependencies + command: | + curl https://glide.sh/get | sh + go get -t ./... + - run: + name: Builder + command: make build + - run: + name: Run tests + command: | + make testall + cp splunk-firehose-nozzle /tmp + - persist_to_workspace: + root: /tmp + paths: + - splunk-firehose-nozzle + + deploy-nozzle: + docker: + - image: circleci/golang:1.12 + working_directory: /go/src/github.com/cloudfoundry-community/splunk-firehose-nozzle + steps: # steps that comprise the `deploy` job + - attach_workspace: + at: /tmp + - checkout # check out source code to working directory + - run: + name: Install dependencies + command: | + curl https://glide.sh/get | sh + go get -t ./... + cp -R /tmp/splunk-firehose-nozzle . + - run: + name: Deploy nozzle + command: | + .circleci/update_manifest.sh + .circleci/pre-req.sh + cf push -f .circleci/ci_nozzle_manifest.yml -u process --random-route + - run: + name: Teardown deployment env + command: | + sleep 10 + echo "Teardown deployment env" + cf delete splunk-firehose-nozzle -f + cf delete-org splunk-ci -f + + tile-builder: + docker: + - image: circleci/golang:1.12 + working_directory: /go/src/github.com/cloudfoundry-community/splunk-firehose-nozzle + steps: + - attach_workspace: + at: /tmp + - checkout + - run: + name: Tile builder + command: | + cp -R /tmp/splunk-firehose-nozzle . + .circleci/tile-builder.sh + - run: + name: Push tile to internal s3 + command: | + cp -R /tmp/splunk-firehose-nozzle . + .circleci/push_tile.sh + +workflows: + version: 2 + build-and-deploy-nozzle: + jobs: + - build + - deploy-nozzle: + requires: + - build + filters: + branches: + only: + - develop + - master + - tile-builder: + requires: + - deploy-nozzle + filters: + branches: + only: master \ No newline at end of file diff --git a/.circleci/pre-req.sh b/.circleci/pre-req.sh new file mode 100755 index 00000000..81f919cb --- /dev/null +++ b/.circleci/pre-req.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +set -e +#Install CF CLI +wget -q -O - https://packages.cloudfoundry.org/debian/cli.cloudfoundry.org.key | sudo apt-key add - +echo "deb https://packages.cloudfoundry.org/debian stable main" | sudo tee /etc/apt/sources.list.d/cloudfoundry-cli.list +#Add support for https apt sources +sudo apt-get install apt-transport-https ca-certificates +sudo apt-get update +sudo apt-get install cf-cli +#CF Login +cf login --skip-ssl-validation -a $API_ENDPOINT -u $API_USER -p $API_PASSWORD -o system -s system +#Create splunk-ci org and space +if [ "`cf o | grep "splunk-ci"`" == "splunk-ci" ]; then + echo "Its here" + cf target -o "splunk-ci" -s "splunk-ci" +else + cf create-org splunk-ci + cf target -o splunk-ci + cf create-space splunk-ci + cf target -o "splunk-ci" -s "splunk-ci" +fi \ No newline at end of file diff --git a/.circleci/push_tile.sh b/.circleci/push_tile.sh new file mode 100755 index 00000000..c9f39245 --- /dev/null +++ b/.circleci/push_tile.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -e +sudo apt-get install -y python-pip libpython-dev > /dev/null 2>&1 +echo "Installing aws cli..." +sudo pip install awscli > /dev/null 2>&1 +echo "Push Splunk tile to s3..." +aws s3 cp tile/product/splunk-nozzle-*.pivotal s3://pcf-ci-artifacts/ \ No newline at end of file diff --git a/.circleci/tile-builder.sh b/.circleci/tile-builder.sh new file mode 100755 index 00000000..01d3164e --- /dev/null +++ b/.circleci/tile-builder.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash +set -e +wget https://github.com/cf-platform-eng/tile-generator/releases/download/v13.0.2/pcf_linux-64bit > /dev/null 2>&1 +chmod +x pcf_linux-64bit +sudo mv pcf_linux-64bit /usr/local/bin/tile +sudo apt install python-pip > /dev/null 2>&1 +echo "installing vurtualenv" +sudo /usr/bin/easy_install virtualenv > /dev/null 2>&1 +virtualenv -p python tile-generator-env +source tile-generator-env/bin/activate +echo "Installing tile-generator..." +pip install tile-generator +cd tile +echo "Installing bosh..." +wget https://github.com/cloudfoundry/bosh-cli/releases/download/v5.5.0/bosh-cli-5.5.0-linux-amd64 > /dev/null 2>&1 +mv bosh-cli-5.5.0-linux-amd64 bosh +chmod +x ./bosh +sudo mv ./bosh /usr/local/bin/bosh +echo "Building PCF Tile for Splunk-firehose-nozzle" +tile build \ No newline at end of file diff --git a/.circleci/update_manifest.sh b/.circleci/update_manifest.sh new file mode 100755 index 00000000..a50c354d --- /dev/null +++ b/.circleci/update_manifest.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +set -e +#Set below params in CircleCI env variable settings +# API_ENDPOINT, API_USER, API_PASSWORD, SPLUNK_TOKEN, SPLUNK_HOST, SPLUNK_INDEX +#Update manifest for deployment +sed -i 's@API_ENDPOINT:.*@'"API_ENDPOINT: $API_ENDPOINT"'@' .circleci/ci_nozzle_manifest.yml +sed -i 's@API_USER:.*@'"API_USER: $API_USER"'@' .circleci/ci_nozzle_manifest.yml +sed -i 's@API_PASSWORD:.*@'"API_PASSWORD: $API_PASSWORD"'@' .circleci/ci_nozzle_manifest.yml +sed -i 's@SPLUNK_HOST:.*@'"SPLUNK_HOST: $SPLUNK_HOST"'@' .circleci/ci_nozzle_manifest.yml +sed -i 's@SPLUNK_TOKEN:.*@'"SPLUNK_TOKEN: $SPLUNK_TOKEN"'@' .circleci/ci_nozzle_manifest.yml +sed -i 's@SPLUNK_INDEX:.*@'"SPLUNK_INDEX: $SPLUNK_INDEX"'@' .circleci/ci_nozzle_manifest.yml +#copy nozzle binary from shared workspace +cp /tmp/splunk-firehose-nozzle . diff --git a/.gitignore b/.gitignore index ea906271..2039a316 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ tools/dump_app_info/dump_app_info tools/data_gen/data_gen + cache.db *.out diff --git a/README.md b/README.md index 3b0facb6..c2cc7b51 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![CircleCI](https://circleci.com/gh/git-lfs/git-lfs.svg?style=shield&circle-token=856152c2b02bfd236f54d21e1f581f3e4ebf47ad)](https://circleci.com/gh/cloudfoundry-community/splunk-firehose-nozzle) ## Splunk Nozzle Cloud Foundry Firehose-to-Splunk Nozzle @@ -141,6 +142,24 @@ $ ./dump_app_info --skip-ssl-validation --api-endpoint=https://org name mapping + spaceNameCache map[string]Space // caches space guid->space name mapping + closing chan struct{} wg sync.WaitGroup config *BoltdbConfig @@ -47,11 +64,13 @@ type Boltdb struct { func NewBoltdb(client AppClient, config *BoltdbConfig) (*Boltdb, error) { return &Boltdb{ - appClient: client, - cache: make(map[string]*App), - missingApps: make(map[string]struct{}), - closing: make(chan struct{}), - config: config, + appClient: client, + cache: make(map[string]*App), + missingApps: make(map[string]struct{}), + orgNameCache: make(map[string]Org), + spaceNameCache: make(map[string]Space), + closing: make(chan struct{}), + config: config, }, nil } @@ -108,7 +127,7 @@ func (c *Boltdb) Close() error { return c.appdb.Close() } -// GetAppInfo tries first get app info from cache. If caches doesn't have this +// GetApp tries first get app info from cache. If caches doesn't have this // app info (cache miss), it issues API to retrieve the app info from remote // if the app is not already missing and clients don't ignore the missing app // info, and then add the app info to the cache @@ -123,6 +142,7 @@ func (c *Boltdb) GetApp(appGuid string) (*App, error) { // Find in cache if app != nil { + c.fillOrgAndSpace(app) return app, nil } @@ -159,6 +179,22 @@ func (c *Boltdb) GetAllApps() (map[string]*App, error) { return apps, nil } +func (c *Boltdb) ManuallyInvalidateCaches() error { + c.lock.Lock() + c.orgNameCache = make(map[string]Org) + c.spaceNameCache = make(map[string]Space) + c.lock.Unlock() + + apps, err := c.getAllAppsFromRemote() + if err != nil { + return err + } + c.lock.Lock() + c.cache = apps + c.lock.Unlock() + return nil +} + func (c *Boltdb) getAppFromCache(appGuid string) (*App, error) { c.lock.RLock() if app, ok := c.cache[appGuid]; ok { @@ -208,7 +244,7 @@ func (c *Boltdb) getAllAppsFromRemote() (map[string]*App, error) { totalPages := 0 q := url.Values{} - q.Set("inline-relations-depth", "2") + q.Set("inline-relations-depth", "0") if c.config.AppLimits > 0 { // Latest N apps q.Set("order-direction", "desc") @@ -271,6 +307,7 @@ func (c *Boltdb) invalidateMissingAppCache() { // and update boltdb and in-memory cache func (c *Boltdb) invalidateCache() { ticker := time.NewTicker(c.config.AppCacheTTL) + orgSpaceTicker := time.NewTicker(c.config.OrgSpaceCacheTTL) c.wg.Add(1) go func() { @@ -284,7 +321,14 @@ func (c *Boltdb) invalidateCache() { c.lock.Lock() c.cache = apps c.lock.Unlock() + } else { + c.config.Logger.Error("Unable to fetch copy of cache from remote", err) } + case <-orgSpaceTicker.C: + c.lock.Lock() + c.orgNameCache = make(map[string]Org) + c.spaceNameCache = make(map[string]Space) + c.lock.Unlock() case <-c.closing: return } @@ -310,15 +354,68 @@ func (c *Boltdb) fillDatabase(apps map[string]*App) { } func (c *Boltdb) fromPCFApp(app *cfclient.App) *App { - return &App{ - app.Name, - app.Guid, - app.SpaceData.Entity.Name, - app.SpaceData.Entity.Guid, - app.SpaceData.Entity.OrgData.Entity.Name, - app.SpaceData.Entity.OrgData.Entity.Guid, - c.isOptOut(app.Environment), + cachedApp := &App{ + Name: app.Name, + Guid: app.Guid, + SpaceGuid: app.SpaceGuid, + IgnoredApp: c.isOptOut(app.Environment), } + + c.fillOrgAndSpace(cachedApp) + + return cachedApp +} + +func (c *Boltdb) fillOrgAndSpace(app *App) error { + now := time.Now() + + c.lock.RLock() + space, ok := c.spaceNameCache[app.SpaceGuid] + c.lock.RUnlock() + + if !ok || now.Sub(space.LastUpdated) > c.config.OrgSpaceCacheTTL { + cfspace, err := c.appClient.GetSpaceByGuid(app.SpaceGuid) + if err != nil { + return err + } + + space = Space{ + Name: cfspace.Name, + OrgGUID: cfspace.OrganizationGuid, + LastUpdated: now, + } + + c.lock.Lock() + c.spaceNameCache[app.SpaceGuid] = space + c.lock.Unlock() + } + + app.SpaceName = space.Name + app.OrgGuid = space.OrgGUID + + c.lock.RLock() + org, ok := c.orgNameCache[space.OrgGUID] + c.lock.RUnlock() + if !ok || now.Sub(org.LastUpdated) > c.config.OrgSpaceCacheTTL { + cforg, err := c.appClient.GetOrgByGuid(space.OrgGUID) + if err != nil { + return err + } + + org = Org{ + Name: cforg.Name, + LastUpdated: now, + } + + c.lock.Lock() + c.orgNameCache[space.OrgGUID] = org + c.lock.Unlock() + } + + app.OrgGuid = space.OrgGUID + app.OrgName = org.Name + + return nil } func (c *Boltdb) getAppFromRemote(appGuid string) (*App, error) { @@ -326,7 +423,6 @@ func (c *Boltdb) getAppFromRemote(appGuid string) (*App, error) { if err != nil { return nil, err } - app := c.fromPCFApp(&cfApp) c.fillDatabase(map[string]*App{app.Guid: app}) diff --git a/cache/cache.go b/cache/cache.go index 3dccac1d..42b6adb5 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -7,19 +7,19 @@ import ( ) type App struct { - Name string - Guid string - SpaceName string - SpaceGuid string - OrgName string - OrgGuid string - IgnoredApp bool + Name string + Guid string + SpaceName string + SpaceGuid string + OrgName string + OrgGuid string + CfAppEnv map[string]interface{} + IgnoredApp bool } type Cache interface { Open() error Close() error - GetAllApps() (map[string]*App, error) GetApp(string) (*App, error) } @@ -28,4 +28,6 @@ type AppClient interface { AppByGuid(appGuid string) (cfclient.App, error) ListApps() ([]cfclient.App, error) ListAppsByQueryWithLimits(query url.Values, totalPages int) ([]cfclient.App, error) + GetSpaceByGuid(spaceGUID string) (cfclient.Space, error) + GetOrgByGuid(orgGUID string) (cfclient.Org, error) } diff --git a/cache/cache_easyjson.go b/cache/cache_easyjson.go index d30fcda7..5fa2fa54 100644 --- a/cache/cache_easyjson.go +++ b/cache/cache_easyjson.go @@ -42,6 +42,26 @@ func easyjsonA591d1bcDecodeGithubComCloudfoundryCommunitySplunkFirehoseNozzleCac out.OrgName = string(in.String()) case "OrgGuid": out.OrgGuid = string(in.String()) + case "CfAppEnv": + if in.IsNull() { + in.Skip() + } else { + in.Delim('{') + if !in.IsDelim('}') { + out.CfAppEnv = make(map[string]interface{}) + } else { + out.CfAppEnv = nil + } + for !in.IsDelim('}') { + key := string(in.String()) + in.WantColon() + var v1 interface{} + v1 = in.Interface() + (out.CfAppEnv)[key] = v1 + in.WantComma() + } + in.Delim('}') + } case "IgnoredApp": out.IgnoredApp = bool(in.Bool()) default: @@ -95,6 +115,27 @@ func easyjsonA591d1bcEncodeGithubComCloudfoundryCommunitySplunkFirehoseNozzleCac out.RawByte(',') } first = false + out.RawString("\"CfAppEnv\":") + if in.CfAppEnv == nil { + out.RawString(`null`) + } else { + out.RawByte('{') + v2First := true + for v2Name, v2Value := range in.CfAppEnv { + if !v2First { + out.RawByte(',') + } + v2First = false + out.String(string(v2Name)) + out.RawByte(':') + out.Raw(json.Marshal(v2Value)) + } + out.RawByte('}') + } + if !first { + out.RawByte(',') + } + first = false out.RawString("\"IgnoredApp\":") out.Bool(bool(in.IgnoredApp)) out.RawByte('}') diff --git a/cache/cache_test.go b/cache/cache_test.go index 06c41a2a..908ceb9f 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -20,6 +20,7 @@ var _ = Describe("Cache", func() { ignoreMissingApps = true appCacheTTL = 2 * time.Second missingAppCacheTTL = 2 * time.Second + orgSpaceCacheTTL = 2 * time.Second n = 10 nilApp *App = nil @@ -29,6 +30,7 @@ var _ = Describe("Cache", func() { IgnoreMissingApps: ignoreMissingApps, AppCacheTTL: appCacheTTL, MissingAppCacheTTL: missingAppCacheTTL, + OrgSpaceCacheTTL: orgSpaceCacheTTL, Logger: lager.NewLogger("test"), } @@ -53,7 +55,8 @@ var _ = Describe("Cache", func() { time.Sleep(1 * time.Second) gerr = os.Remove(boltdbPath) - Ω(gerr).ShouldNot(HaveOccurred()) + Ω(gerr == nil || os.IsNotExist(gerr)).Should(BeTrue()) + // Ω(gerr).ShouldNot(HaveOccurred()) }) Context("Get app good case", func() { @@ -102,19 +105,101 @@ var _ = Describe("Cache", func() { }) Context("Cache invalidation", func() { + BeforeEach(func() { + // close the cache created in the outer BeforeEach + cache.Close() + + config.AppCacheTTL = 0 + config.OrgSpaceCacheTTL = time.Second + cache, gerr = NewBoltdb(client, config) + Ω(gerr).ShouldNot(HaveOccurred()) + + gerr = cache.Open() + Ω(gerr).ShouldNot(HaveOccurred()) + }) It("Expect new app", func() { - id := fmt.Sprintf("id_%d", time.Now().UnixNano()) - client.CreateApp(id, id, id) + now := time.Now().UnixNano() + id := fmt.Sprintf("id_%d", now) + client.CreateApp(id, fmt.Sprintf("cf_space_id_%d", now)) + + client.ResetCallCounts() + + cache.ManuallyInvalidateCaches() + + app, err := cache.GetApp(id) + Ω(err).ShouldNot(HaveOccurred()) + Expect(app).NotTo(Equal(nilApp)) + Expect(app.Guid).To(Equal(id)) + + Expect(app.SpaceGuid).NotTo(BeEmpty()) + Expect(app.SpaceName).NotTo(BeEmpty()) + Expect(client.GetSpaceByGUIDCallCount()).To(Equal(11)) + + Expect(app.OrgGuid).NotTo(BeEmpty()) + Expect(app.OrgName).NotTo(BeEmpty()) + Expect(client.GetOrgByGUIDCallCount()).To(Equal(11)) + + apps, err := cache.GetAllApps() + Ω(err).ShouldNot(HaveOccurred()) + + Expect(apps).NotTo(Equal(nil)) + Expect(len(apps)).To(Equal(n + 1)) + }) + }) - // Sleep for AppCacheTTL interval to make sure the cache - // invalidation happens - time.Sleep(appCacheTTL + 1) + Context("App Cache Invalidation but not Org/Space Cache Invalidation", func() { + var ( + config *BoltdbConfig + cache *Boltdb + client *testing.AppClientMock + ) + + BeforeEach(func() { + boltdbPath := "/tmp/boltdb2" + config = &BoltdbConfig{ + Path: boltdbPath, + IgnoreMissingApps: ignoreMissingApps, + AppCacheTTL: appCacheTTL, + MissingAppCacheTTL: missingAppCacheTTL, + OrgSpaceCacheTTL: 48 * time.Hour, + Logger: lager.NewLogger("test"), + } + + client = testing.NewAppClientMock(n) + + os.Remove(boltdbPath) + cache, gerr = NewBoltdb(client, config) + Ω(gerr).ShouldNot(HaveOccurred()) + + gerr = cache.Open() + Ω(gerr).ShouldNot(HaveOccurred()) + }) + + It("Expects new app but no org space calls", func() { + now := time.Now().UnixNano() + id := fmt.Sprintf("id_%d", now) + client.CreateApp(id, fmt.Sprintf("cf_space_id_%d", now)) + + client.ResetCallCounts() + + // wait for the cache to invalidate and repopulate + time.Sleep(appCacheTTL + (250 * time.Millisecond)) app, err := cache.GetApp(id) Ω(err).ShouldNot(HaveOccurred()) Expect(app).NotTo(Equal(nilApp)) Expect(app.Guid).To(Equal(id)) + Expect(app.SpaceGuid).NotTo(BeEmpty()) + Expect(app.SpaceName).NotTo(BeEmpty()) + + Expect(app.OrgGuid).NotTo(BeEmpty()) + Expect(app.OrgName).NotTo(BeEmpty()) + + // this will be 1 because `invalidateCache` will have been called between ResetCallCounts and now but the org and space cache has not reached its TTL + Expect(client.GetSpaceByGUIDCallCount()).To(Equal(1)) + Expect(client.GetOrgByGUIDCallCount()).To(Equal(1)) + apps, err := cache.GetAllApps() Ω(err).ShouldNot(HaveOccurred()) diff --git a/eventrouter/default.go b/eventrouter/default.go index 6ac49b55..c00983ec 100644 --- a/eventrouter/default.go +++ b/eventrouter/default.go @@ -11,17 +11,25 @@ import ( type Config struct { SelectedEvents string + SelectedDeployments string } type router struct { appCache cache.Cache sink eventsink.Sink selectedEvents map[string]bool + selectedDeployments map[string]bool } func New(appCache cache.Cache, sink eventsink.Sink, config *Config) (Router, error) { selectedEvents, err := fevents.ParseSelectedEvents(config.SelectedEvents) + + if err != nil { + return nil, err + } + selectedDeployments,err := fevents.ParseSelectedDeployments(config.SelectedDeployments) + if err != nil { return nil, err } @@ -30,11 +38,18 @@ func New(appCache cache.Cache, sink eventsink.Sink, config *Config) (Router, err appCache: appCache, sink: sink, selectedEvents: selectedEvents, + selectedDeployments: selectedDeployments, }, nil } func (r *router) Route(msg *events.Envelope) error { eventType := msg.GetEventType() + deployment := msg.GetDeployment() + + //Filter the deployments (cf,redis,rabbitmq,etc) (bosh deployments) + if ! (r.selectedDeployments["all"]) && ! (r.selectedDeployments[deployment]) { + return nil + } if _, ok := r.selectedEvents[eventType.String()]; !ok { // Ignore this event since we are not interested diff --git a/events/events.go b/events/events.go index 780ec4f8..a9e6ae4e 100644 --- a/events/events.go +++ b/events/events.go @@ -20,7 +20,6 @@ type Event struct { func HttpStart(msg *events.Envelope) *Event { httpStart := msg.GetHttpStart() - fields := logrus.Fields{ "timestamp": httpStart.GetTimestamp(), "request_id": utils.FormatUUID(httpStart.GetRequestId()), @@ -150,6 +149,7 @@ func ErrorEvent(msg *events.Envelope) *Event { func ContainerMetric(msg *events.Envelope) *Event { containerMetric := msg.GetContainerMetric() + fields := logrus.Fields{ "cf_app_id": containerMetric.GetApplicationId(), "cpu_percentage": containerMetric.GetCpuPercentage(), @@ -172,16 +172,19 @@ func (e *Event) AnnotateWithAppData(appCache cache.Cache) { if cf_app_id != nil && appGuid != "" && cf_app_id != "" { appInfo, err := appCache.GetApp(appGuid) - if err != nil || appInfo == nil { + if err != nil { + logrus.Error("Failed to fetch application metadata: ", err) + return + } else if appInfo == nil { return } - cf_app_name := appInfo.Name cf_space_id := appInfo.SpaceGuid cf_space_name := appInfo.SpaceName cf_org_id := appInfo.OrgGuid cf_org_name := appInfo.OrgName cf_ignored_app := appInfo.IgnoredApp + app_env := appInfo.CfAppEnv if cf_app_name != "" { e.Fields["cf_app_name"] = cf_app_name @@ -203,6 +206,7 @@ func (e *Event) AnnotateWithAppData(appCache cache.Cache) { e.Fields["cf_org_name"] = cf_org_name } + e.Fields["info_splunk_index"] = app_env["SPLUNK_INDEX"] e.Fields["cf_ignored_app"] = cf_ignored_app } } @@ -259,6 +263,26 @@ func ParseSelectedEvents(wantedEvents string) (map[string]bool, error) { return selectedEvents, nil } +func ParseSelectedDeployments(wantedDeployments string) (map[string]bool, error) { + wantedDeployments = strings.TrimSpace(wantedDeployments) + selectedDeployments := make(map[string]bool) + + if wantedDeployments == "" || strings.ToLower(wantedDeployments) == "all" { + selectedDeployments["all"] = true + return selectedDeployments, nil + } + + var deployments []string + if err := json.Unmarshal([]byte(wantedDeployments), &deployments); err != nil { + deployments = strings.Split(wantedDeployments, ",") + } + + for _, deployment := range deployments { + deployment = strings.TrimSpace(deployment) + selectedDeployments[deployment] = true + } + return selectedDeployments, nil +} func getKeyValueFromString(kvPair string) (string, string, error) { values := strings.Split(kvPair, ":") if len(values) != 2 { diff --git a/events/events_test.go b/events/events_test.go index 76f79984..26bfdaba 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -178,6 +178,15 @@ var _ = Describe("Events", func() { }) }) + Context("ParseSelectedEvents, empty select deployments passed in", func() { + It("should return a hash of only the default deployment(all)", func() { + results, err := fevents.ParseSelectedDeployments("") + Ω(err).ShouldNot(HaveOccurred()) + expected := map[string]bool{"all": true} + Expect(results).To(Equal(expected)) + }) + }) + Context("ParseSelectedEvents, bogus event names", func() { It("should err out", func() { _, err := fevents.ParseSelectedEvents("bogus, invalid") diff --git a/eventsink/splunk.go b/eventsink/splunk.go index ad7ed9a4..c1b0a8f5 100644 --- a/eventsink/splunk.go +++ b/eventsink/splunk.go @@ -2,15 +2,17 @@ package eventsink import ( "fmt" + "math" "strconv" "strings" "sync" "time" + "sync/atomic" + "code.cloudfoundry.org/lager" "github.com/cloudfoundry-community/splunk-firehose-nozzle/eventwriter" "github.com/cloudfoundry-community/splunk-firehose-nozzle/utils" - "sync/atomic" ) const SPLUNK_HEC_FIELDS_SUPPORT_VERSION = "6.4" @@ -26,8 +28,7 @@ type SplunkConfig struct { ExtraFields map[string]string TraceLogging bool UUID string - - Logger lager.Logger + Logger lager.Logger } type Splunk struct { @@ -73,6 +74,7 @@ func (s *Splunk) Write(fields map[string]interface{}, msg string) error { if len(msg) > 0 { fields["msg"] = msg } + s.events <- fields return nil } @@ -124,7 +126,7 @@ func (s *Splunk) indexEvents(writer eventwriter.Writer, batch []map[string]inter return nil } s.config.Logger.Error("Unable to talk to Splunk", err) - time.Sleep(5 * time.Second) + time.Sleep(getRetryInterval(i)) } s.config.Logger.Error("Finish retrying and dropping events", err, lager.Data{"events": len(batch)}) return nil @@ -210,3 +212,9 @@ func (s *Splunk) Log(message lager.LogFormat) { events := []map[string]interface{}{event} s.writers[len(s.writers)-1].Write(events) } + +func getRetryInterval(attempt int) time.Duration { + // algorithm taken from https://en.wikipedia.org/wiki/Exponential_backoff + timeInSec := 5 + (0.5 * (math.Exp2(float64(attempt)) - 1.0)) + return time.Millisecond * time.Duration(1000*timeInSec) +} diff --git a/eventsink/splunk_test.go b/eventsink/splunk_test.go index 3d35227a..918b003c 100644 --- a/eventsink/splunk_test.go +++ b/eventsink/splunk_test.go @@ -118,7 +118,6 @@ var _ = Describe("Splunk", func() { var envelopeHttpStartStop *events.HttpStartStop var startTimestamp, stopTimestamp int64 var requestId events.UUID - var requestIdHex, applicationIdHex string var peerType events.PeerType var method events.Method var uri, remoteAddress, userAgent string @@ -149,7 +148,6 @@ var _ = Describe("Splunk", func() { Low: &requestIdLow, High: &requestIdHigh, } - requestIdHex = "b12a3f87-83ab-4cf2-554b-042dc36e28f1" applicationIdLow := uint64(10539615360601842564) applicationIdHigh := uint64(3160954123591206558) @@ -157,7 +155,6 @@ var _ = Describe("Splunk", func() { Low: &applicationIdLow, High: &applicationIdHigh, } - applicationIdHex = "8463ec45-543c-4492-9ec6-f52707f7dd2b" envelopeHttpStartStop = &events.HttpStartStop{ StartTimestamp: &startTimestamp, @@ -646,4 +643,4 @@ var _ = Describe("Splunk", func() { Ω(err).ShouldNot(HaveOccurred()) }) -}) +}) \ No newline at end of file diff --git a/eventwriter/splunk.go b/eventwriter/splunk.go index 2742c98c..f6aff77e 100644 --- a/eventwriter/splunk.go +++ b/eventwriter/splunk.go @@ -44,7 +44,11 @@ func NewSplunk(config *SplunkConfig) Writer { func (s *splunkClient) Write(events []map[string]interface{}) error { bodyBuffer := new(bytes.Buffer) for i, event := range events { - if s.config.Index != "" { + + + if event["event"].(map[string]interface{})["info_splunk_index"] != nil { + event["index"] = event["event"].(map[string]interface{})["info_splunk_index"] + } else if s.config.Index != "" { event["index"] = s.config.Index } diff --git a/glide.lock b/glide.lock index bb971986..13fabeab 100644 --- a/glide.lock +++ b/glide.lock @@ -55,8 +55,6 @@ imports: - jwriter - name: github.com/pkg/errors version: 645ef00459ed84a119197bfb8d8205042c6df63d -- name: github.com/Sirupsen/logrus - version: 4b6ea7319e214d98c938f12692336f7ca9348d6b - name: golang.org/x/net version: 7dbad50ab5b31073856416cdcfeb2796d682f844 subpackages: diff --git a/main.go b/main.go index 215d982c..556ea934 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,9 @@ func main() { signal.Notify(shutdownChan, syscall.SIGINT, syscall.SIGTERM) config := splunknozzle.NewConfigFromCmdFlags(version, branch, commit, buildos) + if config.AppCacheTTL == 0 && config.OrgSpaceCacheTTL > 0 { + logger.Info("Apps are not being cached. When apps are not cached, the org and space caching TTL is ineffective") + } splunkNozzle := splunknozzle.NewSplunkFirehoseNozzle(config) err := splunkNozzle.Run(shutdownChan, logger) diff --git a/splunknozzle/config.go b/splunknozzle/config.go index 46dc5b48..db5be041 100644 --- a/splunknozzle/config.go +++ b/splunknozzle/config.go @@ -32,11 +32,13 @@ type Config struct { IgnoreMissingApps bool `json:"ignore-missing-apps"` MissingAppCacheTTL time.Duration `json:"missing-app-cache-ttl"` AppCacheTTL time.Duration `json:"app-cache-ttl"` + OrgSpaceCacheTTL time.Duration `json:"org-space-cache-ttl"` AppLimits int `json:"app-limits"` - BoltDBPath string `json:"boltdb-path"` - WantedEvents string `json:"wanted-events"` - ExtraFields string `json:"extra-fields"` + BoltDBPath string `json:"boltdb-path"` + WantedEvents string `json:"wanted-events"` + WantedDeployments string `json:"wanted-deployments"` + ExtraFields string `json:"extra-fields"` FlushInterval time.Duration `json:"flush-interval"` QueueSize int `json:"queue-size"` @@ -101,6 +103,8 @@ func NewConfigFromCmdFlags(version, branch, commit, buildos string) *Config { OverrideDefaultFromEnvar("MISSING_APP_CACHE_INVALIDATE_TTL").Default("0s").DurationVar(&c.MissingAppCacheTTL) kingpin.Flag("app-cache-invalidate-ttl", "How frequently the app info local cache invalidates"). OverrideDefaultFromEnvar("APP_CACHE_INVALIDATE_TTL").Default("0s").DurationVar(&c.AppCacheTTL) + kingpin.Flag("org-space-cache-invalidate-ttl", "How frequently the org and space cache invalidates"). + OverrideDefaultFromEnvar("ORG_SPACE_CACHE_INVALIDATE_TTL").Default("72h").DurationVar(&c.OrgSpaceCacheTTL) kingpin.Flag("app-limits", "Restrict to APP_LIMITS most updated apps per request when populating the app metadata cache"). OverrideDefaultFromEnvar("APP_LIMITS").Default("0").IntVar(&c.AppLimits) @@ -108,6 +112,8 @@ func NewConfigFromCmdFlags(version, branch, commit, buildos string) *Config { Default("cache.db").OverrideDefaultFromEnvar("BOLTDB_PATH").StringVar(&c.BoltDBPath) kingpin.Flag("events", fmt.Sprintf("Comma separated list of events you would like. Valid options are %s", events.AuthorizedEvents())). OverrideDefaultFromEnvar("EVENTS").Default("ValueMetric,CounterEvent,ContainerMetric").StringVar(&c.WantedEvents) + kingpin.Flag("deployments", fmt.Sprintf("Comma separated list of deployments you would like (bosh deployments")). + OverrideDefaultFromEnvar("DEPLOYMENTS").Default("all").StringVar(&c.WantedDeployments) kingpin.Flag("extra-fields", "Extra fields you want to annotate your events with, example: '--extra-fields=env:dev,something:other "). OverrideDefaultFromEnvar("EXTRA_FIELDS").Default("").StringVar(&c.ExtraFields) diff --git a/splunknozzle/config_test.go b/splunknozzle/config_test.go index ea299ece..394bf53c 100644 --- a/splunknozzle/config_test.go +++ b/splunknozzle/config_test.go @@ -53,6 +53,7 @@ var _ = Describe("Config", func() { os.Setenv("BOLTDB_PATH", "foo.db") os.Setenv("EVENTS", "LogMessage") + os.Setenv("DEPLOYMENTS", "cf") os.Setenv("EXTRA_FIELDS", "foo:bar") os.Setenv("FLUSH_INTERVAL", "43s") @@ -92,6 +93,7 @@ var _ = Describe("Config", func() { Expect(c.BoltDBPath).To(Equal("foo.db")) Expect(c.WantedEvents).To(Equal("LogMessage")) + Expect(c.WantedDeployments).To(Equal("cf")) Expect(c.ExtraFields).To(Equal("foo:bar")) Expect(c.FlushInterval).To(Equal(43 * time.Second)) @@ -130,6 +132,7 @@ var _ = Describe("Config", func() { Expect(c.BoltDBPath).To(Equal("cache.db")) Expect(c.WantedEvents).To(Equal("ValueMetric,CounterEvent,ContainerMetric")) + Expect(c.WantedDeployments).To(Equal("all")) Expect(c.ExtraFields).To(Equal("")) Expect(c.FlushInterval).To(Equal(5 * time.Second)) @@ -177,6 +180,7 @@ var _ = Describe("Config", func() { "--app-limits=35", "--boltdb-path=foo.dbc", "--events=LogMessagec", + "--deployments=cfc", "--extra-fields=foo:barc", "--flush-interval=34s", "--consumer-queue-size=2323", @@ -218,6 +222,7 @@ var _ = Describe("Config", func() { Expect(c.BoltDBPath).To(Equal("foo.dbc")) Expect(c.WantedEvents).To(Equal("LogMessagec")) + Expect(c.WantedDeployments).To(Equal("cfc")) Expect(c.ExtraFields).To(Equal("foo:barc")) Expect(c.FlushInterval).To(Equal(34 * time.Second)) diff --git a/splunknozzle/nozzle.go b/splunknozzle/nozzle.go index 8963327e..3a4a4bcc 100644 --- a/splunknozzle/nozzle.go +++ b/splunknozzle/nozzle.go @@ -21,6 +21,7 @@ type SplunkFirehoseNozzle struct { config *Config } +//create new function of type *SplunkFirehoseNozzle func NewSplunkFirehoseNozzle(config *Config) *SplunkFirehoseNozzle { return &SplunkFirehoseNozzle{ config: config, @@ -31,6 +32,7 @@ func NewSplunkFirehoseNozzle(config *Config) *SplunkFirehoseNozzle { func (s *SplunkFirehoseNozzle) EventRouter(cache cache.Cache, eventSink eventsink.Sink) (eventrouter.Router, error) { config := &eventrouter.Config{ SelectedEvents: s.config.WantedEvents, + SelectedDeployments: s.config.WantedDeployments, } return eventrouter.New(cache, eventSink, config) } @@ -55,6 +57,7 @@ func (s *SplunkFirehoseNozzle) AppCache(client cache.AppClient, logger lager.Log IgnoreMissingApps: s.config.IgnoreMissingApps, MissingAppCacheTTL: s.config.MissingAppCacheTTL, AppCacheTTL: s.config.AppCacheTTL, + OrgSpaceCacheTTL: s.config.OrgSpaceCacheTTL, Logger: logger, } return cache.NewBoltdb(client, &c) @@ -78,6 +81,7 @@ func (s *SplunkFirehoseNozzle) EventSink(logger lager.Logger) (eventsink.Sink, e Logger: logger, } + var writers []eventwriter.Writer for i := 0; i < s.config.HecWorkers+1; i++ { splunkWriter := eventwriter.NewSplunk(writerConfig) diff --git a/splunknozzle/nozzle_test.go b/splunknozzle/nozzle_test.go index 79a53aff..d99c8ad2 100644 --- a/splunknozzle/nozzle_test.go +++ b/splunknozzle/nozzle_test.go @@ -41,6 +41,7 @@ func newConfig() *Config { BoltDBPath: "/tmp/boltdb.db", WantedEvents: "LogMessage", + WantedDeployments: "all", ExtraFields: "tag:value", FlushInterval: time.Second * 5, diff --git a/testing/app_client_mock.go b/testing/app_client_mock.go index 654e3390..6c008949 100644 --- a/testing/app_client_mock.go +++ b/testing/app_client_mock.go @@ -10,9 +10,13 @@ import ( ) type AppClientMock struct { - lock sync.RWMutex - apps map[string]cfclient.App - n int + lock sync.RWMutex + apps map[string]cfclient.App + n int + listAppsCallCount int + appByGUIDCallCount int + getOrgByGUIDCallCount int + getSpaceByGUIDCallCount int } func NewAppClientMock(n int) *AppClientMock { @@ -27,6 +31,8 @@ func (m *AppClientMock) AppByGuid(guid string) (cfclient.App, error) { m.lock.RLock() defer m.lock.RUnlock() + m.appByGUIDCallCount++ + app, ok := m.apps[guid] if ok { return app, nil @@ -38,6 +44,8 @@ func (m *AppClientMock) ListApps() ([]cfclient.App, error) { m.lock.RLock() defer m.lock.RUnlock() + m.listAppsCallCount++ + var apps []cfclient.App for k := range m.apps { apps = append(apps, m.apps[k]) @@ -49,25 +57,45 @@ func (m *AppClientMock) ListAppsByQueryWithLimits(query url.Values, totalPages i return m.ListApps() } -func (m *AppClientMock) CreateApp(appID, spaceID, orgID string) { +func (m *AppClientMock) GetSpaceByGuid(spaceGUID string) (cfclient.Space, error) { + m.lock.Lock() + defer m.lock.Unlock() + + m.getSpaceByGUIDCallCount++ + + var id int + fmt.Sscanf(spaceGUID, "cf_space_id_%d", &id) + + return cfclient.Space{ + Guid: spaceGUID, + Name: fmt.Sprintf("cf_space_name_%d", id), + OrganizationGuid: fmt.Sprintf("cf_org_id_%d", id), + }, nil +} + +func (m *AppClientMock) GetOrgByGuid(orgGUID string) (cfclient.Org, error) { + m.lock.Lock() + defer m.lock.Unlock() + + m.getOrgByGUIDCallCount++ + + var id int + fmt.Sscanf(orgGUID, "cf_org_id_%d", &id) + + return cfclient.Org{ + Guid: orgGUID, + Name: fmt.Sprintf("cf_org_name_%d", id), + }, nil +} + +func (m *AppClientMock) CreateApp(appID, spaceID string) { m.lock.Lock() defer m.lock.Unlock() app := cfclient.App{ - Guid: appID, - Name: appID, - SpaceData: cfclient.SpaceResource{ - Entity: cfclient.Space{ - Guid: spaceID, - Name: spaceID, - OrgData: cfclient.OrgResource{ - Entity: cfclient.Org{ - Guid: orgID, - Name: orgID, - }, - }, - }, - }, + Guid: appID, + Name: appID, + SpaceGuid: spaceID, } m.apps[appID] = app @@ -77,22 +105,45 @@ func getApps(n int) map[string]cfclient.App { apps := make(map[string]cfclient.App, n) for i := 0; i < n; i++ { app := cfclient.App{ - Guid: fmt.Sprintf("cf_app_id_%d", i), - Name: fmt.Sprintf("cf_app_name_%d", i), - SpaceData: cfclient.SpaceResource{ - Entity: cfclient.Space{ - Guid: fmt.Sprintf("cf_space_id_%d", i%50), - Name: fmt.Sprintf("cf_space_name_%d", i%50), - OrgData: cfclient.OrgResource{ - Entity: cfclient.Org{ - Guid: fmt.Sprintf("cf_org_id_%d", i%100), - Name: fmt.Sprintf("cf_org_name_%d", i%100), - }, - }, - }, - }, + Guid: fmt.Sprintf("cf_app_id_%d", i), + Name: fmt.Sprintf("cf_app_name_%d", i), + SpaceGuid: fmt.Sprintf("cf_space_id_%d", i%50), } apps[app.Guid] = app } return apps } + +func (m *AppClientMock) ListAppsCallCount() int { + m.lock.Lock() + defer m.lock.Unlock() + return m.listAppsCallCount +} + +func (m *AppClientMock) AppByGUIDCallCount() int { + m.lock.Lock() + defer m.lock.Unlock() + return m.appByGUIDCallCount +} + +func (m *AppClientMock) GetOrgByGUIDCallCount() int { + m.lock.Lock() + defer m.lock.Unlock() + return m.getOrgByGUIDCallCount +} + +func (m *AppClientMock) GetSpaceByGUIDCallCount() int { + m.lock.Lock() + defer m.lock.Unlock() + return m.getSpaceByGUIDCallCount +} + +func (m *AppClientMock) ResetCallCounts() { + m.lock.Lock() + defer m.lock.Unlock() + + m.listAppsCallCount = 0 + m.appByGUIDCallCount = 0 + m.getOrgByGUIDCallCount = 0 + m.getSpaceByGUIDCallCount = 0 +} diff --git a/tile/tile-history.yml b/tile/tile-history.yml index bd28d337..d8e296ba 100644 --- a/tile/tile-history.yml +++ b/tile/tile-history.yml @@ -2,4 +2,5 @@ history: - 0.2.1 - 1.0.0 -version: 1.0.1 +- 1.0.1 +version: 1.0.2 diff --git a/tile/tile.yml b/tile/tile.yml index 8bd7f240..74effd40 100644 --- a/tile/tile.yml +++ b/tile/tile.yml @@ -8,9 +8,9 @@ apply_open_security_group: true # Apply open security group, default: fals allow_paid_service_plans: true # Allow paid service plans, default: false stemcell_criteria: - os: ubuntu-trusty + os: ubuntu-xenial requires_cpi: false - version: '3421' + version: '97' properties: - name: author @@ -109,16 +109,6 @@ forms: label: Additional Fields description: A set of user defined key:value pairs that are added to all Splunk events that do not occur in the event payload. Expected format - key1:value1, key2:value2, key3:value3 optional: true - - name: add_app_info - type: boolean - default: false - label: Add App Information - description: Enriches raw data with application metadata, such as application name, space name, org name, etc. - - name: enable_event_tracing - type: boolean - label: Enable Event Tracing - default: false - description: Enables data loss tracing. - name: hec_retries type: integer label: HEC Retries @@ -159,6 +149,21 @@ forms: label: App Limits default: 0 description: The number of apps for which metadata is gathered when refreshing the app metadata cache (order based on app creation date). Set to 0 to remove limit. + - name: nozzle_memory + type: string + label: Nozzle Memory + description: Nozzle memory in MB. + default: 256M + - name: add_app_info + type: boolean + default: false + label: Add App Information + description: Enriches raw data with application metadata, such as application name, space name, org name, etc. + - name: enable_event_tracing + type: boolean + label: Enable Event Tracing + default: false + description: Enables data loss tracing. - name: ignore_missing_app type: boolean label: Ignore Missing App @@ -170,7 +175,7 @@ packages: type: app label: Splunk-Firehose-Nozzle manifest: - memory: 256M + memory: (( .properties.nozzle_memory.value )) instances: (( .properties.scale_out_nozzle.value )) buildpack: binary_buildpack health-check-type: process @@ -179,13 +184,3 @@ packages: command: ./splunk-firehose-nozzle env: GOPACKAGENAME: main - - - - - - - - - -