Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 inmemory: fix watch to continue serving based on resourceVersion parameter #11695

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
8 changes: 5 additions & 3 deletions test/infrastructure/inmemory/pkg/runtime/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ type resourceGroupTracker struct {
lock sync.RWMutex
objects map[schema.GroupVersionKind]map[types.NamespacedName]client.Object
// ownedObjects tracks ownership. Key is the owner, values are the owned objects.
ownedObjects map[ownReference]map[ownReference]struct{}
ownedObjects map[ownReference]map[ownReference]struct{}
nextResourceVersion uint64
}

type ownReference struct {
Expand Down Expand Up @@ -159,8 +160,9 @@ func (c *cache) AddResourceGroup(name string) {
return
}
c.resourceGroups[name] = &resourceGroupTracker{
objects: map[schema.GroupVersionKind]map[types.NamespacedName]client.Object{},
ownedObjects: map[ownReference]map[ownReference]struct{}{},
objects: map[schema.GroupVersionKind]map[types.NamespacedName]client.Object{},
ownedObjects: map[ownReference]map[ownReference]struct{}{},
nextResourceVersion: 1,
}
}

Expand Down
6 changes: 3 additions & 3 deletions test/infrastructure/inmemory/pkg/runtime/cache/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (c *cache) store(resourceGroup string, obj client.Object, replaceExisting b
return apierrors.NewConflict(unsafeGuessGroupVersionResource(objGVK).GroupResource(), objKey.String(), fmt.Errorf("object has been modified"))
}

c.beforeUpdate(resourceGroup, trackedObj, obj)
c.beforeUpdate(resourceGroup, trackedObj, obj, &tracker.nextResourceVersion)

tracker.objects[objGVK][objKey] = obj.DeepCopyObject().(client.Object)
updateTrackerOwnerReferences(tracker, trackedObj, obj, objRef)
Expand All @@ -226,7 +226,7 @@ func (c *cache) store(resourceGroup string, obj client.Object, replaceExisting b
return apierrors.NewNotFound(unsafeGuessGroupVersionResource(objGVK).GroupResource(), objKey.String())
}

c.beforeCreate(resourceGroup, obj)
c.beforeCreate(resourceGroup, obj, &tracker.nextResourceVersion)

tracker.objects[objGVK][objKey] = obj.DeepCopyObject().(client.Object)
updateTrackerOwnerReferences(tracker, nil, obj, objRef)
Expand Down Expand Up @@ -422,7 +422,7 @@ func (c *cache) doTryDeleteLocked(resourceGroup string, tracker *resourceGroupTr
oldObj := obj.DeepCopyObject().(client.Object)
now := metav1.Time{Time: time.Now().UTC()}
obj.SetDeletionTimestamp(&now)
c.beforeUpdate(resourceGroup, oldObj, obj)
c.beforeUpdate(resourceGroup, oldObj, obj, &tracker.nextResourceVersion)

objects[objKey] = obj
c.afterUpdate(resourceGroup, oldObj, obj)
Expand Down
8 changes: 6 additions & 2 deletions test/infrastructure/inmemory/pkg/runtime/cache/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func Test_cache_client(t *testing.T) {
r := c.resourceGroups["foo"].objects[cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)][key]
g.Expect(r.GetObjectKind().GroupVersionKind()).To(BeComparableTo(cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)), "gvk must be set")
g.Expect(r.GetName()).To(Equal("bar"), "name must be equal to object tracker key")
g.Expect(r.GetResourceVersion()).To(Equal("v1"), "resourceVersion must be set")
g.Expect(r.GetResourceVersion()).To(Equal("0"), "resourceVersion must be set")
g.Expect(r.GetCreationTimestamp()).ToNot(BeZero(), "creation timestamp must be set")
g.Expect(r.GetAnnotations()).To(HaveKey(lastSyncTimeAnnotation), "last sync annotation must exists")

