Skip to content

Commit 1de789e

Browse files
committed
feat: re-work seed-mode, especially in PEX
1 parent 437050d commit 1de789e

File tree

3 files changed

+124
-143
lines changed

3 files changed

+124
-143
lines changed

node/node.go

Lines changed: 93 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -401,85 +401,106 @@ func NewNodeWithContext(ctx context.Context,
401401
return nil, err
402402
}
403403

404-
mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger, tracer)
404+
var mempool mempl.Mempool
405+
var mempoolReactor p2p.Reactor
406+
var evidenceReactor *evidence.Reactor
407+
var evidencePool *evidence.Pool
405408

406-
evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateStore, blockStore, logger)
407-
if err != nil {
408-
return nil, err
409+
// Skip non-PEX reactors in seed mode
410+
if !config.P2P.SeedMode {
411+
mempool, mempoolReactor = createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger, tracer)
412+
413+
evidenceReactor, evidencePool, err = createEvidenceReactor(config, dbProvider, stateStore, blockStore, logger)
414+
if err != nil {
415+
return nil, err
416+
}
409417
}
410418

411-
// make block executor for consensus and blocksync reactors to execute blocks
412-
blockExec := sm.NewBlockExecutor(
413-
stateStore,
414-
logger.With("module", "state"),
415-
proxyApp.Consensus(),
416-
mempool,
417-
evidencePool,
418-
blockStore,
419-
sm.BlockExecutorWithMetrics(smMetrics),
420-
sm.BlockExecutorWithRootDir(config.RootDir),
421-
sm.BlockExecutorWithTracer(tracer),
422-
)
419+
var blockExec *sm.BlockExecutor
420+
var bcReactor p2p.Reactor
421+
var propagationReactor *propagation.Reactor
422+
var consensusReactor *cs.Reactor
423+
var consensusState *cs.State
424+
var stateSyncReactor *statesync.Reactor
425+
426+
// Skip non-PEX reactors in seed mode
427+
if !config.P2P.SeedMode {
428+
// make block executor for consensus and blocksync reactors to execute blocks
429+
blockExec = sm.NewBlockExecutor(
430+
stateStore,
431+
logger.With("module", "state"),
432+
proxyApp.Consensus(),
433+
mempool,
434+
evidencePool,
435+
blockStore,
436+
sm.BlockExecutorWithMetrics(smMetrics),
437+
sm.BlockExecutorWithRootDir(config.RootDir),
438+
sm.BlockExecutorWithTracer(tracer),
439+
)
423440

424-
offlineStateSyncHeight := int64(0)
425-
if blockStore.Height() == 0 {
426-
offlineStateSyncHeight, err = blockExec.Store().GetOfflineStateSyncHeight()
427-
if err != nil && err.Error() != "value empty" {
428-
panic(fmt.Sprintf("failed to retrieve statesynced height from store %s; expected state store height to be %v", err, state.LastBlockHeight))
441+
offlineStateSyncHeight := int64(0)
442+
if blockStore.Height() == 0 {
443+
offlineStateSyncHeight, err = blockExec.Store().GetOfflineStateSyncHeight()
444+
if err != nil && err.Error() != "value empty" {
445+
panic(fmt.Sprintf("failed to retrieve statesynced height from store %s; expected state store height to be %v", err, state.LastBlockHeight))
446+
}
447+
}
448+
// Don't start block sync if we're doing a state sync first.
449+
bcReactor, err = createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, localAddr, logger, bsMetrics, offlineStateSyncHeight)
450+
if err != nil {
451+
return nil, fmt.Errorf("could not create blocksync reactor: %w", err)
429452
}
430-
}
431-
// Don't start block sync if we're doing a state sync first.
432-
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, localAddr, logger, bsMetrics, offlineStateSyncHeight)
433-
if err != nil {
434-
return nil, fmt.Errorf("could not create blocksync reactor: %w", err)
435-
}
436-
437-
if state.TimeoutCommit > 0 {
438-
// set the catchup retry time to match the block time
439-
propagation.RetryTime = state.TimeoutCommit
440-
}
441-
partsChan := make(chan types.PartInfo, 2500)
442-
proposalChan := make(chan types.Proposal, 100)
443-
propagationReactor := propagation.NewReactor(
444-
nodeKey.ID(),
445-
propagation.Config{
446-
Store: blockStore,
447-
Mempool: mempool,
448-
Privval: privValidator,
449-
ChainID: state.ChainID,
450-
BlockMaxBytes: state.ConsensusParams.Block.MaxBytes,
451-
PartChan: partsChan,
452-
ProposalChan: proposalChan,
453-
},
454-
propagation.WithTracer(tracer),
455-
)
456-
if !stateSync && !blockSync {
457-
propagationReactor.StartProcessing()
458-
}
459453

