@@ -415,13 +415,29 @@ func (s *SyncController) reconcile(ctx context.Context, federatedName common.Qua
415
415
fedResource .RecordError ("EnsureFinalizerError" , errors .Wrap (err , "Failed to ensure finalizer" ))
416
416
return worker .StatusError
417
417
}
418
-
419
- return s .syncToClusters (ctx , fedResource )
418
+ clustersToSync , selectedClusters , err := s .prepareToSync (ctx , fedResource )
419
+ if err != nil {
420
+ fedResource .RecordError ("PrepareToSyncError" , errors .Wrap (err , "Failed to prepare to sync" ))
421
+ return worker .StatusError
422
+ }
423
+ return s .syncToClusters (ctx , fedResource , clustersToSync , selectedClusters )
420
424
}
421
425
422
- // syncToClusters ensures that the state of the given object is
423
- // synchronized to member clusters.
424
- func (s * SyncController ) syncToClusters (ctx context.Context , fedResource FederatedResource ) worker.Result {
426
+ // prepareToSync performs the following preprocessing steps required to sync federated objects to selected member clusters:
427
+ // 1. Compute the list of selected member clusters from the placement field.
428
+ // 2. Compute the list of member clusters that requires an operation to be dispatched.
429
+ // 3. For newly selected clusters, update the PropagationStatus for these clusters to PendingCreate.
430
+ //
431
+ // The PendingCreate status allows us to safely skip checking of clusters during object deletion when PropagationStatus is
432
+ // empty. If not, it might be that the object was created but we failed to update the federated object's status previously.
433
+ func (s * SyncController ) prepareToSync (
434
+ ctx context.Context ,
435
+ fedResource FederatedResource ,
436
+ ) (
437
+ requireSync []* fedcorev1a1.FederatedCluster ,
438
+ selectedClusters sets.Set [string ],
439
+ err error ,
440
+ ) {
425
441
keyedLogger := klog .FromContext (ctx )
426
442
427
443
clusters , err := s .fedInformerManager .GetJoinedClusters ()
@@ -430,10 +446,81 @@ func (s *SyncController) syncToClusters(ctx context.Context, fedResource Federat
430
446
string (fedcorev1a1 .ClusterRetrievalFailed ),
431
447
errors .Wrap (err , "Failed to retrieve list of clusters" ),
432
448
)
433
- return s .setFederatedStatus (ctx , fedResource , fedcorev1a1 .ClusterRetrievalFailed , nil )
449
+ result := s .setFederatedStatus (ctx , fedResource , fedcorev1a1 .ClusterRetrievalFailed , nil )
450
+ if result != worker .StatusAllOK {
451
+ keyedLogger .Error (nil , "Failed to set federated status" , "result" , result .String ())
452
+ }
453
+ return nil , nil , err
454
+ }
455
+ clusterMap := make (map [string ]* fedcorev1a1.FederatedCluster , len (clusters ))
456
+ for _ , cluster := range clusters {
457
+ clusterMap [cluster .Name ] = cluster
434
458
}
435
459
436
460
selectedClusterNames := fedResource .ComputePlacement (clusters )
461
+ pendingCreateClusters := selectedClusterNames .Clone ()
462
+ status := fedResource .Object ().GetStatus ()
463
+ for _ , cluster := range status .Clusters {
464
+ pendingCreateClusters .Delete (cluster .Cluster )
465
+ if cluster , exist := clusterMap [cluster .Cluster ]; exist {
466
+ requireSync = append (requireSync , cluster )
467
+ }
468
+ }
469
+
470
+ if pendingCreateClusters .Len () == 0 {
471
+ return requireSync , selectedClusterNames , nil
472
+ }
473
+ for cluster := range pendingCreateClusters {
474
+ if cluster , exist := clusterMap [cluster ]; exist && cluster .GetDeletionTimestamp ().IsZero () {
475
+ status .Clusters = append (status .Clusters , fedcorev1a1.PropagationStatus {
476
+ Cluster : cluster .Name ,
477
+ Status : fedcorev1a1 .PendingCreate ,
478
+ })
479
+ requireSync = append (requireSync , cluster )
480
+ }
481
+ }
482
+
483
+ keyedLogger .V (1 ).Info ("Update clusters pending object creation" ,
484
+ "clusters" , strings .Join (sets .List (pendingCreateClusters ), "," ))
485
+ obj := fedResource .Object ()
486
+ objNamespace := obj .GetNamespace ()
487
+ objName := obj .GetName ()
488
+ // If the underlying resource has changed, attempt to retrieve and
489
+ // update it repeatedly.
490
+ err = wait .PollImmediateWithContext (ctx , 1 * time .Second , 5 * time .Second , func (ctx context.Context ) (bool , error ) {
491
+ var err error
492
+ obj .GetStatus ().Clusters = status .Clusters
493
+ obj , err = fedobjectadapters .UpdateStatus (ctx , s .fedClient .CoreV1alpha1 (), obj , metav1.UpdateOptions {})
494
+ if err == nil {
495
+ fedResource .SetObject (obj )
496
+ return true , nil
497
+ }
498
+ if apierrors .IsConflict (err ) {
499
+ obj , err = fedobjectadapters .Get (ctx , s .fedClient .CoreV1alpha1 (), objNamespace , objName , metav1.GetOptions {})
500
+ if err != nil {
501
+ return false , errors .Wrapf (err , "failed to retrieve resource" )
502
+ }
503
+ return false , nil
504
+ }
505
+ return false , errors .Wrapf (err , "failed to update resource" )
506
+ })
507
+ if err != nil {
508
+ keyedLogger .Error (err , "Failed to set propagation status" )
509
+ return nil , nil , err
510
+ }
511
+ return requireSync , selectedClusterNames , nil
512
+ }
513
+
514
+ // syncToClusters ensures that the state of the given object is
515
+ // synchronized to member clusters.
516
+ func (s * SyncController ) syncToClusters (
517
+ ctx context.Context ,
518
+ fedResource FederatedResource ,
519
+ clusters []* fedcorev1a1.FederatedCluster ,
520
+ selectedClusterNames sets.Set [string ],
521
+ ) worker.Result {
522
+ keyedLogger := klog .FromContext (ctx )
523
+ var err error
437
524
keyedLogger .V (2 ).Info ("Ensuring target object in clusters" , "clusters" , strings .Join (sets .List (selectedClusterNames ), "," ))
438
525
439
526
skipAdoptingPreexistingResources := ! adoption .ShouldAdoptPreexistingResources (fedResource .Object ())
@@ -693,9 +780,7 @@ func (s *SyncController) ensureRemovalFromClusters(ctx context.Context, fedResou
693
780
remainingClusters := []string {}
694
781
ok , err := s .handleDeletionInClusters (
695
782
ctx ,
696
- fedResource .TargetGVK (),
697
- fedResource .TargetGVR (),
698
- fedResource .TargetName (),
783
+ fedResource ,
699
784
func (dispatcher dispatch.UnmanagedDispatcher , clusterName string , clusterObj * unstructured.Unstructured ) {
700
785
remainingClusters = append (remainingClusters , clusterName )
701
786
s .removeFromCluster (ctx , dispatcher , clusterName , fedResource , clusterObj , true )
@@ -727,18 +812,20 @@ func (s *SyncController) ensureRemovalFromClusters(ctx context.Context, fedResou
727
812
// the informer to cover the possibility that the resources have not
728
813
// yet been cached.
729
814
func (s * SyncController ) checkObjectRemovedFromAllClusters (ctx context.Context , fedResource FederatedResource ) error {
730
- clusters , err := s .fedInformerManager .GetJoinedClusters ()
815
+ keyedLogger := klog .FromContext (ctx )
816
+ syncedClusters , syncedClusterNames , err := s .getSyncedClusters (fedResource )
731
817
if err != nil {
732
- return errors . Wrap ( err , "failed to get a list of clusters" )
818
+ return err
733
819
}
734
820
821
+ keyedLogger .V (4 ).Info ("Check object removed from clusters" , "clusters" , strings .Join (syncedClusterNames , "," ))
735
822
dispatcher := dispatch .NewCheckUnmanagedDispatcher (
736
823
s .getClusterClient ,
737
824
fedResource .TargetGVR (),
738
825
fedResource .TargetName (),
739
826
)
740
827
unreadyClusters := []string {}
741
- for _ , cluster := range clusters {
828
+ for _ , cluster := range syncedClusters {
742
829
if ! clusterutil .IsClusterReady (& cluster .Status ) {
743
830
unreadyClusters = append (unreadyClusters , cluster .Name )
744
831
continue
@@ -762,22 +849,24 @@ func (s *SyncController) checkObjectRemovedFromAllClusters(ctx context.Context,
762
849
// each managed resource in member clusters.
763
850
func (s * SyncController ) handleDeletionInClusters (
764
851
ctx context.Context ,
765
- targetGVK schema.GroupVersionKind ,
766
- targetGVR schema.GroupVersionResource ,
767
- targetQualifiedName common.QualifiedName ,
852
+ fedResource FederatedResource ,
768
853
deletionFunc func (dispatcher dispatch.UnmanagedDispatcher , clusterName string , clusterObj * unstructured.Unstructured ),
769
854
) (bool , error ) {
770
855
keyedLogger := klog .FromContext (ctx )
856
+ targetGVK := fedResource .TargetGVK ()
857
+ targetGVR := fedResource .TargetGVR ()
858
+ targetQualifiedName := fedResource .TargetName ()
771
859
772
- clusters , err := s .fedInformerManager . GetJoinedClusters ( )
860
+ syncedClusters , syncedClusterNames , err := s .getSyncedClusters ( fedResource )
773
861
if err != nil {
774
- return false , fmt . Errorf ( "failed to get a list of clusters: %w" , err )
862
+ return false , err
775
863
}
776
864
865
+ keyedLogger .V (4 ).Info ("Handle deletion in clusters" , "clusters" , strings .Join (syncedClusterNames , "," ))
777
866
dispatcher := dispatch .NewUnmanagedDispatcher (s .getClusterClient , targetGVR , targetQualifiedName )
778
867
retrievalFailureClusters := []string {}
779
868
unreadyClusters := []string {}
780
- for _ , cluster := range clusters {
869
+ for _ , cluster := range syncedClusters {
781
870
clusterName := cluster .Name
782
871
783
872
if ! clusterutil .IsClusterReady (& cluster .Status ) {
@@ -821,6 +910,30 @@ func (s *SyncController) handleDeletionInClusters(
821
910
return ok , nil
822
911
}
823
912
913
+ func (s * SyncController ) getSyncedClusters (
914
+ fedResource FederatedResource ,
915
+ ) ([]* fedcorev1a1.FederatedCluster , []string , error ) {
916
+ clusters , err := s .fedInformerManager .GetJoinedClusters ()
917
+ if err != nil {
918
+ return nil , nil , fmt .Errorf ("failed to get the list of joined clusters: %w" , err )
919
+ }
920
+ clusterMap := make (map [string ]* fedcorev1a1.FederatedCluster , len (clusters ))
921
+ for _ , cluster := range clusters {
922
+ clusterMap [cluster .Name ] = cluster
923
+ }
924
+
925
+ status := fedResource .Object ().GetStatus ()
926
+ syncedClusters := make ([]* fedcorev1a1.FederatedCluster , 0 , len (status .Clusters ))
927
+ syncedClusterNames := make ([]string , 0 , len (status .Clusters ))
928
+ for _ , cluster := range status .Clusters {
929
+ if cluster , exists := clusterMap [cluster .Cluster ]; exists {
930
+ syncedClusters = append (syncedClusters , cluster )
931
+ syncedClusterNames = append (syncedClusterNames , cluster .Name )
932
+ }
933
+ }
934
+ return syncedClusters , syncedClusterNames , nil
935
+ }
936
+
824
937
func (s * SyncController ) ensureFinalizer (ctx context.Context , fedResource FederatedResource ) error {
825
938
ctx , keyedLogger := logging .InjectLoggerValues (ctx , "finalizer-name" , FinalizerSyncController )
826
939
0 commit comments