Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions charts/fleet-crd/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7799,6 +7799,19 @@ spec:
description: WebhookCommit is the latest Git commit hash received
from a webhook
type: string
webhookCommitStaleSince:
description: 'WebhookCommitStaleSince records when the reconciler
first detected that

the git API was still returning the previously-deployed commit
while a

webhook had announced a newer one. Cleared once the API returns
the

expected commit or when LatestCommit fails for a different reason.'
format: date-time
type: string
type: object
type: object
served: true
Expand Down
140 changes: 140 additions & 0 deletions integrationtests/gitjob/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1966,4 +1966,144 @@ var _ = Describe("GitJob controller", func() {
})
})
})

// These two sub-cases share the same initial setup (create GitRepo, wait for the first job
// to succeed and be deleted) and diverge only at the point where the webhook commit and
// API commit are set. The inner BeforeEach runs before the outer JustBeforeEach so that
// gitRepoName and expectedCommit are set before the shared setup executes.
When("a GitRepo with DisablePolling receives a new webhook after initial deployment", func() {
var (
gitRepo v1alpha1.GitRepo
gitRepoName string
job batchv1.Job
)

const (
webhookRaceCommit = "deadbeef1deadbeef1deadbeef1deadbeef1aaaa"
staleWebhookCommit = "cafebabe1cafebabe1cafebabe1cafebabe1bbbb"
apiAdvancedCommit = "deadf00d1deadf00d1deadf00d1deadf00d1cccc"
)

JustBeforeEach(func() {
gitRepo = createGitRepoWithDisablePolling(gitRepoName)
Expect(k8sClient.Create(ctx, &gitRepo)).To(Succeed())

// Wait for the initial job from stableCommit.
Eventually(func() error {
jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+stableCommit, 5))
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
}).Should(Succeed())

// Mark it succeeded so it gets cleaned up.
Eventually(func() error {
jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+stableCommit, 5))
if err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job); err != nil {
return err
}
job.Status = succeeded(job.Status)
return k8sClient.Status().Update(ctx, &job)
}).Should(Succeed())

Eventually(func() bool {
jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+stableCommit, 5))
err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
return errors.IsNotFound(err)
}).Should(BeTrue())
})

AfterEach(func() {
waitDeleteGitrepo(gitRepo)
logsBuffer.Reset()
})

When("the git API has not yet propagated the push", func() {
BeforeEach(func() {
gitRepoName = "disable-polling-api-race"
expectedCommit = stableCommit
})

JustBeforeEach(func() {
// The fetcherMock still returns stableCommit, reproducing the race where the
// git server API has not yet propagated the push.
Expect(setGitRepoWebhookCommit(gitRepo, webhookRaceCommit)).To(Succeed())
})

It("requeues until the API catches up, then creates a job", func() {
// The reconciler should requeue and keep Status.Commit at stableCommit while
// the API is still returning the old value.
Consistently(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: gitRepoName, Namespace: gitRepoNamespace}, &gitRepo)).To(Succeed())
g.Expect(gitRepo.Status.Commit).To(Equal(stableCommit))
}, 2*time.Second, 500*time.Millisecond).Should(Succeed())

// Simulate the git API catching up.
expectedCommit = webhookRaceCommit

// The reconciler picks up the correct commit and creates a job.
Eventually(func() error {
jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+webhookRaceCommit, 5))
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
}).Should(Succeed())
})
})

// Verify the fix does not loop when the API returns a commit newer than both the
// deployed commit and the (stale) WebhookCommit — i.e. out-of-order webhook arrival.
When("the git API has already advanced beyond the webhook commit", func() {
BeforeEach(func() {
gitRepoName = "disable-polling-no-loop"
expectedCommit = stableCommit
})

JustBeforeEach(func() {
Expect(setGitRepoWebhookCommit(gitRepo, staleWebhookCommit)).To(Succeed())
expectedCommit = apiAdvancedCommit
})

It("creates a job for the API commit without looping", func() {
// commit != oldCommit, so the fix must not requeue — trust the API value.
Eventually(func() error {
jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+apiAdvancedCommit, 5))
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
}).Should(Succeed())
})
})