Expand Down Expand Up @@ -271,7 +271,7 @@ func Test_cache_client(t *testing.T) {
// Check all the computed fields are as expected.
g.Expect(obj.GetObjectKind().GroupVersionKind()).To(BeComparableTo(cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)), "gvk must be set")
g.Expect(obj.GetName()).To(Equal("bar"), "name must be equal to object tracker key")
g.Expect(obj.GetResourceVersion()).To(Equal("v1"), "resourceVersion must be set")
g.Expect(obj.GetResourceVersion()).To(Equal("1"), "resourceVersion must be set")
g.Expect(obj.GetCreationTimestamp()).ToNot(BeZero(), "creation timestamp must be set")
g.Expect(obj.GetAnnotations()).To(HaveKey(lastSyncTimeAnnotation), "last sync annotation must be set")
})
Expand Down Expand Up @@ -435,6 +435,8 @@ func Test_cache_client(t *testing.T) {
g.Expect(objBefore.GetResourceVersion()).ToNot(Equal(objUpdate.GetResourceVersion()), "Object version must be changed")
objBefore.SetResourceVersion(objUpdate.GetResourceVersion())
objBefore.Labels = objUpdate.Labels
g.Expect(objUpdate.GetGeneration()).To(Equal(objBefore.GetGeneration()+1), "Object Generation must increment")
objBefore.Generation = objUpdate.GetGeneration()
g.Expect(objBefore).To(BeComparableTo(objUpdate), "everything else must be the same")

g.Expect(h.Events()).To(ContainElement("foo, CloudMachine=baz, Updated"))
Expand Down Expand Up @@ -670,6 +672,8 @@ func Test_cache_client(t *testing.T) {
g.Expect(objBefore.GetResourceVersion()).ToNot(Equal(objAfterUpdate.GetResourceVersion()), "Object version must be changed")
objBefore.SetResourceVersion(objAfterUpdate.GetResourceVersion())
objBefore.Labels = objAfterUpdate.Labels
g.Expect(objAfterUpdate.GetGeneration()).To(Equal(objBefore.GetGeneration()+1), "Object Generation must increment")
objBefore.Generation = objAfterUpdate.GetGeneration()
g.Expect(objBefore).To(BeComparableTo(objAfterUpdate), "everything else must be the same")

g.Expect(h.Events()).To(ContainElement("foo, CloudMachine=baz, Deleted"))
Expand Down
16 changes: 9 additions & 7 deletions test/infrastructure/inmemory/pkg/runtime/cache/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,30 @@ package cache
import (
"fmt"
"reflect"
"strconv"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (c *cache) beforeCreate(_ string, obj client.Object) {
func (c *cache) beforeCreate(_ string, obj client.Object, resourceVersion *uint64) {
now := time.Now().UTC()
obj.SetCreationTimestamp(metav1.Time{Time: now})
// TODO: UID
obj.SetAnnotations(appendAnnotations(obj, lastSyncTimeAnnotation, now.Format(time.RFC3339)))
obj.SetResourceVersion(fmt.Sprintf("v%d", 1))
obj.SetResourceVersion(fmt.Sprintf("%d", *resourceVersion))
obj.SetGeneration(1)
*resourceVersion++
}

func (c *cache) afterCreate(resourceGroup string, obj client.Object) {
c.informCreate(resourceGroup, obj)
}

func (c *cache) beforeUpdate(_ string, oldObj, newObj client.Object) {
func (c *cache) beforeUpdate(_ string, oldObj, newObj client.Object, resourceVersion *uint64) {
newObj.SetCreationTimestamp(oldObj.GetCreationTimestamp())
newObj.SetResourceVersion(oldObj.GetResourceVersion())
newObj.SetGeneration(oldObj.GetGeneration())
// TODO: UID
newObj.SetAnnotations(appendAnnotations(newObj, lastSyncTimeAnnotation, oldObj.GetAnnotations()[lastSyncTimeAnnotation]))
if !oldObj.GetDeletionTimestamp().IsZero() {
Expand All @@ -51,8 +52,9 @@ func (c *cache) beforeUpdate(_ string, oldObj, newObj client.Object) {
now := time.Now().UTC()
newObj.SetAnnotations(appendAnnotations(newObj, lastSyncTimeAnnotation, now.Format(time.RFC3339)))

oldResourceVersion, _ := strconv.Atoi(strings.TrimPrefix(oldObj.GetResourceVersion(), "v"))
newObj.SetResourceVersion(fmt.Sprintf("v%d", oldResourceVersion+1))
newObj.SetResourceVersion(fmt.Sprintf("%d", *resourceVersion))
newObj.SetGeneration(oldObj.GetGeneration() + 1)
*resourceVersion++
}
}

Expand Down
40 changes: 26 additions & 14 deletions test/infrastructure/inmemory/pkg/server/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

inmemoryruntime "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/pkg/runtime"
inmemoryclient "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/pkg/runtime/client"
inmemoryportforward "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/pkg/server/api/portforward"
)

Expand Down Expand Up @@ -315,6 +316,25 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
return
}

h.log.V(3).Info(fmt.Sprintf("Serving List for %v", req.Request.URL), "resourceGroup", resourceGroup)

list, err := h.apiV1list(ctx, req, *gvk, inmemoryClient)
if err != nil {
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
_ = resp.WriteHeaderAndEntity(int(status.Status().Code), status)
return
}
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}

if err := resp.WriteEntity(list); err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
}

func (h *apiServerHandler) apiV1list(ctx context.Context, req *restful.Request, gvk schema.GroupVersionKind, inmemoryClient inmemoryclient.Client) (*unstructured.UnstructuredList, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be

Suggested change
func (h *apiServerHandler) apiV1list(ctx context.Context, req *restful.Request, gvk schema.GroupVersionKind, inmemoryClient inmemoryclient.Client) (*unstructured.UnstructuredList, error) {
func (h *apiServerHandler) v1List(ctx context.Context, req *restful.Request, gvk schema.GroupVersionKind, inmemoryClient inmemoryclient.Client) (*unstructured.UnstructuredList, error) {

(easier to distinguish from apiV1List with upper L)

// Reads and returns the requested data.
list := &unstructured.UnstructuredList{}
list.SetAPIVersion(gvk.GroupVersion().String())
Expand All @@ -328,33 +348,23 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
// TODO: The only field Selector which works is for `spec.nodeName` on pods.
fieldSelector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
return nil, err
}
if fieldSelector != nil {
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: fieldSelector})
}

labelSelector, err := labels.Parse(req.QueryParameter("labelSelector"))
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
return nil, err
}
if labelSelector != nil {
listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector})
}
if err := inmemoryClient.List(ctx, list, listOpts...); err != nil {
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
_ = resp.WriteHeaderAndEntity(int(status.Status().Code), status)
return
}
_ = resp.WriteHeaderAndEntity(http.StatusInternalServerError, err.Error())
return
}
if err := resp.WriteEntity(list); err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
return nil, err
}
return list, nil
}

func (h *apiServerHandler) apiV1Watch(req *restful.Request, resp *restful.Response) {
Expand All @@ -372,6 +382,8 @@ func (h *apiServerHandler) apiV1Watch(req *restful.Request, resp *restful.Respon
return
}

h.log.V(3).Info(fmt.Sprintf("Serving Watch for %v", req.Request.URL), "resourceGroup", resourceGroup)

// If the request is a Watch handle it using watchForResource.
err = h.watchForResource(req, resp, resourceGroup, *gvk)
if err != nil {
Expand Down
Loading
Loading