Skip to content

Commit c34c278

Browse files
receiver/k8sobjects: close race window between sendInitialState and watch
Use the list ResourceVersion returned by sendInitialState directly as the watch starting point on the first iteration, eliminating a redundant second List() call and the race window it created. Previously, sendInitialState() did a List() and then getResourceVersion() did another List() to determine where to start watching. Events occurring between the two calls could be missed or duplicated. Now sendInitialState() returns the list's own ResourceVersion (always >= any individual object RV), which is used directly on the first watch iteration. Subsequent iterations (e.g. after 410 Gone) fall back to getResourceVersion() as before. Also update the checkpoint with the list RV after sendInitialState so the persisted value accurately reflects the snapshot point of the listing, not just the highest individual object RV. Co-authored-by: Dhruv Shah <dhruv.shah@sumologic.com>
1 parent 2f017a2 commit c34c278

10 files changed

Lines changed: 140 additions & 45 deletions

File tree

internal/k8sinventory/go.mod

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ go 1.25.0
55
require (
66
github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.147.0
77
github.com/stretchr/testify v1.11.1
8-
go.opentelemetry.io/collector/component v1.54.1-0.20260326211300-c04f6776a74c
9-
go.opentelemetry.io/collector/extension/xextension v0.148.1-0.20260326211300-c04f6776a74c
8+
go.opentelemetry.io/collector/component v1.54.1-0.20260330144813-4d17eb8959de
9+
go.opentelemetry.io/collector/extension/xextension v0.148.1-0.20260330144813-4d17eb8959de
1010
go.uber.org/zap v1.27.1
1111
k8s.io/api v0.35.3
1212
k8s.io/apimachinery v0.35.3
@@ -35,11 +35,11 @@ require (
3535
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
3636
github.com/x448/float16 v0.8.4 // indirect
3737
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
38-
go.opentelemetry.io/collector/component/componenttest v0.148.1-0.20260326211300-c04f6776a74c // indirect
39-
go.opentelemetry.io/collector/extension v1.54.1-0.20260326211300-c04f6776a74c // indirect
40-
go.opentelemetry.io/collector/featuregate v1.54.1-0.20260326211300-c04f6776a74c // indirect
41-
go.opentelemetry.io/collector/internal/componentalias v0.148.1-0.20260326211300-c04f6776a74c // indirect
42-
go.opentelemetry.io/collector/pdata v1.54.1-0.20260326211300-c04f6776a74c // indirect
38+
go.opentelemetry.io/collector/component/componenttest v0.148.1-0.20260330144813-4d17eb8959de // indirect
39+
go.opentelemetry.io/collector/extension v1.54.1-0.20260330144813-4d17eb8959de // indirect
40+
go.opentelemetry.io/collector/featuregate v1.54.1-0.20260330144813-4d17eb8959de // indirect
41+
go.opentelemetry.io/collector/internal/componentalias v0.148.1-0.20260330144813-4d17eb8959de // indirect
42+
go.opentelemetry.io/collector/pdata v1.54.1-0.20260330144813-4d17eb8959de // indirect
4343
go.opentelemetry.io/otel v1.42.0 // indirect
4444
go.opentelemetry.io/otel/metric v1.42.0 // indirect
4545
go.opentelemetry.io/otel/sdk v1.42.0 // indirect

internal/k8sinventory/go.sum

Lines changed: 14 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/k8sinventory/watch/observer.go

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (o *Observer) startCheckpointFlusher(ctx context.Context, namespace string)
101101
setLatestRV = func(rv string) {
102102
// Buffer the latest rv in the checkpointer; Flush will write it to storage.
103103
if err := o.checkpointer.SetCheckpoint(context.Background(), namespace, o.config.Gvr.Resource, rv); err != nil {
104-
o.logger.Warn("failed to buffer resourceVersion",
104+
o.logger.Error("failed to buffer resourceVersion",
105105
zap.String("namespace", namespace),
106106
zap.String("resource", o.config.Gvr.Resource),
107107
zap.Error(err))
@@ -110,7 +110,7 @@ func (o *Observer) startCheckpointFlusher(ctx context.Context, namespace string)
110110

111111
flush = func() {
112112
if err := o.checkpointer.Flush(context.Background()); err != nil {
113-
o.logger.Warn("failed to flush checkpoints",
113+
o.logger.Error("failed to flush checkpoints",
114114
zap.String("namespace", namespace),
115115
zap.String("resource", o.config.Gvr.Resource),
116116
zap.Error(err))
@@ -154,22 +154,40 @@ func (o *Observer) startWatch(ctx context.Context, resource dynamic.ResourceInte
154154
setLatestRV, flushCheckpoint, stopFlusher := o.startCheckpointFlusher(cancelCtx, namespace)
155155
defer stopFlusher()
156156

157+
// initialListRV holds the list resourceVersion returned by sendInitialState.
158+
// It is used as the watch starting point on the first iteration, eliminating
159+
// a second List() call and closing the race window between the two listings.
160+
// It is cleared after the first iteration so subsequent restarts (e.g. after
161+
// a 410 Gone) fall back to getResourceVersion() as normal.
162+
var initialListRV string
157163
if o.config.IncludeInitialState {
158-
o.sendInitialState(ctx, resource, namespace, setLatestRV)
159-
// Force-flush immediately so the rv from the initial state is durable before
160-
// the watch loop starts. Without this, a 410 arriving within the first 5s
161-
// would delete a checkpoint that was never written due to the flush interval delay.
164+
initialListRV = o.sendInitialState(ctx, resource, namespace, setLatestRV)
165+
// Update the checkpoint with the list's own RV, which is >= any individual
166+
// object RV and represents the precise snapshot point of the listing.
167+
if initialListRV != "" {
168+
setLatestRV(initialListRV)
169+
}
170+
// Force-flush immediately so the rv is durable before the watch loop starts.
162171
flushCheckpoint()
163172
}
164173

165174
wait.UntilWithContext(cancelCtx, func(newCtx context.Context) {
166-
resourceVersion, err := o.getResourceVersion(newCtx, resource, namespace)
167-
if err != nil {
168-
o.logger.Error("could not retrieve a resourceVersion",
169-
zap.String("resource", o.config.Gvr.String()),
170-
zap.Error(err))
171-
cancel()
172-
return
175+
var resourceVersion string
176+
if initialListRV != "" {
177+
// First iteration: reuse the list RV from sendInitialState directly,
178+
// avoiding a redundant List() call and the race window it creates.
179+
resourceVersion = initialListRV
180+
initialListRV = ""
181+
} else {
182+
var err error
183+
resourceVersion, err = o.getResourceVersion(newCtx, resource, namespace)
184+
if err != nil {
185+
o.logger.Error("could not retrieve a resourceVersion",
186+
zap.String("resource", o.config.Gvr.String()),
187+
zap.Error(err))
188+
cancel()
189+
return
190+
}
173191
}
174192

175193
done := o.doWatch(ctx, resourceVersion, namespace, watchFunc, stopperChan, setLatestRV)
@@ -186,7 +204,7 @@ func (o *Observer) startWatch(ctx context.Context, resource dynamic.ResourceInte
186204
// This handles 410 Gone errors where the persisted resourceVersion is too old
187205
if o.checkpointer != nil {
188206
if err := o.checkpointer.DeleteCheckpoint(context.Background(), namespace, o.config.Gvr.Resource); err != nil {
189-
o.logger.Warn("failed to delete persisted resourceVersion after watch restart",
207+
o.logger.Error("failed to delete persisted resourceVersion after watch restart",
190208
zap.String("namespace", namespace),
191209
zap.String("resource", o.config.Gvr.Resource),
192210
zap.Error(err))
@@ -201,7 +219,9 @@ func (o *Observer) startWatch(ctx context.Context, resource dynamic.ResourceInte
201219
// those objects were already seen before the last checkpoint. setLatestRV is
202220
// called for every event that is emitted so the flusher tracks the high-water
203221
// mark across the initial listing as well as the watch loop.
204-
func (o *Observer) sendInitialState(ctx context.Context, resource dynamic.ResourceInterface, namespace string, setLatestRV func(string)) {
222+
// It returns the list's own ResourceVersion, which the caller should use as
223+
// the watch starting point to avoid a redundant List() call.
224+
func (o *Observer) sendInitialState(ctx context.Context, resource dynamic.ResourceInterface, namespace string, setLatestRV func(string)) string {
205225
o.logger.Info("sending initial state",
206226
zap.String("resource", o.config.Gvr.String()),
207227
zap.Strings("namespaces", o.config.Namespaces))
@@ -231,13 +251,15 @@ func (o *Observer) sendInitialState(ctx context.Context, resource dynamic.Resour
231251
o.logger.Error("error in listing objects for initial state",
232252
zap.String("resource", o.config.Gvr.String()),
233253
zap.Error(err))
234-
return
254+
return ""
235255
}
236256

257+
listRV := objects.GetResourceVersion()
258+
237259
if len(objects.Items) == 0 {
238260
o.logger.Debug("no objects found for initial state",
239261
zap.String("resource", o.config.Gvr.String()))
240-
return
262+
return listRV
241263
}
242264

243265
emitted := 0
@@ -271,8 +293,11 @@ func (o *Observer) sendInitialState(ctx context.Context, resource dynamic.Resour
271293
}
272294

273295
o.logger.Info("initial state sent",
296+
zap.String("namespace", namespace),
297+
zap.String("list_rv", listRV),
274298
zap.String("resource", o.config.Gvr.String()),
275299
zap.Int("object_count", emitted))
300+
return listRV
276301
}
277302

278303
// doWatch returns true when watching is done, false when watching should be restarted.

internal/k8sinventory/watch/observer_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,76 @@ func TestObserverInitialStateNoObjects(t *testing.T) {
367367
wg.Wait()
368368
}
369369

370+
// TestSendInitialStateReturnsListRV verifies that sendInitialState returns the
371+
// list's own ResourceVersion, not just the highest individual object RV.
372+
// The list RV is always >= any individual object RV and is the correct starting
373+
// point for the subsequent watch to avoid a race window between two List calls.
374+
func TestSendInitialStateReturnsListRV(t *testing.T) {
375+
mockClient := newMockDynamicClient()
376+
// Set list RV to "999", higher than any individual object RV below.
377+
mockClient.setListResourceVersion("999")
378+
mockClient.createPods(
379+
generatePod("pod1", "default", nil, "100"),
380+
generatePod("pod2", "default", nil, "200"),
381+
)
382+
383+
cfg := Config{
384+
Config: k8sinventory.Config{
385+
Gvr: schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"},
386+
Namespaces: []string{"default"},
387+
},
388+
IncludeInitialState: true,
389+
}
390+
391+
obs, err := New(mockClient, cfg, zap.NewNop(), nil, nil)
392+
require.NoError(t, err)
393+
394+
resource := mockClient.Resource(cfg.Gvr)
395+
listRV := obs.sendInitialState(t.Context(), resource.Namespace("default"), "default", func(string) {})
396+
assert.Equal(t, "999", listRV, "sendInitialState should return the list's own ResourceVersion")
397+
}
398+
399+
// TestInitialStateListRVPersistedAsCheckpoint verifies that after sendInitialState
400+
// the checkpoint is updated with the list's RV (via setLatestRV in startWatch),
401+
// which is more accurate than the highest individual object RV.
402+
func TestInitialStateListRVPersistedAsCheckpoint(t *testing.T) {
403+
mockClient := newMockDynamicClient()
404+
storageClient := storagetest.NewInMemoryClient(component.KindReceiver, component.MustNewID("test"), "test")
405+
406+
// List RV "999" is higher than any individual pod RV — after startup the
407+
// checkpoint should hold "999", not "200".
408+
mockClient.setListResourceVersion("999")
409+
mockClient.createPods(
410+
generatePod("pod1", "default", nil, "100"),
411+
generatePod("pod2", "default", nil, "200"),
412+
)
413+
414+
cfg := Config{
415+
Config: k8sinventory.Config{
416+
Gvr: schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"},
417+
Namespaces: []string{"default"},
418+
},
419+
IncludeInitialState: true,
420+
PersistResourceVersion: true,
421+
}
422+
423+
obs, err := New(mockClient, cfg, zap.NewNop(), storageClient, nil)
424+
require.NoError(t, err)
425+
426+
wg := sync.WaitGroup{}
427+
stopChan := obs.Start(t.Context(), &wg)
428+
429+
time.Sleep(200 * time.Millisecond)
430+
431+
close(stopChan)
432+
wg.Wait()
433+
434+
cp := newCheckpointer(storageClient, zap.NewNop())
435+
rv, err := cp.GetCheckpoint(t.Context(), "default", "pods")
436+
require.NoError(t, err)
437+
assert.Equal(t, "999", rv, "checkpoint should hold the list RV, not a lower individual object RV")
438+
}
439+
370440
func verifyReceivedEvents(t *testing.T, numEvents int, receivedEventsChan chan *apiWatch.Event, stopChan chan struct{}) {
371441
receivedEvents := 0
372442

receiver/k8seventsreceiver/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ the K8s API server. This can be one of `none` (for no auth), `serviceAccount`
3333
- `namespaces` (default = `all`): An array of `namespaces` to collect events from.
3434
This receiver will continuously watch all the `namespaces` mentioned in the array for
3535
new events.
36-
- `storage` (default: none): specifies the storage extension to use for persisting resourceVersions. Required when `persist_resource_version` is enabled.
36+
- `storage` (default: none): specifies the storage extension to use for persisting resourceVersions. Required when `persist_resource_version` is enabled. The attached storage should be a persistent volume, which is accessible by all the nodes in the given cluster. If local volume is used, persistence will break when the pod is moved to another node.
3737
- `persist_resource_version` (default: `false`): when set to `true`, the receiver persists the resourceVersion after processing each event. On restart, the receiver resumes watching from the persisted resourceVersion, preventing duplicate events. Requires a `storage` extension to be configured.
3838

3939
Examples:

receiver/k8sobjectsreceiver/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ use this config to specify the group to select. By default, it will select the f
6969
For example, `events` resource is available in both `v1` and `events.k8s.io/v1` APIGroup. In
7070
this case, it will select `v1` by default.
7171
- `k8s_leader_elector` (default: none): if specified, will enable Leader Election by using `k8sleaderelector` extension
72-
- `storage` (default: none): specifies the storage extension to use for persisting resourceVersions. Required when `persist_resource_version` is enabled.
72+
- `storage` (default: none): specifies the storage extension to use for persisting resourceVersions. Required when `persist_resource_version` is enabled. The attached storage should be a persistent volume, which is accessible by all the nodes in the given cluster. If local volume is used, persistence will break when the pod is moved to another node.
7373
- `persist_resource_version` (default: `false`): when set to `true` (watch-mode only), the receiver persists the resourceVersion after processing each event. On restart, the receiver resumes watching from the persisted resourceVersion, preventing duplicate events. Requires a `storage` extension to be configured.
7474

7575

receiver/k8sobjectsreceiver/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (c *Config) Validate() error {
102102
}
103103

104104
if object.PersistResourceVersion && object.ResourceVersion != "" {
105-
return errors.New("resource_version cannot be set when persist_resource_version is enabled; the persisted version takes priority")
105+
return errors.New("resource_version cannot be set when persist_resource_version is enabled")
106106
}
107107

108108
if len(object.ExcludeNamespaces) != 0 && len(object.Namespaces) != 0 {

receiver/k8sobjectsreceiver/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func TestConfigValidationPersistResourceVersion(t *testing.T) {
246246
},
247247
},
248248
},
249-
expectedErr: "resource_version cannot be set when persist_resource_version is enabled; the persisted version takes priority",
249+
expectedErr: "resource_version cannot be set when persist_resource_version is enabled",
250250
},
251251
{
252252
desc: "resource_version set without persist_resource_version is valid",

receiver/k8sobjectsreceiver/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ require (
2020
go.opentelemetry.io/collector/confmap v1.54.1-0.20260330144813-4d17eb8959de
2121
go.opentelemetry.io/collector/consumer v1.54.1-0.20260330144813-4d17eb8959de
2222
go.opentelemetry.io/collector/consumer/consumertest v0.148.1-0.20260330144813-4d17eb8959de
23-
go.opentelemetry.io/collector/extension/xextension v0.148.1-0.20260326211300-c04f6776a74c
23+
go.opentelemetry.io/collector/extension/xextension v0.148.1-0.20260330144813-4d17eb8959de
2424
go.opentelemetry.io/collector/filter v0.148.1-0.20260330144813-4d17eb8959de
2525
go.opentelemetry.io/collector/pdata v1.54.1-0.20260330144813-4d17eb8959de
2626
go.opentelemetry.io/collector/receiver v1.54.1-0.20260330144813-4d17eb8959de

0 commit comments

Comments
 (0)