Skip to content

Commit 3128f0c

Browse files
committed
Allow reconciling from multiple replicas
Signed-off-by: Julien Semaan <jul.semaan@gmail.com>
1 parent fbc45f2 commit 3128f0c

File tree

3 files changed

+208
-5
lines changed

3 files changed

+208
-5
lines changed

cmd/provider/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"os"
2122
"path/filepath"
2223

@@ -50,6 +51,7 @@ func main() {
5051
maxReconcileRate = app.Flag("max-reconcile-rate", "The maximum number of concurrent reconciliation operations.").Default("1").Int()
5152
artifactsHistoryLimit = app.Flag("artifacts-history-limit", "Each attempt to run the playbook/role generates a set of artifacts on disk. This settings limits how many of these to keep.").Default("10").Int()
5253
pollStateMetricInterval = app.Flag("poll-state-metric", "State metric recording interval").Default("5s").Duration()
54+
replicasCount = app.Flag("replicas", "Amount of replicas configured for the provider. When using more than 1 replica, reconciles will be sharded across them based on a modular hash.").Default("1").Int()
5355
)
5456
kingpin.MustParse(app.Parse(os.Args[1:]))
5557

@@ -95,12 +97,16 @@ func main() {
9597
},
9698
}
9799

100+
providerCtx, cancel := context.WithCancel(context.Background())
98101
ansibleOpts := ansiblerun.SetupOptions{
99102
AnsibleCollectionsPath: *ansibleCollectionsPath,
100103
AnsibleRolesPath: *ansibleRolesPath,
101104
Timeout: *timeout,
102105
ArtifactsHistoryLimit: *artifactsHistoryLimit,
106+
ReplicasCount: *replicasCount,
107+
ProviderCtx: providerCtx,
108+
ProviderCancel: cancel,
103109
}
104110
kingpin.FatalIfError(ansible.Setup(mgr, o, ansibleOpts), "Cannot setup Ansible controllers")
105-
kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager")
111+
kingpin.FatalIfError(mgr.Start(providerCtx), "Cannot start controller manager")
106112
}

internal/controller/ansibleRun/ansibleRun.go

Lines changed: 157 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,30 @@ import (
3232
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
3333
"github.com/crossplane/crossplane-runtime/pkg/controller"
3434
"github.com/crossplane/crossplane-runtime/pkg/event"
35+
"github.com/crossplane/crossplane-runtime/pkg/logging"
3536
"github.com/crossplane/crossplane-runtime/pkg/meta"
3637
"github.com/crossplane/crossplane-runtime/pkg/ratelimiter"
3738
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
3839
"github.com/crossplane/crossplane-runtime/pkg/resource"
3940
"github.com/crossplane/crossplane-runtime/pkg/statemetrics"
41+
"github.com/google/uuid"
4042
"github.com/spf13/afero"
4143
"gopkg.in/yaml.v2"
44+
coordinationv1 "k8s.io/api/coordination/v1"
4245
v1 "k8s.io/api/core/v1"
4346
"k8s.io/apimachinery/pkg/api/equality"
4447
kerrors "k8s.io/apimachinery/pkg/api/errors"
48+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4549
"k8s.io/apimachinery/pkg/types"
50+
"k8s.io/utils/pointer"
4651
ctrl "sigs.k8s.io/controller-runtime"
4752
"sigs.k8s.io/controller-runtime/pkg/client"
4853

4954
"github.com/crossplane-contrib/provider-ansible/apis/v1alpha1"
5055
"github.com/crossplane-contrib/provider-ansible/internal/ansible"
5156
"github.com/crossplane-contrib/provider-ansible/pkg/galaxyutil"
5257
"github.com/crossplane-contrib/provider-ansible/pkg/runnerutil"
58+
"github.com/crossplane-contrib/provider-ansible/pkg/shardutil"
5359
)
5460

5561
const (
@@ -75,6 +81,13 @@ const (
7581
errUnmarshalTemplate = "cannot unmarshal template"
7682
)
7783

84+
const (
85+
leaseNameTemplate = "provider-ansible-lease-%d"
86+
leaseDurationSeconds = 30
87+
leaseRenewalInterval = 5 * time.Second
88+
leaseAcquireAttemptInterval = 5 * time.Second
89+
)
90+
7891
const (
7992
baseWorkingDir = "/ansibleDir"
8093
)
@@ -97,6 +110,9 @@ type SetupOptions struct {
97110
AnsibleRolesPath string
98111
Timeout time.Duration
99112
ArtifactsHistoryLimit int
113+
ReplicasCount int
114+
ProviderCtx context.Context
115+
ProviderCancel context.CancelFunc
100116
}
101117

102118
// Setup adds a controller that reconciles AnsibleRun managed resources.
@@ -128,6 +144,8 @@ func Setup(mgr ctrl.Manager, o controller.Options, s SetupOptions) error {
128144
ArtifactsHistoryLimit: s.ArtifactsHistoryLimit,
129145
}
130146
},
147+
replicaID: uuid.New().String(),
148+
logger: o.Logger,
131149
}
132150

