Skip to content

Commit 3a28ceb

Browse files
committed
fix(stages): requeue Stages when Freight is soaking in upstream
Auto-promotion silently stalled whenever a downstream Stage's source specified requiredSoakTime: - Regular Stages requeued on a fixed 5 minute cadence, so short soak windows (or soak deadlines that did not land on a 5-minute boundary) could be missed until a later tick happened to fall past the deadline. - Control-flow Stages returned ctrl.Result{} with no RequeueAfter at all, relying entirely on watch events. No watch fires when 'soak time has elapsed', so a control-flow Stage with requiredSoakTime set never woke up on its own to re-evaluate the soak and verify its Freight, blocking every downstream Stage indefinitely. Add a calculateNextSoakCheck helper that lists candidate Freight per upstream source (with the soak filter disabled), computes the remaining soak per upstream Stage according to the configured AvailabilityStrategy, and returns the soonest deadline plus a 1 second buffer. Wire the helper into both reconcilers: - The regular reconciler now requeues at min(5m, soakDeadline). - The control-flow reconciler requeues at the soak deadline when one exists, and otherwise keeps the previous watch-driven behavior. Refs: #4586 Signed-off-by: Justin Chen <mail@justin0u0.com>
1 parent 317a455 commit 3a28ceb

4 files changed

Lines changed: 488 additions & 1 deletion

File tree

pkg/controller/stages/control_flow_stages.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,19 @@ func (r *ControlFlowStageReconciler) Reconcile(ctx context.Context, req ctrl.Req
259259
}
260260
return ctrl.Result{}, fmt.Errorf("failed to update Stage status: %w", err)
261261
}
262+
263+
// Control-flow Stages are normally driven entirely by watches, but when a
264+
// source has a RequiredSoakTime, no watch fires once the soak deadline
265+
// elapses. In that case we must explicitly schedule a requeue so the soak
266+
// can be re-evaluated and the Freight verified for downstream consumers.
267+
// See: https://github.com/akuity/kargo/issues/4586
268+
if reconcileErr == nil {
269+
if soakRequeue, soakErr := calculateNextSoakCheck(ctx, r.client, stage); soakErr != nil {
270+
logger.Error(soakErr, "failed to calculate next soak check interval")
271+
} else if soakRequeue > 0 {
272+
return ctrl.Result{RequeueAfter: soakRequeue}, nil
273+
}
274+
}
262275
return ctrl.Result{}, reconcileErr
263276
}
264277

pkg/controller/stages/regular_stages.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,17 @@ func (r *RegularStageReconciler) Reconcile(ctx context.Context, req ctrl.Request
394394
}
395395
// Otherwise, requeue after a delay.
396396
// TODO: Make the requeue delay configurable.
397-
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
397+
requeueAfter := 5 * time.Minute
398+
399+
// If any candidate Freight is still soaking in an upstream Stage, requeue
400+
// sooner so auto-promotion can fire as soon as the soak time elapses
401+
// rather than waiting for the next default-interval tick.
402+
if soakRequeue, soakErr := calculateNextSoakCheck(ctx, r.client, stage); soakErr != nil {
403+
logger.Error(soakErr, "failed to calculate next soak check interval")
404+
} else if soakRequeue > 0 && soakRequeue < requeueAfter {
405+
requeueAfter = soakRequeue
406+
}
407+
return ctrl.Result{RequeueAfter: requeueAfter}, nil
398408
}
399409

