diff --git a/config/config.go b/config/config.go index 6b5b4faa963..27184a3c439 100644 --- a/config/config.go +++ b/config/config.go @@ -1030,6 +1030,26 @@ type ConsensusConfig struct { // BlockTimeTolerance is the maximum allowed difference between the proposed block time and wall-clock time. BlockTimeTolerance time.Duration `mapstructure:"block_time_tolerance"` + + // CatchupLagThreshold is how many blocks ahead of us a peer must be to count as + // reporting us behind; catching_up=true requires a majority of connected peers to + // be that far ahead. The reactor's WaitSync latch only reflects the initial + // block-sync at startup and stays false for the rest of the process, so without + // this check a node that later stops keeping up with its peers still reports + // catching_up=false. The comparison is against peer-reported heights rather than + // block-time staleness, so a network where every node has legitimately stopped at + // the same height is not misreported as catching up; peer heights are unverified + // gossip, so the lag must be corroborated by a majority of at least two connected + // peers, which keeps a single peer from driving the signal. + // 0 disables peer-height lag detection only; must be >=2 when enabled to absorb + // the normal one-height round skew between synced peers. The separate zero-peer + // rule (a node with no peers that is not the sole validator reports catching_up) + // always applies, independent of this threshold. + CatchupLagThreshold int64 `mapstructure:"catchup_lag_threshold"` + // CatchupDebounceDuration is how long the peer-lag condition must hold + // continuously before catching_up flips to true, damping flapping at the + // threshold boundary. The transition back to false is immediate. + CatchupDebounceDuration time.Duration `mapstructure:"catchup_debounce_duration"` } // DefaultConsensusConfig returns a default configuration for the consensus service @@ -1050,6 +1070,8 @@ func DefaultConsensusConfig() *ConsensusConfig { PeerQueryMaj23SleepDuration: 2000 * time.Millisecond, DoubleSignCheckHeight: int64(0), BlockTimeTolerance: 60 * time.Second, + CatchupLagThreshold: 5, + CatchupDebounceDuration: 10 * time.Second, } } @@ -1155,6 +1177,15 @@ func (cfg *ConsensusConfig) ValidateBasic() error { if cfg.BlockTimeTolerance <= 0 { return errors.New("block_time_tolerance must be positive") } + if cfg.CatchupLagThreshold < 0 { + return errors.New("catchup_lag_threshold can't be negative") + } + if cfg.CatchupLagThreshold == 1 { + return errors.New("catchup_lag_threshold must be 0 (disabled) or >=2 (margin beyond the one-height round skew)") + } + if cfg.CatchupDebounceDuration < 0 { + return errors.New("catchup_debounce_duration can't be negative") + } return nil } diff --git a/config/config_test.go b/config/config_test.go index 442a7d1898a..57d5e146cde 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -179,6 +179,12 @@ func TestConsensusConfig_ValidateBasic(t *testing.T) { "BlockTimeTolerance": {func(c *config.ConsensusConfig) { c.BlockTimeTolerance = time.Second }, false}, "BlockTimeTolerance zero": {func(c *config.ConsensusConfig) { c.BlockTimeTolerance = 0 }, true}, "BlockTimeTolerance negative": {func(c *config.ConsensusConfig) { c.BlockTimeTolerance = -1 }, true}, + "CatchupLagThreshold disabled": {func(c *config.ConsensusConfig) { c.CatchupLagThreshold = 0 }, false}, + "CatchupLagThreshold one": {func(c *config.ConsensusConfig) { c.CatchupLagThreshold = 1 }, true}, + "CatchupLagThreshold two": {func(c *config.ConsensusConfig) { c.CatchupLagThreshold = 2 }, false}, + "CatchupLagThreshold negative": {func(c *config.ConsensusConfig) { c.CatchupLagThreshold = -1 }, true}, + "CatchupDebounceDuration zero": {func(c *config.ConsensusConfig) { c.CatchupDebounceDuration = 0 }, false}, + "CatchupDebounceDuration negative": {func(c *config.ConsensusConfig) { c.CatchupDebounceDuration = -1 }, true}, } for desc, tc := range testcases { t.Run(desc, func(t *testing.T) { @@ -195,6 +201,12 @@ func TestConsensusConfig_ValidateBasic(t *testing.T) { } } +func TestDefaultConsensusConfigCatchupDefaults(t *testing.T) { + cfg := config.DefaultConsensusConfig() + assert.Equal(t, int64(5), cfg.CatchupLagThreshold) + assert.Equal(t, 10*time.Second, cfg.CatchupDebounceDuration) +} + func TestInstrumentationConfigValidateBasic(t *testing.T) { cfg := config.TestInstrumentationConfig() assert.NoError(t, cfg.ValidateBasic()) diff --git a/config/toml.go b/config/toml.go index 6bb6fb92a77..7641b3b9eb4 100644 --- a/config/toml.go +++ b/config/toml.go @@ -519,6 +519,17 @@ peer_query_maj23_sleep_duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}" # Maximum allowed difference between proposed block time and wall-clock time. block_time_tolerance = "{{ .Consensus.BlockTimeTolerance }}" +# After initial block-sync completes, report catching_up=true when a majority of the +# connected peers (at least two) are more than this many blocks ahead of us (i.e. this +# node has stopped keeping up). Set to 0 to disable peer-height lag detection only; +# must be >=2 when enabled to absorb normal round skew. A node with no peers that is +# not the sole validator still reports catching_up regardless of this setting. +catchup_lag_threshold = {{ .Consensus.CatchupLagThreshold }} + +# How long the peer-lag condition must hold before catching_up flips to true +# (flap damping). The transition back to false is immediate. +catchup_debounce_duration = "{{ .Consensus.CatchupDebounceDuration }}" + ####################################################### ### Storage Configuration Options ### ####################################################### diff --git a/consensus/reactor.go b/consensus/reactor.go index ac4c9bc95f9..aad05fc5f8f 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -46,6 +46,14 @@ type Reactor struct { eventBus *types.EventBus rs *cstypes.RoundState + // catchUpLagThreshold and catchUpDebounce drive IsBehind, which lets /status + // report catching_up=true after initial sync when this node has fallen behind + // live peers. behindSince tracks how long the lag condition has held + // continuously, for debouncing. + catchUpLagThreshold int64 + catchUpDebounce time.Duration + behindSince time.Time + Metrics *Metrics } @@ -422,6 +430,97 @@ func (conR *Reactor) WaitSync() bool { return conR.waitSync } +// IsBehind reports whether this node has stopped keeping up with its peers after +// the initial block-sync has completed. WaitSync only reflects startup block-sync +// and stays false for the rest of the process, so on its own catching_up never +// reflects a node that later falls behind. IsBehind closes that gap from +// peer-reported heights. +// +// It compares heights rather than block-time staleness on purpose: a stale local +// block time can't distinguish a node that is behind its peers (some peer reports a +// greater height) from a network where every node has legitimately stopped at the +// same height (no peer is ahead). Only the former is "catching up"; reporting the +// latter as catching up would be a false positive. +func (conR *Reactor) IsBehind() bool { + raw := conR.isBehindRaw() + + conR.mtx.Lock() + defer conR.mtx.Unlock() + return conR.applyDebounceLocked(raw, time.Now()) +} + +// isBehindRaw gathers the local height and peer heights and applies the lag +// decision, without debouncing. It holds no lock while reading peers / round state. +func (conR *Reactor) isBehindRaw() bool { + peerHeights := collectPeerHeights(conR.Switch.Peers().List()) + + // Sole-validator status is read live from consensus state on every call, so it + // reflects validator-set changes this node has committed into local round state + // without any cached value to update. (A node partitioned before a set change + // can't observe the other side's update, but that only makes it report behind, + // which is correct.) + return conR.evaluateBehind(conR.getRoundState().Height, peerHeights, conR.conS.isLocalSoleValidator()) +} + +// collectPeerHeights returns the gossiped consensus height of every peer that +// carries a consensus PeerState. Peers without one (key absent or wrong type) +// are skipped rather than counted as height 0, so they don't dilute the +// majority calculation in evaluateBehind. +func collectPeerHeights(peers []p2p.Peer) []int64 { + heights := make([]int64, 0, len(peers)) + for _, peer := range peers { + ps, ok := peer.Get(types.PeerStateKey).(*PeerState) + if !ok { + continue + } + heights = append(heights, ps.GetHeight()) + } + return heights +} + +// minCorroboratingPeers is the fewest connected peers required before peer-height +// lag is trusted. Peer heights come from unverified gossip (NewRoundStepMessage), and +// with a single peer that peer is a trivial "majority"; requiring at least two means +// no single peer can drive catching_up on its own. Below this we can't corroborate, so +// the height-lag check abstains (the zero-peer rule still covers the no-peer case). +const minCorroboratingPeers = 2 + +// evaluateBehind is the pure lag decision. With no peers we can't observe progress, +// so we report behind unless this node can finalize on its own (it's the sole +// validator); a non-validator or a validator in a larger set genuinely needs peers. +// Otherwise we report behind only when a majority of at least minCorroboratingPeers +// connected peers report a height more than catchUpLagThreshold ahead of ours, so an +// inflated height from a single peer (or a minority) can't drive the signal. A zero +// threshold disables the height-lag check; the zero-peer rule still applies. +func (conR *Reactor) evaluateBehind(myHeight int64, peerHeights []int64, isSoleValidator bool) bool { + if len(peerHeights) == 0 { + return !isSoleValidator + } + if conR.catchUpLagThreshold <= 0 || len(peerHeights) < minCorroboratingPeers { + return false + } + ahead := 0 + for _, h := range peerHeights { + if h-myHeight > conR.catchUpLagThreshold { + ahead++ + } + } + return 2*ahead > len(peerHeights) +} + +// applyDebounceLocked requires the lag condition to hold for catchUpDebounce before +// returning true; recovery to false is immediate. conR.mtx must be held. +func (conR *Reactor) applyDebounceLocked(raw bool, now time.Time) bool { + if !raw { + conR.behindSince = time.Time{} + return false + } + if conR.behindSince.IsZero() { + conR.behindSince = now + } + return now.Sub(conR.behindSince) >= conR.catchUpDebounce +} + //-------------------------------------- // subscribeToBroadcastEvents subscribes for new round steps and votes @@ -1057,6 +1156,14 @@ func ReactorMetrics(metrics *Metrics) ReactorOption { return func(conR *Reactor) { conR.Metrics = metrics } } +// ReactorCatchupConfig configures the IsBehind heuristic that backs catching_up. +func ReactorCatchupConfig(lagThreshold int64, debounce time.Duration) ReactorOption { + return func(conR *Reactor) { + conR.catchUpLagThreshold = lagThreshold + conR.catchUpDebounce = debounce + } +} + //----------------------------------------------------------------------------- var ( diff --git a/consensus/reactor_catchup_test.go b/consensus/reactor_catchup_test.go new file mode 100644 index 00000000000..4b7764c9d5a --- /dev/null +++ b/consensus/reactor_catchup_test.go @@ -0,0 +1,205 @@ +package consensus + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + cfg "github.com/cometbft/cometbft/config" + cstypes "github.com/cometbft/cometbft/consensus/types" + "github.com/cometbft/cometbft/crypto" + "github.com/cometbft/cometbft/p2p" + p2pmock "github.com/cometbft/cometbft/p2p/mock" + "github.com/cometbft/cometbft/types" +) + +func TestEvaluateBehind(t *testing.T) { + tests := []struct { + name string + lagThreshold int64 + myHeight int64 + peerHeights []int64 + isSoleValidator bool + want bool + }{ + {"majority far ahead", 5, 100, []int64{110, 110, 110}, false, true}, + {"single peer ahead is minority", 5, 100, []int64{110, 100, 100}, false, false}, + {"majority of three ahead", 5, 100, []int64{110, 110, 100}, false, true}, + {"two peers split is not majority", 5, 100, []int64{110, 100}, false, false}, + {"both peers ahead", 5, 100, []int64{110, 110}, false, true}, + {"lone peer cannot corroborate", 5, 100, []int64{110}, false, false}, + {"lone peer far ahead still cannot corroborate", 5, 100, []int64{100000}, false, false}, + {"peers within threshold", 5, 100, []int64{105, 105, 105}, false, false}, + {"majority one over threshold", 5, 100, []int64{106, 106, 106}, false, true}, + {"equal height network halt", 5, 100, []int64{100, 100, 100}, false, false}, + {"round skew one ahead", 5, 100, []int64{101, 101, 101}, false, false}, + {"no peer height learned", 5, 100, []int64{0, 0, 0}, false, false}, + {"threshold disabled ignores lag", 0, 100, []int64{999, 999, 999}, false, false}, + {"sole validator zero peers healthy", 5, 100, nil, true, false}, + {"non-sole zero peers behind", 5, 100, nil, false, true}, + {"zero peers behind even when lag disabled", 0, 100, nil, false, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conR := &Reactor{ + catchUpLagThreshold: tt.lagThreshold, + } + got := conR.evaluateBehind(tt.myHeight, tt.peerHeights, tt.isSoleValidator) + assert.Equal(t, tt.want, got) + }) + } +} + +// peerWithHeight returns a mock peer carrying a consensus PeerState at height h. +func peerWithHeight(h int64) *p2pmock.Peer { + p := p2pmock.NewPeer(nil) + ps := NewPeerState(p) + ps.PRS.Height = h + p.Set(types.PeerStateKey, ps) + return p +} + +func TestCollectPeerHeights(t *testing.T) { + t.Run("no peers yields empty slice", func(t *testing.T) { + assert.Empty(t, collectPeerHeights(nil)) + }) + + t.Run("collects every peer height in order", func(t *testing.T) { + peers := []p2p.Peer{peerWithHeight(100), peerWithHeight(110), peerWithHeight(90)} + assert.Equal(t, []int64{100, 110, 90}, collectPeerHeights(peers)) + }) + + t.Run("skips peer without a PeerState", func(t *testing.T) { + peers := []p2p.Peer{peerWithHeight(100), p2pmock.NewPeer(nil), peerWithHeight(110)} + assert.Equal(t, []int64{100, 110}, collectPeerHeights(peers)) + }) + + t.Run("skips peer with a non-PeerState value at the key", func(t *testing.T) { + bad := p2pmock.NewPeer(nil) + bad.Set(types.PeerStateKey, "not a peer state") + peers := []p2p.Peer{peerWithHeight(100), bad} + assert.Equal(t, []int64{100}, collectPeerHeights(peers)) + }) +} + +func TestIsLocalSoleValidator(t *testing.T) { + val, _ := types.RandValidator(false, 10) + other, _ := types.RandValidator(false, 10) + + tests := []struct { + name string + pubKey crypto.PubKey + valSet *types.ValidatorSet + want bool + }{ + {"sole validator and we are it", val.PubKey, types.NewValidatorSet([]*types.Validator{val}), true}, + {"sole validator but it is not us", other.PubKey, types.NewValidatorSet([]*types.Validator{val}), false}, + {"we are in a larger set", val.PubKey, types.NewValidatorSet([]*types.Validator{val, other}), false}, + {"no local validator key", nil, types.NewValidatorSet([]*types.Validator{val}), false}, + {"no validator set", val.PubKey, nil, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cs := &State{} + cs.privValidatorPubKey = tt.pubKey + cs.Validators = tt.valSet + assert.Equal(t, tt.want, cs.isLocalSoleValidator()) + }) + } +} + +func TestReactorCatchupConfig(t *testing.T) { + conR := &Reactor{} + ReactorCatchupConfig(7, 3*time.Second)(conR) + assert.Equal(t, int64(7), conR.catchUpLagThreshold) + assert.Equal(t, 3*time.Second, conR.catchUpDebounce) +} + +// newCatchupReactor wires a Reactor with a non-started Switch carrying mock peers +// at the given heights, a round state at myHeight, and (optionally) a single-validator +// set that this node belongs to. catchUpDebounce is 0 so IsBehind reflects the raw +// decision immediately, keeping the test free of timing. +func newCatchupReactor(t *testing.T, myHeight int64, sole bool, peerHeights ...int64) *Reactor { + t.Helper() + cs := &State{} + if sole { + val, _ := types.RandValidator(false, 10) + cs.Validators = types.NewValidatorSet([]*types.Validator{val}) + cs.privValidatorPubKey = val.PubKey + } + conR := &Reactor{ + conS: cs, + rs: &cstypes.RoundState{Height: myHeight}, + catchUpLagThreshold: 5, + catchUpDebounce: 0, + } + conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR) + + sw := p2p.NewSwitch(cfg.DefaultP2PConfig(), nil) + peerSet := sw.Peers().(*p2p.PeerSet) + for _, h := range peerHeights { + require.NoError(t, peerSet.Add(peerWithHeight(h))) + } + conR.SetSwitch(sw) + return conR +} + +// TestIsBehindIntegration exercises the full IsBehind path (Switch peers -> +// collectPeerHeights -> evaluateBehind, plus the sole-validator and debounce +// wiring) rather than the pure decision helpers in isolation. +func TestIsBehindIntegration(t *testing.T) { + t.Run("majority of peers ahead reports behind", func(t *testing.T) { + assert.True(t, newCatchupReactor(t, 100, false, 110, 110, 110).IsBehind()) + }) + t.Run("synced multi-peer network is not behind", func(t *testing.T) { + assert.False(t, newCatchupReactor(t, 100, false, 100, 100, 100).IsBehind()) + }) + t.Run("lone peer ahead cannot corroborate", func(t *testing.T) { + assert.False(t, newCatchupReactor(t, 100, false, 110).IsBehind()) + }) + t.Run("no peers and not sole validator is behind", func(t *testing.T) { + assert.True(t, newCatchupReactor(t, 100, false).IsBehind()) + }) + t.Run("no peers but sole validator is not behind", func(t *testing.T) { + assert.False(t, newCatchupReactor(t, 100, true).IsBehind()) + }) +} + +func TestApplyDebounce(t *testing.T) { + debounce := 10 * time.Second + t0 := time.Now() + + t.Run("not behind resets and returns false", func(t *testing.T) { + conR := &Reactor{catchUpDebounce: debounce, behindSince: t0} + assert.False(t, conR.applyDebounceLocked(false, t0.Add(time.Hour))) + assert.True(t, conR.behindSince.IsZero()) + }) + + t.Run("behind but within debounce stays false", func(t *testing.T) { + conR := &Reactor{catchUpDebounce: debounce} + assert.False(t, conR.applyDebounceLocked(true, t0)) + assert.False(t, conR.applyDebounceLocked(true, t0.Add(5*time.Second))) + }) + + t.Run("behind past debounce flips true", func(t *testing.T) { + conR := &Reactor{catchUpDebounce: debounce} + assert.False(t, conR.applyDebounceLocked(true, t0)) + assert.True(t, conR.applyDebounceLocked(true, t0.Add(debounce))) + }) + + t.Run("transient blip resets the timer", func(t *testing.T) { + conR := &Reactor{catchUpDebounce: debounce} + assert.False(t, conR.applyDebounceLocked(true, t0)) + assert.False(t, conR.applyDebounceLocked(false, t0.Add(5*time.Second))) + // timer restarts; not yet past debounce from the new start. + assert.False(t, conR.applyDebounceLocked(true, t0.Add(6*time.Second))) + assert.True(t, conR.applyDebounceLocked(true, t0.Add(16*time.Second))) + }) + + t.Run("zero debounce flips immediately", func(t *testing.T) { + conR := &Reactor{catchUpDebounce: 0} + assert.True(t, conR.applyDebounceLocked(true, t0)) + }) +} diff --git a/consensus/state.go b/consensus/state.go index daadbd17e5a..d530ec88bd3 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -2727,6 +2727,20 @@ func (cs *State) updatePrivValidatorPubKey() error { return nil } +// isLocalSoleValidator reports whether this node is the only validator in the +// current set, i.e. it can finalize blocks on its own and so does not need peers +// to make progress. Uses the same local-validator membership test as the signing +// path (Validators.HasAddress on the private validator's address), so a +// non-validator full node always returns false. +func (cs *State) isLocalSoleValidator() bool { + cs.mtx.RLock() + defer cs.mtx.RUnlock() + if cs.privValidatorPubKey == nil || cs.Validators == nil { + return false + } + return cs.Validators.Size() == 1 && cs.Validators.HasAddress(cs.privValidatorPubKey.Address()) +} + // look back to check existence of the node's consensus votes before joining consensus func (cs *State) checkDoubleSigningRisk(height int64) error { if cs.privValidator != nil && cs.privValidatorPubKey != nil && cs.config.DoubleSignCheckHeight > 0 && height > 0 { diff --git a/inspect/rpc/rpc.go b/inspect/rpc/rpc.go index 4367ade2d59..85bfa57cbae 100644 --- a/inspect/rpc/rpc.go +++ b/inspect/rpc/rpc.go @@ -86,6 +86,10 @@ func (waitSyncCheckerImpl) WaitSync() bool { return false } +func (waitSyncCheckerImpl) IsBehind() bool { + return false +} + // ListenAndServe listens on the address specified in srv.Addr and handles any // incoming requests over HTTP using the Inspector rpc handler specified on the server. func (srv *Server) ListenAndServe(ctx context.Context) error { diff --git a/inspect/rpc/rpc_test.go b/inspect/rpc/rpc_test.go new file mode 100644 index 00000000000..5929392313f --- /dev/null +++ b/inspect/rpc/rpc_test.go @@ -0,0 +1,15 @@ +package rpc + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// The offline inspect server has no live consensus reactor, so it always reports +// fully synced: never waiting on block-sync and never behind its peers. +func TestWaitSyncCheckerReportsSynced(t *testing.T) { + checker := waitSyncCheckerImpl{} + assert.False(t, checker.WaitSync()) + assert.False(t, checker.IsBehind()) +} diff --git a/node/setup.go b/node/setup.go index df5e316254c..558f20ee7b2 100644 --- a/node/setup.go +++ b/node/setup.go @@ -331,7 +331,11 @@ func createConsensusReactor(config *cfg.Config, if privValidator != nil { consensusState.SetPrivValidator(privValidator) } - consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics)) + consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics), + cs.ReactorCatchupConfig( + config.Consensus.CatchupLagThreshold, + config.Consensus.CatchupDebounceDuration, + )) consensusReactor.SetLogger(consensusLogger) // services which will be publishing and/or subscribing for messages (events) // consensusReactor will set it on consensusState and blockExecutor diff --git a/rpc/core/env.go b/rpc/core/env.go index 9dc27b7c131..cbad670ee55 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -59,6 +59,7 @@ type peers interface { type consensusReactor interface { WaitSync() bool + IsBehind() bool } // ---------------------------------------------- diff --git a/rpc/core/status.go b/rpc/core/status.go index 5e3d6d1892e..78ffa7e6c7f 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -62,7 +62,7 @@ func (env *Environment) Status(*rpctypes.Context) (*ctypes.ResultStatus, error) EarliestAppHash: earliestAppHash, EarliestBlockHeight: earliestBlockHeight, EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), - CatchingUp: env.ConsensusReactor.WaitSync(), + CatchingUp: env.ConsensusReactor.WaitSync() || env.ConsensusReactor.IsBehind(), }, ValidatorInfo: ctypes.ValidatorInfo{ Address: env.PubKey.Address(),