Skip to content

Commit e4ce1ac

Browse files
authored
Moved several consensus fields to state struct (#4825)
* Moved consensus fields to state struct. * Proper comment.
1 parent 350573a commit e4ce1ac

22 files changed

+323
-176
lines changed

consensus/checks_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package consensus
2+
3+
import (
4+
"testing"
5+
6+
msg_pb "github.com/harmony-one/harmony/api/proto/message"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
// verifyMessageSig modifies the message signature when returns error
11+
func TestVerifyMessageSig(t *testing.T) {
12+
message := &msg_pb.Message{
13+
Signature: []byte("signature"),
14+
}
15+
16+
err := verifyMessageSig(nil, message)
17+
require.Error(t, err)
18+
require.Empty(t, message.Signature)
19+
}

consensus/consensus.go

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"sync"
66
"sync/atomic"
77
"time"
8-
"unsafe"
98

109
"github.com/harmony-one/abool"
1110
bls_core "github.com/harmony-one/bls/ffi/go/bls"
@@ -65,8 +64,6 @@ type Consensus struct {
6564
decider quorum.Decider
6665
// FBFTLog stores the pbft messages and blocks during FBFT process
6766
fBFTLog *FBFTLog
68-
// phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc
69-
phase FBFTPhase
7067
// current indicates what state a node is in
7168
current State
7269
// isBackup declarative the node is in backup mode
@@ -89,15 +86,7 @@ type Consensus struct {
8986
MinPeers int
9087
// private/public keys of current node
9188
priKey multibls.PrivateKeys
92-
// the publickey of leader
93-
leaderPubKey unsafe.Pointer //*bls.PublicKeyWrapper
94-
// blockNum: the next blockNumber that FBFT is going to agree on,
95-
// should be equal to the blockNumber of next block
96-
blockNum uint64
97-
// Blockhash - 32 byte
98-
blockHash [32]byte
99-
// Block to run consensus on
100-
block []byte
89+
10190
// Shard Id which this node belongs to
10291
ShardID uint32
10392
// IgnoreViewIDCheck determines whether to ignore viewID check
@@ -241,21 +230,19 @@ func (consensus *Consensus) getPublicKeys() multibls.PublicKeys {
241230
}
242231

243232
func (consensus *Consensus) GetLeaderPubKey() *bls_cosi.PublicKeyWrapper {
244-
consensus.mutex.RLock()
245-
defer consensus.mutex.RUnlock()
246233
return consensus.getLeaderPubKey()
247234
}
248235

249236
func (consensus *Consensus) getLeaderPubKey() *bls_cosi.PublicKeyWrapper {
250-
return (*bls_cosi.PublicKeyWrapper)(atomic.LoadPointer(&consensus.leaderPubKey))
237+
return consensus.current.getLeaderPubKey()
251238
}
252239

253240
func (consensus *Consensus) SetLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) {
254241
consensus.setLeaderPubKey(pub)
255242
}
256243

257244
func (consensus *Consensus) setLeaderPubKey(pub *bls_cosi.PublicKeyWrapper) {
258-
atomic.StorePointer(&consensus.leaderPubKey, unsafe.Pointer(pub))
245+
consensus.current.setLeaderPubKey(pub)
259246
}
260247

261248
func (consensus *Consensus) GetPrivateKeys() multibls.PrivateKeys {
@@ -284,11 +271,11 @@ func (consensus *Consensus) IsBackup() bool {
284271
}
285272

286273
func (consensus *Consensus) BlockNum() uint64 {
287-
return atomic.LoadUint64(&consensus.blockNum)
274+
return consensus.getBlockNum()
288275
}
289276

290277
func (consensus *Consensus) getBlockNum() uint64 {
291-
return atomic.LoadUint64(&consensus.blockNum)
278+
return atomic.LoadUint64(&consensus.current.blockNum)
292279
}
293280

294281
// New create a new Consensus record
@@ -301,8 +288,7 @@ func New(
301288
mutex: &sync.RWMutex{},
302289
ShardID: shard,
303290
fBFTLog: NewFBFTLog(),
304-
phase: FBFTAnnounce,
305-
current: NewState(Normal),
291+
current: NewState(Normal, shard),
306292
decider: Decider,
307293
registry: registry,
308294
MinPeers: minPeers,

consensus/consensus_service.go

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package consensus
22

33
import (
44
"math/big"
5-
"sync/atomic"
65
"time"
76

87
"github.com/ethereum/go-ethereum/common"
@@ -65,7 +64,7 @@ var (
6564
// Signs the consensus message and returns the marshaled message.
6665
func (consensus *Consensus) signAndMarshalConsensusMessage(message *msg_pb.Message,
6766
priKey *bls_core.SecretKey) ([]byte, error) {
68-
if err := consensus.signConsensusMessage(message, priKey); err != nil {
67+
if err := signConsensusMessage(message, priKey); err != nil {
6968
return empty, err
7069
}
7170
marshaledMessage, err := protobuf.Marshal(message)
@@ -113,30 +112,30 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi
113112
}
114113

115114
// Sign on the hash of the message
116-
func (consensus *Consensus) signMessage(message []byte, priKey *bls_core.SecretKey) []byte {
115+
func signMessage(message []byte, priKey *bls_core.SecretKey) []byte {
117116
hash := hash.Keccak256(message)
118117
signature := priKey.SignHash(hash[:])
119118
return signature.Serialize()
120119
}
121120

122121
// Sign on the consensus message signature field.
123-
func (consensus *Consensus) signConsensusMessage(message *msg_pb.Message,
122+
func signConsensusMessage(message *msg_pb.Message,
124123
priKey *bls_core.SecretKey) error {
125124
message.Signature = nil
126125
marshaledMessage, err := protobuf.Marshal(message)
127126
if err != nil {
128127
return err
129128
}
130129
// 64 byte of signature on previous data
131-
signature := consensus.signMessage(marshaledMessage, priKey)
130+
signature := signMessage(marshaledMessage, priKey)
132131
message.Signature = signature
133132
return nil
134133
}
135134

136135
// UpdateBitmaps update the bitmaps for prepare and commit phase
137136
func (consensus *Consensus) updateBitmaps() {
138137
consensus.getLogger().Debug().
139-
Str("MessageType", consensus.phase.String()).
138+
Str("MessageType", consensus.current.phase.String()).
140139
Msg("[UpdateBitmaps] Updating consensus bitmaps")
141140
members := consensus.decider.Participants()
142141
prepareBitmap := bls_cosi.NewMask(members)
@@ -199,8 +198,8 @@ func (consensus *Consensus) sendLastSignPower() {
199198
func (consensus *Consensus) resetState() {
200199
consensus.switchPhase("ResetState", FBFTAnnounce)
201200

202-
consensus.blockHash = [32]byte{}
203-
consensus.block = []byte{}
201+
consensus.current.blockHash = [32]byte{}
202+
consensus.current.block = []byte{}
204203
consensus.decider.ResetPrepareAndCommitVotes()
205204
if consensus.prepareBitmap != nil {
206205
consensus.prepareBitmap.Clear()
@@ -295,23 +294,23 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error {
295294

296295
// SetBlockNum sets the blockNum in consensus object, called at node bootstrap
297296
func (consensus *Consensus) SetBlockNum(blockNum uint64) {
298-
atomic.StoreUint64(&consensus.blockNum, blockNum)
297+
consensus.setBlockNum(blockNum)
299298
}
300299

301300
// SetBlockNum sets the blockNum in consensus object, called at node bootstrap
302301
func (consensus *Consensus) setBlockNum(blockNum uint64) {
303-
atomic.StoreUint64(&consensus.blockNum, blockNum)
302+
consensus.current.setBlockNum(blockNum)
304303
}
305304

306305
// ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading
307306
func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) {
308307
consensus.mutex.RLock()
309308
members := consensus.decider.Participants()
310309
consensus.mutex.RUnlock()
311-
return consensus.readSignatureBitmapPayload(recvPayload, offset, members)
310+
return readSignatureBitmapPayload(recvPayload, offset, members)
312311
}
313312

314-
func (consensus *Consensus) readSignatureBitmapPayload(recvPayload []byte, offset int, members multibls.PublicKeys) (*bls_core.Sign, *bls_cosi.Mask, error) {
313+
func readSignatureBitmapPayload(recvPayload []byte, offset int, members multibls.PublicKeys) (*bls_core.Sign, *bls_cosi.Mask, error) {
315314
if offset+bls.BLSSignatureSizeInBytes > len(recvPayload) {
316315
return nil, nil, errors.New("payload not have enough length")
317316
}
@@ -596,12 +595,7 @@ func (consensus *Consensus) GetFinality() int64 {
596595

597596
// switchPhase will switch FBFTPhase to desired phase.
598597
func (consensus *Consensus) switchPhase(subject string, desired FBFTPhase) {
599-
consensus.getLogger().Info().
600-
Str("from:", consensus.phase.String()).
601-
Str("to:", desired.String()).
602-
Str("switchPhase:", subject)
603-
604-
consensus.phase = desired
598+
consensus.current.switchPhase(subject, desired)
605599
}
606600

607601
var (
@@ -623,15 +617,15 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
623617
return errGetPreparedBlock
624618
}
625619

626-
aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.decider.Participants())
620+
aggSig, mask, err := readSignatureBitmapPayload(payload, 32, consensus.decider.Participants())
627621
if err != nil {
628622
return errReadBitmapPayload
629623
}
630624

631625
// Have to keep the block hash so the leader can finish the commit phase of prepared block
632626
consensus.resetState()
633627

634-
copy(consensus.blockHash[:], blockHash[:])
628+
copy(consensus.current.blockHash[:], blockHash[:])
635629
consensus.switchPhase("selfCommit", FBFTCommit)
636630
consensus.aggregatedPrepareSig = aggSig
637631
consensus.prepareBitmap = mask
@@ -651,7 +645,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
651645
quorum.Commit,
652646
[]*bls_cosi.PublicKeyWrapper{key.Pub},
653647
key.Pri.SignHash(commitPayload),
654-
common.BytesToHash(consensus.blockHash[:]),
648+
common.BytesToHash(consensus.current.blockHash[:]),
655649
block.NumberU64(),
656650
block.Header().ViewID().Uint64(),
657651
); err != nil {
@@ -697,9 +691,9 @@ func (consensus *Consensus) GetLogger() *zerolog.Logger {
697691
func (consensus *Consensus) getLogger() *zerolog.Logger {
698692
logger := utils.Logger().With().
699693
Uint32("shardID", consensus.ShardID).
700-
Uint64("myBlock", consensus.blockNum).
701-
Uint64("myViewID", consensus.getCurBlockViewID()).
702-
Str("phase", consensus.phase.String()).
694+
Uint64("myBlock", consensus.current.getBlockNum()).
695+
Uint64("myViewID", consensus.current.getCurBlockViewID()).
696+
Str("phase", consensus.current.phase.String()).
703697
Str("mode", consensus.current.Mode().String()).
704698
Logger()
705699
return &logger

consensus/consensus_service_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestSignAndMarshalConsensusMessage(t *testing.T) {
3434
t.Fatalf("Cannot craeate consensus: %v", err)
3535
}
3636
consensus.SetCurBlockViewID(2)
37-
consensus.blockHash = [32]byte{}
37+
consensus.current.blockHash = [32]byte{}
3838

3939
msg := &msg_pb.Message{}
4040
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(msg, blsPriKey)

consensus/consensus_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestConsensusInitialization(t *testing.T) {
2323
assert.NoError(t, err)
2424

2525
messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
26-
state := NewState(Normal)
26+
state := NewState(Normal, consensus.ShardID)
2727

2828
timeouts := createTimeout()
2929
expectedTimeouts := make(map[TimeoutType]time.Duration)
@@ -37,7 +37,7 @@ func TestConsensusInitialization(t *testing.T) {
3737
// FBFTLog
3838
assert.NotNil(t, consensus.FBFTLog())
3939

40-
assert.Equal(t, FBFTAnnounce, consensus.phase)
40+
assert.Equal(t, FBFTAnnounce, consensus.current.phase)
4141

4242
// State / consensus.current
4343
assert.Equal(t, state.mode, consensus.current.mode)

consensus/consensus_v2.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"context"
66
"encoding/hex"
77
"math/big"
8-
"sync/atomic"
98
"time"
109

1110
"github.com/ethereum/go-ethereum/common"
@@ -179,7 +178,7 @@ func (consensus *Consensus) _finalCommit(isLeader bool) {
179178
)
180179
consensus.fBFTLog.AddVerifiedMessage(FBFTMsg)
181180
// find correct block content
182-
curBlockHash := consensus.blockHash
181+
curBlockHash := consensus.current.blockHash
183182
block := consensus.fBFTLog.GetBlockByHash(curBlockHash)
184183
if block == nil {
185184
consensus.getLogger().Warn().
@@ -843,7 +842,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int, defaultKey *bls.PublicK
843842

844843
// SetupForNewConsensus sets the state for new consensus
845844
func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) {
846-
atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1)
845+
consensus.setBlockNum(blk.NumberU64() + 1)
847846
consensus.setCurBlockViewID(committedMsg.ViewID + 1)
848847
var epoch *big.Int
849848
if blk.IsLastBlockInEpoch() {

consensus/construct.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,32 +24,32 @@ type NetworkMessage struct {
2424
}
2525

2626
// Populates the common basic fields for all consensus message.
27-
func (consensus *Consensus) populateMessageFields(
27+
func (pm *State) populateMessageFields(
2828
request *msg_pb.ConsensusRequest, blockHash []byte,
2929
) *msg_pb.ConsensusRequest {
30-
request.ViewId = consensus.getCurBlockViewID()
31-
request.BlockNum = consensus.getBlockNum()
32-
request.ShardId = consensus.ShardID
30+
request.ViewId = pm.getCurBlockViewID()
31+
request.BlockNum = pm.getBlockNum()
32+
request.ShardId = pm.ShardID
3333
// 32 byte block hash
3434
request.BlockHash = blockHash
3535
return request
3636
}
3737

3838
// Populates the common basic fields for the consensus message and senders bitmap.
39-
func (consensus *Consensus) populateMessageFieldsAndSendersBitmap(
39+
func (pm *State) populateMessageFieldsAndSendersBitmap(
4040
request *msg_pb.ConsensusRequest, blockHash []byte, bitmap []byte,
4141
) *msg_pb.ConsensusRequest {
42-
consensus.populateMessageFields(request, blockHash)
42+
pm.populateMessageFields(request, blockHash)
4343
// sender address
4444
request.SenderPubkeyBitmap = bitmap
4545
return request
4646
}
4747

4848
// Populates the common basic fields for the consensus message and single sender.
49-
func (consensus *Consensus) populateMessageFieldsAndSender(
49+
func (pm *State) populateMessageFieldsAndSender(
5050
request *msg_pb.ConsensusRequest, blockHash []byte, pubKey bls.SerializedPublicKey,
5151
) *msg_pb.ConsensusRequest {
52-
consensus.populateMessageFields(request, blockHash)
52+
pm.populateMessageFields(request, blockHash)
5353
// sender address
5454
request.SenderPubkey = pubKey[:]
5555
return request
@@ -75,26 +75,26 @@ func (consensus *Consensus) construct(
7575
)
7676

7777
if len(priKeys) == 1 {
78-
consensusMsg = consensus.populateMessageFieldsAndSender(
79-
message.GetConsensus(), consensus.blockHash[:], priKeys[0].Pub.Bytes,
78+
consensusMsg = consensus.current.populateMessageFieldsAndSender(
79+
message.GetConsensus(), consensus.current.blockHash[:], priKeys[0].Pub.Bytes,
8080
)
8181
} else {
8282
// TODO: use a persistent bitmap to report bitmap
8383
mask := bls.NewMask(consensus.decider.Participants())
8484
for _, key := range priKeys {
8585
mask.SetKey(key.Pub.Bytes, true)
8686
}
87-
consensusMsg = consensus.populateMessageFieldsAndSendersBitmap(
88-
message.GetConsensus(), consensus.blockHash[:], mask.Bitmap,
87+
consensusMsg = consensus.current.populateMessageFieldsAndSendersBitmap(
88+
message.GetConsensus(), consensus.current.blockHash[:], mask.Bitmap,
8989
)
9090
}
9191

9292
// Do the signing, 96 byte of bls signature
9393
needMsgSig := true
9494
switch p {
9595
case msg_pb.MessageType_ANNOUNCE:
96-
consensusMsg.Block = consensus.block
97-
consensusMsg.Payload = consensus.blockHash[:]
96+
consensusMsg.Block = consensus.current.block
97+
consensusMsg.Payload = consensus.current.blockHash[:]
9898
case msg_pb.MessageType_PREPARE:
9999
needMsgSig = false
100100
sig := bls_core.Sign{}
@@ -114,7 +114,7 @@ func (consensus *Consensus) construct(
114114
}
115115
consensusMsg.Payload = sig.Serialize()
116116
case msg_pb.MessageType_PREPARED:
117-
consensusMsg.Block = consensus.block
117+
consensusMsg.Block = consensus.current.block
118118
consensusMsg.Payload = consensus.constructQuorumSigAndBitmap(quorum.Prepare)
119119
case msg_pb.MessageType_COMMITTED:
120120
consensusMsg.Payload = consensus.constructQuorumSigAndBitmap(quorum.Commit)

0 commit comments

Comments
 (0)