Skip to content

Commit 69f262a

Browse files
committed
fix(consensus,sharding,libp2p): address PR #48 review feedback
- nodesCoordinator: guard EpochStartPrepare display block against a nil nodesConfig[newEpoch] when SetNodes returns an error, and acquire mutNodesMaps.RLock() for consistency with other readers. - slotConsensus: rewrite the SetConsensusGroup doc comment so it matches the body — the two writes are intentionally non-atomic to keep the JobDone hot path off the ConsensusGroup lock; the brief inconsistency self-heals on the next poll. - consensusState: add SetWaitingAllSignaturesTimeOutIfSlot to perform the slot-equality check and the flag write under a single mutSlotState write lock. - subslotSignature: use the new helper in waitAllSignatures so a stale goroutine cannot set WaitingAllSignaturesTimeOut after BeginNewSlot has cleared state. - libp2p netMessenger: replace per-iteration time.After with a reusable time.Ticker so the connection-monitor sweep loop no longer allocates a fresh timer on every iteration.
1 parent 15f04c5 commit 69f262a

5 files changed

Lines changed: 36 additions & 23 deletions

File tree

core/consensus/slot/bls/subslotSignature.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -327,21 +327,14 @@ func (sr *subslotSignature) waitAllSignatures(spawnSlot int64) {
327327
// All signatures collected (100%), exit immediately
328328
return
329329
case <-timeout.C:
330-
sr.RLockSlotState()
331-
currentSlot := sr.SlotIndex
332-
sr.RUnlockSlotState()
333-
if currentSlot != spawnSlot {
334-
// Slot has advanced — this goroutine belongs to a stale slot. Do
335-
// not touch shared state; the current slot has its own goroutine.
336-
return
337-
}
338-
339330
// Timeout reached, check if subslot is already finished by threshold signatures
340331
if sr.IsSubslotFinished(sr.Current()) {
341332
return
342333
}
343334

344-
sr.SetWaitingAllSignaturesTimeOut(true)
335+
if !sr.SetWaitingAllSignaturesTimeOutIfSlot(spawnSlot, true) {
336+
return
337+
}
345338
select {
346339
case sr.ConsensusChannel() <- true:
347340
default:

core/consensus/slot/consensusState.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,20 @@ func (cns *ConsensusState) SetWaitingAllSignaturesTimeOut(timedOut bool) {
7575
cns.mutSlotState.Unlock()
7676
}
7777

78+
// SetWaitingAllSignaturesTimeOutIfSlot atomically sets WaitingAllSignaturesTimeOut
79+
// only when SlotIndex still equals spawnSlot. Returns true when the write was
80+
// applied. Used by waitAllSignatures to ensure a stale goroutine cannot set the
81+
// flag after BeginNewSlot has already advanced the slot and cleared state.
82+
func (cns *ConsensusState) SetWaitingAllSignaturesTimeOutIfSlot(spawnSlot int64, timedOut bool) bool {
83+
cns.mutSlotState.Lock()
84+
defer cns.mutSlotState.Unlock()
85+
if cns.SlotIndex != spawnSlot {
86+
return false
87+
}
88+
cns.WaitingAllSignaturesTimeOut = timedOut
89+
return true
90+
}
91+
7892
// NewConsensusState creates a new ConsensusState object
7993
func NewConsensusState(
8094
slotConsensus *slotConsensus,

core/consensus/slot/slotConsensus.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@ import (
1313
// - mutConsensusGroup protects consensusGroup
1414
// - mut protects validatorSlotStates
1515
//
16-
// SetConsensusGroup acquires mutConsensusGroup and mut in that order to swap
17-
// both atomically with respect to combined readers; never the reverse order.
16+
// SetConsensusGroup installs consensusGroup and validatorSlotStates in two
17+
// separate critical sections (mutConsensusGroup, then mut); the writes are
18+
// intentionally not atomic against combined readers, and a concurrent
19+
// ComputeSize can undercount for one poll cycle before self-healing on the
20+
// next poll. Lock order is fixed: mutConsensusGroup before mut, never the
21+
// reverse, so readers holding mut.RLock can safely call ConsensusGroup().
1822
type slotConsensus struct {
1923
electedNodes map[string]struct{}
2024
mutElected sync.RWMutex

network/p2p/libp2p/netMessenger.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,12 +439,14 @@ func (netMes *networkMessenger) createConnectionMonitor(p2pConfig config.P2PConf
439439
// goroutine had no termination signal and leaked on Close(), which
440440
// accumulated under test rigs that spin clusters up and down in the
441441
// same process (-count=N integration tests).
442+
ticker := time.NewTicker(durationCheckConnections)
443+
defer ticker.Stop()
442444
for {
443445
cmw.CheckConnectionsBlocking()
444446
select {
445447
case <-netMes.ctx.Done():
446448
return
447-
case <-time.After(durationCheckConnections):
449+
case <-ticker.C:
448450
}
449451
}
450452
}()

sharding/nodesCoordinator.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -786,18 +786,18 @@ func (ihgs *indexHashedNodesCoordinator) EpochStartPrepare(metaHdr data.HeaderHa
786786

787787
ihgs.mutNodesConfig.RLock()
788788
displayCfg := ihgs.nodesConfig[newEpoch]
789-
displayElected := displayCfg.electedList
790-
displayEligible := displayCfg.eligibleList
791-
displayWaiting := displayCfg.waitingList
792-
displayLeaving := displayCfg.leavingList
793789
ihgs.mutNodesConfig.RUnlock()
794790

795-
displayNodesConfiguration(
796-
displayElected,
797-
displayEligible,
798-
displayWaiting,
799-
displayLeaving,
800-
)
791+
if displayCfg != nil {
792+
displayCfg.mutNodesMaps.RLock()
793+
elected := displayCfg.electedList
794+
eligible := displayCfg.eligibleList
795+
waiting := displayCfg.waitingList
796+
leaving := displayCfg.leavingList
797+
displayCfg.mutNodesMaps.RUnlock()
798+
799+
displayNodesConfiguration(elected, eligible, waiting, leaving)
800+
}
801801

802802
ihgs.mutSavedStateKey.Lock()
803803
ihgs.savedStateKey = randomness

0 commit comments

Comments
 (0)