Skip to content
This repository was archived by the owner on Mar 15, 2022. It is now read-only.

Commit 93ba501

Browse files
committed
Configuration flag to register last resource version position and start from there next time to avoid duplicating events.
1 parent eec9229 commit 93ba501

File tree

4 files changed

+62
-9
lines changed

4 files changed

+62
-9
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@ _testmain.go
2424
*.prof
2525

2626
eventrouter
27+
28+
.idea

eventrouter.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/golang/glog"
2323
"github.com/heptiolabs/eventrouter/sinks"
2424
"github.com/prometheus/client_golang/prometheus"
25+
"github.com/spf13/cast"
2526
"github.com/spf13/viper"
2627

2728
v1 "k8s.io/api/core/v1"
@@ -90,10 +91,14 @@ type EventRouter struct {
9091
// event sink
9192
// TODO: Determine if we want to support multiple sinks.
9293
eSink sinks.EventSinkInterface
94+
95+
lastSeenResourceVersion string
96+
lastResourceVersionPosition func(string)
9397
}
9498

9599
// NewEventRouter will create a new event router using the input params
96-
func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer) *EventRouter {
100+
func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer,
101+
lastSeenResourceVersion string, lastResourceVersionPosition func(rv string)) *EventRouter {
97102
if viper.GetBool("enable-prometheus") {
98103
prometheus.MustRegister(kubernetesWarningEventCounterVec)
99104
prometheus.MustRegister(kubernetesNormalEventCounterVec)
@@ -102,8 +107,10 @@ func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformer
102107
}
103108

104109
er := &EventRouter{
105-
kubeClient: kubeClient,
106-
eSink: sinks.ManufactureSink(),
110+
kubeClient: kubeClient,
111+
eSink: sinks.ManufactureSink(),
112+
lastSeenResourceVersion: lastSeenResourceVersion,
113+
lastResourceVersionPosition: lastResourceVersionPosition,
107114
}
108115
eventsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
109116
AddFunc: er.addEvent,
@@ -133,16 +140,26 @@ func (er *EventRouter) Run(stopCh <-chan struct{}) {
133140
// addEvent is called when an event is created, or during the initial list
134141
func (er *EventRouter) addEvent(obj interface{}) {
135142
e := obj.(*v1.Event)
136-
prometheusEvent(e)
137-
er.eSink.UpdateEvents(e, nil)
143+
if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(e.ResourceVersion) {
144+
prometheusEvent(e)
145+
er.eSink.UpdateEvents(e, nil)
146+
er.lastResourceVersionPosition(e.ResourceVersion)
147+
} else {
148+
glog.V(5).Infof("Event had already been processed:\n%v", e)
149+
}
138150
}
139151

140152
// updateEvent is called any time there is an update to an existing event
141153
func (er *EventRouter) updateEvent(objOld interface{}, objNew interface{}) {
142154
eOld := objOld.(*v1.Event)
143155
eNew := objNew.(*v1.Event)
144-
prometheusEvent(eNew)
145-
er.eSink.UpdateEvents(eNew, eOld)
156+
if cast.ToInt(er.lastSeenResourceVersion) < cast.ToInt(eNew.ResourceVersion) {
157+
prometheusEvent(eNew)
158+
er.eSink.UpdateEvents(eNew, eOld)
159+
er.lastResourceVersionPosition(eNew.ResourceVersion)
160+
} else {
161+
glog.V(5).Infof("Event had already been processed:\n%v", eNew)
162+
}
146163
}
147164

148165
// prometheusEvent is called when an event is added or updated

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ require (
1717
github.com/prometheus/client_golang v1.1.0
1818
github.com/rockset/rockset-go-client v0.6.0
1919
github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0
20+
github.com/spf13/cast v1.3.0
2021
github.com/spf13/viper v1.4.0
2122
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
2223
k8s.io/api v0.0.0-20190814101207-0772a1bdf941
2324
k8s.io/apimachinery v0.0.0-20190814100815-533d101be9a6
2425
k8s.io/client-go v12.0.0+incompatible
26+
k8s.io/klog v0.4.0
2527
k8s.io/utils v0.0.0-20190809000727-6c36bc71fc4a // indirect
2628
)

main.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package main
1818

1919
import (
2020
"flag"
21+
"io/ioutil"
2122
"net/http"
2223
"os"
2324
"os/signal"
@@ -27,6 +28,7 @@ import (
2728

2829
"github.com/golang/glog"
2930
"github.com/prometheus/client_golang/prometheus/promhttp"
31+
"github.com/spf13/cast"
3032
"github.com/spf13/viper"
3133

3234
"k8s.io/client-go/informers"
@@ -49,7 +51,7 @@ func sigHandler() <-chan struct{} {
4951
syscall.SIGSEGV, // FullDerp
5052
syscall.SIGABRT, // Abnormal termination
5153
syscall.SIGILL, // illegal instruction
52-
syscall.SIGFPE) // floating point - this is why we can't have nice things
54+
syscall.SIGFPE) // floating point - this is why we can't have nice things
5355
sig := <-c
5456
glog.Warningf("Signal (%v) Detected, Shutting Down", sig)
5557
close(stop)
@@ -107,11 +109,41 @@ func main() {
107109
var wg sync.WaitGroup
108110

109111
clientset := loadConfig()
112+
113+
var lastResourceVersionPosition string
114+
var mostRecentResourceVersion *string
115+
116+
resourceVersionPositionPath := viper.GetString("lastResourceVersionPositionPath")
117+
resourceVersionPositionFunc := func(resourceVersion string) {
118+
if resourceVersionPositionPath != "" {
119+
if cast.ToInt(resourceVersion) > cast.ToInt(mostRecentResourceVersion) {
120+
err := ioutil.WriteFile(resourceVersionPositionPath, []byte(resourceVersion), 0644)
121+
if err != nil {
122+
glog.Errorf("failed to write lastResourceVersionPosition")
123+
} else {
124+
mostRecentResourceVersion = &resourceVersion
125+
}
126+
}
127+
}
128+
}
129+
130+
if resourceVersionPositionPath != "" {
131+
_, err := os.Stat(resourceVersionPositionPath)
132+
if !os.IsNotExist(err) {
133+
resourceVersionBytes, err := ioutil.ReadFile(resourceVersionPositionPath)
134+
if err != nil {
135+
glog.Errorf("failed to read resource version bookmark from %s", resourceVersionPositionPath)
136+
} else {
137+
lastResourceVersionPosition = string(resourceVersionBytes)
138+
}
139+
}
140+
}
141+
110142
sharedInformers := informers.NewSharedInformerFactory(clientset, viper.GetDuration("resync-interval"))
111143
eventsInformer := sharedInformers.Core().V1().Events()
112144

113145
// TODO: Support locking for HA https://github.com/kubernetes/kubernetes/pull/42666
114-
eventRouter := NewEventRouter(clientset, eventsInformer)
146+
eventRouter := NewEventRouter(clientset, eventsInformer, lastResourceVersionPosition, resourceVersionPositionFunc)
115147
stop := sigHandler()
116148

117149
// Startup the http listener for Prometheus Metrics endpoint.

0 commit comments

Comments
 (0)