Skip to content

Commit 49ce7d3

Browse files
fix: prevent int32 overflow in MonoVertex autoscaler desiredReplicas (#3421)
Signed-off-by: suryapratap-01 <suryapratap.personal@gmail.com>
1 parent 82e480a commit 49ce7d3

2 files changed

Lines changed: 145 additions & 4 deletions

File tree

pkg/reconciler/monovertex/scaling/scaling.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,17 +308,31 @@ func (s *Scaler) desiredReplicas(_ context.Context, monoVtx *dfv1.MonoVertex, pr
308308
return int32(monoVtx.Status.Replicas)
309309
}
310310

311-
var desired int32
312311
// We calculate the time of finishing processing the pending messages,
313312
// and then we know how many replicas are needed to get them done in target seconds.
314-
desired = int32(math.Round(((float64(pending) / processingRate) / float64(monoVtx.Spec.Scale.GetTargetProcessingSeconds())) * float64(monoVtx.Status.ReadyReplicas)))
313+
// Clamp the float64 result before casting to int32 to prevent wraparound when the
314+
// intermediate value exceeds math.MaxInt32 (can happen with large pending + near-zero rate).
315+
desiredRaw := math.Round(((float64(pending) / processingRate) / float64(monoVtx.Spec.Scale.GetTargetProcessingSeconds())) * float64(monoVtx.Status.ReadyReplicas))
316+
if desiredRaw > math.MaxInt32 {
317+
desiredRaw = math.MaxInt32
318+
}
319+
desired := int32(desiredRaw)
315320

316321
// we only scale down to zero when the pending and rate are both zero.
317322
if desired == 0 {
318323
desired = 1
319324
}
320-
if desired > int32(pending) && pending > 0 { // For some corner cases, we don't want to scale up to more than pending.
321-
desired = int32(pending)
325+
// For some corner cases, we don't want to scale up to more than pending.
326+
// pending is int64 (matches the daemon's wrapperspb.Int64Value type), but desired
327+
// replicas must fit in int32 per the Kubernetes replica spec, so we guard the cast.
328+
if pending > 0 {
329+
pendingCap := int32(math.MaxInt32)
330+
if pending <= math.MaxInt32 {
331+
pendingCap = int32(pending)
332+
}
333+
if desired > pendingCap {
334+
desired = pendingCap
335+
}
322336
}
323337
return desired
324338
}

pkg/reconciler/monovertex/scaling/scaling_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,130 @@ limitations under the License.
1515
*/
1616

1717
package scaling
18+
19+
import (
20+
"context"
21+
"math"
22+
"testing"
23+
24+
"github.com/stretchr/testify/assert"
25+
26+
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
27+
)
28+
29+
func monoVtxWithScale(targetSec uint32, readyReplicas uint32, currentReplicas uint32) *dfv1.MonoVertex {
30+
mv := &dfv1.MonoVertex{}
31+
mv.Spec.Scale.TargetProcessingSeconds = &targetSec
32+
mv.Status.ReadyReplicas = readyReplicas
33+
mv.Status.Replicas = currentReplicas
34+
return mv
35+
}
36+
37+
func TestDesiredReplicas(t *testing.T) {
38+
s := &Scaler{}
39+
ctx := context.Background()
40+
41+
tests := []struct {
42+
name string
43+
pending int64
44+
processingRate float64
45+
targetSec uint32
46+
readyReplicas uint32
47+
currentReplicas uint32
48+
expected int32
49+
}{
50+
{
51+
name: "bothZero_scaleToZero",
52+
pending: 0,
53+
processingRate: 0,
54+
targetSec: 20,
55+
readyReplicas: 1,
56+
expected: 0,
57+
},
58+
{
59+
name: "rateZero_returnsCurrent",
60+
pending: 100,
61+
processingRate: 0,
62+
targetSec: 20,
63+
readyReplicas: 1,
64+
currentReplicas: 3,
65+
expected: 3,
66+
},
67+
{
68+
name: "normal",
69+
pending: 100,
70+
processingRate: 5,
71+
targetSec: 20,
72+
readyReplicas: 1,
73+
expected: 1,
74+
},
75+
{
76+
name: "desiredZero_clampedToOne",
77+
pending: 1,
78+
processingRate: 1000,
79+
targetSec: 20,
80+
readyReplicas: 1,
81+
expected: 1,
82+
},
83+
{
84+
// desired = round((3/0.5)/20 * 1) = round(0.3) = 0 → clamped to 1.
85+
// pending cap (3) > 1 so no further cap applied.
86+
name: "capByPending_desiredLessThanPending",
87+
pending: 3,
88+
processingRate: 0.5,
89+
targetSec: 20,
90+
readyReplicas: 1,
91+
expected: 1,
92+
},
93+
{
94+
// pending cap path: desired > pending, so cap to pending.
95+
name: "capByPending_desiredGreaterThanPending",
96+
pending: 3,
97+
processingRate: 0.01,
98+
targetSec: 1,
99+
readyReplicas: 5,
100+
expected: 3,
101+
},
102+
{
103+
// Regression test for issue #3415: pending=100,000, rate=0.001 msg/s, targetSec=20,
104+
// readyReplicas=1 → raw float64 = 5,000,000,000 which overflows int32 without the fix.
105+
// After the MaxInt32 float clamp, desired is then capped to pending (100,000) since
106+
// we must never scale to more replicas than there are messages.
107+
name: "overflow_fromIssue3415",
108+
pending: 100_000,
109+
processingRate: 0.001,
110+
targetSec: 20,
111+
readyReplicas: 1,
112+
expected: 100_000,
113+
},
114+
{
115+
name: "extremeOverflow",
116+
pending: 1_000_000,
117+
processingRate: 0.0001,
118+
targetSec: 1,
119+
readyReplicas: 10,
120+
expected: 1_000_000,
121+
},
122+
{
123+
// pending > math.MaxInt32: the pending-cap guard must not wrap to negative.
124+
name: "pendingExceedsMaxInt32",
125+
pending: int64(math.MaxInt32) + 1000,
126+
processingRate: 1e9,
127+
targetSec: 20,
128+
readyReplicas: 1,
129+
expected: 1,
130+
},
131+
}
132+
133+
for _, tc := range tests {
134+
t.Run(tc.name, func(t *testing.T) {
135+
mv := monoVtxWithScale(tc.targetSec, tc.readyReplicas, tc.currentReplicas)
136+
got := s.desiredReplicas(ctx, mv, tc.processingRate, tc.pending)
137+
assert.Equal(t, tc.expected, got)
138+
// Invariant: result must never be negative (except the explicit scale-to-zero case).
139+
if tc.expected != 0 {
140+
assert.True(t, got > 0, "desiredReplicas must not return a non-positive value for non-zero expected")
141+
}
142+
})
143+
}
144+
}

0 commit comments

Comments
 (0)