Skip to content

Commit 55cba2d

Browse files
Add resourceVersion persistence feature to k8s receivers
This commit implements resourceVersion persistence for k8sobjects and k8sevents receivers to prevent duplicate events on collector restart. Key changes: - Add checkpointer component for storing/retrieving resourceVersions - Modify observer to compare list vs persisted versions and use highest - Add storage client initialization in receivers - Update config schemas with new persist_resource_version and storage fields - Add comprehensive unit tests for persistence logic - Consolidate test files into main test files The feature is opt-in via persist_resource_version config flag and requires a storage extension to be configured. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 39fe68a commit 55cba2d

20 files changed

Lines changed: 1752 additions & 232 deletions

File tree

internal/k8sinventory/go.mod

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,28 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinv
33
go 1.25.0
44

55
require (
6+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.147.0
67
github.com/stretchr/testify v1.11.1
8+
go.opentelemetry.io/collector/extension/xextension v0.147.0
79
go.uber.org/zap v1.27.1
810
k8s.io/api v0.35.2
911
k8s.io/apimachinery v0.35.2
1012
k8s.io/client-go v0.35.2
1113
)
1214

1315
require (
16+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
1417
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
1518
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
1619
github.com/go-logr/logr v1.4.3 // indirect
20+
github.com/go-logr/stdr v1.2.2 // indirect
1721
github.com/go-openapi/jsonpointer v0.21.0 // indirect
1822
github.com/go-openapi/jsonreference v0.20.2 // indirect
1923
github.com/go-openapi/swag v0.23.0 // indirect
2024
github.com/google/gnostic-models v0.7.0 // indirect
2125
github.com/google/go-cmp v0.7.0 // indirect
26+
github.com/google/uuid v1.6.0 // indirect
27+
github.com/hashicorp/go-version v1.8.0 // indirect
2228
github.com/josharian/intern v1.0.0 // indirect
2329
github.com/json-iterator/go v1.1.12 // indirect
2430
github.com/mailru/easyjson v0.7.7 // indirect
@@ -27,6 +33,18 @@ require (
2733
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
2834
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
2935
github.com/x448/float16 v0.8.4 // indirect
36+
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
37+
go.opentelemetry.io/collector/component v1.53.0 // indirect
38+
go.opentelemetry.io/collector/component/componenttest v0.147.0 // indirect
39+
go.opentelemetry.io/collector/extension v1.53.0 // indirect
40+
go.opentelemetry.io/collector/featuregate v1.53.0 // indirect
41+
go.opentelemetry.io/collector/internal/componentalias v0.147.0 // indirect
42+
go.opentelemetry.io/collector/pdata v1.53.0 // indirect
43+
go.opentelemetry.io/otel v1.40.0 // indirect
44+
go.opentelemetry.io/otel/metric v1.40.0 // indirect
45+
go.opentelemetry.io/otel/sdk v1.40.0 // indirect
46+
go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect
47+
go.opentelemetry.io/otel/trace v1.40.0 // indirect
3048
go.uber.org/multierr v1.11.0 // indirect
3149
go.yaml.in/yaml/v2 v2.4.3 // indirect
3250
go.yaml.in/yaml/v3 v3.0.4 // indirect
@@ -36,7 +54,7 @@ require (
3654
golang.org/x/term v0.40.0 // indirect
3755
golang.org/x/text v0.34.0 // indirect
3856
golang.org/x/time v0.9.0 // indirect
39-
google.golang.org/protobuf v1.36.10 // indirect
57+
google.golang.org/protobuf v1.36.11 // indirect
4058
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
4159
gopkg.in/inf.v0 v0.9.1 // indirect
4260
gopkg.in/yaml.v3 v3.0.1 // indirect

internal/k8sinventory/go.sum

Lines changed: 45 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package watch
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"go.opentelemetry.io/collector/extension/xextension/storage"
9+
"go.uber.org/zap"
10+
)
11+
12+
type checkpointer struct {
13+
client storage.Client
14+
logger *zap.Logger
15+
}
16+
17+
const checkpointKeyFormat = "latestResourceVersion/%s"
18+
19+
func newCheckpointer(client storage.Client, logger *zap.Logger) *checkpointer {
20+
return &checkpointer{
21+
client: client,
22+
logger: logger,
23+
}
24+
}
25+
26+
func (c *checkpointer) GetResourceVersion(ctx context.Context,
27+
namespace, objectType string) (string, error) {
28+
if c.client == nil {
29+
return "", errors.New("storage client is nil")
30+
}
31+
32+
checkPointKey := c.getCheckpointKey(namespace, objectType)
33+
data, err := c.client.Get(ctx, checkPointKey)
34+
if err != nil {
35+
c.logger.Warn("Error retrieving checkpoint",
36+
zap.String("namespace", namespace),
37+
zap.String("objectType", objectType),
38+
zap.Error(err))
39+
return "", fmt.Errorf("failed to retrieve checkpoint: %w", err)
40+
}
41+
42+
// If key is not found, data and error is nil
43+
if len(data) == 0 {
44+
c.logger.Debug("No checkpoint found, starting from the beginning",
45+
zap.String("key", checkPointKey))
46+
return "", nil
47+
}
48+
return string(data), nil
49+
}
50+
51+
func (c *checkpointer) SetResourceVersion(
52+
ctx context.Context,
53+
namespace, objectType, resourceVersion string) error {
54+
if c.client == nil {
55+
return errors.New("storage client is nil")
56+
}
57+
58+
key := c.getCheckpointKey(namespace, objectType)
59+
if key == "" {
60+
return fmt.Errorf("checkpoint key is empty: %s, %s", namespace, objectType)
61+
}
62+
63+
if err := c.client.Set(ctx, key, []byte(resourceVersion)); err != nil {
64+
return fmt.Errorf("failed to store resourceVersion with key %s: %w", key, err)
65+
}
66+
67+
c.logger.Debug("Checkpoint saved",
68+
zap.String("namespace", namespace),
69+
zap.String("objectType", objectType))
70+
71+
return nil
72+
}
73+
74+
// getCheckpointKey generates a unique storage key
75+
// returns resourceVersion key for global watch stream (without namespace) or
76+
// per namespace watch stream.
77+
func (*checkpointer) getCheckpointKey(namespace, objectType string) string {
78+
// when watch stream is cluster-wide or cluster-scoped resource (no namespace),
79+
// the resource version is persisted per object type only.
80+
if namespace == "" {
81+
// example: latestResourceVersion/nodes, latestResourceVersion/namespaces
82+
return fmt.Sprintf(checkpointKeyFormat, objectType)
83+
}
84+
85+
// when watch stream is created per namespace, the resource version is persisted
86+
// per object type per namespace.
87+
// example: latestResourceVersion/pods.default, latestResourceVersion/configmaps.kube-system
88+
return fmt.Sprintf("%s.%s", fmt.Sprintf(checkpointKeyFormat, objectType), namespace)
89+
}

0 commit comments

Comments
 (0)