Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"os"
"path/filepath"

Expand Down Expand Up @@ -50,6 +51,7 @@ func main() {
maxReconcileRate = app.Flag("max-reconcile-rate", "The maximum number of concurrent reconciliation operations.").Default("1").Int()
artifactsHistoryLimit = app.Flag("artifacts-history-limit", "Each attempt to run the playbook/role generates a set of artifacts on disk. This settings limits how many of these to keep.").Default("10").Int()
pollStateMetricInterval = app.Flag("poll-state-metric", "State metric recording interval").Default("5s").Duration()
replicasCount = app.Flag("replicas", "Amount of replicas configured for the provider. When using more than 1 replica, reconciles will be sharded across them based on a modular hash.").Default("1").Int()
)
kingpin.MustParse(app.Parse(os.Args[1:]))

Expand Down Expand Up @@ -95,12 +97,16 @@ func main() {
},
}

providerCtx, cancel := context.WithCancel(context.Background())
ansibleOpts := ansiblerun.SetupOptions{
AnsibleCollectionsPath: *ansibleCollectionsPath,
AnsibleRolesPath: *ansibleRolesPath,
Timeout: *timeout,
ArtifactsHistoryLimit: *artifactsHistoryLimit,
ReplicasCount: *replicasCount,
ProviderCtx: providerCtx,
ProviderCancel: cancel,
}
kingpin.FatalIfError(ansible.Setup(mgr, o, ansibleOpts), "Cannot setup Ansible controllers")
kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager")
kingpin.FatalIfError(mgr.Start(providerCtx), "Cannot start controller manager")
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
gotest.tools/v3 v3.5.1
k8s.io/api v0.29.4
k8s.io/apimachinery v0.29.4
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
sigs.k8s.io/controller-runtime v0.17.3
sigs.k8s.io/controller-tools v0.14.0
)
Expand Down Expand Up @@ -85,7 +86,6 @@ require (
k8s.io/component-base v0.29.2 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
Expand Down
159 changes: 155 additions & 4 deletions internal/controller/ansibleRun/ansibleRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,30 @@ import (
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/controller"
"github.com/crossplane/crossplane-runtime/pkg/event"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/ratelimiter"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
"github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/crossplane/crossplane-runtime/pkg/statemetrics"
"github.com/google/uuid"
"github.com/spf13/afero"
"gopkg.in/yaml.v2"
coordinationv1 "k8s.io/api/coordination/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/crossplane-contrib/provider-ansible/apis/v1alpha1"
"github.com/crossplane-contrib/provider-ansible/internal/ansible"
"github.com/crossplane-contrib/provider-ansible/pkg/galaxyutil"
"github.com/crossplane-contrib/provider-ansible/pkg/runnerutil"
"github.com/crossplane-contrib/provider-ansible/pkg/shardutil"
)

const (
Expand All @@ -75,6 +81,13 @@ const (
errUnmarshalTemplate = "cannot unmarshal template"
)

const (
leaseNameTemplate = "provider-ansible-lease-%d"
leaseDurationSeconds = 30
leaseRenewalInterval = 5 * time.Second
leaseAcquireAttemptInterval = 5 * time.Second
)

const (
baseWorkingDir = "/ansibleDir"
)
Expand All @@ -97,6 +110,9 @@ type SetupOptions struct {
AnsibleRolesPath string
Timeout time.Duration
ArtifactsHistoryLimit int
ReplicasCount int
ProviderCtx context.Context
ProviderCancel context.CancelFunc
}

// Setup adds a controller that reconciles AnsibleRun managed resources.
Expand Down Expand Up @@ -128,6 +144,8 @@ func Setup(mgr ctrl.Manager, o controller.Options, s SetupOptions) error {
ArtifactsHistoryLimit: s.ArtifactsHistoryLimit,
}
},
replicaID: uuid.New().String(),
logger: o.Logger,
}

opts := []managed.ReconcilerOption{
Expand All @@ -149,20 +167,28 @@ func Setup(mgr ctrl.Manager, o controller.Options, s SetupOptions) error {

r := managed.NewReconciler(mgr, resource.ManagedKind(v1alpha1.AnsibleRunGroupVersionKind), opts...)

currentShard, err := c.acquireAndHoldShard(o, s)
if err != nil {
return fmt.Errorf("cannot acquire and hold shard: %w", err)
}

return ctrl.NewControllerManagedBy(mgr).
Named(name).
WithOptions(o.ForControllerRuntime()).
For(&v1alpha1.AnsibleRun{}).
WithEventFilter(shardutil.IsResourceForShard(currentShard, s.ReplicasCount)).
Complete(ratelimiter.NewReconciler(name, r, o.GlobalRateLimiter))
}

// A connector is expected to produce an ExternalClient when its Connect method
// is called.
type connector struct {
kube client.Client
usage resource.Tracker
fs afero.Afero
ansible func(dir string) params
kube client.Client
usage resource.Tracker
fs afero.Afero
ansible func(dir string) params
replicaID string
logger logging.Logger
}

func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) { //nolint:gocyclo
Expand Down Expand Up @@ -527,3 +553,128 @@ func addBehaviorVars(pc *v1alpha1.ProviderConfig) map[string]string {
}
return behaviorVars
}

