diff --git a/agreement/pseudonode.go b/agreement/pseudonode.go index 54c9fde7df..8c1c600b48 100644 --- a/agreement/pseudonode.go +++ b/agreement/pseudonode.go @@ -284,6 +284,11 @@ func (n asyncPseudonode) makePseudonodeVerifier(voteVerifier *AsyncVoteVerifier) // makeProposals creates a slice of block proposals for the given round and period. func (n asyncPseudonode) makeProposals(round basics.Round, period period, accounts []account.ParticipationRecordForRound) ([]proposal, []unauthenticatedVote) { + if len(accounts) == 0 { + // If we don't have any participating accounts: no need to call AssembleBlock + return nil, nil + } + addresses := make([]basics.Address, len(accounts)) for i := range accounts { addresses[i] = accounts[i].Account diff --git a/components/mocks/mockParticipationRegistry.go b/components/mocks/mockParticipationRegistry.go index 3e4cfaf5ca..9dc7c7952b 100644 --- a/components/mocks/mockParticipationRegistry.go +++ b/components/mocks/mockParticipationRegistry.go @@ -79,6 +79,11 @@ func (m *MockParticipationRegistry) HasLiveKeys(from, to basics.Round) bool { return false } +// NumKeys implements account.ParticipationRegistry +func (m *MockParticipationRegistry) NumKeys() uint32 { + return 0 +} + // Register updates the EffectiveFirst and EffectiveLast fields. If there are multiple records for the account // then it is possible for multiple records to be updated. func (m *MockParticipationRegistry) Register(id account.ParticipationID, on basics.Round) error { diff --git a/data/account/participationRegistry.go b/data/account/participationRegistry.go index a4ebcb7001..7a46c7f41a 100644 --- a/data/account/participationRegistry.go +++ b/data/account/participationRegistry.go @@ -22,6 +22,7 @@ import ( "encoding/base32" "errors" "fmt" + "sync/atomic" "time" "github.com/algorand/go-algorand/config" @@ -258,6 +259,9 @@ type ParticipationRegistry interface { // HasLiveKeys quickly tests to see if there is a valid participation key over some range of rounds HasLiveKeys(from, to basics.Round) bool + // NumKeys quickly returns the number of participation keys in the DB. + NumKeys() uint32 + // Register updates the EffectiveFirst and EffectiveLast fields. If there are multiple records for the account // then it is possible for multiple records to be updated. Register(id ParticipationID, on basics.Round) error @@ -405,7 +409,10 @@ func dbSchemaUpgrade0(ctx context.Context, tx *sql.Tx, newDatabase bool) error { // participationDB provides a concrete implementation of the ParticipationRegistry interface. type participationDB struct { - cache map[ParticipationID]ParticipationRecord + // This cache contains all ParticipationRecords. It must always contain all available keys + // because Get(), GetAll(), and HasLiveKeys() use this cache and never query the DB. + cache map[ParticipationID]ParticipationRecord + cacheSize uint32 // atomic uint32 to quickly check if participation DB has any keys in it // dirty marked on Record(), DeleteExpired(), cleared on Register(), Delete(), Flush() dirty map[ParticipationID]struct{} @@ -442,6 +449,8 @@ type updatingParticipationRecord struct { required bool } +// initializeCache loads all records from the database and puts them +// all in the cache. func (db *participationDB) initializeCache() error { db.mutex.Lock() defer db.mutex.Unlock() @@ -461,10 +470,19 @@ func (db *participationDB) initializeCache() error { } db.cache = cache + db.updateCacheSize() db.dirty = make(map[ParticipationID]struct{}) return nil } +func (db *participationDB) updateCacheSize() { + atomic.StoreUint32(&db.cacheSize, uint32(len(db.cache))) +} + +func (db *participationDB) NumKeys() uint32 { + return atomic.LoadUint32(&db.cacheSize) +} + func (db *participationDB) writeThread() { defer close(db.writeQueueDone) var lastErr error @@ -549,6 +567,7 @@ func (db *participationDB) Insert(record Participation) (id ParticipationID, err Voting: voting, VRF: vrf, } + db.updateCacheSize() return } @@ -580,6 +599,7 @@ func (db *participationDB) Delete(id ParticipationID) error { } delete(db.dirty, id) delete(db.cache, id) + db.updateCacheSize() // do the db part async db.writeQueue <- makeOpRequest(&deleteOp{id}) @@ -615,6 +635,7 @@ func (db *participationDB) DeleteExpired(latestRound basics.Round, agreementProt db.dirty[r.ParticipationID] = struct{}{} db.cache[r.ParticipationID] = r } + db.updateCacheSize() db.mutex.Unlock() return nil } @@ -944,6 +965,7 @@ func (db *participationDB) Register(id ParticipationID, on basics.Round) error { delete(db.dirty, id) db.cache[id] = record.ParticipationRecord } + db.updateCacheSize() db.mutex.Unlock() } @@ -989,6 +1011,7 @@ func (db *participationDB) Record(account basics.Address, round basics.Round, pa db.dirty[record.ParticipationID] = struct{}{} db.cache[record.ParticipationID] = record + db.updateCacheSize() return nil } diff --git a/data/accountManager.go b/data/accountManager.go index 83a51f4c28..7c95eb65d2 100644 --- a/data/accountManager.go +++ b/data/accountManager.go @@ -95,9 +95,6 @@ func (manager *AccountManager) StateProofKeys(rnd basics.Round) (out []account.S // HasLiveKeys returns true if we have any Participation // keys valid for the specified round range (inclusive) func (manager *AccountManager) HasLiveKeys(from, to basics.Round) bool { - manager.mu.Lock() - defer manager.mu.Unlock() - return manager.registry.HasLiveKeys(from, to) } diff --git a/data/pools/transactionPool.go b/data/pools/transactionPool.go index dfdc633262..61bc567f67 100644 --- a/data/pools/transactionPool.go +++ b/data/pools/transactionPool.go @@ -660,14 +660,16 @@ func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup []transactio stats.ProcessingTime.AddTransaction(transactionGroupDuration) } - blockGenerationStarts := time.Now() - lvb, gerr := pool.pendingBlockEvaluator.GenerateBlock(pool.getVotingAccountsForRound(evalRnd)) - if gerr != nil { - pool.assemblyResults.err = fmt.Errorf("could not generate block for %d: %v", pool.assemblyResults.roundStartedEvaluating, gerr) - } else { - pool.assemblyResults.blk = lvb + if votingAccts := pool.getVotingAccountsForRound(evalRnd); len(votingAccts) > 0 { + blockGenerationStarts := time.Now() + lvb, gerr := pool.pendingBlockEvaluator.GenerateBlock(votingAccts) + if gerr != nil { + pool.assemblyResults.err = fmt.Errorf("could not generate block for %d: %v", pool.assemblyResults.roundStartedEvaluating, gerr) + } else { + pool.assemblyResults.blk = lvb + } + stats.BlockGenerationDuration = uint64(time.Since(blockGenerationStarts)) } - stats.BlockGenerationDuration = uint64(time.Since(blockGenerationStarts)) pool.assemblyResults.stats = *stats pool.assemblyCond.Broadcast() } else { diff --git a/node/node.go b/node/node.go index 15b2bea5e5..7db8231173 100644 --- a/node/node.go +++ b/node/node.go @@ -1431,20 +1431,11 @@ func (node *AlgorandFullNode) Record(account basics.Address, round basics.Round, } // IsParticipating implements network.NodeInfo -// -// This function is not fully precise. node.ledger and -// node.accountManager might move relative to each other and there is -// no synchronization. This is good-enough for current uses of -// IsParticipating() which is used in networking code to determine if -// the node should ask for transaction gossip (or skip it to save -// bandwidth). The current transaction pool size is about 3 -// rounds. Starting to receive transaction gossip 10 rounds in the -// future when we might propose or vote on blocks in that future is a -// little extra buffer but seems reasonable at this time. -- bolson -// 2022-05-18 +// It makes the conservative assumption that if any participation keys exist that have not yet +// been deleted by the participationDB (as part of DeleteExpired(), where it cleans up and calls +// Delete() on expired keys) then we can consider the node as participating. func (node *AlgorandFullNode) IsParticipating() bool { - round := node.ledger.Latest() + 1 - return node.accountManager.HasLiveKeys(round, round+10) + return node.accountManager.Registry().NumKeys() > 0 } // SetSyncRound no-ops