460-
consensusReactor, consensusState := createConsensusReactor(
461-
config, state, blockExec, blockStore, mempool, evidencePool,
462-
privValidator, csMetrics, propagationReactor, stateSync || blockSync, eventBus, consensusLogger, offlineStateSyncHeight, tracer, partsChan, proposalChan,
463-
)
454+
if state.TimeoutCommit > 0 {
455+
// set the catchup retry time to match the block time
456+
propagation.RetryTime = state.TimeoutCommit
457+
}
458+
partsChan := make(chan types.PartInfo, 2500)
459+
proposalChan := make(chan types.Proposal, 100)
460+
propagationReactor = propagation.NewReactor(
461+
nodeKey.ID(),
462+
propagation.Config{
463+
Store: blockStore,
464+
Mempool: mempool,
465+
Privval: privValidator,
466+
ChainID: state.ChainID,
467+
BlockMaxBytes: state.ConsensusParams.Block.MaxBytes,
468+
PartChan: partsChan,
469+
ProposalChan: proposalChan,
470+
},
471+
propagation.WithTracer(tracer),
472+
)
473+
if !stateSync && !blockSync {
474+
propagationReactor.StartProcessing()
475+
}
464476

465-
err = stateStore.SetOfflineStateSyncHeight(0)
466-
if err != nil {
467-
panic(fmt.Sprintf("failed to reset the offline state sync height %s", err))
468-
}
469-
propagationReactor.SetLogger(logger.With("module", "propagation"))
470-
471-
logger.Info("Consensus reactor created", "timeout_propose", consensusState.GetState().TimeoutPropose, "timeout_commit", consensusState.GetState().TimeoutCommit)
472-
// Set up state sync reactor, and schedule a sync if requested.
473-
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
474-
// we should clean this whole thing up. See:
475-
// https://github.com/tendermint/tendermint/issues/4644
476-
stateSyncReactor := statesync.NewReactor(
477-
*config.StateSync,
478-
proxyApp.Snapshot(),
479-
proxyApp.Query(),
480-
ssMetrics,
481-
)
482-
stateSyncReactor.SetLogger(logger.With("module", "statesync"))
477+
consensusReactor, consensusState = createConsensusReactor(
478+
config, state, blockExec, blockStore, mempool, evidencePool,
479+
privValidator, csMetrics, propagationReactor, stateSync || blockSync, eventBus, consensusLogger, offlineStateSyncHeight, tracer, partsChan, proposalChan,
480+
)
481+
}
482+
483+
// Skip non-PEX reactors in seed mode
484+
if !config.P2P.SeedMode {
485+
err = stateStore.SetOfflineStateSyncHeight(0)
486+
if err != nil {
487+
panic(fmt.Sprintf("failed to reset the offline state sync height %s", err))
488+
}
489+
propagationReactor.SetLogger(logger.With("module", "propagation"))
490+
491+
logger.Info("Consensus reactor created", "timeout_propose", consensusState.GetState().TimeoutPropose, "timeout_commit", consensusState.GetState().TimeoutCommit)
492+
// Set up state sync reactor, and schedule a sync if requested.
493+
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
494+
// we should clean this whole thing up. See:
495+
// https://github.com/tendermint/tendermint/issues/4644
496+
stateSyncReactor = statesync.NewReactor(
497+
*config.StateSync,
498+
proxyApp.Snapshot(),
499+
proxyApp.Query(),
500+
ssMetrics,
501+
)
502+
stateSyncReactor.SetLogger(logger.With("module", "statesync"))
503+
}
483504

