Skip to content

Commit e235588

Browse files
Refactor resourceVersion persistence with prioritized checkpoint loading
- Refactored getResourceVersion() to prioritize persisted checkpoints over config - Added DeleteCheckpoint() method to handle 410 Gone errors gracefully - Renamed methods from *ResourceVersion to *Checkpoint for consistency - Extracted fetchListResourceVersion() helper to reduce code duplication - Added comprehensive test coverage for checkpoint deletion - Removed unused compareResourceVersions test Priority order: 1. Persisted checkpoint (if checkpointer exists) 2. Config resourceVersion (if no checkpointer) 3. List resourceVersion (fallback) Co-Authored-By: Dhruv Shah <dhruv.shah@sumologic.com>
1 parent a88975c commit e235588

4 files changed

Lines changed: 265 additions & 176 deletions

File tree

internal/k8sinventory/watch/checkpointer.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@ func newCheckpointer(client storage.Client, logger *zap.Logger) *checkpointer {
2323
}
2424
}
2525

26-
func (c *checkpointer) GetResourceVersion(ctx context.Context,
26+
func (c *checkpointer) GetCheckpoint(ctx context.Context,
2727
namespace, objectType string) (string, error) {
2828
if c.client == nil {
2929
return "", errors.New("storage client is nil")
3030
}
3131

3232
checkPointKey := c.getCheckpointKey(namespace, objectType)
33+
c.logger.Debug("Retrieving checkpoint, key: " + checkPointKey,
34+
zap.String("namespace", namespace),
35+
zap.String("objectType", objectType))
3336
data, err := c.client.Get(ctx, checkPointKey)
3437
if err != nil {
3538
c.logger.Warn("Error retrieving checkpoint",
@@ -48,7 +51,7 @@ func (c *checkpointer) GetResourceVersion(ctx context.Context,
4851
return string(data), nil
4952
}
5053

51-
func (c *checkpointer) SetResourceVersion(
54+
func (c *checkpointer) SetCheckpoint(
5255
ctx context.Context,
5356
namespace, objectType, resourceVersion string) error {
5457
if c.client == nil {
@@ -64,7 +67,32 @@ func (c *checkpointer) SetResourceVersion(
6467
return fmt.Errorf("failed to store resourceVersion with key %s: %w", key, err)
6568
}
6669

67-
c.logger.Debug("Checkpoint saved",
70+
c.logger.Debug("Checkpoint saved with key: " + key + " value: " + resourceVersion,
71+
zap.String("namespace", namespace),
72+
zap.String("objectType", objectType))
73+
74+
return nil
75+
}
76+
77+
// DeleteCheckpoint deletes the persisted checkpoint for a given namespace and object type.
78+
// This is used when the persisted resourceVersion is no longer valid (e.g., after a 410 Gone error).
79+
func (c *checkpointer) DeleteCheckpoint(
80+
ctx context.Context,
81+
namespace, objectType string) error {
82+
if c.client == nil {
83+
return errors.New("storage client is nil")
84+
}
85+
86+
key := c.getCheckpointKey(namespace, objectType)
87+
if key == "" {
88+
return fmt.Errorf("checkpoint key is empty: %s, %s", namespace, objectType)
89+
}
90+
91+
if err := c.client.Delete(ctx, key); err != nil {
92+
return fmt.Errorf("failed to delete resourceVersion with key %s: %w", key, err)
93+
}
94+
95+
c.logger.Debug("Checkpoint deleted with key: " + key,
6896
zap.String("namespace", namespace),
6997
zap.String("objectType", objectType))
7098

internal/k8sinventory/watch/checkpointer_test.go

Lines changed: 107 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ func TestCheckpointerGetAndSet(t *testing.T) {
2222
ctx := context.Background()
2323

2424
// Test Set
25-
err := checkpointer.SetResourceVersion(ctx, "default", "pods", "12345")
25+
err := checkpointer.SetCheckpoint(ctx, "default", "pods", "12345")
2626
require.NoError(t, err)
2727

2828
// Test Get
29-
rv, err := checkpointer.GetResourceVersion(ctx, "default", "pods")
29+
rv, err := checkpointer.GetCheckpoint(ctx, "default", "pods")
3030
require.NoError(t, err)
3131
assert.Equal(t, "12345", rv)
3232
}
@@ -84,11 +84,11 @@ func TestCheckpointerKeyFormat(t *testing.T) {
8484
for _, tt := range tests {
8585
t.Run(tt.name, func(t *testing.T) {
8686
// Set the resourceVersion
87-
err := checkpointer.SetResourceVersion(ctx, tt.namespace, tt.objectType, tt.resourceVersion)
87+
err := checkpointer.SetCheckpoint(ctx, tt.namespace, tt.objectType, tt.resourceVersion)
8888
require.NoError(t, err)
8989

9090
// Verify the key format by getting it back
91-
rv, err := checkpointer.GetResourceVersion(ctx, tt.namespace, tt.objectType)
91+
rv, err := checkpointer.GetCheckpoint(ctx, tt.namespace, tt.objectType)
9292
require.NoError(t, err)
9393
assert.Equal(t, tt.resourceVersion, rv)
9494

@@ -106,7 +106,7 @@ func TestCheckpointerGetNonExistent(t *testing.T) {
106106
ctx := context.Background()
107107

108108
// Get non-existent key
109-
rv, err := checkpointer.GetResourceVersion(ctx, "default", "pods")
109+
rv, err := checkpointer.GetCheckpoint(ctx, "default", "pods")
110110
require.NoError(t, err)
111111
assert.Equal(t, "", rv)
112112
}
@@ -118,15 +118,15 @@ func TestCheckpointerUpdate(t *testing.T) {
118118
ctx := context.Background()
119119

120120
// Set initial value
121-
err := checkpointer.SetResourceVersion(ctx, "default", "pods", "100")
121+
err := checkpointer.SetCheckpoint(ctx, "default", "pods", "100")
122122
require.NoError(t, err)
123123

124124
// Update to new value
125-
err = checkpointer.SetResourceVersion(ctx, "default", "pods", "200")
125+
err = checkpointer.SetCheckpoint(ctx, "default", "pods", "200")
126126
require.NoError(t, err)
127127

128128
// Verify updated value
129-
rv, err := checkpointer.GetResourceVersion(ctx, "default", "pods")
129+
rv, err := checkpointer.GetCheckpoint(ctx, "default", "pods")
130130
require.NoError(t, err)
131131
assert.Equal(t, "200", rv)
132132
}
@@ -138,18 +138,18 @@ func TestCheckpointerMultipleNamespaces(t *testing.T) {
138138
ctx := context.Background()
139139

140140
// Set different versions for different namespaces
141-
err := checkpointer.SetResourceVersion(ctx, "default", "pods", "100")
141+
err := checkpointer.SetCheckpoint(ctx, "default", "pods", "100")
142142
require.NoError(t, err)
143143

144-
err = checkpointer.SetResourceVersion(ctx, "kube-system", "pods", "200")
144+
err = checkpointer.SetCheckpoint(ctx, "kube-system", "pods", "200")
145145
require.NoError(t, err)
146146

147147
// Verify they are independent
148-
rv1, err := checkpointer.GetResourceVersion(ctx, "default", "pods")
148+
rv1, err := checkpointer.GetCheckpoint(ctx, "default", "pods")
149149
require.NoError(t, err)
150150
assert.Equal(t, "100", rv1)
151151

152-
rv2, err := checkpointer.GetResourceVersion(ctx, "kube-system", "pods")
152+
rv2, err := checkpointer.GetCheckpoint(ctx, "kube-system", "pods")
153153
require.NoError(t, err)
154154
assert.Equal(t, "200", rv2)
155155
}
@@ -160,12 +160,105 @@ func TestCheckpointerNilClient(t *testing.T) {
160160
ctx := context.Background()
161161

162162
// Get with nil client should return error
163-
_, err := checkpointer.GetResourceVersion(ctx, "default", "pods")
163+
_, err := checkpointer.GetCheckpoint(ctx, "default", "pods")
164164
assert.Error(t, err)
165165
assert.Contains(t, err.Error(), "storage client is nil")
166166

167167
// Set with nil client should return error
168-
err = checkpointer.SetResourceVersion(ctx, "default", "pods", "100")
168+
err = checkpointer.SetCheckpoint(ctx, "default", "pods", "100")
169169
assert.Error(t, err)
170170
assert.Contains(t, err.Error(), "storage client is nil")
171+
172+
// Delete with nil client should return error
173+
err = checkpointer.DeleteCheckpoint(ctx, "default", "pods")
174+
assert.Error(t, err)
175+
assert.Contains(t, err.Error(), "storage client is nil")
176+
}
177+
178+
func TestCheckpointerDelete(t *testing.T) {
179+
client := storagetest.NewInMemoryClient(component.KindReceiver, component.MustNewID("test"), "test")
180+
checkpointer := newCheckpointer(client, zap.NewNop())
181+
182+
ctx := context.Background()
183+
184+
// Set a value
185+
err := checkpointer.SetCheckpoint(ctx, "default", "pods", "12345")
186+
require.NoError(t, err)
187+
188+
// Verify it exists
189+
rv, err := checkpointer.GetCheckpoint(ctx, "default", "pods")
190+
require.NoError(t, err)
191+
assert.Equal(t, "12345", rv)
192+
193+
// Delete it
194+
err = checkpointer.DeleteCheckpoint(ctx, "default", "pods")
195+
require.NoError(t, err)
196+
197+
// Verify it's gone (Get should return empty string)
198+
rv, err = checkpointer.GetCheckpoint(ctx, "default", "pods")
199+
require.NoError(t, err)
200+
assert.Equal(t, "", rv)
201+
}
202+
203+
func TestCheckpointerDeleteNonExistent(t *testing.T) {
204+
client := storagetest.NewInMemoryClient(component.KindReceiver, component.MustNewID("test"), "test")
205+
checkpointer := newCheckpointer(client, zap.NewNop())
206+
207+
ctx := context.Background()
208+
209+
// Delete non-existent key should not error
210+
err := checkpointer.DeleteCheckpoint(ctx, "default", "pods")
211+
require.NoError(t, err)
212+
}
213+
214+
func TestCheckpointerDeleteMultipleNamespaces(t *testing.T) {
215+
client := storagetest.NewInMemoryClient(component.KindReceiver, component.MustNewID("test"), "test")
216+
checkpointer := newCheckpointer(client, zap.NewNop())
217+
218+
ctx := context.Background()
219+
220+
// Set values for different namespaces
221+
err := checkpointer.SetCheckpoint(ctx, "default", "pods", "100")
222+
require.NoError(t, err)
223+
224+
err = checkpointer.SetCheckpoint(ctx, "kube-system", "pods", "200")
225+
require.NoError(t, err)
226+
227+
// Delete only one namespace
228+
err = checkpointer.DeleteCheckpoint(ctx, "default", "pods")
229+
require.NoError(t, err)
230+
231+
// Verify only default was deleted
232+
rv1, err := checkpointer.GetCheckpoint(ctx, "default", "pods")
233+
require.NoError(t, err)
234+
assert.Equal(t, "", rv1) // Should be empty
235+
236+
rv2, err := checkpointer.GetCheckpoint(ctx, "kube-system", "pods")
237+
require.NoError(t, err)
238+
assert.Equal(t, "200", rv2) // Should still exist
239+
}
240+
241+
func TestCheckpointerDeleteClusterWide(t *testing.T) {
242+
client := storagetest.NewInMemoryClient(component.KindReceiver, component.MustNewID("test"), "test")
243+
checkpointer := newCheckpointer(client, zap.NewNop())
244+
245+
ctx := context.Background()
246+
247+
// Set cluster-wide resource (empty namespace)
248+
err := checkpointer.SetCheckpoint(ctx, "", "nodes", "500")
249+
require.NoError(t, err)
250+
251+
// Verify it exists
252+
rv, err := checkpointer.GetCheckpoint(ctx, "", "nodes")
253+
require.NoError(t, err)
254+
assert.Equal(t, "500", rv)
255+
256+
// Delete it
257+
err = checkpointer.DeleteCheckpoint(ctx, "", "nodes")
258+
require.NoError(t, err)
259+
260+
// Verify it's gone
261+
rv, err = checkpointer.GetCheckpoint(ctx, "", "nodes")
262+
require.NoError(t, err)
263+
assert.Equal(t, "", rv)
171264
}

0 commit comments

Comments
 (0)