// Verify that a permanently broken git API (always returning the old commit) surfaces
// an error on the GitPolling condition instead of looping silently forever.
When("the git API permanently returns the previously-deployed commit", func() {
BeforeEach(func() {
gitRepoName = "disable-polling-permanent-stale"
expectedCommit = stableCommit // never updated — simulates a broken API
})

JustBeforeEach(func() {
Expect(setGitRepoWebhookCommit(gitRepo, webhookRaceCommit)).To(Succeed())
})

It("sets the GitPolling condition to an error after the stale timeout", func() {
// The fix keeps Status.Commit at the previously-deployed value while waiting.
Consistently(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: gitRepoName, Namespace: gitRepoNamespace}, &gitRepo)).To(Succeed())
g.Expect(gitRepo.Status.Commit).To(Equal(stableCommit))
}, 2*time.Second, 500*time.Millisecond).Should(Succeed())

// After the stale timeout (2s in tests) the GitPolling condition must turn
// False with reason StaleAPI so the user can see the API problem.
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: gitRepoName, Namespace: gitRepoNamespace}, &gitRepo)).To(Succeed())
cond, found := getCondition(&gitRepo, "GitPolling")
g.Expect(found).To(BeTrue())
g.Expect(string(cond.Status)).To(Equal(string(corev1.ConditionFalse)))
g.Expect(cond.Reason).To(Equal("StaleAPI"))
}).Should(Succeed())

// No deployment job should have been created for the webhook commit.
jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+webhookRaceCommit, 5))
Consistently(func() error {
return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job)
}, 2*time.Second, 500*time.Millisecond).ShouldNot(Succeed())
})
})
})
})
2 changes: 2 additions & 0 deletions integrationtests/gitjob/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ var _ = BeforeSuite(func() {
Workers: 50,
SystemNamespace: "default",
KnownHosts: ssh.KnownHosts{},
// Short timeout so the "permanently stale API" integration test runs quickly.
WebhookCommitStaleTimeout: 2 * time.Second,
}).SetupWithManager(mgr)
Expect(err).ToNot(HaveOccurred())

Expand Down
28 changes: 15 additions & 13 deletions internal/cmd/controller/gitops/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/rancher/fleet/internal/metrics"
"github.com/rancher/fleet/internal/ssh"
fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1"
"github.com/rancher/fleet/pkg/durations"
"github.com/rancher/fleet/pkg/git"
"github.com/rancher/fleet/pkg/version"
"github.com/rancher/fleet/pkg/webhook"
Expand Down Expand Up @@ -177,19 +178,20 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error {
}

gitJobReconciler := &reconciler.GitJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Image: g.Image,
Scheduler: sched,
Workers: workers,
ShardID: g.ShardID,
JobNodeSelector: g.ShardNodeSelector,
GitFetcher: &git.Fetch{KnownHosts: kh},
Clock: reconciler.RealClock{},
Recorder: mgr.GetEventRecorder(fmt.Sprintf("fleet-gitops%s", shardIDSuffix)),
SystemNamespace: namespace,
KnownHosts: kh,
WithImagescan: imagescanEnabled,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Image: g.Image,
Scheduler: sched,
Workers: workers,
ShardID: g.ShardID,
JobNodeSelector: g.ShardNodeSelector,
GitFetcher: &git.Fetch{KnownHosts: kh},
Clock: reconciler.RealClock{},
Recorder: mgr.GetEventRecorder(fmt.Sprintf("fleet-gitops%s", shardIDSuffix)),
SystemNamespace: namespace,
KnownHosts: kh,
WithImagescan: imagescanEnabled,
WebhookCommitStaleTimeout: durations.GitPollingStaleTimeout,
}

