Skip to content

Commit bad89c8

Browse files
committed
Fix IsBenched / notification inconsistency
1 parent be56189 commit bad89c8

File tree

2 files changed

+25
-99
lines changed

2 files changed

+25
-99
lines changed

snow/networking/benchlist/benchlist.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,6 @@ func (b *benchlist) observe(nodeID ids.NodeID, v float64) {
158158
b.nodes[nodeID] = n
159159
}
160160

161-
// If the bench duration has expired, clear the benched state and reset
162-
// the failure probability so that the node gets a clean slate rather
163-
// than being immediately re-benched due to stale EWMA history.
164-
if n.isBenched && b.benchExpired(n) {
165-
n.isBenched = false
166-
n.failureProbability = math.NewUninitializedAverager(b.halflife)
167-
}
168-
169161
n.failureProbability.Observe(v, b.clock.Time())
170162

171163
p := n.failureProbability.Read()
@@ -183,18 +175,13 @@ func (b *benchlist) observe(nodeID ids.NodeID, v float64) {
183175
}
184176
}
185177

186-
// benchExpired reports whether n's bench duration has elapsed.
187-
func (b *benchlist) benchExpired(n *node) bool {
188-
return !n.benchedAt.IsZero() && b.clock.Time().After(n.benchedAt.Add(b.benchDuration))
189-
}
190-
191178
// IsBenched returns true if messages to nodeID should immediately fail.
192179
func (b *benchlist) IsBenched(nodeID ids.NodeID) bool {
193180
b.lock.RLock()
194181
defer b.lock.RUnlock()
195182

196183
n, ok := b.nodes[nodeID]
197-
return ok && n.isBenched && !b.benchExpired(n)
184+
return ok && n.isBenched
198185
}
199186

200187
// TODO: Close goroutine within run during shutdown.
@@ -218,17 +205,23 @@ func (b *benchlist) run() {
218205
)
219206

