Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,25 @@

// BlockTimeTolerance is the maximum allowed difference between the proposed block time and wall-clock time.
BlockTimeTolerance time.Duration `mapstructure:"block_time_tolerance"`

// CatchupLagThreshold is how many blocks a peer may be ahead of us before we
// report catching_up=true. The reactor's WaitSync latch only reflects the
// initial block-sync at startup and stays false for the rest of the process, so
// without this check a node that later stops keeping up with its peers still
// reports catching_up=false. The comparison is against peer-reported heights
// rather than block-time staleness, so a network where every node has
// legitimately stopped at the same height is not misreported as catching up.
// 0 disables the check; must be >=2 when enabled to absorb the normal one-height
// round skew between synced peers.
CatchupLagThreshold int64 `mapstructure:"catchup_lag_threshold"`
Comment thread
marcello33 marked this conversation as resolved.
// MinExpectedPeers, when >0, reports catching_up=true while connected peers are
// below this count, since a node that cannot reach enough peers cannot establish
// that it is current. 0 keeps single-node deployments healthy.
MinExpectedPeers int `mapstructure:"min_expected_peers"`
// CatchupDebounceDuration is how long the peer-lag condition must hold
// continuously before catching_up flips to true, damping flapping at the
// threshold boundary. The transition back to false is immediate.
CatchupDebounceDuration time.Duration `mapstructure:"catchup_debounce_duration"`
}

// DefaultConsensusConfig returns a default configuration for the consensus service
Expand All @@ -1050,6 +1069,9 @@
PeerQueryMaj23SleepDuration: 2000 * time.Millisecond,
DoubleSignCheckHeight: int64(0),
BlockTimeTolerance: 60 * time.Second,
CatchupLagThreshold: 5,
MinExpectedPeers: 0,
Comment thread
pratikspatil024 marked this conversation as resolved.
Outdated
CatchupDebounceDuration: 10 * time.Second,
}
}

Expand Down Expand Up @@ -1118,7 +1140,7 @@

// ValidateBasic performs basic validation (checking param bounds, etc.) and
// returns an error if any check fails.
func (cfg *ConsensusConfig) ValidateBasic() error {

Check failure on line 1143 in config/config.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_cometbft&issues=AZ6IZUd9p-V3yQeSlepz&open=AZ6IZUd9p-V3yQeSlepz&pullRequest=40
if cfg.TimeoutPropose < 0 {
return errors.New("timeout_propose can't be negative")
}
Expand Down Expand Up @@ -1155,6 +1177,18 @@
if cfg.BlockTimeTolerance <= 0 {
return errors.New("block_time_tolerance must be positive")
}
if cfg.CatchupLagThreshold < 0 {
return errors.New("catchup_lag_threshold can't be negative")
}
if cfg.CatchupLagThreshold == 1 {
return errors.New("catchup_lag_threshold must be 0 (disabled) or >=2 to absorb round skew")
Comment thread
marcello33 marked this conversation as resolved.
Outdated
}
if cfg.MinExpectedPeers < 0 {
return errors.New("min_expected_peers can't be negative")
}
if cfg.CatchupDebounceDuration < 0 {
return errors.New("catchup_debounce_duration can't be negative")
}
return nil
}

Expand Down
13 changes: 13 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,19 @@ peer_query_maj23_sleep_duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}"
# Maximum allowed difference between proposed block time and wall-clock time.
block_time_tolerance = "{{ .Consensus.BlockTimeTolerance }}"

# After initial block-sync completes, report catching_up=true when a peer is more
# than this many blocks ahead of us (i.e. this node has stopped keeping up). Set to
# 0 to disable; must be >=2 when enabled to absorb normal round skew.
catchup_lag_threshold = {{ .Consensus.CatchupLagThreshold }}

# When >0, report catching_up=true while connected peers are below this count, since
# the node cannot establish that it is current. Keep at 0 for single-node deployments.
min_expected_peers = {{ .Consensus.MinExpectedPeers }}

# How long the peer-lag condition must hold before catching_up flips to true
# (flap damping). The transition back to false is immediate.
catchup_debounce_duration = "{{ .Consensus.CatchupDebounceDuration }}"