484505
nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state, softwareVersion)
485506
if err != nil {

node/setup.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -497,14 +497,18 @@ func createSwitch(config *cfg.Config,
497497
p2p.WithTracer(traceClient),
498498
)
499499
sw.SetLogger(p2pLogger)
500-
if config.Mempool.Type != cfg.MempoolTypeNop {
501-
sw.AddReactor("MEMPOOL", mempoolReactor)
502-
}
503-
sw.AddReactor("BLOCKSYNC", bcReactor)
504-
sw.AddReactor("CONSENSUS", consensusReactor)
505-
sw.AddReactor("EVIDENCE", evidenceReactor)
506-
sw.AddReactor("STATESYNC", stateSyncReactor)
507-
sw.AddReactor("RECOVERY", propagationReactor)
500+
501+
// Skip non-PEX reactors in seed mode
502+
if !config.P2P.SeedMode {
503+
if config.Mempool.Type != cfg.MempoolTypeNop {
504+
sw.AddReactor("MEMPOOL", mempoolReactor)
505+
}
506+
sw.AddReactor("BLOCKSYNC", bcReactor)
507+
sw.AddReactor("CONSENSUS", consensusReactor)
508+
sw.AddReactor("EVIDENCE", evidenceReactor)
509+
sw.AddReactor("STATESYNC", stateSyncReactor)
510+
sw.AddReactor("RECOVERY", propagationReactor)
511+
}
508512

509513
sw.SetNodeInfo(nodeInfo)
510514
sw.SetNodeKey(nodeKey)

p2p/pex/pex_reactor.go

Lines changed: 19 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (e errTooEarlyToDial) Error() string {
7676
// ## Preventing abuse
7777
//
7878
// Only accept pexAddrsMsg from peers we sent a corresponding pexRequestMsg too.
79-
// Only accept one pexRequestMsg every ~defaultEnsurePeersPeriod.
79+
// All nodes disconnect after exchanging addresses to prevent persistent connections.
8080
type Reactor struct {
8181
p2p.BaseReactor
8282

@@ -250,35 +250,25 @@ func (r *Reactor) Receive(e p2p.Envelope) {
250250
// 1) restrict how frequently peers can request
251251
// 2) limit the output size
252252

253-
// If we're a seed and this is an inbound peer,
254-
// respond once and disconnect.
255-
if r.config.SeedMode && !e.Src.IsOutbound() {
256-
id := string(e.Src.ID())
257-
v := r.lastReceivedRequests.Get(id)
258-
if v != nil {
259-
// FlushStop/StopPeer are already
260-
// running in a go-routine.
261-
return
262-
}
263-
r.lastReceivedRequests.Set(id, time.Now())
264-
265-
// Send addrs and disconnect
266-
r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
267-
go func() {
268-
// In a go-routine so it doesn't block .Receive.
269-
e.Src.FlushStop()
270-
r.Switch.StopPeerGracefully(e.Src, r.String())
271-
}()
272-
273-
} else {
274-
// Check we're not receiving requests too frequently.
275-
if err := r.receiveRequest(e.Src); err != nil {
276-
r.Switch.StopPeerForError(e.Src, err, r.String())
277-
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
278-
return
279-
}
280-
r.SendAddrs(e.Src, r.book.GetSelection())
253+
// All nodes now disconnect after exchanging addresses
254+
id := string(e.Src.ID())
255+
v := r.lastReceivedRequests.Get(id)
256+
if v != nil {
257+
// FlushStop/StopPeer are already
258+
// running in a go-routine.
259+
return
281260
}
261+
r.lastReceivedRequests.Set(id, time.Now())
262+
263+
// Send addrs and disconnect
264+
r.SendAddrs(e.Src, r.book.GetSelection())
265+
go func(peer Peer) {
266+
// In a go-routine so it doesn't block .Receive.
267+
if peer != nil {
268+
peer.FlushStop()
269+
r.Switch.StopPeerGracefully(peer, r.String())
270+
}
271+
}(e.Src)
282272

283273
case *tmp2p.PexAddrs:
284274
// If we asked for addresses, add them to the book
@@ -302,40 +292,6 @@ func (r *Reactor) Receive(e p2p.Envelope) {
302292
}
303293
}
304294

305-
// enforces a minimum amount of time between requests
306-
func (r *Reactor) receiveRequest(src Peer) error {
307-
id := string(src.ID())
308-
v := r.lastReceivedRequests.Get(id)
309-
if v == nil {
310-
// initialize with empty time
311-
lastReceived := time.Time{}
312-
r.lastReceivedRequests.Set(id, lastReceived)
313-
return nil
314-
}
315-
316-
lastReceived := v.(time.Time)
317-
if lastReceived.Equal(time.Time{}) {
318-
// first time gets a free pass. then we start tracking the time
319-
lastReceived = time.Now()
320-
r.lastReceivedRequests.Set(id, lastReceived)
321-
return nil
322-
}
323-
324-
now := time.Now()
325-
minInterval := r.minReceiveRequestInterval()
326-
if now.Sub(lastReceived) < minInterval {
327-
return fmt.Errorf(
328-
"peer (%v) sent next PEX request too soon. lastReceived: %v, now: %v, minInterval: %v. Disconnecting",
329-
src.ID(),
330-
lastReceived,
331-
now,
332-
minInterval,
333-
)
334-
}
335-
r.lastReceivedRequests.Set(id, now)
336-
return nil
337-
}
338-
339295
// RequestAddrs asks peer for more addresses if we do not already have a
340296
// request out for this peer.
341297
func (r *Reactor) RequestAddrs(p Peer) {

0 commit comments

Comments
 (0)