diff --git a/node/node.go b/node/node.go index 0efa974f12..dafe4d7e6d 100644 --- a/node/node.go +++ b/node/node.go @@ -401,81 +401,106 @@ func NewNodeWithContext(ctx context.Context, return nil, err } - mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger, tracer) + var mempool mempl.Mempool + var mempoolReactor p2p.Reactor + var evidenceReactor *evidence.Reactor + var evidencePool *evidence.Pool - evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateStore, blockStore, logger) - if err != nil { - return nil, err + // Skip non-PEX reactors in seed mode + if !config.P2P.SeedMode { + mempool, mempoolReactor = createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger, tracer) + + evidenceReactor, evidencePool, err = createEvidenceReactor(config, dbProvider, stateStore, blockStore, logger) + if err != nil { + return nil, err + } } - // make block executor for consensus and blocksync reactors to execute blocks - blockExec := sm.NewBlockExecutor( - stateStore, - logger.With("module", "state"), - proxyApp.Consensus(), - mempool, - evidencePool, - blockStore, - sm.BlockExecutorWithMetrics(smMetrics), - sm.BlockExecutorWithRootDir(config.RootDir), - sm.BlockExecutorWithTracer(tracer), - ) + var blockExec *sm.BlockExecutor + var bcReactor p2p.Reactor + var propagationReactor *propagation.Reactor + var consensusReactor *cs.Reactor + var consensusState *cs.State + var stateSyncReactor *statesync.Reactor + + // Skip non-PEX reactors in seed mode + if !config.P2P.SeedMode { + // make block executor for consensus and blocksync reactors to execute blocks + blockExec = sm.NewBlockExecutor( + stateStore, + logger.With("module", "state"), + proxyApp.Consensus(), + mempool, + evidencePool, + blockStore, + sm.BlockExecutorWithMetrics(smMetrics), + sm.BlockExecutorWithRootDir(config.RootDir), + sm.BlockExecutorWithTracer(tracer), + ) - offlineStateSyncHeight := int64(0) - if blockStore.Height() == 0 { - offlineStateSyncHeight, err = blockExec.Store().GetOfflineStateSyncHeight() - if err != nil && err.Error() != "value empty" { - panic(fmt.Sprintf("failed to retrieve statesynced height from store %s; expected state store height to be %v", err, state.LastBlockHeight)) + offlineStateSyncHeight := int64(0) + if blockStore.Height() == 0 { + offlineStateSyncHeight, err = blockExec.Store().GetOfflineStateSyncHeight() + if err != nil && err.Error() != "value empty" { + panic(fmt.Sprintf("failed to retrieve statesynced height from store %s; expected state store height to be %v", err, state.LastBlockHeight)) + } + } + // Don't start block sync if we're doing a state sync first. + bcReactor, err = createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, localAddr, logger, bsMetrics, offlineStateSyncHeight) + if err != nil { + return nil, fmt.Errorf("could not create blocksync reactor: %w", err) } - } - // Don't start block sync if we're doing a state sync first. - bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, localAddr, logger, bsMetrics, offlineStateSyncHeight) - if err != nil { - return nil, fmt.Errorf("could not create blocksync reactor: %w", err) - } - if state.TimeoutCommit > 0 { - // set the catchup retry time to match the block time - propagation.RetryTime = state.TimeoutCommit - } - propagationReactor := propagation.NewReactor( - nodeKey.ID(), - propagation.Config{ - Store: blockStore, - Mempool: mempool, - Privval: privValidator, - ChainID: state.ChainID, - BlockMaxBytes: state.ConsensusParams.Block.MaxBytes, - }, - propagation.WithTracer(tracer), - ) - if !stateSync && !blockSync { - propagationReactor.StartProcessing() - } + if state.TimeoutCommit > 0 { + // set the catchup retry time to match the block time + propagation.RetryTime = state.TimeoutCommit + } + partsChan := make(chan types.PartInfo, 2500) + proposalChan := make(chan types.Proposal, 100) + propagationReactor = propagation.NewReactor( + nodeKey.ID(), + propagation.Config{ + Store: blockStore, + Mempool: mempool, + Privval: privValidator, + ChainID: state.ChainID, + BlockMaxBytes: state.ConsensusParams.Block.MaxBytes, + PartChan: partsChan, + ProposalChan: proposalChan, + }, + propagation.WithTracer(tracer), + ) + if !stateSync && !blockSync { + propagationReactor.StartProcessing() + } - consensusReactor, consensusState := createConsensusReactor( - config, state, blockExec, blockStore, mempool, evidencePool, - privValidator, csMetrics, propagationReactor, stateSync || blockSync, eventBus, consensusLogger, offlineStateSyncHeight, tracer, - ) + consensusReactor, consensusState = createConsensusReactor( + config, state, blockExec, blockStore, mempool, evidencePool, + privValidator, csMetrics, propagationReactor, stateSync || blockSync, eventBus, consensusLogger, offlineStateSyncHeight, tracer, partsChan, proposalChan, + ) + } - err = stateStore.SetOfflineStateSyncHeight(0) - if err != nil { - panic(fmt.Sprintf("failed to reset the offline state sync height %s", err)) - } - propagationReactor.SetLogger(logger.With("module", "propagation")) - - logger.Info("Consensus reactor created", "timeout_propose", consensusState.GetState().TimeoutPropose, "timeout_commit", consensusState.GetState().TimeoutCommit) - // Set up state sync reactor, and schedule a sync if requested. - // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, - // we should clean this whole thing up. See: - // https://github.com/tendermint/tendermint/issues/4644 - stateSyncReactor := statesync.NewReactor( - *config.StateSync, - proxyApp.Snapshot(), - proxyApp.Query(), - ssMetrics, - ) - stateSyncReactor.SetLogger(logger.With("module", "statesync")) + // Skip non-PEX reactors in seed mode + if !config.P2P.SeedMode { + err = stateStore.SetOfflineStateSyncHeight(0) + if err != nil { + panic(fmt.Sprintf("failed to reset the offline state sync height %s", err)) + } + propagationReactor.SetLogger(logger.With("module", "propagation")) + + logger.Info("Consensus reactor created", "timeout_propose", consensusState.GetState().TimeoutPropose, "timeout_commit", consensusState.GetState().TimeoutCommit) + // Set up state sync reactor, and schedule a sync if requested. + // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, + // we should clean this whole thing up. See: + // https://github.com/tendermint/tendermint/issues/4644 + stateSyncReactor = statesync.NewReactor( + *config.StateSync, + proxyApp.Snapshot(), + proxyApp.Query(), + ssMetrics, + ) + stateSyncReactor.SetLogger(logger.With("module", "statesync")) + } nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state, softwareVersion) if err != nil { @@ -586,6 +611,9 @@ func (n *Node) OnStart() error { // Start the RPC server before the P2P server // so we can eg. receive txs for the first block if n.config.RPC.ListenAddress != "" { + if n.config.P2P.SeedMode { + return fmt.Errorf("seed nodes cannot have RPC enabled (rpc.laddr=\"%s\") as per ADR-052. Set rpc.laddr=\"\" to disable RPC on seed nodes", n.config.RPC.ListenAddress) + } listeners, err := n.startRPC() if err != nil { return err diff --git a/node/setup.go b/node/setup.go index 7c56fddbaf..e543aa43a6 100644 --- a/node/setup.go +++ b/node/setup.go @@ -493,14 +493,18 @@ func createSwitch(config *cfg.Config, p2p.WithTracer(traceClient), ) sw.SetLogger(p2pLogger) - if config.Mempool.Type != cfg.MempoolTypeNop { - sw.AddReactor("MEMPOOL", mempoolReactor) - } - sw.AddReactor("BLOCKSYNC", bcReactor) - sw.AddReactor("CONSENSUS", consensusReactor) - sw.AddReactor("EVIDENCE", evidenceReactor) - sw.AddReactor("STATESYNC", stateSyncReactor) - sw.AddReactor("RECOVERY", propagationReactor) + + // Skip non-PEX reactors in seed mode + if !config.P2P.SeedMode { + if config.Mempool.Type != cfg.MempoolTypeNop { + sw.AddReactor("MEMPOOL", mempoolReactor) + } + sw.AddReactor("BLOCKSYNC", bcReactor) + sw.AddReactor("CONSENSUS", consensusReactor) + sw.AddReactor("EVIDENCE", evidenceReactor) + sw.AddReactor("STATESYNC", stateSyncReactor) + sw.AddReactor("RECOVERY", propagationReactor) + } sw.SetNodeInfo(nodeInfo) sw.SetNodeKey(nodeKey) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 6b18ed940e..c38bed432c 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -76,7 +76,7 @@ func (e errTooEarlyToDial) Error() string { // ## Preventing abuse // // Only accept pexAddrsMsg from peers we sent a corresponding pexRequestMsg too. -// Only accept one pexRequestMsg every ~defaultEnsurePeersPeriod. +// All nodes disconnect after exchanging addresses to prevent persistent connections. type Reactor struct { p2p.BaseReactor @@ -250,35 +250,25 @@ func (r *Reactor) Receive(e p2p.Envelope) { // 1) restrict how frequently peers can request // 2) limit the output size - // If we're a seed and this is an inbound peer, - // respond once and disconnect. - if r.config.SeedMode && !e.Src.IsOutbound() { - id := string(e.Src.ID()) - v := r.lastReceivedRequests.Get(id) - if v != nil { - // FlushStop/StopPeer are already - // running in a go-routine. - return - } - r.lastReceivedRequests.Set(id, time.Now()) - - // Send addrs and disconnect - r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers)) - go func() { - // In a go-routine so it doesn't block .Receive. - e.Src.FlushStop() - r.Switch.StopPeerGracefully(e.Src, r.String()) - }() - - } else { - // Check we're not receiving requests too frequently. - if err := r.receiveRequest(e.Src); err != nil { - r.Switch.StopPeerForError(e.Src, err, r.String()) - r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime) - return - } - r.SendAddrs(e.Src, r.book.GetSelection()) + // All nodes now disconnect after exchanging addresses + id := string(e.Src.ID()) + v := r.lastReceivedRequests.Get(id) + if v != nil { + // FlushStop/StopPeer are already + // running in a go-routine. + return } + r.lastReceivedRequests.Set(id, time.Now()) + + // Send addrs and disconnect + r.SendAddrs(e.Src, r.book.GetSelection()) + go func(peer Peer) { + // In a go-routine so it doesn't block .Receive. + if peer != nil { + peer.FlushStop() + r.Switch.StopPeerGracefully(peer, r.String()) + } + }(e.Src) case *tmp2p.PexAddrs: // If we asked for addresses, add them to the book @@ -302,40 +292,6 @@ func (r *Reactor) Receive(e p2p.Envelope) { } } -// enforces a minimum amount of time between requests -func (r *Reactor) receiveRequest(src Peer) error { - id := string(src.ID()) - v := r.lastReceivedRequests.Get(id) - if v == nil { - // initialize with empty time - lastReceived := time.Time{} - r.lastReceivedRequests.Set(id, lastReceived) - return nil - } - - lastReceived := v.(time.Time) - if lastReceived.Equal(time.Time{}) { - // first time gets a free pass. then we start tracking the time - lastReceived = time.Now() - r.lastReceivedRequests.Set(id, lastReceived) - return nil - } - - now := time.Now() - minInterval := r.minReceiveRequestInterval() - if now.Sub(lastReceived) < minInterval { - return fmt.Errorf( - "peer (%v) sent next PEX request too soon. lastReceived: %v, now: %v, minInterval: %v. Disconnecting", - src.ID(), - lastReceived, - now, - minInterval, - ) - } - r.lastReceivedRequests.Set(id, now) - return nil -} - // RequestAddrs asks peer for more addresses if we do not already have a // request out for this peer. func (r *Reactor) RequestAddrs(p Peer) { diff --git a/test/e2e/runner/perturb.go b/test/e2e/runner/perturb.go index db8de63de0..e3d04fdf2f 100644 --- a/test/e2e/runner/perturb.go +++ b/test/e2e/runner/perturb.go @@ -108,12 +108,25 @@ func PerturbNode(ctx context.Context, node *e2e.Node, perturbation e2e.Perturbat return nil, fmt.Errorf("unexpected perturbation %q", perturbation) } - status, err := waitForNode(ctx, node, 0, 20*time.Second) - if err != nil { - return nil, err + if node.Mode == e2e.ModeSeed { + // Seed nodes don't have RPC, use simple wait + err = waitForSeedNode(ctx, node, 20*time.Second) + if err != nil { + return nil, err + } + logger.Info("perturb node", + "msg", + log.NewLazySprintf("Seed node %v recovered", node.Name)) + return nil, nil // Return nil status for seed nodes since they don't have RPC + } else { + // Regular nodes use RPC status check + status, err := waitForNode(ctx, node, 0, 20*time.Second) + if err != nil { + return nil, err + } + logger.Info("perturb node", + "msg", + log.NewLazySprintf("Node %v recovered at height %v", node.Name, status.SyncInfo.LatestBlockHeight)) + return status, nil } - logger.Info("perturb node", - "msg", - log.NewLazySprintf("Node %v recovered at height %v", node.Name, status.SyncInfo.LatestBlockHeight)) - return status, nil } diff --git a/test/e2e/runner/rpc.go b/test/e2e/runner/rpc.go index f6f1738423..3d6395f16b 100644 --- a/test/e2e/runner/rpc.go +++ b/test/e2e/runner/rpc.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net" "time" rpchttp "github.com/cometbft/cometbft/rpc/client/http" @@ -132,3 +133,34 @@ func waitForAllNodes(ctx context.Context, testnet *e2e.Testnet, height int64, ti return lastHeight, nil } + +// waitForSeedNode waits for a seed node's P2P port to become available. +// Since seed nodes don't have RPC endpoints, we check P2P connectivity. +func waitForSeedNode(ctx context.Context, node *e2e.Node, timeout time.Duration) error { + logger.Info("Waiting for seed node P2P port", "node", node.Name) + + deadline := time.Now().Add(timeout) + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if time.Now().After(deadline) { + return fmt.Errorf("timed out waiting for seed node %v P2P port after %v", node.Name, timeout) + } + + // Try to connect to the P2P port + addr := node.AddressP2P(false) + conn, err := net.DialTimeout("tcp", addr, time.Second) + if err == nil && conn != nil { + conn.Close() + logger.Info("Seed node P2P port available", "node", node.Name, "addr", addr) + return nil + } + // Continue trying if connection failed + } + } +} diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index 7735f6ec13..99f3d70d61 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -227,6 +227,8 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { case e2e.ModeSeed: cfg.P2P.SeedMode = true cfg.P2P.PexReactor = true + // Disable RPC for seed nodes as per ADR-052 + cfg.RPC.ListenAddress = "" case e2e.ModeFull, e2e.ModeLight: // Don't need to do anything, since we're using a dummy privval key by default. default: diff --git a/test/e2e/runner/start.go b/test/e2e/runner/start.go index 7a223aa33c..e8c59c83df 100644 --- a/test/e2e/runner/start.go +++ b/test/e2e/runner/start.go @@ -52,8 +52,16 @@ func Start(ctx context.Context, testnet *e2e.Testnet, p infra.Provider) error { return err } for _, node := range nodesAtZero { - if _, err := waitForNode(ctx, node, 0, 15*time.Second); err != nil { - return err + if node.Mode == e2e.ModeSeed { + // Seed nodes don't have RPC, use simple wait + if err := waitForSeedNode(ctx, node, 15*time.Second); err != nil { + return err + } + } else { + // Regular nodes use RPC status check + if _, err := waitForNode(ctx, node, 0, 15*time.Second); err != nil { + return err + } } if node.PrometheusProxyPort > 0 { logger.Info("start", "msg", @@ -123,12 +131,23 @@ func Start(ctx context.Context, testnet *e2e.Testnet, p infra.Provider) error { if err != nil { return err } - status, err := waitForNode(ctx, node, node.StartAt, 3*time.Minute) - if err != nil { - return err + if node.Mode == e2e.ModeSeed { + // Seed nodes don't have RPC, use simple wait + err = waitForSeedNode(ctx, node, 3*time.Minute) + if err != nil { + return err + } + logger.Info("start", "msg", log.NewLazySprintf("Seed node %v started", + node.Name)) + } else { + // Regular nodes use RPC status check + status, err := waitForNode(ctx, node, node.StartAt, 3*time.Minute) + if err != nil { + return err + } + logger.Info("start", "msg", log.NewLazySprintf("Node %v up on http://%s:%v at height %v", + node.Name, node.ExternalIP, node.ProxyPort, status.SyncInfo.LatestBlockHeight)) } - logger.Info("start", "msg", log.NewLazySprintf("Node %v up on http://%s:%v at height %v", - node.Name, node.ExternalIP, node.ProxyPort, status.SyncInfo.LatestBlockHeight)) } return nil