Skip to content

Commit d8f9a96

Browse files
authored
Merge pull request #201 from red-hat-storage/sync_us--main
Syncing latest changes from upstream main for ramen
2 parents 5e1a7ed + 0e0db36 commit d8f9a96

9 files changed

+385
-140
lines changed

controllers/drplacementcontrol.go

+8-30
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,11 @@ import (
1111

1212
"github.com/go-logr/logr"
1313
clrapiv1beta1 "github.com/open-cluster-management-io/api/cluster/v1beta1"
14-
ocmworkv1 "github.com/open-cluster-management/api/work/v1"
1514
errorswrapper "github.com/pkg/errors"
1615
plrv1 "github.com/stolostron/multicloud-operators-placementrule/pkg/apis/apps/v1"
1716
corev1 "k8s.io/api/core/v1"
1817
"k8s.io/apimachinery/pkg/api/errors"
1918
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20-
"k8s.io/apimachinery/pkg/types"
21-
"sigs.k8s.io/yaml"
2219

2320
rmn "github.com/ramendr/ramen/api/v1alpha1"
2421
rmnutil "github.com/ramendr/ramen/controllers/util"
@@ -31,6 +28,9 @@ const (
3128
DRPCNameAnnotation = "drplacementcontrol.ramendr.openshift.io/drpc-name"
3229
DRPCNamespaceAnnotation = "drplacementcontrol.ramendr.openshift.io/drpc-namespace"
3330

31+
// Annotation that stores the UID of DRPC that created the resource on the managed cluster using a ManifestWork
32+
DRPCUIDAnnotation = "drplacementcontrol.ramendr.openshift.io/drpc-uid"
33+
3434
// Annotation for the last cluster on which the application was running
3535
LastAppDeploymentCluster = "drplacementcontrol.ramendr.openshift.io/last-app-deployment-cluster"
3636

@@ -865,9 +865,7 @@ func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(srcCluster string
865865
srcCluster, ok))
866866
}
867867

868-
clusterToSkip := srcCluster
869-
870-
err = d.EnsureCleanup(clusterToSkip)
868+
err = d.EnsureCleanup(srcCluster)
871869
if err != nil {
872870
return err
873871
}
@@ -1261,7 +1259,7 @@ func (d *DRPCInstance) getVRGFromManifestWork(clusterName string) (*rmn.VolumeRe
12611259
return nil, fmt.Errorf("%w", err)
12621260
}
12631261

1264-
vrg, err := d.extractVRGFromManifestWork(mw)
1262+
vrg, err := rmnutil.ExtractVRGFromManifestWork(mw)
12651263
if err != nil {
12661264
return nil, err
12671265
}
@@ -1285,9 +1283,7 @@ func (d *DRPCInstance) vrgExistsAndPrimary(targetCluster string) bool {
12851283
}
12861284

