Skip to content

Commit 75ed7ba

Browse files
Merge pull request #1082 from fabriziosestito/fix/storage-watcher-race-condition
fix(storage): avoid dropping ADD events after a concurrent store delete
2 parents 98fd3ae + 68934ad commit 75ed7ba

2 files changed

Lines changed: 153 additions & 20 deletions

File tree

internal/storage/store_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package storage
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"fmt"
78
"log/slog"
@@ -466,6 +467,115 @@ func (suite *storeTestSuite) TestWatchList() {
466467
suite.NotEmpty(bookmarkObj.ResourceVersion)
467468
}
468469

470+
// When a non-Deleted event arrives for an object the store no longer holds, the watcher must still broadcast the payload.
471+
func (suite *storeTestSuite) TestHandleMessageNotFoundStillBroadcasts() {
472+
ctx, cancel := context.WithCancel(suite.T().Context())
473+
defer cancel()
474+
475+
w, err := suite.broadcaster.Watch()
476+
suite.Require().NoError(err)
477+
defer w.Stop()
478+
479+
go suite.watcher.Start(ctx)
480+
suite.Require().NoError(suite.nc.Flush())
481+
482+
sbom := &storagev1alpha1.SBOM{
483+
ObjectMeta: metav1.ObjectMeta{
484+
Name: "ghost",
485+
Namespace: "default",
486+
},
487+
}
488+
sbomBytes, err := json.Marshal(sbom)
489+
suite.Require().NoError(err)
490+
491+
payload, err := json.Marshal(event{
492+
EventType: watch.Added,
493+
Object: runtime.RawExtension{Raw: sbomBytes},
494+
})
495+
suite.Require().NoError(err)
496+
497+
suite.Require().NoError(suite.nc.Publish("watch.sboms", payload))
498+
suite.Require().NoError(suite.nc.Flush())
499+
500+
events := mustReadEvents(suite.T(), w, 1)
501+
suite.Equal(watch.Added, events[0].Type)
502+
503+
got, ok := events[0].Object.(*storagev1alpha1.SBOM)
504+
suite.Require().True(ok)
505+
suite.Equal("ghost", got.Name)
506+
suite.Equal("default", got.Namespace)
507+
}
508+
509+
// When the store holds a different object at the same namespace/name, the watcher must broadcast the payload, not the refetched object.
510+
func (suite *storeTestSuite) TestHandleMessageUIDMismatchBroadcastsPayload() {
511+
ctx, cancel := context.WithCancel(suite.T().Context())
512+
defer cancel()
513+
514+
w, err := suite.broadcaster.Watch()
515+
suite.Require().NoError(err)
516+
defer w.Stop()
517+
518+
go suite.watcher.Start(ctx)
519+
suite.Require().NoError(suite.nc.Flush())
520+
521+
// Populate the store at default/collide. The stale event published next will carry a different UID for the same namespace/name.
522+
storedUID := types.UID("stored-uid")
523+
stored := &storagev1alpha1.SBOM{
524+
ObjectMeta: metav1.ObjectMeta{
525+
Name: "collide",
526+
Namespace: "default",
527+
UID: storedUID,
528+
},
529+
SPDX: runtime.RawExtension{Raw: []byte(`{"stored": true}`)},
530+
}
531+
key := keyPrefix + "/default/collide"
532+
err = suite.store.Create(ctx, key, stored, &storagev1alpha1.SBOM{}, 0)
533+
suite.Require().NoError(err)
534+
535+
// Drain the ADDED event produced by the Create above
536+
created := mustReadEvents(suite.T(), w, 1)
537+
suite.Equal(watch.Added, created[0].Type)
538+
createdSBOM, ok := created[0].Object.(*storagev1alpha1.SBOM)
539+
suite.Require().True(ok)
540+
suite.Equal(storedUID, createdSBOM.UID)
541+
542+
// Publish a stale ADDED carrying a different UID at the same namespace/name
543+
staleUID := types.UID("stale-uid")
544+
suite.Require().NotEqual(storedUID, staleUID)
545+
546+
stale := &storagev1alpha1.SBOM{
547+
ObjectMeta: metav1.ObjectMeta{
548+
Name: "collide",
549+
Namespace: "default",
550+
UID: staleUID,
551+
},
552+
}
553+
staleBytes, err := json.Marshal(stale)
554+
suite.Require().NoError(err)
555+
556+
payload, err := json.Marshal(event{
557+
EventType: watch.Added,
558+
Object: runtime.RawExtension{Raw: staleBytes},
559+
})
560+
suite.Require().NoError(err)
561+
562+
suite.Require().NoError(suite.nc.Publish("watch.sboms", payload))
563+
suite.Require().NoError(suite.nc.Flush())
564+
565+
events := mustReadEvents(suite.T(), w, 1)
566+
suite.Equal(watch.Added, events[0].Type)
567+
568+
got, ok := events[0].Object.(*storagev1alpha1.SBOM)
569+
suite.Require().True(ok)
570+
suite.Equal(staleUID, got.UID, "expected payload UID, not stored UID")
571+
572+
// Stored object should still be intact and retrievable with its own UID
573+
fetched := &storagev1alpha1.SBOM{}
574+
err = suite.store.Get(ctx, key, k8sstorage.GetOptions{}, fetched)
575+
suite.Require().NoError(err)
576+
suite.Equal(storedUID, fetched.UID)
577+
}
578+
469579
func (suite *storeTestSuite) TestGetList() {
470580
key := keyPrefix + "/default"
471581
sbom1 := storagev1alpha1.SBOM{

internal/storage/watcher.go

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -90,32 +90,18 @@ func (w *natsWatcher) handleMessage(ctx context.Context, msg *nats.Msg) error {
9090
}
9191

9292
obj := w.store.newFunc()
93-
err := json.Unmarshal(payload.Object.Raw, obj)
94-
if err != nil {
93+
if err := json.Unmarshal(payload.Object.Raw, obj); err != nil {
9594
return fmt.Errorf("failed to decode object: %w", err)
9695
}
9796

98-
// NOTE: For deleted events, we broadcast the event without re-fetching the object from the store.
99-
// This is because the object was deleted, and we are not able to fetch it again.
100-
// The object might not be the complete object, as the transforum function may have been applied.
97+
// For deleted events broadcast the payload directly since the store no longer has it.
98+
// Otherwise rehydrate fields stripped by the publish-side transform (e.g. SBOM.SPDX) from the store's current state.
10199
if payload.EventType != watch.Deleted {
102-
metaAccessor, err := meta.Accessor(obj)
100+
rehydrated, err := w.rehydrate(ctx, obj)
103101
if err != nil {
104-
return fmt.Errorf("failed to get meta accessor: %w", err)
105-
}
106-
key := fmt.Sprintf("%s/%s/%s/%s", storagev1alpha1.GroupName, w.resource, metaAccessor.GetNamespace(), metaAccessor.GetName())
107-
108-
if err := w.store.Get(ctx, key, storage.GetOptions{}, obj); err != nil {
109-
if storage.IsNotFound(err) {
110-
// Object not found, possibly deleted after the event was sent.
111-
w.logger.DebugContext(ctx, "Object not found in store while handling message, skipping",
112-
"key", key,
113-
)
114-
115-
return nil
116-
}
117-
return fmt.Errorf("failed to get object from store while handling message: %w", err)
102+
return err
118103
}
104+
obj = rehydrated
119105
}
120106

121107
if err := w.watchBroadcaster.Action(payload.EventType, obj); err != nil {
@@ -129,6 +115,43 @@ func (w *natsWatcher) handleMessage(ctx context.Context, msg *nats.Msg) error {
129115
return nil
130116
}
131117

118+
// rehydrate returns the stored object matching the payload, or the payload itself when the store does not hold a matching object.
119+
// Falling back to the payload (rather than dropping the event) keeps downstream watchers like the GC dependency graph builder consistent. The following DELETED event reconciles client state.
120+
func (w *natsWatcher) rehydrate(ctx context.Context, payloadObj runtime.Object) (runtime.Object, error) {
121+
payloadAccessor, err := meta.Accessor(payloadObj)
122+
if err != nil {
123+
return nil, fmt.Errorf("failed to get meta accessor: %w", err)
124+
}
125+
key := fmt.Sprintf("%s/%s/%s/%s", storagev1alpha1.GroupName, w.resource, payloadAccessor.GetNamespace(), payloadAccessor.GetName())
126+
127+
fetched := w.store.newFunc()
128+
if err := w.store.Get(ctx, key, storage.GetOptions{}, fetched); err != nil {
129+
if !storage.IsNotFound(err) {
130+
return nil, fmt.Errorf("failed to get object from store while handling message: %w", err)
131+
}
132+
w.logger.DebugContext(ctx, "Object not found in store while handling message; broadcasting payload",
133+
"key", key,
134+
)
135+
return payloadObj, nil
136+
}
137+
138+
fetchedAccessor, err := meta.Accessor(fetched)
139+
if err != nil {
140+
return nil, fmt.Errorf("failed to get meta accessor for fetched object: %w", err)
141+
}
142+
if fetchedAccessor.GetUID() != payloadAccessor.GetUID() {
143+
// Same namespace/name, different object: it was deleted and recreated before we got here.
144+
w.logger.DebugContext(ctx, "Stored object UID does not match payload; broadcasting payload",
145+
"key", key,
146+
"payloadUID", payloadAccessor.GetUID(),
147+
"storedUID", fetchedAccessor.GetUID(),
148+
)
149+
return payloadObj, nil
150+
}
151+
152+
return fetched, nil
153+
}
154+
132155
// natsBroadcaster broadcasts watch events using NATS.
133156
type natsBroadcaster struct {
134157
nc *nats.Conn

0 commit comments

Comments
 (0)