Skip to content

WIP: 🐛 cache/replication: fix comparison whether update is needed by removing meta fields #3171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions pkg/reconciler/cache/replication/replication_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ func (r *reconciler) reconcile(ctx context.Context, key string) error {
}

// update global copy and compare
metaChanged, err := ensureMeta(globalCopy, localCopy)
metaChanged, metaReason, err := ensureMeta(globalCopy, localCopy)
if err != nil {
return err
}
remainingChanged, err := ensureRemaining(globalCopy, localCopy)
remainingChanged, remainingReason, err := ensureRemaining(globalCopy, localCopy)
if err != nil {
return err
}
Expand All @@ -197,7 +197,14 @@ func (r *reconciler) reconcile(ctx context.Context, key string) error {
return nil
}

logger.V(2).WithValues("kind", globalCopy.GetKind(), "namespace", globalCopy.GetNamespace(), "name", globalCopy.GetName()).Info("Updating object in global cache")
reason := metaReason
if metaChanged && remainingChanged {
reason = fmt.Sprintf("%s; %s", metaReason, remainingReason)
} else if remainingChanged {
reason = remainingReason
}

logger.V(2).WithValues("kind", globalCopy.GetKind(), "namespace", globalCopy.GetNamespace(), "name", globalCopy.GetName(), "reason", reason).Info("Updating object in global cache")
_, err = r.updateObject(ctx, clusterName, globalCopy) // no need for patch because there is only this actor
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,57 +17,72 @@ limitations under the License.
package replication

import (
"context"
"fmt"
"reflect"

"github.com/google/go-cmp/cmp"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"
)

