Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 73 additions & 7 deletions consensus/broadcast/delayedBroadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
const prefixHeaderAlarm = "header_"
const prefixDelayDataAlarm = "delay_"
const sizeHeadersCache = 1000 // 1000 hashes in cache
const sizeProcessedMetaHeadersCache = 1000

type shardDataHandler interface {
GetHeaderHash() []byte
Expand All @@ -36,6 +37,9 @@ type shardDataHandler interface {
type ArgsDelayedBlockBroadcaster struct {
InterceptorsContainer process.InterceptorsContainer
HeadersSubscriber consensus.HeadersPoolSubscriber
HeadersPool consensus.HeadersPoolGetter
ProofsPool consensus.EquivalentProofsPool
EnableEpochsHandler common.EnableEpochsHandler
ShardCoordinator sharding.Coordinator
LeaderCacheSize uint32
ValidatorCacheSize uint32
Expand All @@ -60,6 +64,9 @@ type delayedBlockBroadcaster struct {
interceptorsContainer process.InterceptorsContainer
shardCoordinator sharding.Coordinator
headersSubscriber consensus.HeadersPoolSubscriber
headersPool consensus.HeadersPoolGetter
proofsPool consensus.EquivalentProofsPool
enableEpochsHandler common.EnableEpochsHandler
valHeaderBroadcastData []*shared.ValidatorHeaderBroadcastData
valBroadcastData []*shared.DelayedBroadcastData
delayedBroadcastData []*shared.DelayedBroadcastData
Expand All @@ -71,6 +78,7 @@ type delayedBlockBroadcaster struct {
broadcastHeader func(header data.HeaderHandler, pkBytes []byte) error
broadcastConsensusMessage func(message *consensus.Message) error
cacheHeaders storage.Cacher
cacheProcessedMetaHeaders storage.Cacher
mutHeadersCache sync.RWMutex
}

Expand All @@ -85,6 +93,15 @@ func NewDelayedBlockBroadcaster(args *ArgsDelayedBlockBroadcaster) (*delayedBloc
if check.IfNil(args.HeadersSubscriber) {
return nil, spos.ErrNilHeadersSubscriber
}
if check.IfNil(args.HeadersPool) {
return nil, spos.ErrNilHeadersPool
}
if check.IfNil(args.ProofsPool) {
return nil, spos.ErrNilEquivalentProofPool
}
if check.IfNil(args.EnableEpochsHandler) {
return nil, spos.ErrNilEnableEpochsHandler
}
if check.IfNil(args.AlarmScheduler) {
return nil, spos.ErrNilAlarmScheduler
}
Expand All @@ -94,22 +111,32 @@ func NewDelayedBlockBroadcaster(args *ArgsDelayedBlockBroadcaster) (*delayedBloc
return nil, err
}

cacheProcessedMetaHeaders, err := cache.NewLRUCache(sizeProcessedMetaHeadersCache)
if err != nil {
return nil, err
}

dbb := &delayedBlockBroadcaster{
alarm: args.AlarmScheduler,
shardCoordinator: args.ShardCoordinator,
interceptorsContainer: args.InterceptorsContainer,
headersSubscriber: args.HeadersSubscriber,
headersPool: args.HeadersPool,
proofsPool: args.ProofsPool,
enableEpochsHandler: args.EnableEpochsHandler,
valHeaderBroadcastData: make([]*shared.ValidatorHeaderBroadcastData, 0),
valBroadcastData: make([]*shared.DelayedBroadcastData, 0),
delayedBroadcastData: make([]*shared.DelayedBroadcastData, 0),
maxDelayCacheSize: args.LeaderCacheSize,
maxValidatorDelayCacheSize: args.ValidatorCacheSize,
mutDataForBroadcast: sync.RWMutex{},
cacheHeaders: cacheHeaders,
cacheProcessedMetaHeaders: cacheProcessedMetaHeaders,
mutHeadersCache: sync.RWMutex{},
}

dbb.headersSubscriber.RegisterHandler(dbb.headerReceived)
dbb.proofsPool.RegisterHandler(dbb.receivedProof)
err = dbb.registerHeaderInterceptorCallback(dbb.interceptedHeader)
if err != nil {
return nil, err
Expand Down Expand Up @@ -266,36 +293,75 @@ func (dbb *delayedBlockBroadcaster) Close() {
}

func (dbb *delayedBlockBroadcaster) headerReceived(headerHandler data.HeaderHandler, headerHash []byte) {
if headerHandler.GetShardID() != core.MetachainShardId {
return
}

if common.IsProofsFlagEnabledForHeader(dbb.enableEpochsHandler, headerHandler) {
if !dbb.proofsPool.HasProof(headerHandler.GetShardID(), headerHash) {
return
}
}

dbb.processMetachainHeaderBroadcast(headerHandler, headerHash)
}

func (dbb *delayedBlockBroadcaster) receivedProof(proof data.HeaderProofHandler) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a slight chance both methods will trigger processMetachainHeaderBroadcast for the same block, if header and proof are received roughly at the same time, leading to double broadcast.. should we worry about this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, problem would be duplicated goroutines, and some processing done twice. It gets deduplicated further down when dealing with the alarm, but I have added now deduplication at the processMetachainHeaderBroadcast.

if check.IfNil(proof) {
return
}
if proof.GetHeaderShardId() != core.MetachainShardId {
return
}

headerHash := proof.GetHeaderHash()
header, err := dbb.headersPool.GetHeaderByHash(headerHash)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the header be received and then evicted before the proof comes? You get header H, 100 more headers, evict H, and then you get the proof. The proof waits for the headerReceived, but it already happened.

if err != nil {
log.Trace("delayedBlockBroadcaster.receivedProof: header not found in pool, will be handled by headerReceived",
"headerHash", headerHash,
)
return
}

dbb.processMetachainHeaderBroadcast(header, headerHash)
}

func (dbb *delayedBlockBroadcaster) processMetachainHeaderBroadcast(headerHandler data.HeaderHandler, headerHash []byte) {
has, _ := dbb.cacheProcessedMetaHeaders.HasOrAdd(headerHash, struct{}{}, 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems too early to add the hash...
if the if below returns, nothing happened and next time it will return early on the "has"

if len(dbb.delayedBroadcastData) == 0 && len(dbb.valBroadcastData) == 0 {
		return
	}

if has {
log.Trace("delayedBlockBroadcaster.processMetachainHeaderBroadcast: already processed, skipping",
"headerHash", headerHash,
)
return
}

dbb.mutDataForBroadcast.RLock()
defer dbb.mutDataForBroadcast.RUnlock()

if len(dbb.delayedBroadcastData) == 0 && len(dbb.valBroadcastData) == 0 {
return
}
if headerHandler.GetShardID() != core.MetachainShardId {
return
}

headerHashes, dataForValidators, err := getShardDataFromMetaChainBlock(
headerHandler,
dbb.shardCoordinator.SelfId(),
)
if err != nil {
log.Error("delayedBlockBroadcaster.headerReceived", "error", err.Error(),
log.Error("delayedBlockBroadcaster.processMetachainHeaderBroadcast", "error", err.Error(),
"headerHash", headerHash,
)
return
}
if len(headerHashes) == 0 {
log.Trace("delayedBlockBroadcaster.headerReceived: header received with no shardData for current shard",
log.Trace("delayedBlockBroadcaster.processMetachainHeaderBroadcast: no shardData for current shard",
"headerHash", headerHash,
)
return
}

log.Trace("delayedBlockBroadcaster.headerReceived", "nbHeaderHashes", len(headerHashes))
log.Trace("delayedBlockBroadcaster.processMetachainHeaderBroadcast", "nbHeaderHashes", len(headerHashes))
for i := range headerHashes {
log.Trace("delayedBlockBroadcaster.headerReceived", "headerHash", headerHashes[i])
log.Trace("delayedBlockBroadcaster.processMetachainHeaderBroadcast", "headerHash", headerHashes[i])
}

go dbb.scheduleValidatorBroadcast(dataForValidators)
Expand Down
Loading
Loading