Skip to content

Commit b23b74a

Browse files
Adds resourceVersion support to k8sObject receiver (#46543)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Implements optional resourceVersion checkpointing to prevent duplicate events on collector restart. Resource version persistence is developed as part of watch mechanism which is being used by k8sObjects receiver and k8sEvents receiver. The feature is not enabled for `pull` mode in any of the receivers. Features: - Opt-in via declaring storage extension for persistence - Namespace-aware checkpointing: Creates per stream checkpoint when separate namespaces are mentioned. Per namespace keys are maintained in the storage. The format is `latestResourceVersion/configmaps.kube-system` for per namespace watch stream. In case namespaces are not specified, a global clusterwide watch stream is created that creates a different checkpoint key with format: `latestResourceVersion/pods, latestResourceVersion/nodes` - Watch mode only (validated at config time) - If persistence is enabled, then `resource_version` provided in the config will be ignored. The persisted version will be used to kick-off the watch stream. - In case of stale persisted version, a List() API will be called to get the latest resource version available which is the existing way of handling stale resource version in k8s inventory. Configuration: ``` receivers: k8sobjects: storage: file_storage objects: - name: pods mode: watch ``` How it works: 1. At the start of each watch stream, `getResourceVersion()` is called which checks if persistence is enabled then it retrieves the persisted version. - If persisted version is empty then it calls List() API to get the list of available objects and picks the latest version and persists it. - If persisted version is non-empty then it is supplied to kick-off the watch stream. - In case, the persisted version supplied to start watch stream in the step above is stale then checkpoint is deleted. This prompts the List() API call again to get the latest available resource version for the given stream. 2. After processing each watch event, saves resourceVersion to storage. 3. In case persistence operation fails for any reason, the watch stream continues. #### Link to tracking issue Fixes #46017 <!--Describe what testing was performed and which tests were added.--> #### Testing - Unit tests - Manually built the updated image using ` make docker-otelcontribcol` and deployed in local. Tested various restarts and 410 handling scenarios for global watch streams, per namespace watch streams. <!--Describe the documentation added.--> #### Documentation - Updated schema yaml files for k8sobject and k8sevents receiver. <!--Please delete paragraphs that you did not use before submitting.--> Example - how the new config would look like? ``` extensions: file_storage: directory: /var/lib/otelcol/storage timeout: 10s receivers: k8sobjects: auth_type: serviceAccount storage: file_storage include_initial_state: true objects: - name: pods mode: watch namespaces: [default] - name: events mode: watch namespaces: [default] exporters: nop: service: telemetry: logs: level: debug extensions: [file_storage] pipelines: logs: receivers: [k8sobjects] exporters: [nop] ``` --------- Signed-off-by: Dhruv Shah <dhruv.shah@sumologic.com>
1 parent 4aa4ab4 commit b23b74a

17 files changed

Lines changed: 1759 additions & 65 deletions

File tree

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
change_type: enhancement
2+
3+
component: receiver/k8sobjects
4+
5+
note: When `storage` is configured, watch-mode objects automatically resume from the last seen resourceVersion across restarts, preventing event duplication.
6+
7+
issues: [46543]
8+
9+
subtext:
10+
11+
change_logs: [user]

internal/k8sinventory/go.mod

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,29 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinv
33
go 1.25.0
44

55
require (
6+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.147.0
67
github.com/stretchr/testify v1.11.1
8+
go.opentelemetry.io/collector/component v1.56.1-0.20260415114935-307e3abdbae9
9+
go.opentelemetry.io/collector/extension/xextension v0.150.1-0.20260415114935-307e3abdbae9
710
go.uber.org/zap v1.27.1
811
k8s.io/api v0.35.3
912
k8s.io/apimachinery v0.35.3
1013
k8s.io/client-go v0.35.3
1114
)
1215

1316
require (
17+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
1418
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
1519
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
1620
github.com/go-logr/logr v1.4.3 // indirect
21+
github.com/go-logr/stdr v1.2.2 // indirect
1722
github.com/go-openapi/jsonpointer v0.21.0 // indirect
1823
github.com/go-openapi/jsonreference v0.20.2 // indirect
1924
github.com/go-openapi/swag v0.23.0 // indirect
2025
github.com/google/gnostic-models v0.7.0 // indirect
2126
github.com/google/go-cmp v0.7.0 // indirect
27+
github.com/google/uuid v1.6.0 // indirect
28+
github.com/hashicorp/go-version v1.9.0 // indirect
2229
github.com/josharian/intern v1.0.0 // indirect
2330
github.com/json-iterator/go v1.1.12 // indirect
2431
github.com/mailru/easyjson v0.7.7 // indirect
@@ -27,16 +34,27 @@ require (
2734
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
2835
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
2936
github.com/x448/float16 v0.8.4 // indirect
37+
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
38+
go.opentelemetry.io/collector/component/componenttest v0.150.1-0.20260415114935-307e3abdbae9 // indirect
39+
go.opentelemetry.io/collector/extension v1.56.1-0.20260415114935-307e3abdbae9 // indirect
40+
go.opentelemetry.io/collector/featuregate v1.56.1-0.20260415114935-307e3abdbae9 // indirect
41+
go.opentelemetry.io/collector/internal/componentalias v0.150.1-0.20260415114935-307e3abdbae9 // indirect
42+
go.opentelemetry.io/collector/pdata v1.56.1-0.20260415114935-307e3abdbae9 // indirect
43+
go.opentelemetry.io/otel v1.43.0 // indirect
44+
go.opentelemetry.io/otel/metric v1.43.0 // indirect
45+
go.opentelemetry.io/otel/sdk v1.43.0 // indirect
46+
go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect
47+
go.opentelemetry.io/otel/trace v1.43.0 // indirect
3048
go.uber.org/multierr v1.11.0 // indirect
3149
go.yaml.in/yaml/v2 v2.4.3 // indirect
3250
go.yaml.in/yaml/v3 v3.0.4 // indirect
3351
golang.org/x/net v0.51.0 // indirect
3452
golang.org/x/oauth2 v0.30.0 // indirect
35-
golang.org/x/sys v0.41.0 // indirect
53+
golang.org/x/sys v0.42.0 // indirect
3654
golang.org/x/term v0.40.0 // indirect
3755
golang.org/x/text v0.34.0 // indirect
3856
golang.org/x/time v0.9.0 // indirect
39-
google.golang.org/protobuf v1.36.10 // indirect
57+
google.golang.org/protobuf v1.36.11 // indirect
4058
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
4159
gopkg.in/inf.v0 v0.9.1 // indirect
4260
gopkg.in/yaml.v3 v3.0.1 // indirect
@@ -48,3 +66,5 @@ require (
4866
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect
4967
sigs.k8s.io/yaml v1.6.0 // indirect
5068
)
69+
70+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage => ../../extension/storage

internal/k8sinventory/go.sum

Lines changed: 45 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package watch // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinventory/watch"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"strconv"
11+
"sync"
12+
13+
"go.opentelemetry.io/collector/extension/xextension/storage"
14+
"go.uber.org/zap"
15+
)
16+
17+
type checkpointer struct {
18+
client storage.Client
19+
logger *zap.Logger
20+
21+
// pending holds the latest resourceVersion per storage key, buffered in
22+
// memory across all watch streams. Flush() drains it to persistent storage.
23+
mu sync.Mutex
24+
pending map[string]string
25+
}
26+
27+
const checkpointKeyFormat = "latestResourceVersion/%s"
28+
29+
func newCheckpointer(client storage.Client, logger *zap.Logger) *checkpointer {
30+
return &checkpointer{
31+
client: client,
32+
logger: logger,
33+
pending: make(map[string]string),
34+
}
35+
}
36+
37+
func (c *checkpointer) GetCheckpoint(ctx context.Context, namespace, objectType string) (string, error) {
38+
if c.client == nil {
39+
return "", errors.New("storage client is nil")
40+
}
41+
42+
checkPointKey := c.getCheckpointKey(namespace, objectType)
43+
c.logger.Debug("Retrieving checkpoint, key: "+checkPointKey,
44+
zap.String("namespace", namespace),
45+
zap.String("objectType", objectType))
46+
data, err := c.client.Get(ctx, checkPointKey)
47+
if err != nil {
48+
c.logger.Warn("Error retrieving checkpoint",
49+
zap.String("namespace", namespace),
50+
zap.String("objectType", objectType),
51+
zap.Error(err))
52+
return "", fmt.Errorf("failed to retrieve checkpoint: %w", err)
53+
}
54+
55+
// If key is not found, data and error is nil
56+
if len(data) == 0 {
57+
c.logger.Debug("No checkpoint found, starting from the beginning",
58+
zap.String("key", checkPointKey))
59+
return "", nil
60+
}
61+
return string(data), nil
62+
}
63+
64+
// SetCheckpoint buffers the latest resourceVersion for the given namespace and
65+
// objectType in memory. Call Flush to persist all buffered values to storage.
66+
// Only updates the in-memory value if the new resourceVersion is numerically
67+
// greater than the current one, acting as a high-watermark. This guards against
68+
// out-of-order resourceVersions from List() responses (which are ordered by
69+
// object key, not by resourceVersion).
70+
func (c *checkpointer) SetCheckpoint(
71+
_ context.Context,
72+
namespace, objectType, resourceVersion string,
73+
) error {
74+
key := c.getCheckpointKey(namespace, objectType)
75+
if key == "" {
76+
return fmt.Errorf("checkpoint key is empty: %s, %s", namespace, objectType)
77+
}
78+
79+
newRV, err := strconv.ParseInt(resourceVersion, 10, 64)
80+
if err != nil {
81+
return fmt.Errorf("invalid resourceVersion %q: %w", resourceVersion, err)
82+
}
83+
84+
c.mu.Lock()
85+
if existing, ok := c.pending[key]; ok {
86+
if existingRV, err := strconv.ParseInt(existing, 10, 64); err == nil && newRV <= existingRV {
87+
c.mu.Unlock()
88+
return nil
89+
}
90+
}
91+
c.pending[key] = resourceVersion
92+
c.mu.Unlock()
93+
94+
c.logger.Debug("buffered resourceVersion checkpoint",
95+
zap.String("key", key),
96+
zap.String("resourceVersion", resourceVersion))
97+
98+
return nil
99+
}
100+
101+
// Flush writes all buffered checkpoints to persistent storage. Only the latest
102+
// value per key is written, discarding any intermediate updates since the last
103+
// flush. It is safe to call concurrently from multiple goroutines.
104+
func (c *checkpointer) Flush(ctx context.Context) error {
105+
if c.client == nil {
106+
return errors.New("storage client is nil")
107+
}
108+
109+
c.mu.Lock()
110+
if len(c.pending) == 0 {
111+
c.mu.Unlock()
112+
return nil
113+
}
114+
snapshot := c.pending
115+
// Setting c.pending to an empty map to avoid unnecessary writes when there are no pending updates
116+
// to be flushed to the disk.
117+
c.pending = make(map[string]string)
118+
c.mu.Unlock()
119+
120+
failed := false
121+
for key, rv := range snapshot {
122+
if err := c.client.Set(ctx, key, []byte(rv)); err != nil {
123+
c.logger.Error("failed to flush checkpoint",
124+
zap.String("key", key),
125+
zap.String("resourceVersion", rv),
126+
zap.Error(err))
127+
failed = true
128+
continue
129+
}
130+
c.logger.Debug("flushed resourceVersion checkpoint",
131+
zap.String("key", key),
132+
zap.String("resourceVersion", rv))
133+
}
134+
if failed {
135+
return errors.New("one or more checkpoints failed to be stored")
136+
}
137+
return nil
138+
}
139+
140+
// DeleteCheckpoint deletes the persisted checkpoint for a given namespace and object type.
141+
// This is used when the persisted resourceVersion is no longer valid (e.g., after a 410 Gone error).
142+
func (c *checkpointer) DeleteCheckpoint(
143+
ctx context.Context,
144+
namespace, objectType string,
145+
) error {
146+
if c.client == nil {
147+
return errors.New("storage client is nil")
148+
}
149+
150+
key := c.getCheckpointKey(namespace, objectType)
151+
if key == "" {
152+
return fmt.Errorf("checkpoint key is empty: %s, %s", namespace, objectType)
153+
}
154+
155+
if err := c.client.Delete(ctx, key); err != nil {
156+
return fmt.Errorf("failed to delete resourceVersion with key %s: %w", key, err)
157+
}
158+
159+
c.logger.Debug("Checkpoint deleted with key: "+key,
160+
zap.String("namespace", namespace),
161+
zap.String("objectType", objectType))
162+
163+
return nil
164+
}
165+
166+
// getCheckpointKey generates a unique storage key
167+
// returns resourceVersion key for global watch stream (without namespace) or
168+
// per namespace watch stream.
169+
func (*checkpointer) getCheckpointKey(namespace, objectType string) string {
170+
// when watch stream is cluster-wide or cluster-scoped resource (no namespace),
171+
// the resource version is persisted per object type only.
172+
if namespace == "" {
173+
// example: latestResourceVersion/nodes, latestResourceVersion/namespaces
174+
return fmt.Sprintf(checkpointKeyFormat, objectType)
175+
}
176+
177+
// when watch stream is created per namespace, the resource version is persisted
178+
// per object type per namespace.
179+
// example: latestResourceVersion/pods.default, latestResourceVersion/configmaps.kube-system
180+
return fmt.Sprintf("%s.%s", fmt.Sprintf(checkpointKeyFormat, objectType), namespace)
181+
}

0 commit comments

Comments
 (0)