Skip to content

Commit 28a7735

Browse files
Merge branch 'main' into alex/mdbx_lag_34
2 parents a709cd7 + 3178fda commit 28a7735

File tree

83 files changed

+2832
-494
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+2832
-494
lines changed

cl/beacon/handler/block_production.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"github.com/erigontech/erigon/cl/gossip"
4848
"github.com/erigontech/erigon/cl/persistence/beacon_indicies"
4949
"github.com/erigontech/erigon/cl/phase1/core/state"
50+
"github.com/erigontech/erigon/cl/phase1/network/subnets"
5051
"github.com/erigontech/erigon/cl/transition"
5152
"github.com/erigontech/erigon/cl/transition/impl/eth2"
5253
"github.com/erigontech/erigon/cl/transition/machine"
@@ -143,8 +144,13 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
143144
}
144145

145146
defer func() {
147+
epoch := *slot / a.beaconChainCfg.SlotsPerEpoch
148+
committeesPerSlot := a.syncedData.CommitteeCount(epoch)
149+
subnet := subnets.ComputeSubnetForAttestation(
150+
committeesPerSlot, *slot, *committeeIndex,
151+
a.beaconChainCfg.SlotsPerEpoch, 64)
146152
a.logger.Debug("Produced Attestation", "slot", *slot,
147-
"committee_index", *committeeIndex, "cached", ok, "beacon_block_root",
153+
"committee_index", *committeeIndex, "subnet", subnet, "cached", ok, "beacon_block_root",
148154
attestationData.BeaconBlockRoot, "duration", time.Since(start))
149155
}()
150156

cl/p2p/mock_services/p2p_manager_mock.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cl/phase1/network/gossip/gossip_manager.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,22 @@ func (g *GossipManager) Close() error {
101101
}
102102

103103
func (g *GossipManager) newPubsubValidator(service serviceintf.Service[any], conditions ...ConditionFunc) pubsub.ValidatorEx {
104+
var selfID peer.ID
105+
if h := g.p2p.Host(); h != nil {
106+
selfID = h.ID()
107+
}
104108
return func(ctx context.Context, pid peer.ID, msg *pubsub.Message) (result pubsub.ValidationResult) {
105109
defer func() {
106110
if r := recover(); r != nil {
107111
log.Error("[GossipManager] panic in validator, rejecting message", "err", r, "topic", msg.GetTopic())
108112
result = pubsub.ValidationReject
109113
}
110114
}()
115+
// Skip validation for self-published messages: they were already validated
116+
// by ProcessMessage before Publish was called.
117+
if selfID != "" && pid == selfID {
118+
return pubsub.ValidationAccept
119+
}
111120
curVersion := g.beaconConfig.GetCurrentStateVersion(g.ethClock.GetCurrentEpoch())
112121
// parse the topic and subnet
113122
topic := msg.GetTopic()
@@ -252,8 +261,18 @@ func (g *GossipManager) Publish(ctx context.Context, name string, data []byte) e
252261
if topicHandle == nil {
253262
return fmt.Errorf("topic not found: %s", topic)
254263
}
264+
// Log peer count for attestation topics to help diagnose propagation issues
265+
if gossip.IsTopicBeaconAttestation(name) {
266+
peerCount := len(g.p2p.Pubsub().ListPeers(topic))
267+
if peerCount == 0 {
268+
log.Warn("[Gossip] Publishing attestation with NO peers on subnet", "topic", name, "peerCount", peerCount)
269+
} else if peerCount < 3 {
270+
log.Debug("[Gossip] Publishing attestation with low peer count", "topic", name, "peerCount", peerCount)
271+
}
272+
}
255273
// Note: before publishing the message to the network, Publish() internally runs the validator function.
256-
return topicHandle.topic.Publish(ctx, compressedData, pubsub.WithReadiness(pubsub.MinTopicSize(1)))
274+
// Removed MinTopicSize(1) - don't fail if no peers on subnet, message will propagate when peers join
275+
return topicHandle.topic.Publish(ctx, compressedData)
257276
}
258277

259278
func (g *GossipManager) goCheckForkAndResubscribe(ctx context.Context) {

cl/phase1/network/services/aggregate_and_proof_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(
247247
index: aggregateAndProof.SignedAggregateAndProof.Message.AggregatorIndex,
248248
}
249249
if a.seenAggreatorIndexes.Contains(seenIndex) {
250-
return nil
250+
return fmt.Errorf("%w: aggregator already seen", ErrIgnore)
251251
}
252252

253253
committee, err := headState.GetBeaconCommitee(slot, committeeIndex)

cl/phase1/network/services/attestation_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
263263
// mark the validator as seen
264264
epochLastTime, ok := s.validatorAttestationSeen.Get(vIndex)
265265
if ok && epochLastTime == targetEpoch {
266-
return nil
266+
return fmt.Errorf("validator already seen in target epoch %w", ErrIgnore)
267267
}
268268
s.validatorAttestationSeen.Add(vIndex, targetEpoch)
269269

0 commit comments

Comments
 (0)