Skip to content

Commit 38e9bd6

Browse files
[receiver/k8seventsreceiver] support leader election using k8sleaderelector extension (#42330)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Adding support for leader election using `k8sleaderelector` extension. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #42266 <!--Describe what testing was performed and which tests were added.--> #### Testing Added the test `TestReceiverWithLeaderElection`. I'm just missing testing it in a real scenario, but I will do it next week. However, I marked it as ready for review for discussing the logic and structure 🙏 <!--Describe the documentation added.--> #### Documentation Updated README.md with the example of how it can be used. <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Paulo Dias <[email protected]> Co-authored-by: Christos Markou <[email protected]>
1 parent 0c5e3fc commit 38e9bd6

File tree

7 files changed

+164
-14
lines changed

7 files changed

+164
-14
lines changed

.chloggen/feat_42266.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "enhancement"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receiver/k8seventsreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Added support for Leader Election into `k8seventsreceiver` using `k8sleaderelector` extension."
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42266]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/k8seventsreceiver/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ The following settings are optional:
2828
the K8s API server. This can be one of `none` (for no auth), `serviceAccount`
2929
(to use the standard service account token provided to the agent pod), or
3030
`kubeConfig` to use credentials from `~/.kube/config`.
31+
- `k8s_leader_elector` (default: none): if specified, will enable Leader Election by using `k8sleaderelector` extension
3132
- `namespaces` (default = `all`): An array of `namespaces` to collect events from.
3233
This receiver will continuously watch all the `namespaces` mentioned in the array for
3334
new events.
@@ -37,6 +38,7 @@ Examples:
3738
```yaml
3839
k8s_events:
3940
auth_type: kubeConfig
41+
k8s_leader_elector: k8s_leader_elector
4042
namespaces: [default, my_namespace]
4143
```
4244

receiver/k8seventsreceiver/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package k8seventsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seventsreceiver"
55

