Skip to content

Commit e7b3a30

Browse files
committed
Add peer rate limiting and reputation tracking to blob reactor
1 parent 4536507 commit e7b3a30

8 files changed

Lines changed: 645 additions & 34 deletions

File tree

config/template/template.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,10 @@ logging = "{{ .BeaconKit.NodeAPI.Logging }}"
113113
[beacon-kit.blob-reactor]
114114
# Request timeout for blob requests from peers
115115
request-timeout = "{{ .BeaconKit.BlobReactor.RequestTimeout }}"
116+
117+
# Rate limiting for incoming blob requests per peer (messages per second)
118+
max-messages-per-peer-per-second = {{ .BeaconKit.BlobReactor.MaxMessagesPerPeerPerSecond }}
119+
120+
# Global rate limiting for incoming blob requests from all peers (requests per second)
121+
max-global-requests-per-second = {{ .BeaconKit.BlobReactor.MaxGlobalRequestsPerSecond }}
116122
`

da/blobreactor/config.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,49 @@ package blobreactor
2222

2323
import "time"
2424

25-
const defaultRequestTimeout = 5 * time.Second
25+
const (
26+
// defaultRequestTimeout is the maximum time to wait for a blob response from a peer
27+
defaultRequestTimeout = 5 * time.Second
28+
// defaultMaxMessagesPerPeerPerSecond limits the rate of incoming blob requests per peer
29+
defaultMaxMessagesPerPeerPerSecond = 10.0 / 60.0
30+
// defaultMaxGlobalRequestsPerSecond limits the total rate of incoming blob requests from all peers
31+
defaultMaxGlobalRequestsPerSecond = 30.0 / 60.0
32+
33+
// BurstMultiplier determines how much burst capacity to allow relative to the rate limit.
34+
BurstMultiplier = 2.0
35+
)
2636

27-
// Config is the configuration for the blob reactor
2837
type Config struct {
29-
// RequestTimeout is the timeout for blob requests
30-
RequestTimeout time.Duration `mapstructure:"request-timeout"`
38+
RequestTimeout time.Duration `mapstructure:"request-timeout"`
39+
MaxMessagesPerPeerPerSecond float64 `mapstructure:"max-messages-per-peer-per-second"`
40+
MaxGlobalRequestsPerSecond float64 `mapstructure:"max-global-requests-per-second"`
41+
Reputation ReputationConfig `mapstructure:"reputation"`
3142
}
3243

33-
// DefaultConfig returns the default configuration
3444
func DefaultConfig() Config {
3545
return Config{
36-
RequestTimeout: defaultRequestTimeout,
46+
RequestTimeout: defaultRequestTimeout,
47+
MaxMessagesPerPeerPerSecond: defaultMaxMessagesPerPeerPerSecond,
48+
MaxGlobalRequestsPerSecond: defaultMaxGlobalRequestsPerSecond,
49+
Reputation: DefaultReputationConfig(),
50+
}
51+
}
52+
53+
// WithDefaults returns a new Config with zero values replaced by defaults
54+
func (c Config) WithDefaults() Config {
55+
defaults := DefaultConfig()
56+
57+
if c.RequestTimeout == 0 {
58+
c.RequestTimeout = defaults.RequestTimeout
59+
}
60+
if c.MaxMessagesPerPeerPerSecond == 0 {
61+
c.MaxMessagesPerPeerPerSecond = defaults.MaxMessagesPerPeerPerSecond
3762
}
63+
if c.MaxGlobalRequestsPerSecond == 0 {
64+
c.MaxGlobalRequestsPerSecond = defaults.MaxGlobalRequestsPerSecond
65+
}
66+
// Apply defaults to reputation config
67+
c.Reputation = c.Reputation.WithDefaults()
68+
69+
return c
3870
}

da/blobreactor/reactor.go

Lines changed: 138 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,30 @@ import (
3737
"github.com/berachain/beacon-kit/primitives/math"
3838
"github.com/cometbft/cometbft/libs/service"
3939
"github.com/cometbft/cometbft/p2p"
40+
"golang.org/x/time/rate"
4041
)
4142

4243
const (
4344
// BlobChannel is our custom channel ID for blob requests/responses
4445
BlobChannel = byte(0x70)
45-
4646
// ReactorName is the registered name for the blob reactor in CometBFT's switch
4747
ReactorName = "BLOBREACTOR"
48-
49-
defaultSleepDuration = 100 * time.Millisecond
50-
defaultPriority = 5
51-
defaultSendQueueCapacity = 100
52-
defaultRecvBufferCapacity = 1024 * 1024
48+
// defaultPriority sets the channel priority for blob messages in CometBFT's message scheduler
49+
defaultPriority = 5
50+
// defaultSendQueueCapacity is the maximum number of outgoing messages queued per peer
51+
defaultSendQueueCapacity = 10
52+
// defaultRecvBufferCapacity is the size of the receive buffer for incoming blob messages
53+
defaultRecvBufferCapacity = 10 * 1024 * 1024
54+
// defaultRecvMessageCapacity is the maximum size of a single blob message we'll accept
5355
defaultRecvMessageCapacity = 1024 * 1024
54-
56+
// defaultMaxRequestWorkers limits concurrent goroutines handling blob requests/responses
5557
defaultMaxRequestWorkers = 10
56-
58+
// maxBlobsPerBlock is the protocol limit for blob sidecars per block
5759
maxBlobsPerBlock = 6
60+
// rateLimiterCleanupInterval determines how often to check for stale peer rate limiters
61+
rateLimiterCleanupInterval = 30 * time.Minute
62+
// staleLimiterTimeout determines how long to keep inactive peer rate limiters before deleting them
63+
staleLimiterTimeout = 24 * time.Hour
5864
)
5965

6066
// blobRequestError wraps an error with a status for metrics tracking.
@@ -75,8 +81,23 @@ func newBlobRequestError(err error, status string) error {
7581
return &blobRequestError{err: err, status: status}
7682
}
7783

78-
// BlobReactor handles P2P blob distribution for BeaconKit.
79-
// It implements the CometBFT Reactor interface.
84+
type peerRateLimiter struct {
85+
limiter *rate.Limiter
86+
lastSeen time.Time
87+
}
88+
89+
// BlobReactor manages p2p blob distribution across the network. When a node is
90+
// missing blobs for a slot, it sends requests to connected peers and waits for
91+
// responses containing the serialized blob sidecars.
92+
//
93+
// The reactor implements defense mechanisms including per-peer and global rate
94+
// limiting of incoming requests, and reputation scoring that temporarily bans
95+
// peers exhibiting bad behavior (invalid messages, unsolicited responses, etc).
96+
//
97+
// All request/response handling is concurrent with worker pool limits to prevent
98+
// resource exhaustion.
99+
//
100+
// Integrates with CometBFT's P2P layer via the Reactor interface.
80101
type BlobReactor struct {
81102
service.BaseService
82103
sw *p2p.Switch
@@ -105,6 +126,16 @@ type BlobReactor struct {
105126

106127
// Shutdown flag to prevent new workers during stop
107128
stopped atomic.Bool // set to true when OnStop begins
129+
130+
// We maintain per-peer rate limiters
131+
peerLimiters map[p2p.ID]*peerRateLimiter
132+
peerLimitersMu sync.RWMutex
133+
134+
// A global rate limiter for all incoming requests
135+
globalLimiter *rate.Limiter
136+
137+
// Reputation manager for tracking peer behavior
138+
reputationMgr *ReputationManager
108139
}
109140

110141
// NewBlobReactor creates a new blob reactor with storage backend
@@ -117,6 +148,12 @@ func NewBlobReactor(blobStore BlobStore, logger log.Logger, cfg Config, sink Tel
117148
metrics: newBlobReactorMetrics(sink),
118149
responseChans: make(map[uint64]chan *BlobResponse),
119150
requestWorkers: make(chan struct{}, defaultMaxRequestWorkers),
151+
peerLimiters: make(map[p2p.ID]*peerRateLimiter),
152+
globalLimiter: rate.NewLimiter(
153+
rateOrInf(cfg.MaxGlobalRequestsPerSecond),
154+
int(cfg.MaxGlobalRequestsPerSecond*BurstMultiplier),
155+
),
156+
reputationMgr: NewReputationManager(logger, cfg.Reputation.WithDefaults()),
120157
}
121158
br.BaseService = *service.NewBaseService(nil, ReactorName, br)
122159
return br
@@ -145,12 +182,16 @@ func (br *BlobReactor) GetChannels() []*p2p.ChannelDescriptor {
145182
// InitPeer is called by the switch before the peer is started. Use it to
146183
// initialize data for the peer (e.g. peer state).
147184
func (br *BlobReactor) InitPeer(peer p2p.Peer) p2p.Peer {
148-
br.AddPeer(peer)
149185
return peer
150186
}
151187

152188
// AddPeer is called by the switch after the peer is added and successfully started.
153189
func (br *BlobReactor) AddPeer(peer p2p.Peer) {
190+
if !br.reputationMgr.ShouldAcceptPeer(peer.ID()) {
191+
br.logger.Warn("Rejecting peer due to low reputation", "peer", peer.ID())
192+
return
193+
}
194+
154195
br.stateMu.Lock()
155196
br.peers[peer.ID()] = struct{}{}
156197
br.stateMu.Unlock()
@@ -167,6 +208,52 @@ func (br *BlobReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
167208
br.logger.Info("Removed peer", "peer", peer.ID(), "reason", reason)
168209
}
169210

211+
func (br *BlobReactor) checkPeerRateLimit(peerID p2p.ID) bool {
212+
br.peerLimitersMu.Lock()
213+
defer br.peerLimitersMu.Unlock()
214+
215+
limiter, exists := br.peerLimiters[peerID]
216+
if exists {
217+
limiter.lastSeen = time.Now()
218+
return limiter.limiter.Allow()
219+
}
220+
221+
limiter = &peerRateLimiter{
222+
limiter: rate.NewLimiter(
223+
rateOrInf(br.config.MaxMessagesPerPeerPerSecond),
224+
int(br.config.MaxMessagesPerPeerPerSecond*BurstMultiplier),
225+
),
226+
lastSeen: time.Now(),
227+
}
228+
br.peerLimiters[peerID] = limiter
229+
230+
return limiter.limiter.Allow()
231+
}
232+
233+
func (br *BlobReactor) cleanupStalePeerData() {
234+
ticker := time.NewTicker(rateLimiterCleanupInterval)
235+
defer ticker.Stop()
236+
237+
for {
238+
select {
239+
case <-ticker.C:
240+
// Cleanup stale rate limiters
241+
br.peerLimitersMu.Lock()
242+
for peerID, limiter := range br.peerLimiters {
243+
if time.Since(limiter.lastSeen) > staleLimiterTimeout {
244+
delete(br.peerLimiters, peerID)
245+
}
246+
}
247+
br.peerLimitersMu.Unlock()
248+
249+
// Cleanup stale reputations
250+
br.reputationMgr.CleanupStaleReputations()
251+
case <-br.Quit():
252+
return
253+
}
254+
}
255+
}
256+
170257
// spawnWorker attempts to spawn a worker goroutine to handle the given task.
171258
// Returns true if worker was spawned, false if pool is full or reactor is stopped.
172259
func (br *BlobReactor) spawnWorker(task func(), peerID p2p.ID, taskType string) {
@@ -200,6 +287,12 @@ func (br *BlobReactor) Receive(envelope p2p.Envelope) {
200287
return
201288
}
202289

290+
// Ignore messages from peers with low reputation
291+
if !br.reputationMgr.ShouldAcceptPeer(envelope.Src.ID()) {
292+
br.logger.Debug("Ignoring message from banned peer", "peer", envelope.Src.ID())
293+
return
294+
}
295+
203296
br.logger.Info("Received message on BlobChannel",
204297
"peer", envelope.Src.ID(),
205298
"channel", envelope.ChannelID,
@@ -225,9 +318,19 @@ func (br *BlobReactor) Receive(envelope p2p.Envelope) {
225318

226319
switch msgType {
227320
case MessageTypeRequest:
321+
if !br.checkPeerRateLimit(envelope.Src.ID()) {
322+
br.logger.Warn("Peer exceeded rate limit, dropping request", "peer", envelope.Src.ID())
323+
return
324+
}
325+
if !br.globalLimiter.Allow() {
326+
br.logger.Warn("Global rate limit exceeded, dropping request", "peer", envelope.Src.ID())
327+
return
328+
}
329+
228330
var req BlobRequest
229331
if err := req.UnmarshalSSZ(msgData); err != nil {
230332
br.logger.Error("Failed to unmarshal BlobRequest", "error", err, "peer", envelope.Src.ID())
333+
br.reputationMgr.RecordBadBehavior(envelope.Src.ID(), fmt.Errorf("invalid_ssz: %w", err))
231334
return
232335
}
233336
br.logger.Info("Received blob request", "slot", req.Slot.Unwrap(), "request_id", req.RequestID, "peer", envelope.Src.ID())
@@ -241,6 +344,7 @@ func (br *BlobReactor) Receive(envelope p2p.Envelope) {
241344
var resp BlobResponse
242345
if err := resp.UnmarshalSSZ(msgData); err != nil {
243346
br.logger.Error("Failed to unmarshal BlobResponse", "error", err, "peer", envelope.Src.ID(), "data_size", len(msgData))
347+
br.reputationMgr.RecordBadBehavior(envelope.Src.ID(), fmt.Errorf("invalid_ssz: %w", err))
244348
return
245349
}
246350
br.logger.Info("Received blob response",
@@ -256,6 +360,7 @@ func (br *BlobReactor) Receive(envelope p2p.Envelope) {
256360

257361
default:
258362
br.logger.Warn("Received unknown message type", "type", msgType, "peer", envelope.Src.ID())
363+
br.reputationMgr.RecordBadBehavior(envelope.Src.ID(), fmt.Errorf("unknown_message_type: %d", msgType))
259364
}
260365
}
261366

@@ -340,6 +445,7 @@ func (br *BlobReactor) handleBlobResponse(peer p2p.Peer, resp *BlobResponse) {
340445
br.logger.Info("No waiting channel for response (request may have timed out)",
341446
"request_id", resp.RequestID,
342447
"slot", resp.Slot.Unwrap())
448+
br.reputationMgr.RecordBadBehavior(peer.ID(), fmt.Errorf("unsolicited_response: request_id=%d", resp.RequestID))
343449
return
344450
}
345451

@@ -434,6 +540,10 @@ func (br *BlobReactor) RequestBlobs(
434540
br.metrics.recordPeerAttempt(statusSuccess)
435541
br.metrics.recordOverallRequestComplete(statusSuccess, start)
436542
br.logger.Info("Successfully retrieved and verified blobs", "slot", slot.Unwrap(), "peer", peerID, "count", len(sidecars))
543+
544+
// Reward peer for successful blob exchange
545+
br.reputationMgr.RecordGoodBehavior(peerID)
546+
437547
return sidecars, nil
438548
}
439549

@@ -524,34 +634,34 @@ func (br *BlobReactor) requestBlobsFromPeer(ctx context.Context, peerID p2p.ID,
524634

525635
if resp.Slot != slot {
526636
err = fmt.Errorf("peer %s returned wrong slot: expected %d, got %d", peerID, slot.Unwrap(), resp.Slot.Unwrap())
637+
br.reputationMgr.RecordBadBehavior(peer.ID(), err)
527638
return nil, newBlobRequestError(err, statusInvalidResponse)
528639
}
529640

530641
if resp.HeadSlot < resp.Slot {
531642
err = fmt.Errorf("peer %s head (%d) not at requested slot (%d)", peerID, resp.HeadSlot.Unwrap(), resp.Slot.Unwrap())
643+
br.reputationMgr.RecordBadBehavior(peer.ID(), err)
532644
return nil, newBlobRequestError(err, statusInvalidResponse)
533645
}
534646

535647
if len(resp.SidecarData) > defaultRecvMessageCapacity {
536-
err = fmt.Errorf(
537-
"peer %s sent oversized response: %d bytes (max %d)",
538-
peerID,
539-
len(resp.SidecarData),
540-
defaultRecvMessageCapacity,
541-
)
648+
err = fmt.Errorf("peer %s sent oversized response: %d > %d", peerID, len(resp.SidecarData), defaultRecvMessageCapacity)
649+
br.reputationMgr.RecordBadBehavior(peer.ID(), err)
542650
return nil, newBlobRequestError(err, statusInvalidResponse)
543651
}
544652

545653
var sidecars datypes.BlobSidecars
546654
if len(resp.SidecarData) > 0 {
547655
if err = ssz.Unmarshal(resp.SidecarData, &sidecars); err != nil {
548-
err = fmt.Errorf("failed to unmarshal sidecars from peer %s: %w", peerID, err)
656+
err = fmt.Errorf("peer %s failed to unmarshal sidecars: %w", peerID, err)
657+
br.reputationMgr.RecordBadBehavior(peer.ID(), err)
549658
return nil, newBlobRequestError(err, statusInvalidResponse)
550659
}
551660
}
552661

553662
if len(sidecars) > maxBlobsPerBlock {
554-
err = fmt.Errorf("peer %s sent too many blobs: %d (max %d)", peerID, len(sidecars), maxBlobsPerBlock)
663+
err = fmt.Errorf("peer %s sent too many blobs: %d > %d", peerID, len(sidecars), maxBlobsPerBlock)
664+
br.reputationMgr.RecordBadBehavior(peer.ID(), err)
555665
return nil, newBlobRequestError(err, statusInvalidResponse)
556666
}
557667

@@ -568,6 +678,7 @@ func (br *BlobReactor) requestBlobsFromPeer(ctx context.Context, peerID p2p.ID,
568678

569679
func (br *BlobReactor) OnStart() error {
570680
br.logger.Info("Starting BlobReactor", "node_key", br.nodeKey)
681+
go br.cleanupStalePeerData()
571682
return nil
572683
}
573684

@@ -584,6 +695,14 @@ func (br *BlobReactor) OnStop() {
584695
br.logger.Info("BlobReactor stopped, all workers completed")
585696
}
586697

698+
// rateOrInf returns rate.Inf if r <= 0, otherwise returns rate.Limit(r)
699+
func rateOrInf(r float64) rate.Limit {
700+
if r <= 0 {
701+
return rate.Inf
702+
}
703+
return rate.Limit(r)
704+
}
705+
587706
// encodeBlobSidecarsSSZ takes multiple SSZ-encoded BlobSidecar bytes and combines them
588707
// into a single SSZ-encoded BlobSidecars (slice) format.
589708
// The encoding is: 4-byte offset (always 4) + concatenated sidecars.

0 commit comments

Comments
 (0)