diff --git a/charts/fleet-crd/templates/crds.yaml b/charts/fleet-crd/templates/crds.yaml index 4280f0a2a2..59eba68f29 100644 --- a/charts/fleet-crd/templates/crds.yaml +++ b/charts/fleet-crd/templates/crds.yaml @@ -7799,6 +7799,19 @@ spec: description: WebhookCommit is the latest Git commit hash received from a webhook type: string + webhookCommitStaleSince: + description: 'WebhookCommitStaleSince records when the reconciler + first detected that + + the git API was still returning the previously-deployed commit + while a + + webhook had announced a newer one. Cleared once the API returns + the + + expected commit or when LatestCommit fails for a different reason.' + format: date-time + type: string type: object type: object served: true diff --git a/integrationtests/gitjob/controller/controller_test.go b/integrationtests/gitjob/controller/controller_test.go index 8964b90115..27ed95c69d 100644 --- a/integrationtests/gitjob/controller/controller_test.go +++ b/integrationtests/gitjob/controller/controller_test.go @@ -1966,4 +1966,144 @@ var _ = Describe("GitJob controller", func() { }) }) }) + + // These two sub-cases share the same initial setup (create GitRepo, wait for the first job + // to succeed and be deleted) and diverge only at the point where the webhook commit and + // API commit are set. The inner BeforeEach runs before the outer JustBeforeEach so that + // gitRepoName and expectedCommit are set before the shared setup executes. + When("a GitRepo with DisablePolling receives a new webhook after initial deployment", func() { + var ( + gitRepo v1alpha1.GitRepo + gitRepoName string + job batchv1.Job + ) + + const ( + webhookRaceCommit = "deadbeef1deadbeef1deadbeef1deadbeef1aaaa" + staleWebhookCommit = "cafebabe1cafebabe1cafebabe1cafebabe1bbbb" + apiAdvancedCommit = "deadf00d1deadf00d1deadf00d1deadf00d1cccc" + ) + + JustBeforeEach(func() { + gitRepo = createGitRepoWithDisablePolling(gitRepoName) + Expect(k8sClient.Create(ctx, &gitRepo)).To(Succeed()) + + // Wait for the initial job from stableCommit. + Eventually(func() error { + jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+stableCommit, 5)) + return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + }).Should(Succeed()) + + // Mark it succeeded so it gets cleaned up. + Eventually(func() error { + jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+stableCommit, 5)) + if err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job); err != nil { + return err + } + job.Status = succeeded(job.Status) + return k8sClient.Status().Update(ctx, &job) + }).Should(Succeed()) + + Eventually(func() bool { + jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+stableCommit, 5)) + err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + return errors.IsNotFound(err) + }).Should(BeTrue()) + }) + + AfterEach(func() { + waitDeleteGitrepo(gitRepo) + logsBuffer.Reset() + }) + + When("the git API has not yet propagated the push", func() { + BeforeEach(func() { + gitRepoName = "disable-polling-api-race" + expectedCommit = stableCommit + }) + + JustBeforeEach(func() { + // The fetcherMock still returns stableCommit, reproducing the race where the + // git server API has not yet propagated the push. + Expect(setGitRepoWebhookCommit(gitRepo, webhookRaceCommit)).To(Succeed()) + }) + + It("requeues until the API catches up, then creates a job", func() { + // The reconciler should requeue and keep Status.Commit at stableCommit while + // the API is still returning the old value. + Consistently(func(g Gomega) { + g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: gitRepoName, Namespace: gitRepoNamespace}, &gitRepo)).To(Succeed()) + g.Expect(gitRepo.Status.Commit).To(Equal(stableCommit)) + }, 2*time.Second, 500*time.Millisecond).Should(Succeed()) + + // Simulate the git API catching up. + expectedCommit = webhookRaceCommit + + // The reconciler picks up the correct commit and creates a job. + Eventually(func() error { + jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+webhookRaceCommit, 5)) + return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + }).Should(Succeed()) + }) + }) + + // Verify the fix does not loop when the API returns a commit newer than both the + // deployed commit and the (stale) WebhookCommit — i.e. out-of-order webhook arrival. + When("the git API has already advanced beyond the webhook commit", func() { + BeforeEach(func() { + gitRepoName = "disable-polling-no-loop" + expectedCommit = stableCommit + }) + + JustBeforeEach(func() { + Expect(setGitRepoWebhookCommit(gitRepo, staleWebhookCommit)).To(Succeed()) + expectedCommit = apiAdvancedCommit + }) + + It("creates a job for the API commit without looping", func() { + // commit != oldCommit, so the fix must not requeue — trust the API value. + Eventually(func() error { + jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+apiAdvancedCommit, 5)) + return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + }).Should(Succeed()) + }) + }) + + // Verify that a permanently broken git API (always returning the old commit) surfaces + // an error on the GitPolling condition instead of looping silently forever. + When("the git API permanently returns the previously-deployed commit", func() { + BeforeEach(func() { + gitRepoName = "disable-polling-permanent-stale" + expectedCommit = stableCommit // never updated — simulates a broken API + }) + + JustBeforeEach(func() { + Expect(setGitRepoWebhookCommit(gitRepo, webhookRaceCommit)).To(Succeed()) + }) + + It("sets the GitPolling condition to an error after the stale timeout", func() { + // The fix keeps Status.Commit at the previously-deployed value while waiting. + Consistently(func(g Gomega) { + g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: gitRepoName, Namespace: gitRepoNamespace}, &gitRepo)).To(Succeed()) + g.Expect(gitRepo.Status.Commit).To(Equal(stableCommit)) + }, 2*time.Second, 500*time.Millisecond).Should(Succeed()) + + // After the stale timeout (2s in tests) the GitPolling condition must turn + // False with reason StaleAPI so the user can see the API problem. + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: gitRepoName, Namespace: gitRepoNamespace}, &gitRepo)).To(Succeed()) + cond, found := getCondition(&gitRepo, "GitPolling") + g.Expect(found).To(BeTrue()) + g.Expect(string(cond.Status)).To(Equal(string(corev1.ConditionFalse))) + g.Expect(cond.Reason).To(Equal("StaleAPI")) + }).Should(Succeed()) + + // No deployment job should have been created for the webhook commit. + jobName := names.SafeConcatName(gitRepoName, names.Hex(repo+webhookRaceCommit, 5)) + Consistently(func() error { + return k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: gitRepoNamespace}, &job) + }, 2*time.Second, 500*time.Millisecond).ShouldNot(Succeed()) + }) + }) + }) }) diff --git a/integrationtests/gitjob/controller/suite_test.go b/integrationtests/gitjob/controller/suite_test.go index 06674e5d8c..112e02e218 100644 --- a/integrationtests/gitjob/controller/suite_test.go +++ b/integrationtests/gitjob/controller/suite_test.go @@ -170,6 +170,8 @@ var _ = BeforeSuite(func() { Workers: 50, SystemNamespace: "default", KnownHosts: ssh.KnownHosts{}, + // Short timeout so the "permanently stale API" integration test runs quickly. + WebhookCommitStaleTimeout: 2 * time.Second, }).SetupWithManager(mgr) Expect(err).ToNot(HaveOccurred()) diff --git a/internal/cmd/controller/gitops/operator.go b/internal/cmd/controller/gitops/operator.go index 691659519d..af0cf9e105 100644 --- a/internal/cmd/controller/gitops/operator.go +++ b/internal/cmd/controller/gitops/operator.go @@ -30,6 +30,7 @@ import ( "github.com/rancher/fleet/internal/metrics" "github.com/rancher/fleet/internal/ssh" fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + "github.com/rancher/fleet/pkg/durations" "github.com/rancher/fleet/pkg/git" "github.com/rancher/fleet/pkg/version" "github.com/rancher/fleet/pkg/webhook" @@ -177,19 +178,20 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error { } gitJobReconciler := &reconciler.GitJobReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Image: g.Image, - Scheduler: sched, - Workers: workers, - ShardID: g.ShardID, - JobNodeSelector: g.ShardNodeSelector, - GitFetcher: &git.Fetch{KnownHosts: kh}, - Clock: reconciler.RealClock{}, - Recorder: mgr.GetEventRecorder(fmt.Sprintf("fleet-gitops%s", shardIDSuffix)), - SystemNamespace: namespace, - KnownHosts: kh, - WithImagescan: imagescanEnabled, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Image: g.Image, + Scheduler: sched, + Workers: workers, + ShardID: g.ShardID, + JobNodeSelector: g.ShardNodeSelector, + GitFetcher: &git.Fetch{KnownHosts: kh}, + Clock: reconciler.RealClock{}, + Recorder: mgr.GetEventRecorder(fmt.Sprintf("fleet-gitops%s", shardIDSuffix)), + SystemNamespace: namespace, + KnownHosts: kh, + WithImagescan: imagescanEnabled, + WebhookCommitStaleTimeout: durations.GitPollingStaleTimeout, } statusReconciler := &reconciler.StatusReconciler{ diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go index 942eb3eb80..c26b0ea0bc 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go @@ -144,6 +144,11 @@ type GitJobReconciler struct { SystemNamespace string KnownHosts KnownHostsGetter WithImagescan bool + // WebhookCommitStaleTimeout controls after how long a persistent mismatch + // between the git API commit and the webhook commit is surfaced as an error + // on the GitPolling condition. Defaults to durations.GitPollingStaleTimeout + // when zero. + WebhookCommitStaleTimeout time.Duration } func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error { @@ -348,15 +353,52 @@ func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, // If the client secret has changed, we now retrieve the latest commit. // If the secret is still incorrect, we will not need to create // the gitJob (which is more expensive) and we will return an error earlier. + + // If polling is enabled or there is no pending webhook commit, any previous + // webhook-vs-API stale tracker is no longer meaningful and should be cleared. + if !gitrepo.Spec.DisablePolling || gitrepo.Status.WebhookCommit == "" { + gitrepo.Status.WebhookCommitStaleSince = nil + } + if gitrepo.Spec.DisablePolling || clientSecretChanged { commit, err := monitorLatestCommit(gitrepo, func() (string, error) { return r.GitFetcher.LatestCommit(ctx, gitrepo, r.Client) }) condition.Cond(gitPollingCondition).SetError(&gitrepo.Status, "", err) if err == nil && commit != "" { + // When polling is disabled and a webhook commit is pending, the + // git server API may not have propagated the new commit yet (race + // condition between webhook delivery and API consistency). Restore + // the pre-reconcile commit so the next reconcile (via RequeueAfter) + // can still detect the commit change and create a job once the API + // returns the expected commit. + if gitrepo.Spec.DisablePolling && gitrepo.Status.WebhookCommit != "" && commit == oldCommit && commit != gitrepo.Status.WebhookCommit { + staleTimeout := r.WebhookCommitStaleTimeout + if staleTimeout == 0 { + staleTimeout = durations.GitPollingStaleTimeout + } + if gitrepo.Status.WebhookCommitStaleSince == nil { + now := metav1.Now() + gitrepo.Status.WebhookCommitStaleSince = &now + } + if time.Since(gitrepo.Status.WebhookCommitStaleSince.Time) >= staleTimeout { + condition.Cond(gitPollingCondition).SetError(&gitrepo.Status, "StaleAPI", + fmt.Errorf("git server API still returns the previously-deployed commit after %s: "+ + "expected %s, got %s", staleTimeout, gitrepo.Status.WebhookCommit, commit)) + } else { + logger.V(1).Info("Git server commit does not match webhook commit, requeuing", + "apiCommit", commit, "webhookCommit", gitrepo.Status.WebhookCommit) + } + gitrepo.Status.Commit = oldCommit + return ctrl.Result{RequeueAfter: durations.DefaultRequeueAfter}, nil + } + // API returned the expected commit or a newer one — clear any stale tracker. + gitrepo.Status.WebhookCommitStaleSince = nil gitrepo.Status.Commit = commit } if err != nil { + // Clear any stale tracker: this is a different failure mode. + gitrepo.Status.WebhookCommitStaleSince = nil r.Recorder.Eventf( gitrepo, nil, @@ -1137,6 +1179,7 @@ func updateStatus(ctx context.Context, c client.Client, req types.NamespacedName t.Status.LastPollingTime = status.LastPollingTime t.Status.ObservedGeneration = status.ObservedGeneration t.Status.UpdateGeneration = status.UpdateGeneration + t.Status.WebhookCommitStaleSince = status.WebhookCommitStaleSince // only keep the Ready condition from live status, it's calculated by the status reconciler conds := []genericcondition.GenericCondition{} diff --git a/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go b/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go index 7cb8d8cfab..b03748c088 100644 --- a/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go +++ b/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go @@ -196,6 +196,12 @@ type GitRepoStatus struct { LastSyncedImageScanTime metav1.Time `json:"lastSyncedImageScanTime,omitempty"` // LastPollingTime is the last time the polling check was triggered LastPollingTime metav1.Time `json:"lastPollingTriggered,omitempty"` + // WebhookCommitStaleSince records when the reconciler first detected that + // the git API was still returning the previously-deployed commit while a + // webhook had announced a newer one. Cleared once the API returns the + // expected commit or when LatestCommit fails for a different reason. + // +optional + WebhookCommitStaleSince *metav1.Time `json:"webhookCommitStaleSince,omitempty"` } type GitRepoDisplay struct { diff --git a/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go index 33c0b535c4..e9218094d9 100644 --- a/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go @@ -1689,6 +1689,10 @@ func (in *GitRepoStatus) DeepCopyInto(out *GitRepoStatus) { in.StatusBase.DeepCopyInto(&out.StatusBase) in.LastSyncedImageScanTime.DeepCopyInto(&out.LastSyncedImageScanTime) in.LastPollingTime.DeepCopyInto(&out.LastPollingTime) + if in.WebhookCommitStaleSince != nil { + in, out := &in.WebhookCommitStaleSince, &out.WebhookCommitStaleSince + *out = (*in).DeepCopy() + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GitRepoStatus. diff --git a/pkg/durations/durations.go b/pkg/durations/durations.go index 5178133769..3cf9b16378 100644 --- a/pkg/durations/durations.go +++ b/pkg/durations/durations.go @@ -38,6 +38,10 @@ const ( // the helmop status first, before the status controller looks at // bundledeployments. HelmOpStatusDelay = time.Second * 5 + // GitPollingStaleTimeout is how long the reconciler waits for the git API + // to return the webhook-announced commit before surfacing an error on the + // GitPolling condition. + GitPollingStaleTimeout = time.Minute * 5 // WaitForDependenciesReadyRequeueInterval is the wait time after the Fleet agent finds a BundleDeployment has non-ready dependencies WaitForDependenciesReadyRequeueInterval = time.Second * 15 )