12871285
func (d *DRPCInstance) mwExistsAndPlacementUpdated(targetCluster string) (bool, error) {
1288-
vrgMWName := d.mwu.BuildManifestWorkName(rmnutil.MWTypeVRG)
1289-
1290-
_, err := d.mwu.FindManifestWork(vrgMWName, targetCluster)
1286+
_, err := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, targetCluster)
12911287
if err != nil {
12921288
if errors.IsNotFound(err) {
12931289
return false, nil
@@ -1525,6 +1521,7 @@ func (d *DRPCInstance) generateVRG(dstCluster string, repState rmn.ReplicationSt
15251521
Annotations: map[string]string{
15261522
DestinationClusterAnnotationKey: dstCluster,
15271523
DoNotDeletePVCAnnotation: d.instance.GetAnnotations()[DoNotDeletePVCAnnotation],
1524+
DRPCUIDAnnotation: string(d.instance.UID),
15281525
},
15291526
},
15301527
Spec: rmn.VolumeReplicationGroupSpec{
@@ -1792,10 +1789,7 @@ func (d *DRPCInstance) ensureVRGManifestWorkOnClusterDeleted(clusterName string)
17921789

17931790
const done = true
17941791

1795-
mwName := d.mwu.BuildManifestWorkName(rmnutil.MWTypeVRG)
1796-
mw := &ocmworkv1.ManifestWork{}
1797-
1798-
err := d.reconciler.Get(d.ctx, types.NamespacedName{Name: mwName, Namespace: clusterName}, mw)
1792+
mw, err := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, clusterName)
17991793
if err != nil {
18001794
if errors.IsNotFound(err) {
18011795
return done, nil
@@ -2082,22 +2076,6 @@ func (d *DRPCInstance) updateVRGToRunFinalSync(clusterName string) error {
20822076
return nil
20832077
}
20842078

2085-
func (d *DRPCInstance) extractVRGFromManifestWork(mw *ocmworkv1.ManifestWork) (*rmn.VolumeReplicationGroup, error) {
2086-
if len(mw.Spec.Workload.Manifests) == 0 {
2087-
return nil, fmt.Errorf("invalid VRG ManifestWork for type: %s", mw.Name)
2088-
}
2089-
2090-
vrgClientManifest := &mw.Spec.Workload.Manifests[0]
2091-
vrg := &rmn.VolumeReplicationGroup{}
2092-
2093-
err := yaml.Unmarshal(vrgClientManifest.RawExtension.Raw, &vrg)
2094-
if err != nil {
2095-
return nil, fmt.Errorf("unable to unmarshal VRG object (%w)", err)
2096-
}
2097-
2098-
return vrg, nil
2099-
}
2100-
21012079
func (d *DRPCInstance) updateManifestWork(clusterName string, vrg *rmn.VolumeReplicationGroup) error {
21022080
mw, err := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, clusterName)
21032081
if err != nil {

controllers/drplacementcontrol_controller.go

+213-8
Original file line numberDiff line numberDiff line change
@@ -953,6 +953,12 @@ func (r *DRPlacementControlReconciler) reconcileDRPCInstance(d *DRPCInstance, lo
953953
beforeProcessing = *d.instance.Status.LastUpdateTime
954954
}
955955

956+
if !ensureVRGsManagedByDRPC(d.log, d.mwu, d.vrgs, d.instance, d.vrgNamespace) {
957+
log.Info("Requeing... VRG adoption in progress")
958+
959+
return ctrl.Result{Requeue: true}, nil
960+
}
961+
956962
requeue := d.startProcessing()
957963
log.Info("Finished processing", "Requeue?", requeue)
958964

@@ -1158,14 +1164,6 @@ func (r *DRPlacementControlReconciler) finalizeDRPC(ctx context.Context, drpc *r
11581164
return fmt.Errorf("failed to get DRPolicy while finalizing DRPC (%w)", err)
11591165
}
11601166

1161-
// delete manifestworks (VRGs)
1162-
for _, drClusterName := range rmnutil.DrpolicyClusterNames(drPolicy) {
1163-
err := mwu.DeleteManifestWorksForCluster(drClusterName)
1164-
if err != nil {
1165-
return fmt.Errorf("%w", err)
1166-
}
1167-
}
1168-
11691167
drClusters, err := getDRClusters(ctx, r.Client, drPolicy)
11701168
if err != nil {
11711169
return fmt.Errorf("failed to get drclusters. Error (%w)", err)
@@ -1177,6 +1175,18 @@ func (r *DRPlacementControlReconciler) finalizeDRPC(ctx context.Context, drpc *r
11771175
return fmt.Errorf("failed to retrieve VRGs. We'll retry later. Error (%w)", err)
11781176
}
11791177

1178+
if !ensureVRGsManagedByDRPC(r.Log, mwu, vrgs, drpc, vrgNamespace) {
1179+
return fmt.Errorf("VRG adoption in progress")
1180+
}
1181+
1182+
// delete manifestworks (VRGs)
1183+
for _, drClusterName := range rmnutil.DrpolicyClusterNames(drPolicy) {
1184+
err := mwu.DeleteManifestWorksForCluster(drClusterName)
1185+
if err != nil {
1186+
return fmt.Errorf("%w", err)
1187+
}
1188+
}
1189+
11801190
if len(vrgs) != 0 {
11811191
return fmt.Errorf("waiting for VRGs count to go to zero")
11821192
}
@@ -2286,6 +2296,21 @@ func (r *DRPlacementControlReconciler) determineDRPCState(
22862296
return Stop, "", err
22872297
}
22882298

2299+
mwu := rmnutil.MWUtil{
2300+
Client: r.Client,
2301+
APIReader: r.APIReader,
2302+
Ctx: ctx,
2303+
Log: log,
2304+
InstName: drpc.Name,
2305+
TargetNamespace: vrgNamespace,
2306+
}
2307+
2308+
if !ensureVRGsManagedByDRPC(log, mwu, vrgs, drpc, vrgNamespace) {
2309+
msg := "VRG adoption in progress"
2310+
2311+
return Stop, msg, nil
2312+
}
2313+
22892314
// IF 2 clusters queried, and both queries failed, then STOP
22902315
if successfullyQueriedClusterCount == 0 {
22912316
msg := "Stop - Number of clusters queried is 0"
@@ -2426,3 +2451,183 @@ func (r *DRPlacementControlReconciler) determineDRPCState(
24262451

24272452
return AllowFailover, msg, nil
24282453
}
2454+
2455+
// ensureVRGsManagedByDRPC ensures that VRGs reported by ManagedClusterView are managed by the current instance of
2456+
// DRPC. This is done using the DRPC UID annotation on the viewed VRG matching the current DRPC UID and if not
2457+
// creating or updating the exisiting ManifestWork for the VRG.
2458+
// Returns a bool indicating true if VRGs are managed by the current DRPC resource
2459+
func ensureVRGsManagedByDRPC(
2460+
log logr.Logger,
2461+
mwu rmnutil.MWUtil,
2462+
vrgs map[string]*rmn.VolumeReplicationGroup,
2463+
drpc *rmn.DRPlacementControl,
2464+
vrgNamespace string,
2465+
) bool {
2466+
ensured := true
2467+
2468+
for cluster, viewVRG := range vrgs {
2469+
if rmnutil.ResourceIsDeleted(viewVRG) {
2470+
log.Info("VRG reported by view undergoing deletion, during adoption",
2471+
"cluster", cluster, "namespace", viewVRG.Namespace, "name", viewVRG.Name)
2472+
2473+
continue
2474+
}
2475+
2476+
if viewVRG.GetAnnotations() != nil {
2477+
if v, ok := viewVRG.Annotations[DRPCUIDAnnotation]; ok && v == string(drpc.UID) {
2478+
continue
2479+
}
2480+
}
2481+
2482+
adopted := adoptVRG(log, mwu, viewVRG, cluster, drpc, vrgNamespace)
2483+
2484+
ensured = ensured && adopted
2485+
}
2486+
2487+
return ensured
2488+
}
2489+
2490+
// adoptVRG creates or updates the VRG ManifestWork to ensure that the current DRPC is managing the VRG resource
2491+
// Returns a bool indicating if adoption was completed (which is mostly false except when VRG MW is deleted)
2492+
func adoptVRG(
2493+
log logr.Logger,
2494+
mwu rmnutil.MWUtil,
2495+
viewVRG *rmn.VolumeReplicationGroup,
2496+
cluster string,
2497+
drpc *rmn.DRPlacementControl,
2498+
vrgNamespace string,
2499+
) bool {
2500+
adopted := true
2501+
2502+
mw, err := mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, cluster)
2503+
if err != nil {
2504+
if !errors.IsNotFound(err) {
2505+
log.Info("error fetching VRG ManifestWork during adoption", "error", err, "cluster", cluster)
2506+
2507+
return !adopted
2508+
}
2509+
2510+
adoptOrphanVRG(log, mwu, viewVRG, cluster, drpc, vrgNamespace)
2511+
2512+
return !adopted
2513+
}
2514+
2515+
if rmnutil.ResourceIsDeleted(mw) {
2516+
log.Info("VRG ManifestWork found deleted during adoption", "cluster", cluster)
2517+
2518+
return adopted
2519+
}
2520+
2521+
vrg, err := rmnutil.ExtractVRGFromManifestWork(mw)
2522+
if err != nil {
2523+
log.Info("error extracting VRG from ManifestWork during adoption", "error", err, "cluster", cluster)
2524+
2525+
return !adopted
2526+
}
2527+
2528+
// NOTE: upgrade use case, to add DRPC UID for existing VRG MW
2529+
adoptExistingVRGManifestWork(log, mwu, vrg, cluster, drpc, vrgNamespace)
2530+
2531+
return !adopted
2532+
}
2533+
2534+
// adoptExistingVRGManifestWork updates an existing VRG ManifestWork as managed by the current DRPC resource
2535+
func adoptExistingVRGManifestWork(
2536+
log logr.Logger,
2537+
mwu rmnutil.MWUtil,
2538+
vrg *rmn.VolumeReplicationGroup,
2539+
cluster string,
2540+
drpc *rmn.DRPlacementControl,
2541+
vrgNamespace string,
2542+
) {
2543+
log.Info("adopting existing VRG ManifestWork", "cluster", cluster, "namespace", vrg.Namespace, "name", vrg.Name)
2544+
2545+
if vrg.GetAnnotations() == nil {
2546+
vrg.Annotations = make(map[string]string)
2547+
}
2548+
2549+
if v, ok := vrg.Annotations[DRPCUIDAnnotation]; ok && v == string(drpc.UID) {
2550+
// Annotation may already be set but not reflected on the resource view yet
2551+
log.Info("detected VRGs DRPC UID annotation as existing",
2552+
"cluster", cluster, "namespace", vrg.Namespace, "name", vrg.Name)
2553+
2554+
return
2555+
}
2556+
2557+
vrg.Annotations[DRPCUIDAnnotation] = string(drpc.UID)
2558+
2559+
annotations := make(map[string]string)
2560+
annotations[DRPCNameAnnotation] = drpc.Name
2561+
annotations[DRPCNamespaceAnnotation] = drpc.Namespace
2562+
2563+
err := mwu.CreateOrUpdateVRGManifestWork(drpc.Name, vrgNamespace, cluster, *vrg, annotations)
2564+
if err != nil {
2565+
log.Info("error updating VRG via ManifestWork during adoption", "error", err, "cluster", cluster)
2566+
}
2567+
}
2568+
2569+
// adoptOpphanVRG creates a missing ManifestWork for a VRG found via a ManagedClusterView
2570+
func adoptOrphanVRG(
2571+
log logr.Logger,
2572+
mwu rmnutil.MWUtil,
2573+
viewVRG *rmn.VolumeReplicationGroup,
2574+
cluster string,
2575+
drpc *rmn.DRPlacementControl,
2576+
vrgNamespace string,
2577+
) {
2578+
log.Info("adopting orphaned VRG ManifestWork",
2579+
"cluster", cluster, "namespace", viewVRG.Namespace, "name", viewVRG.Name)
2580+
2581+
annotations := make(map[string]string)
2582+
annotations[DRPCNameAnnotation] = drpc.Name
2583+
annotations[DRPCNamespaceAnnotation] = drpc.Namespace
2584+
2585+
// Adopt the namespace as well
2586+
err := mwu.CreateOrUpdateNamespaceManifest(drpc.Name, vrgNamespace, cluster, annotations)
2587+
if err != nil {
2588+
log.Info("error creating namespace via ManifestWork during adoption", "error", err, "cluster", cluster)
2589+
2590+
return
2591+
}
2592+
2593+
vrg := constructVRGFromView(viewVRG)
2594+
if vrg.GetAnnotations() == nil {
2595+
vrg.Annotations = make(map[string]string)
2596+
}
2597+
2598+
vrg.Annotations[DRPCUIDAnnotation] = string(drpc.UID)
2599+
2600+
if err := mwu.CreateOrUpdateVRGManifestWork(
2601+
drpc.Name, vrgNamespace,
2602+
cluster, *vrg, annotations); err != nil {
2603+
log.Info("error creating VRG via ManifestWork during adoption", "error", err, "cluster", cluster)
2604+
}
2605+
}
2606+
2607+
// constructVRGFromView selectively constructs a VRG from a view, using its spec and only those annotations that
2608+
// would be set by the hub on the ManifestWork
2609+
func constructVRGFromView(viewVRG *rmn.VolumeReplicationGroup) *rmn.VolumeReplicationGroup {
2610+
vrg := &rmn.VolumeReplicationGroup{
2611+
TypeMeta: metav1.TypeMeta{Kind: "VolumeReplicationGroup", APIVersion: "ramendr.openshift.io/v1alpha1"},
2612+
ObjectMeta: metav1.ObjectMeta{
2613+
Name: viewVRG.Name,
2614+
Namespace: viewVRG.Namespace,
2615+
},
2616+
}
2617+
2618+
viewVRG.Spec.DeepCopyInto(&vrg.Spec)
2619+
2620+
for k, v := range viewVRG.GetAnnotations() {
2621+
switch k {
2622+
case DestinationClusterAnnotationKey:
2623+
fallthrough
2624+
case DoNotDeletePVCAnnotation:
2625+
fallthrough
2626+
case DRPCUIDAnnotation:
2627+
rmnutil.AddAnnotation(vrg, k, v)
2628+
default:
2629+
}
2630+
}
2631+
2632+
return vrg
2633+
}

0 commit comments

Comments
 (0)