220207
if j.bench {
208+
// The producer path (observe) is the source of truth for
209+
// b.nodes transitions triggered by EWMA. By the time a bench
210+
// job is emitted, n.isBenched was already set under b.lock.
211+
// This consumer branch only drives callbacks/metrics and tracks
212+
// callback-visible deadlines.
221213
// Always update the timeout deadline. If the node is
222-
// already benched from the consumer's perspective
223-
// (e.g., observe detected expiration and immediately
224-
// re-benched before the timer fired), this extends
225-
// the deadline without a duplicate Benched notification.
214+
// already benched from the consumer's perspective, this
215+
// extends the deadline without a duplicate Benched
216+
// notification.
226217
timeoutHeap.Push(j.nodeID, time.Now().Add(b.benchDuration))
227218
if !benched.Contains(j.nodeID) {
228219
benched.Add(j.nodeID)
229220
b.benchable.Benched(b.ctx.ChainID, j.nodeID)
230221
}
231222
} else if benched.Contains(j.nodeID) {
223+
// Same as above: EWMA unbench jobs don't update b.nodes here
224+
// because observe already flipped n.isBenched before enqueuing.
232225
// Guard: only unbench if the consumer still considers
233226
// the node benched, avoiding duplicate Unbenched calls
234227
// when both EWMA and timeout race to unbench.
@@ -246,6 +239,16 @@ func (b *benchlist) run() {
246239
}
247240
timeoutHeap.Pop()
248241
benched.Remove(nodeID)
242+
b.lock.Lock()
243+
// Timeout unbenching originates in this consumer goroutine
244+
// (not in observe), so we must synchronize b.nodes here to
245+
// keep IsBenched consistent with callback-visible state.
246+
// Timeout-based unbench gives the node a clean slate.
247+
if n, exists := b.nodes[nodeID]; exists {
248+
n.isBenched = false
249+
n.failureProbability = math.NewUninitializedAverager(b.halflife)
250+
}
251+
b.lock.Unlock()
249252

250253
b.ctx.Log.Debug("unbenching node due to timeout",
251254
zap.Stringer("nodeID", nodeID),

snow/networking/benchlist/benchlist_test.go

Lines changed: 4 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -122,77 +122,6 @@ func TestBenchlist(t *testing.T) {
122122
requireBenched()
123123
}
124124

125-
// Regression test: when observe() detects that a bench has expired via the
126-
// mock clock (before the consumer's real-time timer fires) and the EWMA
127-
// immediately re-benches the node, the consumer must not send a duplicate
128-
// Benched notification. The benchable.Benched() assertion
129-
// (require.NotContains) would fail if the guard were missing.
130-
func TestBenchlistNoDuplicateBench(t *testing.T) {
131-
require := require.New(t)
132-
133-
snowCtx := snowtest.Context(t, snowtest.CChainID)
134-
ctx := snowtest.ConsensusContext(snowCtx)
135-
vdrs := validators.NewManager()
136-
vdrID := ids.GenerateTestNodeID()
137-
138-
require.NoError(vdrs.AddStaker(ctx.SubnetID, vdrID, nil, ids.Empty, 1))
139-
140-
benchable := &benchable{
141-
t: t,
142-
wantChainID: ctx.ChainID,
143-
updated: make(chan struct{}, 1),
144-
}
145-
b, err := newBenchlist(
146-
ctx,
147-
benchable,
148-
vdrs,
149-
Config{
150-
Halflife: DefaultHalflife,
151-
UnbenchProbability: DefaultUnbenchProbability,
152-
BenchProbability: DefaultBenchProbability,
153-
BenchDuration: 10 * time.Second,
154-
},
155-
prometheus.NewRegistry(),
156-
)
157-
require.NoError(err)
158-
159-
now := time.Now()
160-
b.clock.Set(now)
161-
162-
// Bench the node: p = 2/3 > 0.5
163-
b.RegisterResponse(vdrID)
164-
b.RegisterFailure(vdrID)
165-
b.RegisterFailure(vdrID)
166-
<-benchable.updated
167-
require.True(b.IsBenched(vdrID))
168-
169-
// Advance the mock clock past the bench duration. The real-time timer
170-
// has NOT fired, so the consumer still considers the node benched.
171-
now = now.Add(11 * time.Second)
172-
b.clock.Set(now)
173-
require.False(b.IsBenched(vdrID))
174-
175-
// Register a failure. observe() detects the bench expired, clears
176-
// isBenched, resets the EWMA to a clean slate, then observes the
177-
// failure (p = 1.0 > 0.5) and immediately re-benches.
178-
// This sends a bench job for a node the consumer already considers
179-
// benched. Without the duplicate-bench guard, the consumer would call
180-
// benchable.Benched() again, failing the require.NotContains assertion.
181-
b.RegisterFailure(vdrID)
182-
require.True(b.IsBenched(vdrID))
183-
184-
// Drive the probability below unbenchProbability to trigger an EWMA
185-
// unbench. Since jobs are FIFO, when we receive the unbench
186-
// notification we know the re-bench job above was already processed
187-
// (without a duplicate Benched call).
188-
for range 20 {
189-
b.RegisterResponse(vdrID)
190-
}
191-
<-benchable.updated
192-
require.False(b.IsBenched(vdrID))
193-
require.Empty(benchable.benched)
194-
}
195-
196125
func TestBenchlistTimeout(t *testing.T) {
197126
require := require.New(t)
198127

@@ -232,15 +161,10 @@ func TestBenchlistTimeout(t *testing.T) {
232161
<-benchable.updated
233162
require.True(b.IsBenched(vdrID))
234163

235-
// Advance the mock clock past the bench duration so IsBenched returns
236-
// false immediately.
237-
now = now.Add(100 * time.Millisecond)
238-
b.clock.Set(now)
239-
require.False(b.IsBenched(vdrID))
240-
241164
// The consumer goroutine's timer also fires (real time ≥ 50ms) and calls
242165
// Unbenched on the benchable.
243166
<-benchable.updated
167+
require.False(b.IsBenched(vdrID))
244168
require.Empty(benchable.benched)
245169
}
246170

@@ -271,7 +195,7 @@ func TestBenchlistTimeoutCleansSlate(t *testing.T) {
271195
Halflife: DefaultHalflife,
272196
UnbenchProbability: DefaultUnbenchProbability,
273197
BenchProbability: DefaultBenchProbability,
274-
BenchDuration: 10 * time.Second,
198+
BenchDuration: 50 * time.Millisecond,
275199
},
276200
prometheus.NewRegistry(),
277201
)
@@ -287,9 +211,8 @@ func TestBenchlistTimeoutCleansSlate(t *testing.T) {
287211
<-benchable.updated
288212
require.True(b.IsBenched(vdrID))
289213

290-
// Advance past the bench duration so the bench expires.
291-
now = now.Add(11 * time.Second)
292-
b.clock.Set(now)
214+
// Wait for timeout-based unbench.
215+
<-benchable.updated
293216
require.False(b.IsBenched(vdrID))
294217

295218
// Register a response followed by a failure. With a clean slate, p = 1/2

0 commit comments

Comments
 (0)