Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ebf689f
[receiver/k8seventsreceiver] support leader election using k8sleadere…
paulojmdias Aug 28, 2025
6742a32
Merge branch 'main' into feat/42266
paulojmdias Aug 28, 2025
6e981b4
feat: improve tests with TestReceiverWithLeaderElection
paulojmdias Aug 30, 2025
90f5fa3
Merge branch 'main' into feat/42266
paulojmdias Aug 30, 2025
496743e
chore: make gotidy
paulojmdias Aug 30, 2025
7b5bb21
Merge branch 'main' into feat/42266
paulojmdias Aug 31, 2025
0ea1a68
Merge branch 'main' of github.com:paulojmdias/opentelemetry-collector…
paulojmdias Sep 8, 2025
d60ede1
Merge branch 'main' into feat/42266
paulojmdias Sep 8, 2025
1c07a34
chore: make gotidy
paulojmdias Sep 8, 2025
239b13e
Merge branch 'main' into feat/42266
ChrsMark Sep 10, 2025
3fa9e50
Merge branch 'main' into feat/42266
ChrsMark Sep 10, 2025
51b4a0d
Merge branch 'main' of github.com:paulojmdias/opentelemetry-collector…
paulojmdias Sep 14, 2025
d73e380
chore: make gotidy
paulojmdias Sep 14, 2025
746f0ad
Merge branch 'feat/42266' of github.com:paulojmdias/opentelemetry-col…
paulojmdias Sep 14, 2025
8a1b66d
Merge branch 'main' into feat/42266
paulojmdias Sep 15, 2025
81dcde7
Merge branch 'main' into feat/42266
ChrsMark Sep 16, 2025
db60bbf
Merge branch 'main' into feat/42266
paulojmdias Sep 19, 2025
22101dc
chore: make gotidy
paulojmdias Sep 19, 2025
770366d
Merge branch 'main' into feat/42266
paulojmdias Sep 23, 2025
1b06b50
chore: make gotidy
paulojmdias Sep 23, 2025
803ddad
Merge branch 'main' into feat/42266
paulojmdias Sep 25, 2025
1d8abab
Merge branch 'main' of github.com:paulojmdias/opentelemetry-collector…
paulojmdias Sep 26, 2025
36a6cd6
Merge branch 'main' of github.com:paulojmdias/opentelemetry-collector…
paulojmdias Sep 26, 2025
41db6d5
fix: revert event handler
paulojmdias Sep 26, 2025
edfce9b
Merge branch 'feat/42266' of github.com:paulojmdias/opentelemetry-col…
paulojmdias Sep 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feat_42266.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/k8seventsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Added support for Leader Election into `k8seventsreceiver` using `k8sleaderelector` extension."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [42266]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 2 additions & 0 deletions receiver/k8seventsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The following settings are optional:
the K8s API server. This can be one of `none` (for no auth), `serviceAccount`
(to use the standard service account token provided to the agent pod), or
`kubeConfig` to use credentials from `~/.kube/config`.
- `k8s_leader_elector` (default: none): if specified, will enable Leader Election by using `k8sleaderelector` extension
- `namespaces` (default = `all`): An array of `namespaces` to collect events from.
This receiver will continuously watch all the `namespaces` mentioned in the array for
new events.
Expand All @@ -37,6 +38,7 @@ Examples:
```yaml
k8s_events:
auth_type: kubeConfig
k8s_leader_elector: k8s_leader_elector
namespaces: [default, my_namespace]
```

Expand Down
3 changes: 3 additions & 0 deletions receiver/k8seventsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package k8seventsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seventsreceiver"

