Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 10 additions & 32 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func (conR *Reactor) OnStart() error {
go conR.peerStatsRoutine()

conR.subscribeToBroadcastEvents()
go conR.updateRoundStateRoutine()

if !conR.WaitSync() {
err := conR.conS.Start()
Expand Down Expand Up @@ -285,9 +284,10 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
case *HasProposalBlockPartMessage:
ps.ApplyHasProposalBlockPartMessage(msg)
case *VoteSetMaj23Message:
conR.rsMtx.RLock()
conR.rsMtx.Lock()
conR.rs = conR.conS.GetRoundState()
height, votes := conR.rs.Height, conR.rs.Votes
conR.rsMtx.RUnlock()
conR.rsMtx.Unlock()
if height != msg.Height {
return
}
Expand Down Expand Up @@ -354,9 +354,10 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
case *VoteMessage:
cs := conR.conS

conR.rsMtx.RLock()
conR.rsMtx.Lock()
conR.rs = conR.conS.GetRoundState()
height, valSize, lastCommitSize := conR.rs.Height, conR.rs.Validators.Size(), conR.rs.LastCommit.Size()
conR.rsMtx.RUnlock()
conR.rsMtx.Unlock()
ps.SetHasVoteFromPeer(msg.Vote, height, valSize, lastCommitSize)

cs.peerMsgQueue <- msgInfo{msg, e.Src.ID(), time.Time{}}
Expand All @@ -376,9 +377,10 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
switch msg := msg.(type) {
case *VoteSetBitsMessage:
conR.rsMtx.RLock()
conR.rsMtx.Lock()
conR.rs = conR.conS.GetRoundState()
height, votes := conR.rs.Height, conR.rs.Votes
conR.rsMtx.RUnlock()
conR.rsMtx.Unlock()

if height == msg.Height {
var ourVotes *bits.BitArray
Expand Down Expand Up @@ -426,7 +428,6 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep,
func(data cmtevents.EventData) {
conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState))
conR.updateRoundStateNoCsLock()
}); err != nil {
conR.Logger.Error("Error adding listener for events (NewRoundStep)", "err", err)
}
Expand All @@ -441,15 +442,13 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote,
func(data cmtevents.EventData) {
conR.broadcastHasVoteMessage(data.(*types.Vote))
conR.updateRoundStateNoCsLock()
}); err != nil {
conR.Logger.Error("Error adding listener for events (Vote)", "err", err)
}

if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventProposalBlockPart,
func(data cmtevents.EventData) {
conR.broadcastHasProposalBlockPartMessage(data.(*BlockPartMessage))
conR.updateRoundStateNoCsLock()
}); err != nil {
conR.Logger.Error("Error adding listener for events (ProposalBlockPart)", "err", err)
}
Expand Down Expand Up @@ -561,31 +560,10 @@ func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
})
}

func (conR *Reactor) updateRoundStateRoutine() {
t := time.NewTicker(100 * time.Microsecond)
defer t.Stop()
for range t.C {
if !conR.IsRunning() {
return
}
rs := conR.conS.GetRoundState()
conR.rsMtx.Lock()
conR.rs = rs
conR.rsMtx.Unlock()
}
}
Comment on lines -564 to -576
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loops does very frequent and short lived allocations.
Can it be replaced with just a get over round state wherever is necessary?
This is what this PR tries and do


func (conR *Reactor) updateRoundStateNoCsLock() {
rs := conR.conS.getRoundState()
conR.rsMtx.Lock()
conR.rs = rs
conR.initialHeight = conR.conS.state.InitialHeight
conR.rsMtx.Unlock()
}

func (conR *Reactor) getRoundState() *cstypes.RoundState {
conR.rsMtx.Lock()
defer conR.rsMtx.Unlock()
conR.rs = conR.conS.GetRoundState()
return conR.rs
}

Expand Down
Loading