133151
opts := []managed.ReconcilerOption{
@@ -149,20 +167,28 @@ func Setup(mgr ctrl.Manager, o controller.Options, s SetupOptions) error {
149167

150168
r := managed.NewReconciler(mgr, resource.ManagedKind(v1alpha1.AnsibleRunGroupVersionKind), opts...)
151169

170+
currentShard, err := c.acquireAndHoldShard(o, s)
171+
if err != nil {
172+
return fmt.Errorf("cannot acquire and hold shard: %w", err)
173+
}
174+
152175
return ctrl.NewControllerManagedBy(mgr).
153176
Named(name).
154177
WithOptions(o.ForControllerRuntime()).
155178
For(&v1alpha1.AnsibleRun{}).
179+
WithEventFilter(shardutil.IsResourceForShard(currentShard, s.ReplicasCount)).
156180
Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter))
157181
}
158182

159183
// A connector is expected to produce an ExternalClient when its Connect method
160184
// is called.
161185
type connector struct {
162-
kube client.Client
163-
usage resource.Tracker
164-
fs afero.Afero
165-
ansible func(dir string) params
186+
kube client.Client
187+
usage resource.Tracker
188+
fs afero.Afero
189+
ansible func(dir string) params
190+
replicaID string
191+
logger logging.Logger
166192
}
167193