// ensureMeta changes unstructuredCacheObject's metadata to match unstructuredLocalObject's metadata except the ResourceVersion and the shard annotation fields.
func ensureMeta(cacheObject *unstructured.Unstructured, localObject *unstructured.Unstructured) (changed bool, err error) {
func ensureMeta(cacheObject *unstructured.Unstructured, localObject *unstructured.Unstructured) (changed bool, reason string, err error) {
cacheObjMetaRaw, hasCacheObjMetaRaw, err := unstructured.NestedFieldNoCopy(cacheObject.Object, "metadata")
if err != nil {
return false, err
return false, "", err
}
cacheObjMeta, ok := cacheObjMetaRaw.(map[string]interface{})
if !ok {
return false, fmt.Errorf("metadata field of unstructuredCacheObject is of the type %T, expected map[string]interface{}", cacheObjMetaRaw)
return false, "", fmt.Errorf("metadata field of unstructuredCacheObject is of the type %T, expected map[string]interface{}", cacheObjMetaRaw)
}
localObjMetaRaw, hasLocalObjMetaRaw, err := unstructured.NestedFieldNoCopy(localObject.Object, "metadata")
if err != nil {
return false, err
return false, "", err
}
localObjMeta, ok := localObjMetaRaw.(map[string]interface{})
if !ok {
return false, fmt.Errorf("metadata field of unstructuredLocalObjectMeta is of the type %T, expected map[string]interface{}", localObjMetaRaw)
return false, "", fmt.Errorf("metadata field of unstructuredLocalObjectMeta is of the type %T, expected map[string]interface{}", localObjMetaRaw)
}
if !hasLocalObjMetaRaw && !hasCacheObjMetaRaw {
return false, nil // no-op
return false, "", nil // no-op
}
if !hasLocalObjMetaRaw {
unstructured.RemoveNestedField(cacheObject.Object, "metadata")
return true, nil
return true, "local .metadata disappeared", nil
}

// before we can compare the cache object we need to
// store, remove and then bring back fields that are unique only to the cache object
if cacheObjRV, found := cacheObjMeta["resourceVersion"]; found {
unstructured.RemoveNestedField(cacheObjMeta, "resourceVersion")
defer func() {
if err == nil {
err = unstructured.SetNestedField(cacheObject.Object, cacheObjRV, "metadata", "resourceVersion")
}
}()
// before we can compare the local and cache objects we need to remove certain
// field we know will be different, and bring them back after the comparison.
for _, pth := range []string{"resourceVersion", "generation", "managedFields"} {
pth := pth
if v, found := cacheObjMeta[pth]; found {
delete(cacheObjMeta, pth)
defer func() { //nolint:gocritic
if err == nil {
err = unstructured.SetNestedField(cacheObject.Object, v, "metadata", pth)
}
}()
}
if v, found := localObjMeta[pth]; found {
delete(localObjMeta, pth)
defer func() { //nolint:gocritic
if err == nil {
err = unstructured.SetNestedField(localObject.Object, v, "metadata", pth)
}
}()
}
}
if cacheObjAnnotationsRaw, found := cacheObjMeta["annotations"]; found {
cacheObjAnnotations, ok := cacheObjAnnotationsRaw.(map[string]interface{})
if !ok {
return false, fmt.Errorf("metadata.annotations field of unstructuredCacheObject is of the type %T, expected map[string]interface{}", cacheObjAnnotationsRaw)
return false, "", fmt.Errorf("metadata.annotations field of unstructuredCacheObject is of the type %T, expected map[string]interface{}", cacheObjAnnotationsRaw)
}
if shard, hasShard := cacheObjAnnotations[genericrequest.ShardAnnotationKey]; hasShard {
unstructured.RemoveNestedField(cacheObjAnnotations, genericrequest.ShardAnnotationKey)
delete(cacheObjAnnotations, genericrequest.ShardAnnotationKey)
defer func() {
if err == nil {
err = unstructured.SetNestedField(cacheObject.Object, shard, "metadata", "annotations", genericrequest.ShardAnnotationKey)
Expand All @@ -77,35 +92,29 @@ func ensureMeta(cacheObject *unstructured.Unstructured, localObject *unstructure
// TODO: in the future the original RV will be stored in an annotation
}

// before we can compare with the local object we need to
// store, remove and then bring back the ResourceVersion on the local object
if localObjRV, found := localObjMeta["resourceVersion"]; found {
unstructured.RemoveNestedField(localObjMeta, "resourceVersion")
defer func() {
if err == nil {
localObjMeta["resourceVersion"] = localObjRV
}
}()
}

changed = !reflect.DeepEqual(cacheObjMeta, localObjMeta)
if !changed {
return false, nil
return false, "", nil
}

reason = ".metadata changed"
if logger := klog.FromContext(context.Background()).V(5); logger.Enabled() {
reason += " " + cmp.Diff(localObjMeta, cacheObjMeta)
}

newCacheObjMeta := map[string]interface{}{}
for k, v := range localObjMeta {
newCacheObjMeta[k] = v
}
return true, unstructured.SetNestedMap(cacheObject.Object, newCacheObjMeta, "metadata")
return true, reason, unstructured.SetNestedMap(cacheObject.Object, newCacheObjMeta, "metadata")
}

// ensureRemaining changes unstructuredCacheObject to match unstructuredLocalObject except for the metadata field
// returns true when the unstructuredCacheObject was updated.
func ensureRemaining(cacheObject *unstructured.Unstructured, localObject *unstructured.Unstructured) (bool, error) {
func ensureRemaining(cacheObject *unstructured.Unstructured, localObject *unstructured.Unstructured) (bool, string, error) {
cacheObjMeta, found, err := unstructured.NestedFieldNoCopy(cacheObject.Object, "metadata")
if err != nil {
return false, err
return false, "", err
}
if found {
unstructured.RemoveNestedField(cacheObject.Object, "metadata")
Expand All @@ -116,7 +125,7 @@ func ensureRemaining(cacheObject *unstructured.Unstructured, localObject *unstru

localObjMeta, found, err := unstructured.NestedFieldNoCopy(localObject.Object, "metadata")
if err != nil {
return false, err
return false, "", err
}
if found {
unstructured.RemoveNestedField(localObject.Object, "metadata")
Expand All @@ -127,15 +136,20 @@ func ensureRemaining(cacheObject *unstructured.Unstructured, localObject *unstru

changed := !reflect.DeepEqual(cacheObject.Object, localObject.Object)
if !changed {
return false, nil
return false, "", nil
}

reason := "object changed"
if logger := klog.FromContext(context.Background()).V(5); logger.Enabled() {
reason += " " + cmp.Diff(localObject.Object, cacheObject.Object)
}

newCacheObj := map[string]interface{}{}
for k, v := range localObject.Object {
newCacheObj[k] = v
}
cacheObject.Object = newCacheObj
return true, nil
return true, reason, nil
}

func toUnstructured(obj interface{}) (*unstructured.Unstructured, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,34 @@ func TestEnsureUnstructuredSpec(t *testing.T) {
}
},
},
{
name: "generation on local different than cache",
cacheObject: &unstructured.Unstructured{Object: map[string]interface{}{
"spec": map[string]interface{}{"fieldA": "a"},
"metadata": map[string]interface{}{"generation": int64(1)},
}},
localObject: &unstructured.Unstructured{Object: map[string]interface{}{
"spec": map[string]interface{}{"fieldA": "a"},
"metadata": map[string]interface{}{"generation": int64(2)},
}},
expectSpecChanged: false,
},
{
name: "managedFields on local different than cache",
cacheObject: &unstructured.Unstructured{Object: map[string]interface{}{
"spec": map[string]interface{}{"fieldA": "a"},
"metadata": map[string]interface{}{"managedFields": []interface{}{"a"}},
}},
localObject: &unstructured.Unstructured{Object: map[string]interface{}{
"spec": map[string]interface{}{"fieldA": "a"},
"metadata": map[string]interface{}{"managedFields": []interface{}{"a", "b"}},
}},
expectSpecChanged: false,
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(tt *testing.T) {
specChanged, err := ensureRemaining(scenario.cacheObject, scenario.localObject)
specChanged, _, err := ensureRemaining(scenario.cacheObject, scenario.localObject)
if specChanged != scenario.expectSpecChanged {
tt.Fatalf("spec changed = %v, expected spec to be changed = %v", specChanged, scenario.expectSpecChanged)
}
Expand Down Expand Up @@ -232,7 +256,7 @@ func TestEnsureUnstructuredStatus(t *testing.T) {
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(tt *testing.T) {
statusChanged, err := ensureRemaining(scenario.cacheObject, scenario.localObject)
statusChanged, _, err := ensureRemaining(scenario.cacheObject, scenario.localObject)
if statusChanged != scenario.expectStatusChanged {
tt.Fatalf("status changed = %v, expected spec to be changed = %v", statusChanged, scenario.expectStatusChanged)
}
Expand Down Expand Up @@ -392,7 +416,7 @@ func TestEnsureUnstructuredMeta(t *testing.T) {
if err != nil {
tt.Fatal(err)
}
metaChanged, err := ensureMeta(unstructuredCacheApiExport, unstructuredLocalApiExport)
metaChanged, _, err := ensureMeta(unstructuredCacheApiExport, unstructuredLocalApiExport)
if err != nil {
tt.Fatal(err)
}
Expand Down
18 changes: 5 additions & 13 deletions sdk/apis/core/helper/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,11 @@ func DontReplicateForValue(replicateValue, controller string) (result string, ch
// ReplicateFor ensures the controller string is part of the separated list of controller names
// in the internal.kcp.io/replicate label. This function changes the annotations in-place.
func ReplicateFor(annotations map[string]string, controller string) (result map[string]string, changed bool) {
for k, v := range annotations {
if k != core.ReplicateAnnotationKey {
continue
}

if v := annotations[core.ReplicateAnnotationKey]; v != "" {
existing := sets.New[string](strings.Split(v, ",")...)
if !existing.Has(controller) {
existing.Insert(controller)
annotations[k] = strings.Join(sets.List[string](existing), ",")
annotations[core.ReplicateAnnotationKey] = strings.Join(sets.List[string](existing), ",")
return annotations, true
}
return annotations, false
Expand All @@ -77,19 +73,15 @@ func ReplicateFor(annotations map[string]string, controller string) (result map[
// DontReplicateFor ensures the controller string is not part of the separated list of controller names
// in the internal.kcp.io/replicate label. This function changes the annotations in-place.
func DontReplicateFor(annotations map[string]string, controller string) (result map[string]string, changed bool) {
for k, v := range annotations {
if k != core.ReplicateAnnotationKey {
continue
}

if v := annotations[core.ReplicateAnnotationKey]; v != "" {
if v == controller {
delete(annotations, k)
delete(annotations, core.ReplicateAnnotationKey)
return annotations, true
}
existing := sets.New[string](strings.Split(v, ",")...)
if existing.Has(controller) {
existing.Delete(controller)
annotations[k] = strings.Join(sets.List[string](existing), ",")
annotations[core.ReplicateAnnotationKey] = strings.Join(sets.List[string](existing), ",")
return annotations, true
}
return annotations, false
Expand Down
Loading