func (c *connector) generateLeaseName(index int) string {
return fmt.Sprintf(leaseNameTemplate, index)
}

func (c *connector) releaseLease(ctx context.Context, kube client.Client, index int) error {
leaseName := c.generateLeaseName(index)
ns := "upbound-system"

lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: leaseName},
}

return kube.Delete(ctx, lease)
}

// Attempts to acquire or renew a lease for the current replica ID
// Returns an error when unable to obtain the lease
func (c *connector) acquireLease(ctx context.Context, kube client.Client, index int) error {
lease := &coordinationv1.Lease{}
leaseName := c.generateLeaseName(index)
leaseDurationSeconds := ptr.To(int32(leaseDurationSeconds))

ns := "upbound-system"

if err := kube.Get(ctx, client.ObjectKey{Namespace: ns, Name: leaseName}, lease); err != nil {
if !kerrors.IsNotFound(err) {
return err
}

// Create a new Lease
lease = &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: leaseName,
Namespace: ns,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: &c.replicaID,
RenewTime: &metav1.MicroTime{Time: time.Now()},
LeaseDurationSeconds: leaseDurationSeconds,
},
}
if err := kube.Create(ctx, lease); err != nil {
return err
}
c.logger.Debug("created lease", "lease", lease)
return nil
}

// Check if the lease is held by another replica and is not expired
if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != c.replicaID {
if lease.Spec.RenewTime != nil && time.Since(lease.Spec.RenewTime.Time) < time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second {
// Lease is held by another replica and is not expired
return fmt.Errorf("lease is still held by %s", *lease.Spec.HolderIdentity)
}
}

// Update the lease to acquire it
lease.Spec.HolderIdentity = ptr.To(c.replicaID)
lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
lease.Spec.LeaseDurationSeconds = leaseDurationSeconds
if err := kube.Update(ctx, lease); err != nil {
if kerrors.IsConflict(err) {
// Another replica updated the lease concurrently, retry
return err
}
return fmt.Errorf("failed to update lease: %w", err)
}

c.logger.Debug("updated lease", "lease", lease)
return nil
}

// Finds an available shard and acquires a lease for it. Will attempt to obtain one indefinitely.
// This will also start a background go-routine to renew the lease continuously and release it when the process receives a shutdown signal
func (c *connector) acquireAndHoldShard(o controller.Options, s SetupOptions) (int, error) {
ctx := s.ProviderCtx
currentShard := -1

cfg := ctrl.GetConfigOrDie()
kube, err := client.New(cfg, client.Options{})
Comment thread
fahedouch marked this conversation as resolved.
if err != nil {
return currentShard, err
}

AcquireLease:
for {
for i := 0; i < s.ReplicasCount; i++ {
if err := c.acquireLease(ctx, kube, i); err == nil {
Comment thread
fahedouch marked this conversation as resolved.
currentShard = i
o.Logger.Debug("acquired lease", "id", i)
go func() {
sigHandler := ctrl.SetupSignalHandler()

for {
select {
case <-time.After(leaseRenewalInterval):
if err := c.acquireLease(ctx, kube, i); err != nil {
o.Logger.Info("failed to renew lease", "id", i, "err", err)
Comment thread
julsemaan marked this conversation as resolved.
s.ProviderCancel()
} else {
o.Logger.Debug("renewed lease", "id", i)
}
case <-sigHandler.Done():
o.Logger.Info("controller is shutting down, releasing lease")
if err := c.releaseLease(ctx, kube, i); err != nil {
o.Logger.Info("failed to release lease", "lease", err)
Comment thread
julsemaan marked this conversation as resolved.
}
o.Logger.Debug("released lease")
s.ProviderCancel()
return
}
}
}()
// Lease is acquired and background goroutine started for renewal, we can safely break to return the current shard
break AcquireLease
} else {
o.Logger.Debug("cannot acquire lease", "id", i, "err", err)
Comment thread
fahedouch marked this conversation as resolved.
time.Sleep(leaseAcquireAttemptInterval)
}
}
}

return currentShard, nil
}
44 changes: 44 additions & 0 deletions pkg/shardutil/shardutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package shardutil

import (
"hash/fnv"

"sigs.k8s.io/controller-runtime/pkg/client"
event "sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// Define a predicate function to filter resources based on consistent hashing
func IsResourceForShard(targetShard, totalShards int) predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return isResourceForShardHelper(e.Object, targetShard, totalShards)
},
UpdateFunc: func(e event.UpdateEvent) bool {
return isResourceForShardHelper(e.ObjectNew, targetShard, totalShards)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return isResourceForShardHelper(e.Object, targetShard, totalShards)
},
GenericFunc: func(e event.GenericEvent) bool {
return isResourceForShardHelper(e.Object, targetShard, totalShards)
},
}
}

// Helper function to check if the resource belongs to the current shard
func isResourceForShardHelper(obj client.Object, targetShard, totalShards int) bool {
// Calculate a hash of the resource name
hash := hashString(obj.GetName())
// Perform modulo operation to determine the shard
shard := hash % uint32(totalShards)
// Check if the shard matches the target shard
return int(shard) == targetShard
}

// Helper function to hash a string using FNV-1a
func hashString(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}
Loading