Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agreement/pseudonode.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@

// makeProposals creates a slice of block proposals for the given round and period.
func (n asyncPseudonode) makeProposals(round basics.Round, period period, accounts []account.ParticipationRecordForRound) ([]proposal, []unauthenticatedVote) {
if len(accounts) == 0 {
// If we don't have any participating accounts: no need to call AssembleBlock
return nil, nil

Check warning on line 289 in agreement/pseudonode.go

View check run for this annotation

Codecov / codecov/patch

agreement/pseudonode.go#L289

Added line #L289 was not covered by tests
}

addresses := make([]basics.Address, len(accounts))
for i := range accounts {
addresses[i] = accounts[i].Account
Expand Down
5 changes: 5 additions & 0 deletions components/mocks/mockParticipationRegistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
return false
}

// NumKeys implements account.ParticipationRegistry
func (m *MockParticipationRegistry) NumKeys() uint32 {
return 0

Check warning on line 84 in components/mocks/mockParticipationRegistry.go

View check run for this annotation

Codecov / codecov/patch

components/mocks/mockParticipationRegistry.go#L83-L84

Added lines #L83 - L84 were not covered by tests
}

// Register updates the EffectiveFirst and EffectiveLast fields. If there are multiple records for the account
// then it is possible for multiple records to be updated.
func (m *MockParticipationRegistry) Register(id account.ParticipationID, on basics.Round) error {
Expand Down
25 changes: 24 additions & 1 deletion data/account/participationRegistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"encoding/base32"
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/algorand/go-algorand/config"
Expand Down Expand Up @@ -258,6 +259,9 @@
// HasLiveKeys quickly tests to see if there is a valid participation key over some range of rounds
HasLiveKeys(from, to basics.Round) bool

// NumKeys quickly returns the number of participation keys in the DB.
NumKeys() uint32

// Register updates the EffectiveFirst and EffectiveLast fields. If there are multiple records for the account
// then it is possible for multiple records to be updated.
Register(id ParticipationID, on basics.Round) error
Expand Down Expand Up @@ -405,7 +409,10 @@

// participationDB provides a concrete implementation of the ParticipationRegistry interface.
type participationDB struct {
cache map[ParticipationID]ParticipationRecord
// This cache contains all ParticipationRecords. It must always contain all available keys
// because Get(), GetAll(), and HasLiveKeys() use this cache and never query the DB.
cache map[ParticipationID]ParticipationRecord
cacheSize uint32 // atomic uint32 to quickly check if participation DB has any keys in it

// dirty marked on Record(), DeleteExpired(), cleared on Register(), Delete(), Flush()
dirty map[ParticipationID]struct{}
Expand Down Expand Up @@ -442,6 +449,8 @@
required bool
}

// initializeCache loads all records from the database and puts them
// all in the cache.
func (db *participationDB) initializeCache() error {
db.mutex.Lock()
defer db.mutex.Unlock()
Expand All @@ -461,10 +470,19 @@
}

db.cache = cache
db.updateCacheSize()
db.dirty = make(map[ParticipationID]struct{})
return nil
}

func (db *participationDB) updateCacheSize() {
atomic.StoreUint32(&db.cacheSize, uint32(len(db.cache)))
}

func (db *participationDB) NumKeys() uint32 {
return atomic.LoadUint32(&db.cacheSize)

Check warning on line 483 in data/account/participationRegistry.go

View check run for this annotation

Codecov / codecov/patch

data/account/participationRegistry.go#L482-L483

Added lines #L482 - L483 were not covered by tests
}

func (db *participationDB) writeThread() {
defer close(db.writeQueueDone)
var lastErr error
Expand Down Expand Up @@ -549,6 +567,7 @@
Voting: voting,
VRF: vrf,
}
db.updateCacheSize()

return
}
Expand Down Expand Up @@ -580,6 +599,7 @@
}
delete(db.dirty, id)
delete(db.cache, id)
db.updateCacheSize()

