Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8693981
Adds resourceVersion support to k8sObject receiver
dhruv-shah-sumo Mar 4, 2026
0dbdd04
Add resourceVersion persistence feature to k8s receivers
dhruv-shah-sumo Mar 9, 2026
0705e53
Refactor resourceVersion persistence with prioritized checkpoint loading
dhruv-shah-sumo Mar 10, 2026
5a07726
Add persist_resource_version documentation to k8s receivers
dhruv-shah-sumo Mar 11, 2026
dab4bdd
Add debug logging and fix test context usage
dhruv-shah-sumo Mar 17, 2026
74a3d2b
Fix CI failures: lint, porto, changelog, and module versions
dhruv-shah-sumo Mar 28, 2026
9e92f03
receiver/k8sobjects: add persist_resource_version support for watch mode
dhruv-shah-sumo Mar 29, 2026
93a8187
receiver/k8sobjects: fix lint errors in observer.go
dhruv-shah-sumo Mar 29, 2026
85edef4
receiver/k8sobjects: close race window between sendInitialState and w…
dhruv-shah-sumo Mar 31, 2026
3fd1164
receiver/k8sobjects: improve checkpointer reliability and initial sta…
dhruv-shah-sumo Apr 1, 2026
84ee801
internal/k8sinventory/watch: remove observer tests invalidated by con…
dhruv-shah-sumo Apr 1, 2026
4538dce
chore: fix go.mod deps, crosslink replace directives, and gci formatting
dhruv-shah-sumo Apr 5, 2026
e20ade7
chore: go mod tidy after rebase
dhruv-shah-sumo Apr 7, 2026
09ef108
Move persist_resource_version to top-level config in k8sobjectsreceiver
dhruv-shah-sumo Apr 7, 2026
f27a8f0
Fix config.schema.yaml: move persist_resource_version to top-level
dhruv-shah-sumo Apr 7, 2026
ec2223c
receiver/k8sobjects: remove persist_resource_version config, auto-per…
dhruv-shah-sumo Apr 8, 2026
1c166c0
receiver/k8sobjects: fix deployment example to use PersistentVolumeCl…
dhruv-shah-sumo Apr 8, 2026
a15da3a
docs: add storage caveats and fix watchobserver.New signature
dhruv-shah-sumo Apr 13, 2026
0ad8d4b
receiver/k8sobjects: inline getStorageClient to remove pkg/stanza dep…
dhruv-shah-sumo Apr 13, 2026
3cbe3f0
chore: run crosslink to prune stale replace directives
dhruv-shah-sumo Apr 13, 2026
4c19e4b
rebase
dhruv-shah-sumo Apr 20, 2026
327e60f
fix: go mod tidy for missing go.sum entries
dhruv-shah-sumo Apr 20, 2026
809b419
fix: crosslink and go.sum
dhruv-shah-sumo Apr 20, 2026
e5fa5a6
Merge branch 'main' into add-resourceversion-docs
dhruv-shah-sumo Apr 20, 2026
573b40c
fix: bump drainprocessor collector deps
dhruv-shah-sumo Apr 20, 2026
6828119
Merge branch 'main' into add-resourceversion-docs
dhruv-shah-sumo Apr 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .chloggen/k8sobjects-persist-resourceversion.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
change_type: enhancement

component: receiver/k8sobjects

note: When `storage` is configured, watch-mode objects automatically resume from the last seen resourceVersion across restarts, preventing event duplication.

issues: [46543]

subtext:

