Skip to content

Commit 0d46e5e

Browse files
authored
[receiver/k8sobjectsreceiver] Switch to standby mode when leader lease is lost (#43054)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Similar to what we did in #42330, this PR ensures similarity and puts the receiver in stand-by instead of shutdown when `k8sleaderelector` is used. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #42706 <!--Describe what testing was performed and which tests were added.--> #### Testing Tested locally, and tests were not changed to ensure the current receiver is not affected at all. <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Paulo Dias <[email protected]>
1 parent e143a2d commit 0d46e5e

File tree

3 files changed

+324
-17
lines changed

3 files changed

+324
-17
lines changed

.chloggen/feat_42706_wg.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "enhancement"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: "receiver/k8sobjectsreceiver"
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Switch to standby mode when leader lease is lost instead of shutdown"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42706]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/k8sobjectsreceiver/receiver.go

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type k8sobjectsreceiver struct {
3838
obsrecv *receiverhelper.ObsReport
3939
mu sync.Mutex
4040
cancel context.CancelFunc
41+
wg sync.WaitGroup
4142
}
4243

4344
func newReceiver(params receiver.Settings, config *Config, consumer consumer.Logs) (receiver.Logs, error) {
@@ -141,13 +142,12 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er
141142
}
142143
kr.setting.Logger.Info("Object Receiver started as leader")
143144
},
145+
// onStoppedLeading: stop watches, but DO NOT shut the whole receiver down
144146
func() {
145-
kr.setting.Logger.Info("no longer leader, stopping")
146-
err = kr.Shutdown(context.Background())
147-
if err != nil {
148-
kr.setting.Logger.Error("shutdown receiver error:", zap.Error(err))
149-
}
150-
})
147+
kr.setting.Logger.Info("no longer leader, stopping watches")
148+
kr.stopWatches()
149+
},
150+
)
151151
} else {
152152
cctx, cancel := context.WithCancel(ctx)
153153
kr.cancel = cancel
@@ -160,17 +160,37 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er
160160
}
161161

162162
func (kr *k8sobjectsreceiver) Shutdown(context.Context) error {
163+
// Stop informers and wait for them to exit.
163164
kr.setting.Logger.Info("Object Receiver stopped")
165+
kr.stopWatches()
166+
164167
if kr.cancel != nil {
165168
kr.cancel()
169+
kr.cancel = nil
166170
}
171+
return nil
172+
}
167173

174+
// stopWatches closes all informer stop channels (idempotently) and waits for their goroutines to exit.
175+
func (kr *k8sobjectsreceiver) stopWatches() {
168176
kr.mu.Lock()
169-
for _, stopperChan := range kr.stopperChanList {
170-
close(stopperChan)
171-
}
177+
// Copy and clear the list under lock to avoid races on restart
178+
chans := kr.stopperChanList
179+
kr.stopperChanList = nil
172180
kr.mu.Unlock()
173-
return nil
181+
182+
if len(chans) == 0 {
183+
return
184+
}
185+
for _, ch := range chans {
186+
select {
187+
case <-ch: // already closed
188+
default:
189+
close(ch)
190+
}
191+
}
192+
// Now wait for all WG-tracked loops (both pull & watch) to exit
193+
kr.wg.Wait()
174194
}
175195

176196
func (kr *k8sobjectsreceiver) start(ctx context.Context, object *K8sObjectsConfig) {
@@ -205,7 +225,9 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC
205225
stopperChan := make(chan struct{})
206226
kr.mu.Lock()
207227
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
228+
kr.wg.Add(1)
208229
kr.mu.Unlock()
230+
defer kr.wg.Done()
209231
ticker := newTicker(ctx, config.Interval)
210232
listOption := metav1.ListOptions{
211233
FieldSelector: config.FieldSelector,
@@ -226,15 +248,21 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC
226248
kr.setting.Logger.Error("error in pulling object",
227249
zap.String("resource", config.gvr.String()),
228250
zap.Error(err))
229-
} else if len(objects.Items) > 0 {
230-
logs := pullObjectsToLogData(objects, time.Now(), config, kr.setting.BuildInfo.Version)
231-
obsCtx := kr.obsrecv.StartLogsOp(ctx)
232-
logRecordCount := logs.LogRecordCount()
233-
err = kr.consumer.ConsumeLogs(obsCtx, logs)
234-
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err)
251+
continue
235252
}
253+
if len(objects.Items) == 0 {
254+
continue
255+
}
256+
logs := pullObjectsToLogData(objects, time.Now(), config, kr.setting.BuildInfo.Version)
257+
obsCtx := kr.obsrecv.StartLogsOp(ctx)
258+
logRecordCount := logs.LogRecordCount()
259+
err = kr.consumer.ConsumeLogs(obsCtx, logs)
260+
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err)
261+
236262
case <-stopperChan:
237263
return
264+
case <-ctx.Done():
265+
return
238266
}
239267
}
240268
}
@@ -243,7 +271,9 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
243271
stopperChan := make(chan struct{})
244272
kr.mu.Lock()
245273
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
274+
kr.wg.Add(1)
246275
kr.mu.Unlock()
276+
defer kr.wg.Done()
247277

248278
if kr.config.IncludeInitialState {
249279
kr.sendInitialState(ctx, config, resource)
@@ -256,6 +286,7 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects
256286
}
257287

258288
cancelCtx, cancel := context.WithCancel(ctx)
289+
defer cancel()
259290
cfgCopy := *config
260291
wait.UntilWithContext(cancelCtx, func(newCtx context.Context) {
261292
resourceVersion, err := getResourceVersion(newCtx, &cfgCopy, resource)
@@ -343,6 +374,10 @@ func (kr *k8sobjectsreceiver) doWatch(ctx context.Context, config *K8sObjectsCon
343374
res := watcher.ResultChan()
344375
for {
345376
select {
377+
case <-ctx.Done():
378+
kr.setting.Logger.Info("context canceled, stopping watch",
379+
zap.String("resource", config.gvr.String()))
380+
return true
346381
case data, ok := <-res:
347382
if data.Type == apiWatch.Error {
348383
errObject := apierrors.FromObject(data.Object)
@@ -372,8 +407,9 @@ func (kr *k8sobjectsreceiver) doWatch(ctx context.Context, config *K8sObjectsCon
372407
kr.setting.Logger.Error("error converting objects to log data", zap.Error(err))
373408
} else {
374409
obsCtx := kr.obsrecv.StartLogsOp(ctx)
410+
cnt := logs.LogRecordCount()
375411
err := kr.consumer.ConsumeLogs(obsCtx, logs)
376-
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), 1, err)
412+
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), cnt, err)
377413
}
378414
case <-stopperChan:
379415
watcher.Stop()

0 commit comments

Comments
 (0)