Skip to content

Commit 9744eea

Browse files
receiver/k8sobjects: add persist_resource_version support for watch mode
Adds periodic checkpoint flushing to persist the latest resourceVersion to storage every 5 seconds instead of writing on every event, reducing storage I/O overhead. Key changes: - Add buffered checkpointer with in-memory pending map; Flush() drains to persistent storage and is a no-op when nothing has changed - Enforce high-watermark in SetCheckpoint: only update if new rv is numerically greater, guarding against out-of-order List() responses - Add startCheckpointFlusher() to encapsulate ticker + setLatestRV + force-flush after sendInitialState to prevent 410 race window - Filter already-seen objects in sendInitialState using persisted rv - Delete persisted rv on 410 Gone to avoid replaying a stale checkpoint - Validate that resource_version and persist_resource_version are not set together in config - Fix README example to use persistentVolumeClaim instead of emptyDir Co-authored-by: Dhruv Shah <dhruv.shah@sumologic.com>
1 parent 04802a4 commit 9744eea

11 files changed

Lines changed: 462 additions & 191 deletions

File tree

internal/k8sinventory/go.sum

Lines changed: 20 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/k8sinventory/watch/checkpointer.go

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"strconv"
11+
"sync"
1012

1113
"go.opentelemetry.io/collector/extension/xextension/storage"
1214
"go.uber.org/zap"
@@ -15,14 +17,20 @@ import (
1517
type checkpointer struct {
1618
client storage.Client
1719
logger *zap.Logger
20+
21+
// pending holds the latest resourceVersion per storage key, buffered in
22+
// memory across all watch streams. Flush() drains it to persistent storage.
23+
mu sync.Mutex
24+
pending map[string]string
1825
}
1926

2027
const checkpointKeyFormat = "latestResourceVersion/%s"
2128

2229
func newCheckpointer(client storage.Client, logger *zap.Logger) *checkpointer {
2330
return &checkpointer{
24-
client: client,
25-
logger: logger,
31+
client: client,
32+
logger: logger,
33+
pending: make(map[string]string),
2634
}
2735
}
2836

@@ -53,27 +61,70 @@ func (c *checkpointer) GetCheckpoint(ctx context.Context, namespace, objectType
5361
return string(data), nil
5462
}
5563

64+
// SetCheckpoint buffers the latest resourceVersion for the given namespace and
65+
// objectType in memory. Call Flush to persist all buffered values to storage.
66+
// Only updates the in-memory value if the new resourceVersion is numerically
67+
// greater than the current one, acting as a high-watermark. This guards against
68+
// out-of-order resourceVersions from List() responses (which are ordered by
69+
// object key, not by resourceVersion).
5670
func (c *checkpointer) SetCheckpoint(
57-
ctx context.Context,
71+
_ context.Context,
5872
namespace, objectType, resourceVersion string,
5973
) error {
60-
if c.client == nil {
61-
return errors.New("storage client is nil")
62-
}
63-
6474
key := c.getCheckpointKey(namespace, objectType)
6575
if key == "" {
6676
return fmt.Errorf("checkpoint key is empty: %s, %s", namespace, objectType)
6777
}
6878

69-
if err := c.client.Set(ctx, key, []byte(resourceVersion)); err != nil {
70-
return fmt.Errorf("failed to store resourceVersion with key %s: %w", key, err)
79+
newRV, err := strconv.ParseInt(resourceVersion, 10, 64)
80+
if err != nil {
81+
return fmt.Errorf("invalid resourceVersion %q: %w", resourceVersion, err)
7182
}
7283

73-
c.logger.Debug("Checkpoint saved with key: "+key+" value: "+resourceVersion,
74-
zap.String("namespace", namespace),
75-
zap.String("objectType", objectType))
84+
c.mu.Lock()
85+
if existing, ok := c.pending[key]; ok {
86+
if existingRV, err := strconv.ParseInt(existing, 10, 64); err == nil && newRV <= existingRV {
87+
c.mu.Unlock()
88+
return nil
89+
}
90+
}
91+
c.pending[key] = resourceVersion
92+
c.mu.Unlock()
7693

94+
c.logger.Debug("buffered resourceVersion checkpoint",
95+
zap.String("key", key),
96+
zap.String("resourceVersion", resourceVersion))
97+
98+
return nil
99+
}
100+
101+
// Flush writes all buffered checkpoints to persistent storage. Only the latest
102+
// value per key is written, discarding any intermediate updates since the last
103+
// flush. It is safe to call concurrently from multiple goroutines.
104+
func (c *checkpointer) Flush(ctx context.Context) error {
105+
if c.client == nil {
106+
return errors.New("storage client is nil")
107+
}
108+
109+
c.mu.Lock()
110+
if len(c.pending) == 0 {
111+
c.mu.Unlock()
112+
return nil
113+
}
114+
snapshot := c.pending
115+
// Setting c.pending to an empty map to avoid unnecessary writes when there are no pending updates
116+
// to be flushed to the disk.
117+
c.pending = make(map[string]string)
118+
c.mu.Unlock()
119+
120+
for key, rv := range snapshot {
121+
if err := c.client.Set(ctx, key, []byte(rv)); err != nil {
122+
return fmt.Errorf("failed to flush checkpoint with key %s: %w", key, err)
123+
}
124+
c.logger.Debug("flushed resourceVersion checkpoint",
125+
zap.String("key", key),
126+
zap.String("resourceVersion", rv))
127+
}
77128
return nil
78129
}
79130

0 commit comments

Comments
 (0)