change_logs: [user]
24 changes: 22 additions & 2 deletions internal/k8sinventory/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,29 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinv
go 1.25.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.147.0
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/component v1.56.1-0.20260415114935-307e3abdbae9
go.opentelemetry.io/collector/extension/xextension v0.150.1-0.20260415114935-307e3abdbae9
go.uber.org/zap v1.27.1
k8s.io/api v0.35.3
k8s.io/apimachinery v0.35.3
k8s.io/client-go v0.35.3
)

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/google/gnostic-models v0.7.0 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.9.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -27,16 +34,27 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/collector/component/componenttest v0.150.1-0.20260415114935-307e3abdbae9 // indirect
go.opentelemetry.io/collector/extension v1.56.1-0.20260415114935-307e3abdbae9 // indirect
go.opentelemetry.io/collector/featuregate v1.56.1-0.20260415114935-307e3abdbae9 // indirect
go.opentelemetry.io/collector/internal/componentalias v0.150.1-0.20260415114935-307e3abdbae9 // indirect
go.opentelemetry.io/collector/pdata v1.56.1-0.20260415114935-307e3abdbae9 // indirect
go.opentelemetry.io/otel v1.43.0 // indirect
go.opentelemetry.io/otel/metric v1.43.0 // indirect
go.opentelemetry.io/otel/sdk v1.43.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect
go.opentelemetry.io/otel/trace v1.43.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/net v0.51.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/term v0.40.0 // indirect
golang.org/x/text v0.34.0 // indirect
golang.org/x/time v0.9.0 // indirect
google.golang.org/protobuf v1.36.10 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand All @@ -48,3 +66,5 @@ require (
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect
sigs.k8s.io/yaml v1.6.0 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage => ../../extension/storage
49 changes: 45 additions & 4 deletions internal/k8sinventory/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

181 changes: 181 additions & 0 deletions internal/k8sinventory/watch/checkpointer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package watch // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sinventory/watch"

import (
"context"
"errors"
"fmt"
"strconv"
"sync"

"go.opentelemetry.io/collector/extension/xextension/storage"
"go.uber.org/zap"
)

type checkpointer struct {
client storage.Client
logger *zap.Logger

// pending holds the latest resourceVersion per storage key, buffered in
// memory across all watch streams. Flush() drains it to persistent storage.
mu sync.Mutex
pending map[string]string
}

const checkpointKeyFormat = "latestResourceVersion/%s"

func newCheckpointer(client storage.Client, logger *zap.Logger) *checkpointer {
return &checkpointer{
client: client,
logger: logger,
pending: make(map[string]string),
}
}

func (c *checkpointer) GetCheckpoint(ctx context.Context, namespace, objectType string) (string, error) {
if c.client == nil {
return "", errors.New("storage client is nil")
}

checkPointKey := c.getCheckpointKey(namespace, objectType)
c.logger.Debug("Retrieving checkpoint, key: "+checkPointKey,
zap.String("namespace", namespace),
zap.String("objectType", objectType))
data, err := c.client.Get(ctx, checkPointKey)
if err != nil {
c.logger.Warn("Error retrieving checkpoint",
zap.String("namespace", namespace),
zap.String("objectType", objectType),
zap.Error(err))
return "", fmt.Errorf("failed to retrieve checkpoint: %w", err)
}

// If key is not found, data and error is nil
Comment thread
dhruv-shah-sumo marked this conversation as resolved.
if len(data) == 0 {
c.logger.Debug("No checkpoint found, starting from the beginning",
zap.String("key", checkPointKey))
return "", nil
}
return string(data), nil
}

// SetCheckpoint buffers the latest resourceVersion for the given namespace and
// objectType in memory. Call Flush to persist all buffered values to storage.
// Only updates the in-memory value if the new resourceVersion is numerically
// greater than the current one, acting as a high-watermark. This guards against
// out-of-order resourceVersions from List() responses (which are ordered by
// object key, not by resourceVersion).
func (c *checkpointer) SetCheckpoint(
Comment thread
ChrsMark marked this conversation as resolved.
_ context.Context,
namespace, objectType, resourceVersion string,
) error {
key := c.getCheckpointKey(namespace, objectType)
if key == "" {
return fmt.Errorf("checkpoint key is empty: %s, %s", namespace, objectType)
}

newRV, err := strconv.ParseInt(resourceVersion, 10, 64)
if err != nil {
return fmt.Errorf("invalid resourceVersion %q: %w", resourceVersion, err)
}

c.mu.Lock()
if existing, ok := c.pending[key]; ok {
if existingRV, err := strconv.ParseInt(existing, 10, 64); err == nil && newRV <= existingRV {
c.mu.Unlock()
return nil
}
}
c.pending[key] = resourceVersion
c.mu.Unlock()

c.logger.Debug("buffered resourceVersion checkpoint",
zap.String("key", key),
zap.String("resourceVersion", resourceVersion))

return nil
}

// Flush writes all buffered checkpoints to persistent storage. Only the latest
// value per key is written, discarding any intermediate updates since the last
// flush. It is safe to call concurrently from multiple goroutines.
func (c *checkpointer) Flush(ctx context.Context) error {
if c.client == nil {
return errors.New("storage client is nil")
}

c.mu.Lock()
if len(c.pending) == 0 {
c.mu.Unlock()
return nil
}
snapshot := c.pending
// Setting c.pending to an empty map to avoid unnecessary writes when there are no pending updates
// to be flushed to the disk.
c.pending = make(map[string]string)
c.mu.Unlock()

failed := false
for key, rv := range snapshot {
if err := c.client.Set(ctx, key, []byte(rv)); err != nil {
c.logger.Error("failed to flush checkpoint",
zap.String("key", key),
zap.String("resourceVersion", rv),
zap.Error(err))
failed = true
continue
}
c.logger.Debug("flushed resourceVersion checkpoint",
zap.String("key", key),
zap.String("resourceVersion", rv))
}
Comment thread
dhruv-shah-sumo marked this conversation as resolved.
if failed {
return errors.New("one or more checkpoints failed to be stored")
Comment thread
dhruv-shah-sumo marked this conversation as resolved.
}
return nil
}

// DeleteCheckpoint deletes the persisted checkpoint for a given namespace and object type.
// This is used when the persisted resourceVersion is no longer valid (e.g., after a 410 Gone error).
func (c *checkpointer) DeleteCheckpoint(
Comment thread
ChrsMark marked this conversation as resolved.
ctx context.Context,
namespace, objectType string,
) error {
if c.client == nil {
return errors.New("storage client is nil")
}

key := c.getCheckpointKey(namespace, objectType)
if key == "" {
return fmt.Errorf("checkpoint key is empty: %s, %s", namespace, objectType)
}

if err := c.client.Delete(ctx, key); err != nil {
return fmt.Errorf("failed to delete resourceVersion with key %s: %w", key, err)
}

c.logger.Debug("Checkpoint deleted with key: "+key,
zap.String("namespace", namespace),
zap.String("objectType", objectType))

return nil
}

// getCheckpointKey generates a unique storage key
// returns resourceVersion key for global watch stream (without namespace) or
// per namespace watch stream.
func (*checkpointer) getCheckpointKey(namespace, objectType string) string {
// when watch stream is cluster-wide or cluster-scoped resource (no namespace),
// the resource version is persisted per object type only.
if namespace == "" {
// example: latestResourceVersion/nodes, latestResourceVersion/namespaces
return fmt.Sprintf(checkpointKeyFormat, objectType)
}

// when watch stream is created per namespace, the resource version is persisted
// per object type per namespace.
// example: latestResourceVersion/pods.default, latestResourceVersion/configmaps.kube-system
return fmt.Sprintf("%s.%s", fmt.Sprintf(checkpointKeyFormat, objectType), namespace)
}
Loading
Loading