#######################################################
### Storage Configuration Options ###
#######################################################
Expand Down
82 changes: 82 additions & 0 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ type Reactor struct {
eventBus *types.EventBus
rs *cstypes.RoundState

// catchUpLagThreshold, minExpectedPeers and catchUpDebounce drive IsBehind,
// which lets /status report catching_up=true after initial sync when this node
// has fallen behind live peers. behindSince tracks how long the lag condition
// has held continuously, for debouncing.
catchUpLagThreshold int64
minExpectedPeers int
catchUpDebounce time.Duration
behindSince time.Time

Metrics *Metrics
}

Expand Down Expand Up @@ -422,6 +431,70 @@ func (conR *Reactor) WaitSync() bool {
return conR.waitSync
}

// IsBehind reports whether this node has stopped keeping up with its peers after
// the initial block-sync has completed. WaitSync only reflects startup block-sync
// and stays false for the rest of the process, so on its own catching_up never
// reflects a node that later falls behind. IsBehind closes that gap from
// peer-reported heights.
//
// It compares heights rather than block-time staleness on purpose: a stale local
// block time can't distinguish a node that is behind its peers (some peer reports a
// greater height) from a network where every node has legitimately stopped at the
// same height (no peer is ahead). Only the former is "catching up"; reporting the
// latter as catching up would be a false positive.
func (conR *Reactor) IsBehind() bool {
raw := conR.isBehindRaw()

conR.mtx.Lock()
Comment thread
pratikspatil024 marked this conversation as resolved.
defer conR.mtx.Unlock()
return conR.applyDebounceLocked(raw, time.Now())
}

// isBehindRaw gathers the local and max peer heights and applies the lag decision,
Comment thread
marcello33 marked this conversation as resolved.
Outdated
// without debouncing. It holds no lock while reading peers / round state.
func (conR *Reactor) isBehindRaw() bool {
peers := conR.Switch.Peers().List()

var maxPeerHeight int64
for _, peer := range peers {
ps, ok := peer.Get(types.PeerStateKey).(*PeerState)
if !ok {
continue
}
if h := ps.GetHeight(); h > maxPeerHeight {
maxPeerHeight = h
}
}

return conR.evaluateBehind(conR.getRoundState().Height, maxPeerHeight, len(peers))
}

// evaluateBehind is the pure lag decision. Too few peers means we can't prove we're
// current (off by default so single-node stays healthy). A zero threshold disables
// the height check. maxPeerHeight == 0 means no peer height learned yet.
func (conR *Reactor) evaluateBehind(myHeight, maxPeerHeight int64, nPeers int) bool {
if conR.minExpectedPeers > 0 && nPeers < conR.minExpectedPeers {
return true
}
if conR.catchUpLagThreshold <= 0 || maxPeerHeight == 0 {
return false
}
return maxPeerHeight-myHeight > conR.catchUpLagThreshold
}

// applyDebounceLocked requires the lag condition to hold for catchUpDebounce before
// returning true; recovery to false is immediate. conR.mtx must be held.
func (conR *Reactor) applyDebounceLocked(raw bool, now time.Time) bool {
if !raw {
conR.behindSince = time.Time{}
return false
}
if conR.behindSince.IsZero() {
conR.behindSince = now
}
return now.Sub(conR.behindSince) >= conR.catchUpDebounce
}

//--------------------------------------

// subscribeToBroadcastEvents subscribes for new round steps and votes
Expand Down Expand Up @@ -1057,6 +1130,15 @@ func ReactorMetrics(metrics *Metrics) ReactorOption {
return func(conR *Reactor) { conR.Metrics = metrics }
}

// ReactorCatchupConfig configures the IsBehind heuristic that backs catching_up.
func ReactorCatchupConfig(lagThreshold int64, minPeers int, debounce time.Duration) ReactorOption {
return func(conR *Reactor) {
conR.catchUpLagThreshold = lagThreshold
conR.minExpectedPeers = minPeers
conR.catchUpDebounce = debounce
}
}

//-----------------------------------------------------------------------------