// do the db part async
db.writeQueue <- makeOpRequest(&deleteOp{id})
Expand Down Expand Up @@ -615,6 +635,7 @@
db.dirty[r.ParticipationID] = struct{}{}
db.cache[r.ParticipationID] = r
}
db.updateCacheSize()
db.mutex.Unlock()
return nil
}
Expand Down Expand Up @@ -944,6 +965,7 @@
delete(db.dirty, id)
db.cache[id] = record.ParticipationRecord
}
db.updateCacheSize()
db.mutex.Unlock()
}

Expand Down Expand Up @@ -989,6 +1011,7 @@

db.dirty[record.ParticipationID] = struct{}{}
db.cache[record.ParticipationID] = record
db.updateCacheSize()
return nil
}

Expand Down
3 changes: 0 additions & 3 deletions data/accountManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ func (manager *AccountManager) StateProofKeys(rnd basics.Round) (out []account.S
// HasLiveKeys returns true if we have any Participation
// keys valid for the specified round range (inclusive)
func (manager *AccountManager) HasLiveKeys(from, to basics.Round) bool {
manager.mu.Lock()
defer manager.mu.Unlock()

return manager.registry.HasLiveKeys(from, to)
}

Expand Down
16 changes: 9 additions & 7 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,14 +660,16 @@
stats.ProcessingTime.AddTransaction(transactionGroupDuration)
}

blockGenerationStarts := time.Now()
lvb, gerr := pool.pendingBlockEvaluator.GenerateBlock(pool.getVotingAccountsForRound(evalRnd))
if gerr != nil {
pool.assemblyResults.err = fmt.Errorf("could not generate block for %d: %v", pool.assemblyResults.roundStartedEvaluating, gerr)
} else {
pool.assemblyResults.blk = lvb
if votingAccts := pool.getVotingAccountsForRound(evalRnd); len(votingAccts) > 0 {
blockGenerationStarts := time.Now()
lvb, gerr := pool.pendingBlockEvaluator.GenerateBlock(votingAccts)
if gerr != nil {
pool.assemblyResults.err = fmt.Errorf("could not generate block for %d: %v", pool.assemblyResults.roundStartedEvaluating, gerr)
} else {
pool.assemblyResults.blk = lvb

Check warning on line 669 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L663-L669

Added lines #L663 - L669 were not covered by tests
}
stats.BlockGenerationDuration = uint64(time.Since(blockGenerationStarts))

Check warning on line 671 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L671

Added line #L671 was not covered by tests
}
stats.BlockGenerationDuration = uint64(time.Since(blockGenerationStarts))
pool.assemblyResults.stats = *stats
pool.assemblyCond.Broadcast()
} else {
Expand Down
17 changes: 4 additions & 13 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,20 +1431,11 @@ func (node *AlgorandFullNode) Record(account basics.Address, round basics.Round,
}

// IsParticipating implements network.NodeInfo
//
// This function is not fully precise. node.ledger and
// node.accountManager might move relative to each other and there is
// no synchronization. This is good-enough for current uses of
// IsParticipating() which is used in networking code to determine if
// the node should ask for transaction gossip (or skip it to save
// bandwidth). The current transaction pool size is about 3
// rounds. Starting to receive transaction gossip 10 rounds in the
// future when we might propose or vote on blocks in that future is a
// little extra buffer but seems reasonable at this time. -- bolson
// 2022-05-18
// It makes the conservative assumption that if any participation keys exist that have not yet
// been deleted by the participationDB (as part of DeleteExpired(), where it cleans up and calls
// Delete() on expired keys) then we can consider the node as participating.
func (node *AlgorandFullNode) IsParticipating() bool {
round := node.ledger.Latest() + 1
return node.accountManager.HasLiveKeys(round, round+10)
return node.accountManager.Registry().NumKeys() > 0
}

// SetSyncRound no-ops
Expand Down
Loading