400410
func (r *RegularStageReconciler) reconcile(

pkg/controller/stages/soak.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package stages
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"sigs.k8s.io/controller-runtime/pkg/client"
9+
10+
kargoapi "github.com/akuity/kargo/api/v1alpha1"
11+
"github.com/akuity/kargo/pkg/api"
12+
)
13+
14+
// soakRequeueBuffer adds a small delay to avoid requeuing immediately after
15+
// the soak time elapses to avoid getting no available Freight.
16+
const soakRequeueBuffer = time.Second
17+
18+
// calculateNextSoakCheck computes how long the reconciler should wait before
19+
// re-evaluating the given Stage for auto-promotion when one or more pieces of
20+
// candidate Freight are still soaking in upstream Stages.
21+
//
22+
// It returns 0 when no soak deadline is pending.
23+
func calculateNextSoakCheck(
24+
ctx context.Context,
25+
c client.Client,
26+
stage *kargoapi.Stage,
27+
) (time.Duration, error) {
28+
var soonest time.Duration
29+
30+
for _, req := range stage.Spec.RequestedFreight {
31+
if req.Sources.RequiredSoakTime == nil ||
32+
req.Sources.RequiredSoakTime.Duration <= 0 ||
33+
len(req.Sources.Stages) == 0 {
34+
continue
35+
}
36+
37+
warehouse, err := api.GetWarehouse(ctx, c, client.ObjectKey{
38+
Namespace: stage.Namespace,
39+
Name: req.Origin.Name,
40+
})
41+
if err != nil {
42+
return 0, fmt.Errorf("get Warehouse %q: %w", req.Origin.Name, err)
43+
}
44+
if warehouse == nil {
45+
continue
46+
}
47+
48+
// List candidates with the soak filter disabled so we can see Freight
49+
// that is verified in upstream Stages but still soaking.
50+
candidates, err := api.ListFreightFromWarehouse(
51+
ctx,
52+
c,
53+
warehouse,
54+
&api.ListWarehouseFreightOptions{
55+
ApprovedFor: stage.Name,
56+
VerifiedIn: req.Sources.Stages,
57+
AvailabilityStrategy: req.Sources.AvailabilityStrategy,
58+
},
59+
)
60+
if err != nil {
61+
return 0, fmt.Errorf("list freight from Warehouse %q: %w", warehouse.Name, err)
62+
}
63+
64+
for i := range candidates {
65+
remaining := remainingSoakForFreight(
66+
&candidates[i],
67+
req.Sources.Stages,
68+
req.Sources.RequiredSoakTime.Duration,
69+
req.Sources.AvailabilityStrategy,
70+
)
71+
if remaining <= 0 {
72+
continue
73+
}
74+
if soonest == 0 || remaining < soonest {
75+
soonest = remaining
76+
}
77+
}
78+
}
79+
80+
if soonest == 0 {
81+
return 0, nil
82+
}
83+
return soonest + soakRequeueBuffer, nil
84+
}
85+
86+
// remainingSoakForFreight returns how much longer the given Freight must soak
87+
// before it satisfies the requirement of being soaked in the configured
88+
// upstream Stages according to the given AvailabilityStrategy. A return value
89+
// of 0 means the Freight either already satisfies the requirement or is not
90+
// eligible to satisfy it (e.g. with the All strategy when the Freight has not
91+
// been verified in every upstream Stage).
92+
func remainingSoakForFreight(
93+
freight *kargoapi.Freight,
94+
upstreamStages []string,
95+
required time.Duration,
96+
strategy kargoapi.FreightAvailabilityStrategy,
97+
) time.Duration {
98+
switch strategy {
99+
case kargoapi.FreightAvailabilityStrategyAll:
100+
// Freight must satisfy the soak in EVERY upstream Stage. The remaining
101+
// wait is the longest remaining wait across those Stages. If any
102+
// upstream Stage has not verified the Freight, the Freight will never
103+
// satisfy this requirement on its own; return 0 to indicate "no
104+
// deadline".
105+
var longest time.Duration
106+
for _, stage := range upstreamStages {
107+
if !freight.IsVerifiedIn(stage) {
108+
return 0
109+
}
110+
elapsed := freight.GetLongestSoak(stage)
111+
if elapsed >= required {
112+
continue
113+
}
114+
wait := required - elapsed
115+
if wait > longest {
116+
longest = wait
117+
}
118+
}
119+
return longest
120+
121+
default:
122+
// OneOf (and empty/unset, which is treated as OneOf): the Freight
123+
// becomes eligible as soon as ANY upstream Stage's soak completes.
124+
// Return the smallest positive remaining wait across the upstream
125+
// Stages where the Freight is verified.
126+
var shortest time.Duration
127+
for _, stage := range upstreamStages {
128+
if !freight.IsVerifiedIn(stage) {
129+
continue
130+
}
131+
elapsed := freight.GetLongestSoak(stage)
132+
if elapsed >= required {
133+
// Already soaked somewhere — no deadline needed.
134+
return 0
135+
}
136+
wait := required - elapsed
137+
if shortest == 0 || wait < shortest {
138+
shortest = wait
139+
}
140+
}
141+
return shortest
142+
}
143+
}

0 commit comments

Comments
 (0)