var (
Expand Down
81 changes: 81 additions & 0 deletions consensus/reactor_catchup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package consensus

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestEvaluateBehind(t *testing.T) {
tests := []struct {
name string
lagThreshold int64
minPeers int
myHeight int64
maxPeerHeight int64
nPeers int
want bool
}{
{"peer far ahead", 5, 0, 100, 110, 3, true},
{"peer ahead within threshold", 5, 0, 100, 105, 3, false},
{"peer exactly at threshold", 5, 0, 100, 105, 3, false},
{"peer one over threshold", 5, 0, 100, 106, 3, true},
{"equal height network halt", 5, 0, 100, 100, 3, false},
{"round skew peer one ahead", 5, 0, 100, 101, 3, false},
{"no peer height learned", 5, 0, 100, 0, 3, false},
{"threshold disabled ignores lag", 0, 0, 100, 999, 3, false},
{"single node zero peers healthy", 5, 0, 100, 0, 0, false},
{"min peers guard off with zero peers", 5, 0, 100, 100, 0, false},
{"min peers guard trips below min", 5, 2, 100, 100, 1, true},
{"min peers guard satisfied", 5, 2, 100, 100, 2, false},
{"min peers guard fires even when lag disabled", 0, 2, 100, 0, 0, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conR := &Reactor{
catchUpLagThreshold: tt.lagThreshold,
minExpectedPeers: tt.minPeers,
}
got := conR.evaluateBehind(tt.myHeight, tt.maxPeerHeight, tt.nPeers)
assert.Equal(t, tt.want, got)
})
}
}

func TestApplyDebounce(t *testing.T) {
debounce := 10 * time.Second
t0 := time.Now()

t.Run("not behind resets and returns false", func(t *testing.T) {
conR := &Reactor{catchUpDebounce: debounce, behindSince: t0}
assert.False(t, conR.applyDebounceLocked(false, t0.Add(time.Hour)))
assert.True(t, conR.behindSince.IsZero())
})

t.Run("behind but within debounce stays false", func(t *testing.T) {
conR := &Reactor{catchUpDebounce: debounce}
assert.False(t, conR.applyDebounceLocked(true, t0))
assert.False(t, conR.applyDebounceLocked(true, t0.Add(5*time.Second)))
})

t.Run("behind past debounce flips true", func(t *testing.T) {
conR := &Reactor{catchUpDebounce: debounce}
assert.False(t, conR.applyDebounceLocked(true, t0))
assert.True(t, conR.applyDebounceLocked(true, t0.Add(debounce)))
})

t.Run("transient blip resets the timer", func(t *testing.T) {
conR := &Reactor{catchUpDebounce: debounce}
assert.False(t, conR.applyDebounceLocked(true, t0))
assert.False(t, conR.applyDebounceLocked(false, t0.Add(5*time.Second)))
// timer restarts; not yet past debounce from the new start.
assert.False(t, conR.applyDebounceLocked(true, t0.Add(6*time.Second)))
assert.True(t, conR.applyDebounceLocked(true, t0.Add(16*time.Second)))
})

t.Run("zero debounce flips immediately", func(t *testing.T) {
conR := &Reactor{catchUpDebounce: 0}
assert.True(t, conR.applyDebounceLocked(true, t0))
})
}
4 changes: 4 additions & 0 deletions inspect/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (waitSyncCheckerImpl) WaitSync() bool {
return false
}

func (waitSyncCheckerImpl) IsBehind() bool {
return false
Comment thread
marcello33 marked this conversation as resolved.
}

// ListenAndServe listens on the address specified in srv.Addr and handles any
// incoming requests over HTTP using the Inspector rpc handler specified on the server.
func (srv *Server) ListenAndServe(ctx context.Context) error {
Expand Down
7 changes: 6 additions & 1 deletion node/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ func createConsensusReactor(config *cfg.Config,
if privValidator != nil {
consensusState.SetPrivValidator(privValidator)
}
consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics))
consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics),
cs.ReactorCatchupConfig(
config.Consensus.CatchupLagThreshold,
config.Consensus.MinExpectedPeers,
config.Consensus.CatchupDebounceDuration,
))
consensusReactor.SetLogger(consensusLogger)
// services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor
Expand Down
1 change: 1 addition & 0 deletions rpc/core/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type peers interface {

type consensusReactor interface {
WaitSync() bool
IsBehind() bool
}

// ----------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion rpc/core/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (env *Environment) Status(*rpctypes.Context) (*ctypes.ResultStatus, error)
EarliestAppHash: earliestAppHash,
EarliestBlockHeight: earliestBlockHeight,
EarliestBlockTime: time.Unix(0, earliestBlockTimeNano),
CatchingUp: env.ConsensusReactor.WaitSync(),
CatchingUp: env.ConsensusReactor.WaitSync() || env.ConsensusReactor.IsBehind(),
Comment thread
marcello33 marked this conversation as resolved.
},
ValidatorInfo: ctypes.ValidatorInfo{
Address: env.PubKey.Address(),
Expand Down
Loading