168194
func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) { //nolint:gocyclo
@@ -527,3 +553,130 @@ func addBehaviorVars(pc *v1alpha1.ProviderConfig) map[string]string {
527553
}
528554
return behaviorVars
529555
}
556+
557+
func (c *connector) generateLeaseName(index int) string {
558+
return fmt.Sprintf(leaseNameTemplate, index)
559+
}
560+
561+
func (c *connector) releaseLease(ctx context.Context, kube client.Client, index int) error {
562+
leaseName := c.generateLeaseName(index)
563+
ns := "upbound-system"
564+
565+
lease := &coordinationv1.Lease{
566+
ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: leaseName},
567+
}
568+
569+
return kube.Delete(ctx, lease)
570+
}
571+
572+
// Attempts to acquire or renew a lease for the current replica ID
573+
// Returns an error when unable to obtain the lease
574+
func (c *connector) acquireLease(ctx context.Context, kube client.Client, index int) error {
575+
lease := &coordinationv1.Lease{}
576+
leaseName := c.generateLeaseName(index)
577+
leaseDurationSeconds := pointer.Int32(leaseDurationSeconds)
578+
579+
ns := "upbound-system"
580+
581+
if err := kube.Get(ctx, client.ObjectKey{Namespace: ns, Name: leaseName}, lease); err != nil {
582+
if !kerrors.IsNotFound(err) {
583+
return err
584+
}
585+
586+
// Create a new Lease
587+
lease = &coordinationv1.Lease{
588+
ObjectMeta: metav1.ObjectMeta{
589+
Name: leaseName,
590+
Namespace: ns,
591+
},
592+
Spec: coordinationv1.LeaseSpec{
593+
HolderIdentity: &c.replicaID,
594+
RenewTime: &metav1.MicroTime{Time: time.Now()},
595+
LeaseDurationSeconds: leaseDurationSeconds,
596+
},
597+
}
598+
if err := kube.Create(ctx, lease); err != nil {
599+
return err
600+
}
601+
c.logger.Debug("created lease", "lease", lease)
602+
return nil
603+
}
604+
605+
// Check if the lease is held by another replica and is not expired
606+
if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != c.replicaID {
607+
if lease.Spec.RenewTime != nil && time.Since(lease.Spec.RenewTime.Time) < time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second {
608+
// Lease is held by another replica and is not expired
609+
return fmt.Errorf("lease is still held by %s", *lease.Spec.HolderIdentity)
610+
}
611+
}
612+
613+
// Update the lease to acquire it
614+
lease.Spec.HolderIdentity = pointer.String(c.replicaID)
615+
lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
616+
lease.Spec.LeaseDurationSeconds = leaseDurationSeconds
617+
if err := kube.Update(ctx, lease); err != nil {
618+
if kerrors.IsConflict(err) {
619+
// Another replica updated the lease concurrently, retry
620+
return err
621+
}
622+
return fmt.Errorf("failed to update lease: %w", err)
623+
}
624+
625+
c.logger.Debug("updated lease", "lease", lease)
626+
return nil
627+
}
628+
629+
// Finds an available shard and acquires a lease for it. Will attempt to obtain one indefinitely.
630+
// This will also start a background go-routine to renew the lease continiously and release it when the process receives a shutdown signal
631+
func (c *connector) acquireAndHoldShard(o controller.Options, s SetupOptions) (int, error) {
632+
ctx := s.ProviderCtx
633+
acquiredLease := false
634+
currentShard := -1
635+
636+
cfg := ctrl.GetConfigOrDie()
637+
kube, err := client.New(cfg, client.Options{})
638+
if err != nil {
639+
return currentShard, err
640+
}
641+
642+
AcquireLease:
643+
for !acquiredLease {
644+
for i := 0; i < s.ReplicasCount; i++ {
645+
if err := c.acquireLease(ctx, kube, i); err == nil {
646+
currentShard = i
647+
o.Logger.Debug("acquired lease", "id", i)
648+
acquiredLease = true
649+
go func() {
650+
sigHandler := ctrl.SetupSignalHandler()
651+
652+
for {
653+
select {
654+
case <-time.After(leaseRenewalInterval):
655+
if err := c.acquireLease(ctx, kube, i); err != nil {
656+
o.Logger.Info("failed to renew lease", "id", i, "err", err)
657+
s.ProviderCancel()
658+
} else {
659+
o.Logger.Debug("renewed lease", "id", i)
660+
}
661+
case <-sigHandler.Done():
662+
o.Logger.Info("controller is shutting down, releasing lease")
663+
if err := c.releaseLease(ctx, kube, i); err != nil {
664+
o.Logger.Info("failed to release lease", "lease", err)
665+
}
666+
o.Logger.Debug("released lease")
667+
s.ProviderCancel()
668+
return
669+
}
670+
}
671+
}()
672+
// Lease is acquired and background goroutine started for renewal, we can safely break to return the current shard
673+
break AcquireLease
674+
} else {
675+
o.Logger.Debug("cannot acquire lease", "id", i, "err", err)
676+
time.Sleep(leaseAcquireAttemptInterval)
677+
}
678+
}
679+
}
680+
681+
return currentShard, nil
682+
}

pkg/shardutil/shardutil.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package shardutil
2+
3+
import (
4+
"hash/fnv"
5+
6+
"sigs.k8s.io/controller-runtime/pkg/client"
7+
event "sigs.k8s.io/controller-runtime/pkg/event"
8+
"sigs.k8s.io/controller-runtime/pkg/predicate"
9+
)
10+
11+
// Define a predicate function to filter resources based on consistent hashing
12+
func IsResourceForShard(targetShard, totalShards int) predicate.Predicate {
13+
return predicate.Funcs{
14+
CreateFunc: func(e event.CreateEvent) bool {
15+
return isResourceForShardHelper(e.Object, targetShard, totalShards)
16+
},
17+
UpdateFunc: func(e event.UpdateEvent) bool {
18+
return isResourceForShardHelper(e.ObjectNew, targetShard, totalShards)
19+
},
20+
DeleteFunc: func(e event.DeleteEvent) bool {
21+
return isResourceForShardHelper(e.Object, targetShard, totalShards)
22+
},
23+
GenericFunc: func(e event.GenericEvent) bool {
24+
return isResourceForShardHelper(e.Object, targetShard, totalShards)
25+
},
26+
}
27+
}
28+
29+
// Helper function to check if the resource belongs to the current shard
30+
func isResourceForShardHelper(obj client.Object, targetShard, totalShards int) bool {
31+
// Calculate a hash of the resource name
32+
hash := hashString(obj.GetName())
33+
// Perform modulo operation to determine the shard
34+
shard := hash % uint32(totalShards)
35+
// Check if the shard matches the target shard
36+
return int(shard) == targetShard
37+
}
38+
39+
// Helper function to hash a string using FNV-1a
40+
func hashString(s string) uint32 {
41+
h := fnv.New32a()
42+
h.Write([]byte(s))
43+
return h.Sum32()
44+
}

0 commit comments

Comments
 (0)