66
import (
7+
"go.opentelemetry.io/collector/component"
78
k8s "k8s.io/client-go/kubernetes"
89

910
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
@@ -16,6 +17,8 @@ type Config struct {
1617
// List of ‘namespaces’ to collect events from.
1718
Namespaces []string `mapstructure:"namespaces"`
1819

20+
K8sLeaderElector *component.ID `mapstructure:"k8s_leader_elector"`
21+
1922
// For mocking
2023
makeClient func(apiConf k8sconfig.APIConfig) (k8s.Interface, error)
2124
}

receiver/k8seventsreceiver/go.mod

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seve
33
go 1.24.0
44

55
require (
6+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector v0.136.0
67
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.136.0
78
github.com/stretchr/testify v1.11.1
89
go.opentelemetry.io/collector/component v1.42.1-0.20250925151503-069408608b28
@@ -61,6 +62,7 @@ require (
6162
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
6263
go.opentelemetry.io/collector/consumer/consumererror v0.136.1-0.20250925151503-069408608b28 // indirect
6364
go.opentelemetry.io/collector/consumer/xconsumer v0.136.1-0.20250925151503-069408608b28 // indirect
65+
go.opentelemetry.io/collector/extension v1.42.1-0.20250925151503-069408608b28 // indirect
6466
go.opentelemetry.io/collector/featuregate v1.42.1-0.20250925151503-069408608b28 // indirect
6567
go.opentelemetry.io/collector/internal/telemetry v0.136.1-0.20250925151503-069408608b28 // indirect
6668
go.opentelemetry.io/collector/pdata/pprofile v0.136.1-0.20250925151503-069408608b28 // indirect
@@ -91,7 +93,7 @@ require (
9193
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
9294
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
9395
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
94-
sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect
96+
sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect
9597
sigs.k8s.io/yaml v1.5.0 // indirect
9698
)
9799

@@ -100,6 +102,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sco
100102
// openshift removed all tags from their repo, use the pseudoversion from the release-3.9 branch HEAD
101103
replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37
102104

105+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector => ../../extension/k8sleaderelector
106+
103107
retract (
104108
v0.76.2
105109
v0.76.1

receiver/k8seventsreceiver/go.sum

Lines changed: 6 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/k8seventsreceiver/receiver.go

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ package k8seventsreceiver // import "github.com/open-telemetry/opentelemetry-col
55

66
import (
77
"context"
8+
"fmt"
9+
"sync"
810
"time"
911

1012
"go.opentelemetry.io/collector/component"
@@ -16,6 +18,7 @@ import (
1618
k8s "k8s.io/client-go/kubernetes"
1719
"k8s.io/client-go/tools/cache"
1820

21+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector"
1922
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seventsreceiver/internal/metadata"
2023
)
2124

@@ -28,6 +31,7 @@ type k8seventsReceiver struct {
2831
ctx context.Context
2932
cancel context.CancelFunc
3033
obsrecv *receiverhelper.ObsReport
34+
wg sync.WaitGroup
3135
}
3236

3337
// newReceiver creates the Kubernetes events receiver with the given configuration.
@@ -56,14 +60,48 @@ func newReceiver(
5660
}, nil
5761
}
5862

59-
func (kr *k8seventsReceiver) Start(ctx context.Context, _ component.Host) error {
63+
func (kr *k8seventsReceiver) Start(ctx context.Context, host component.Host) error {
6064
kr.ctx, kr.cancel = context.WithCancel(ctx)
6165

6266
k8sInterface, err := kr.config.getK8sClient()
6367
if err != nil {
6468
return err
6569
}
6670

71+
if kr.config.K8sLeaderElector != nil {
72+
k8sLeaderElector := host.GetExtensions()[*kr.config.K8sLeaderElector]
73+
if k8sLeaderElector == nil {
74+
return fmt.Errorf("unknown k8s leader elector %q", kr.config.K8sLeaderElector)
75+
}
76+
77+
elector, ok := k8sLeaderElector.(k8sleaderelector.LeaderElection)
78+
if !ok {
79+
return fmt.Errorf("the extension %T does not implement k8sleaderelector.LeaderElection", k8sLeaderElector)
80+
}
81+
82+
kr.settings.Logger.Info("registering the receiver in leader election")
83+
84+
elector.SetCallBackFuncs(
85+
func(_ context.Context) {
86+
kr.settings.Logger.Info("Events Receiver started as leader")
87+
if len(kr.config.Namespaces) == 0 {
88+
kr.startWatch(corev1.NamespaceAll, k8sInterface)
89+
} else {
90+
for _, ns := range kr.config.Namespaces {
91+
kr.startWatch(ns, k8sInterface)
92+
}
93+
}
94+
},
95+
// onStoppedLeading: stop watches, but DO NOT shut the whole receiver down
96+
func() {
97+
kr.settings.Logger.Info("no longer leader, stopping watches")
98+
kr.stopWatches()
99+
},
100+
)
101+
return nil
102+
}
103+
104+
// No leader election: start immediately.
67105
kr.settings.Logger.Info("starting to watch namespaces for the events.")
68106
if len(kr.config.Namespaces) == 0 {
69107
kr.startWatch(corev1.NamespaceAll, k8sInterface)
@@ -72,22 +110,38 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, _ component.Host) error
72110
kr.startWatch(ns, k8sInterface)
73111
}
74112
}
75-
76113
return nil
77114
}
78115

79116
func (kr *k8seventsReceiver) Shutdown(context.Context) error {
80-
if kr.cancel == nil {
81-
return nil
82-
}
83-
// Stop watching all the namespaces by closing all the stopper channels.
84-
for _, stopperChan := range kr.stopperChanList {
85-
close(stopperChan)
117+
// Stop informers and wait for them to exit.
118+
kr.stopWatches()
119+
120+
if kr.cancel != nil {
121+
kr.cancel()
122+
kr.cancel = nil
86123
}
87-
kr.cancel()
88124
return nil
89125
}
90126

127+
// stopWatches closes all informer stop channels (idempotently) and waits for their goroutines to exit.
128+
func (kr *k8seventsReceiver) stopWatches() {
129+
if len(kr.stopperChanList) == 0 {
130+
return
131+
}
132+
for _, ch := range kr.stopperChanList {
133+
select {
134+
case <-ch: // already closed
135+
default:
136+
close(ch)
137+
}
138+
}
139+
// Wait for all controller.Run goroutines to finish.
140+
kr.wg.Wait()
141+
// Reset slice so we can start again on leadership regain.
142+
kr.stopperChanList = nil
143+
}
144+
91145
// Add the 'Event' handler and trigger the watch for a specific namespace.
92146
// For new and updated events, the code is relying on the following k8s code implementation:
93147
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/record/events_cache.go#L327
@@ -118,7 +172,7 @@ func (kr *k8seventsReceiver) handleEvent(ev *corev1.Event) {
118172

119173
// startWatchingNamespace creates an informer and starts
120174
// watching a specific namespace for the events.
121-
func (*k8seventsReceiver) startWatchingNamespace(
175+
func (kr *k8seventsReceiver) startWatchingNamespace(
122176
clientset k8s.Interface,
123177
handlers cache.ResourceEventHandlerFuncs,
124178
ns string,
@@ -132,7 +186,11 @@ func (*k8seventsReceiver) startWatchingNamespace(
132186
ResyncPeriod: 0,
133187
Handler: handlers,
134188
})
135-
go controller.Run(stopper)
189+
kr.wg.Add(1)
190+
go func() {
191+
defer kr.wg.Done()
192+
controller.Run(stopper)
193+
}()
136194
}
137195

138196
// Allow events with eventTimestamp(EventTime/LastTimestamp/FirstTimestamp)

receiver/k8seventsreceiver/receiver_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/component"
1213
"go.opentelemetry.io/collector/component/componenttest"
1314
"go.opentelemetry.io/collector/consumer/consumertest"
1415
"go.opentelemetry.io/collector/receiver/receivertest"
@@ -18,6 +19,7 @@ import (
1819
k8s "k8s.io/client-go/kubernetes"
1920
"k8s.io/client-go/kubernetes/fake"
2021

22+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector/k8sleaderelectortest"
2123
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
2224
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seventsreceiver/internal/metadata"
2325
)
@@ -129,6 +131,56 @@ func TestAllowEvent(t *testing.T) {
129131
assert.False(t, shouldAllowEvent)
130132
}
131133

134+
func TestReceiverWithLeaderElection(t *testing.T) {
135+
le := &k8sleaderelectortest.FakeLeaderElection{}
136+
host := &k8sleaderelectortest.FakeHost{FakeLeaderElection: le}
137+
leaderID := component.MustNewID("k8s_leader_elector")
138+
139+
cfg := createDefaultConfig().(*Config)
140+
cfg.K8sLeaderElector = &leaderID
141+
cfg.makeClient = func(_ k8sconfig.APIConfig) (k8s.Interface, error) {
142+
return fake.NewSimpleClientset(), nil
143+
}
144+
145+
sink := new(consumertest.LogsSink)
146+
r, err := newReceiver(
147+
receivertest.NewNopSettings(metadata.Type),
148+
cfg,
149+
sink,
150+
)
151+
require.NoError(t, err)
152+
recv := r.(*k8seventsReceiver)
153+
154+
require.NoError(t, r.Start(t.Context(), host))
155+
t.Cleanup(func() {
156+
assert.NoError(t, r.Shutdown(t.Context()))
157+
})
158+
159+
// Become leader: start processing events
160+
le.InvokeOnLeading()
161+
recv.handleEvent(getEvent())
162+
163+
require.Eventually(t, func() bool {
164+
return sink.LogRecordCount() == 1
165+
}, 5*time.Second, 100*time.Millisecond, "logs not collected while leader")
166+
167+
// lose leadership
168+
le.InvokeOnStopping()
169+
170+
// DO NOT call recv.handleEvent(...) here; informer wouldn't deliver to this instance.
171+
// Give a tiny moment and ensure count stays 1.
172+
time.Sleep(100 * time.Millisecond)
173+
assert.Equal(t, 1, sink.LogRecordCount(), "event should be ignored after losing leadership")
174+
175+
// regain leadership and inject again
176+
le.InvokeOnLeading()
177+
recv.handleEvent(getEvent())
178+
179+
require.Eventually(t, func() bool {
180+
return sink.LogRecordCount() == 2
181+
}, 5*time.Second, 100*time.Millisecond, "logs not collected after regaining leadership")
182+
}
183+
132184
func getEvent() *corev1.Event {
133185
return &corev1.Event{
134186
InvolvedObject: corev1.ObjectReference{

0 commit comments

Comments
 (0)