Skip to content
This repository was archived by the owner on Mar 15, 2022. It is now read-only.

Keep last resource version position #101

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ _testmain.go
*.prof

eventrouter

.idea
31 changes: 24 additions & 7 deletions eventrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
"github.com/heptiolabs/eventrouter/sinks"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cast"
"github.com/spf13/viper"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -90,10 +91,14 @@ type EventRouter struct {
// event sink
// TODO: Determine if we want to support multiple sinks.
eSink sinks.EventSinkInterface

lastSeenResourceVersion string
lastResourceVersionPosition func(string)
}

// NewEventRouter will create a new event router using the input params
func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer) *EventRouter {
func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer,
lastSeenResourceVersion string, lastResourceVersionPosition func(rv string)) *EventRouter {
if viper.GetBool("enable-prometheus") {
prometheus.MustRegister(kubernetesWarningEventCounterVec)
prometheus.MustRegister(kubernetesNormalEventCounterVec)
Expand All @@ -102,8 +107,10 @@ func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformer
}

er := &EventRouter{
kubeClient: kubeClient,
eSink: sinks.ManufactureSink(),
kubeClient: kubeClient,
eSink: sinks.ManufactureSink(),
lastSeenResourceVersion: lastSeenResourceVersion,
lastResourceVersionPosition: lastResourceVersionPosition,
}
eventsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: er.addEvent,
Expand Down Expand Up @@ -133,16 +140,26 @@ func (er *EventRouter) Run(stopCh <-chan struct{}) {
// addEvent is called when an event is created, or during the initial list
func (er *EventRouter) addEvent(obj interface{}) {
e := obj.(*v1.Event)
prometheusEvent(e)
er.eSink.UpdateEvents(e, nil)
if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(e.ResourceVersion) {
prometheusEvent(e)
er.eSink.UpdateEvents(e, nil)
er.lastResourceVersionPosition(e.ResourceVersion)
} else {
glog.V(5).Infof("Event had already been processed:\n%v", e)
}
}

// updateEvent is called any time there is an update to an existing event
func (er *EventRouter) updateEvent(objOld interface{}, objNew interface{}) {
eOld := objOld.(*v1.Event)
eNew := objNew.(*v1.Event)
prometheusEvent(eNew)
er.eSink.UpdateEvents(eNew, eOld)
if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(eNew.ResourceVersion) {
prometheusEvent(eNew)
er.eSink.UpdateEvents(eNew, eOld)
er.lastResourceVersionPosition(eNew.ResourceVersion)
} else {
glog.V(5).Infof("Event had already been processed:\n%v", eNew)
}
}

// prometheusEvent is called when an event is added or updated
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ require (
github.com/prometheus/client_golang v1.1.0
github.com/rockset/rockset-go-client v0.6.0
github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0
github.com/spf13/cast v1.3.0
github.com/spf13/viper v1.4.0
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
k8s.io/api v0.0.0-20190814101207-0772a1bdf941
k8s.io/apimachinery v0.0.0-20190814100815-533d101be9a6
k8s.io/client-go v12.0.0+incompatible
k8s.io/klog v0.4.0
k8s.io/utils v0.0.0-20190809000727-6c36bc71fc4a // indirect
)
36 changes: 34 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"flag"
"io/ioutil"
"net/http"
"os"
"os/signal"
Expand All @@ -27,6 +28,7 @@ import (

"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cast"
"github.com/spf13/viper"

"k8s.io/client-go/informers"
Expand All @@ -49,7 +51,7 @@ func sigHandler() <-chan struct{} {
syscall.SIGSEGV, // FullDerp
syscall.SIGABRT, // Abnormal termination
syscall.SIGILL, // illegal instruction
syscall.SIGFPE) // floating point - this is why we can't have nice things
syscall.SIGFPE) // floating point - this is why we can't have nice things
sig := <-c
glog.Warningf("Signal (%v) Detected, Shutting Down", sig)
close(stop)
Expand Down Expand Up @@ -107,11 +109,41 @@ func main() {
var wg sync.WaitGroup

clientset := loadConfig()

var lastResourceVersionPosition string
var mostRecentResourceVersion *string

resourceVersionPositionPath := viper.GetString("lastResourceVersionPositionPath")
resourceVersionPositionFunc := func(resourceVersion string) {
if resourceVersionPositionPath != "" {
if cast.ToInt(resourceVersion) > cast.ToInt(mostRecentResourceVersion) {
err := ioutil.WriteFile(resourceVersionPositionPath, []byte(resourceVersion), 0644)
if err != nil {
glog.Errorf("failed to write lastResourceVersionPosition")
} else {
mostRecentResourceVersion = &resourceVersion
}
}
}
}

if resourceVersionPositionPath != "" {
_, err := os.Stat(resourceVersionPositionPath)
if !os.IsNotExist(err) {
resourceVersionBytes, err := ioutil.ReadFile(resourceVersionPositionPath)
if err != nil {
glog.Errorf("failed to read resource version bookmark from %s", resourceVersionPositionPath)
} else {
lastResourceVersionPosition = string(resourceVersionBytes)
}
}
}

sharedInformers := informers.NewSharedInformerFactory(clientset, viper.GetDuration("resync-interval"))
eventsInformer := sharedInformers.Core().V1().Events()

// TODO: Support locking for HA https://github.com/kubernetes/kubernetes/pull/42666
eventRouter := NewEventRouter(clientset, eventsInformer)
eventRouter := NewEventRouter(clientset, eventsInformer, lastResourceVersionPosition, resourceVersionPositionFunc)
stop := sigHandler()

// Startup the http listener for Prometheus Metrics endpoint.
Expand Down