statusReconciler := &reconciler.StatusReconciler{
Expand Down
36 changes: 36 additions & 0 deletions internal/cmd/controller/gitops/reconciler/gitjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ type GitJobReconciler struct {
SystemNamespace string
KnownHosts KnownHostsGetter
WithImagescan bool
// WebhookCommitStaleTimeout controls after how long a persistent mismatch
// between the git API commit and the webhook commit is surfaced as an error
// on the GitPolling condition. Defaults to durations.GitPollingStaleTimeout
// when zero.
WebhookCommitStaleTimeout time.Duration
}

func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
Expand Down Expand Up @@ -354,9 +359,39 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger,
})
condition.Cond(gitPollingCondition).SetError(&gitrepo.Status, "", err)
if err == nil && commit != "" {
// When polling is disabled and a webhook commit is pending, the
// git server API may not have propagated the new commit yet (race
// condition between webhook delivery and API consistency). Restore
// the pre-reconcile commit so the next reconcile (via RequeueAfter)
// can still detect the commit change and create a job once the API
// returns the expected commit.
if gitrepo.Spec.DisablePolling && gitrepo.Status.WebhookCommit != "" && commit == oldCommit && commit != gitrepo.Status.WebhookCommit {
staleTimeout := r.WebhookCommitStaleTimeout
if staleTimeout == 0 {
staleTimeout = durations.GitPollingStaleTimeout
}
if gitrepo.Status.WebhookCommitStaleSince == nil {
now := metav1.Now()
gitrepo.Status.WebhookCommitStaleSince = &now
}
if time.Since(gitrepo.Status.WebhookCommitStaleSince.Time) >= staleTimeout {
condition.Cond(gitPollingCondition).SetError(&gitrepo.Status, "StaleAPI",
fmt.Errorf("git server API still returns the previously-deployed commit after %s: "+
"expected %s, got %s", staleTimeout, gitrepo.Status.WebhookCommit, commit))
} else {
logger.V(1).Info("Git server commit does not match webhook commit, requeuing",
"apiCommit", commit, "webhookCommit", gitrepo.Status.WebhookCommit)
}
gitrepo.Status.Commit = oldCommit
return ctrl.Result{RequeueAfter: durations.DefaultRequeueAfter}, nil
}
// API returned the expected commit or a newer one — clear any stale tracker.
gitrepo.Status.WebhookCommitStaleSince = nil
gitrepo.Status.Commit = commit
Comment thread
thardeck marked this conversation as resolved.
}
if err != nil {
// Clear any stale tracker: this is a different failure mode.
gitrepo.Status.WebhookCommitStaleSince = nil
r.Recorder.Eventf(
gitrepo,
nil,
Expand Down Expand Up @@ -1137,6 +1172,7 @@ func updateStatus(ctx context.Context, c client.Client, req types.NamespacedName
t.Status.LastPollingTime = status.LastPollingTime
t.Status.ObservedGeneration = status.ObservedGeneration
t.Status.UpdateGeneration = status.UpdateGeneration
t.Status.WebhookCommitStaleSince = status.WebhookCommitStaleSince

// only keep the Ready condition from live status, it's calculated by the status reconciler
conds := []genericcondition.GenericCondition{}
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ type GitRepoStatus struct {
LastSyncedImageScanTime metav1.Time `json:"lastSyncedImageScanTime,omitempty"`
// LastPollingTime is the last time the polling check was triggered
LastPollingTime metav1.Time `json:"lastPollingTriggered,omitempty"`
// WebhookCommitStaleSince records when the reconciler first detected that
// the git API was still returning the previously-deployed commit while a
// webhook had announced a newer one. Cleared once the API returns the
// expected commit or when LatestCommit fails for a different reason.
// +optional
WebhookCommitStaleSince *metav1.Time `json:"webhookCommitStaleSince,omitempty"`
}

type GitRepoDisplay struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/durations/durations.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ const (
// the helmop status first, before the status controller looks at
// bundledeployments.
HelmOpStatusDelay = time.Second * 5
// GitPollingStaleTimeout is how long the reconciler waits for the git API
// to return the webhook-announced commit before surfacing an error on the
// GitPolling condition.
GitPollingStaleTimeout = time.Minute * 5
// WaitForDependenciesReadyRequeueInterval is the wait time after the Fleet agent finds a BundleDeployment has non-ready dependencies
WaitForDependenciesReadyRequeueInterval = time.Second * 15
)
Expand Down
Loading