import (
"go.opentelemetry.io/collector/component"
k8s "k8s.io/client-go/kubernetes"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
Expand All @@ -16,6 +17,8 @@ type Config struct {
// List of ‘namespaces’ to collect events from.
Namespaces []string `mapstructure:"namespaces"`

K8sLeaderElector *component.ID `mapstructure:"k8s_leader_elector"`

// For mocking
makeClient func(apiConf k8sconfig.APIConfig) (k8s.Interface, error)
}
Expand Down
6 changes: 5 additions & 1 deletion receiver/k8seventsreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seve
go 1.24.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector v0.135.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.135.0
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/component v1.41.1-0.20250911155607-37a3ace6274c
Expand Down Expand Up @@ -61,6 +62,7 @@ require (
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/extension v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/featuregate v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/internal/telemetry v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.135.1-0.20250911155607-37a3ace6274c // indirect
Expand Down Expand Up @@ -91,7 +93,7 @@ require (
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect
sigs.k8s.io/yaml v1.5.0 // indirect
)

Expand All @@ -100,6 +102,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sco
// openshift removed all tags from their repo, use the pseudoversion from the release-3.9 branch HEAD
replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector => ../../extension/k8sleaderelector

retract (
v0.76.2
v0.76.1
Expand Down
8 changes: 6 additions & 2 deletions receiver/k8seventsreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 75 additions & 15 deletions receiver/k8seventsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package k8seventsreceiver // import "github.com/open-telemetry/opentelemetry-col

import (
"context"
"fmt"
"sync"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -16,6 +18,7 @@ import (
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seventsreceiver/internal/metadata"
)

Expand All @@ -28,6 +31,7 @@ type k8seventsReceiver struct {
ctx context.Context
cancel context.CancelFunc
obsrecv *receiverhelper.ObsReport
wg sync.WaitGroup
}

// newReceiver creates the Kubernetes events receiver with the given configuration.
Expand Down Expand Up @@ -56,14 +60,48 @@ func newReceiver(
}, nil
}

func (kr *k8seventsReceiver) Start(ctx context.Context, _ component.Host) error {
func (kr *k8seventsReceiver) Start(ctx context.Context, host component.Host) error {
kr.ctx, kr.cancel = context.WithCancel(ctx)

k8sInterface, err := kr.config.getK8sClient()
if err != nil {
return err
}

if kr.config.K8sLeaderElector != nil {
k8sLeaderElector := host.GetExtensions()[*kr.config.K8sLeaderElector]
if k8sLeaderElector == nil {
return fmt.Errorf("unknown k8s leader elector %q", kr.config.K8sLeaderElector)
}

elector, ok := k8sLeaderElector.(k8sleaderelector.LeaderElection)
if !ok {
return fmt.Errorf("the extension %T does not implement k8sleaderelector.LeaderElection", k8sLeaderElector)
}

kr.settings.Logger.Info("registering the receiver in leader election")

elector.SetCallBackFuncs(
func(_ context.Context) {
kr.settings.Logger.Info("Events Receiver started as leader")
if len(kr.config.Namespaces) == 0 {
kr.startWatch(corev1.NamespaceAll, k8sInterface)
} else {
for _, ns := range kr.config.Namespaces {
kr.startWatch(ns, k8sInterface)
}
}
},
// onStoppedLeading: stop watches, but DO NOT shut the whole receiver down
func() {
kr.settings.Logger.Info("no longer leader, stopping watches")
kr.stopWatches()
},
)
return nil
}

// No leader election: start immediately.
kr.settings.Logger.Info("starting to watch namespaces for the events.")
if len(kr.config.Namespaces) == 0 {
kr.startWatch(corev1.NamespaceAll, k8sInterface)
Expand All @@ -72,22 +110,38 @@ func (kr *k8seventsReceiver) Start(ctx context.Context, _ component.Host) error
kr.startWatch(ns, k8sInterface)
}
}

return nil
}

func (kr *k8seventsReceiver) Shutdown(context.Context) error {
if kr.cancel == nil {
return nil
}
// Stop watching all the namespaces by closing all the stopper channels.
for _, stopperChan := range kr.stopperChanList {
close(stopperChan)
// Stop informers and wait for them to exit.
kr.stopWatches()

if kr.cancel != nil {
kr.cancel()
kr.cancel = nil
}
kr.cancel()
return nil
}

// stopWatches closes all informer stop channels (idempotently) and waits for their goroutines to exit.
func (kr *k8seventsReceiver) stopWatches() {
if len(kr.stopperChanList) == 0 {
return
}
for _, ch := range kr.stopperChanList {
select {
case <-ch: // already closed
default:
close(ch)
}
}
// Wait for all controller.Run goroutines to finish.
kr.wg.Wait()
// Reset slice so we can start again on leadership regain.
kr.stopperChanList = nil
}

// Add the 'Event' handler and trigger the watch for a specific namespace.
// For new and updated events, the code is relying on the following k8s code implementation:
// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/record/events_cache.go#L327
Expand All @@ -96,12 +150,14 @@ func (kr *k8seventsReceiver) startWatch(ns string, client k8s.Interface) {
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
kr.startWatchingNamespace(client, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
ev := obj.(*corev1.Event)
kr.handleEvent(ev)
if ev, ok := obj.(*corev1.Event); ok {
kr.handleEvent(ev)
}
},
UpdateFunc: func(_, obj any) {
ev := obj.(*corev1.Event)
kr.handleEvent(ev)
if ev, ok := obj.(*corev1.Event); ok {
kr.handleEvent(ev)
}
},
}, ns, stopperChan)
}
Expand All @@ -118,7 +174,7 @@ func (kr *k8seventsReceiver) handleEvent(ev *corev1.Event) {

// startWatchingNamespace creates an informer and starts
// watching a specific namespace for the events.
func (*k8seventsReceiver) startWatchingNamespace(
func (kr *k8seventsReceiver) startWatchingNamespace(
clientset k8s.Interface,
handlers cache.ResourceEventHandlerFuncs,
ns string,
Expand All @@ -132,7 +188,11 @@ func (*k8seventsReceiver) startWatchingNamespace(
ResyncPeriod: 0,
Handler: handlers,
})
go controller.Run(stopper)
kr.wg.Add(1)
go func() {
defer kr.wg.Done()
controller.Run(stopper)
}()
}

// Allow events with eventTimestamp(EventTime/LastTimestamp/FirstTimestamp)
Expand Down
52 changes: 52 additions & 0 deletions receiver/k8seventsreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
Expand All @@ -18,6 +19,7 @@ import (
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector/k8sleaderelectortest"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8seventsreceiver/internal/metadata"
)
Expand Down Expand Up @@ -129,6 +131,56 @@ func TestAllowEvent(t *testing.T) {
assert.False(t, shouldAllowEvent)
}

func TestReceiverWithLeaderElection(t *testing.T) {
le := &k8sleaderelectortest.FakeLeaderElection{}
host := &k8sleaderelectortest.FakeHost{FakeLeaderElection: le}
leaderID := component.MustNewID("k8s_leader_elector")

cfg := createDefaultConfig().(*Config)
cfg.K8sLeaderElector = &leaderID
cfg.makeClient = func(_ k8sconfig.APIConfig) (k8s.Interface, error) {
return fake.NewSimpleClientset(), nil
}

sink := new(consumertest.LogsSink)
r, err := newReceiver(
receivertest.NewNopSettings(metadata.Type),
cfg,
sink,
)
require.NoError(t, err)
recv := r.(*k8seventsReceiver)

require.NoError(t, r.Start(t.Context(), host))
t.Cleanup(func() {
assert.NoError(t, r.Shutdown(t.Context()))
})

// Become leader: start processing events
le.InvokeOnLeading()
recv.handleEvent(getEvent())

require.Eventually(t, func() bool {
return sink.LogRecordCount() == 1
}, 5*time.Second, 100*time.Millisecond, "logs not collected while leader")

// lose leadership
le.InvokeOnStopping()

// DO NOT call recv.handleEvent(...) here; informer wouldn't deliver to this instance.
// Give a tiny moment and ensure count stays 1.
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 1, sink.LogRecordCount(), "event should be ignored after losing leadership")

// regain leadership and inject again
le.InvokeOnLeading()
recv.handleEvent(getEvent())

require.Eventually(t, func() bool {
return sink.LogRecordCount() == 2
}, 5*time.Second, 100*time.Millisecond, "logs not collected after regaining leadership")
}

func getEvent() *corev1.Event {
return &corev1.Event{
InvolvedObject: corev1.ObjectReference{
Expand Down
Loading