diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index cb3a553e8a..79946d193f 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -242,7 +242,7 @@ func (r *Reactor) logErrAddrBook(err error) { // Receive implements Reactor by handling incoming PEX messages. func (r *Reactor) Receive(e p2p.Envelope) { - r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) + r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg_type", fmt.Sprintf("%T", e.Message)) switch msg := e.Message.(type) { case *tmp2p.PexRequest: @@ -254,7 +254,7 @@ func (r *Reactor) Receive(e p2p.Envelope) { // If we're a seed and this is an inbound peer, // respond once and disconnect. - if r.config.SeedMode && !e.Src.IsOutbound() { + if !e.Src.IsOutbound() { id := string(e.Src.ID()) v := r.lastReceivedRequests.Get(id) if v != nil { @@ -268,8 +268,10 @@ func (r *Reactor) Receive(e p2p.Envelope) { r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers)) go func() { // In a go-routine so it doesn't block .Receive. - e.Src.FlushStop() - r.Switch.StopPeerGracefully(e.Src, r.String()) + time.Sleep(200 * time.Millisecond) + if r.Switch != nil && r.Switch.IsRunning() { + r.Switch.StopPeerGracefully(e.Src, r.String()) + } }() } else { diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 4312194d3b..18f24eaeb3 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -16,6 +16,7 @@ import ( "github.com/cometbft/cometbft/config" "github.com/cometbft/cometbft/libs/log" "github.com/cometbft/cometbft/p2p" + "github.com/cometbft/cometbft/p2p/conn" "github.com/cometbft/cometbft/p2p/mock" tmp2p "github.com/cometbft/cometbft/proto/tendermint/p2p" ) @@ -91,6 +92,11 @@ func TestPEXReactorRunning(t *testing.T) { r.SetLogger(logger.With("pex", i)) r.SetEnsurePeersPeriod(250 * time.Millisecond) sw.AddReactor("pex", r) + mr := mock.NewReactor() + mr.Channels = []*conn.ChannelDescriptor{ + {ID: 123, Priority: 123}, + } + sw.AddReactor("mock", mr) return sw }) @@ -111,7 +117,7 @@ func TestPEXReactorRunning(t *testing.T) { require.Nil(t, err) } - assertPeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second, N-1) + assertPeersWithTimeout(t, switches, 10*time.Millisecond, 60*time.Second, N-1) // stop them for _, s := range switches { @@ -919,3 +925,121 @@ func TestPEXReactorWhenAddressBookIsSmallerThanMaxDials(t *testing.T) { assert.Equal(t, 1, pexR.AttemptsToDial(peer)) } } + +// TestPexFromScratch simulates establishing a working P2P network from scratch with a single bootstrap node. +// +// To further test reactor-specific peer disconnections and bootstrap node accept capabilities, each switch is running +// not only PEX, but also some test reactors. +// It is asserted that nodes can connect to each other and the address books contain other peers. +func TestPexFromScratch(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + bootstrap int // n - number of bootstrap nodes + joining int // m - number of joining nodes + }{ + {"1 bootstrap, 3 joining", 1, 3}, + {"2 bootstrap, 5 joining", 2, 5}, + {"3 bootstrap, 20 joining", 3, 20}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // directory to store address books + dir, err := os.MkdirTemp("", "pex_from_scratch") + require.NoError(t, err) + defer os.RemoveAll(dir) + + totalNodes := tc.bootstrap + tc.joining + switches := make([]*p2p.Switch, totalNodes) + books := make([]AddrBook, totalNodes) + logger := log.TestingLogger() + + // Create all switches (bootstrap + joining nodes) + for i := 0; i < totalNodes; i++ { + switches[i] = p2p.MakeSwitch(cfg, i, func(i int, sw *p2p.Switch) *p2p.Switch { + books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false) + books[i].SetLogger(logger.With("pex", i)) + sw.SetAddrBook(books[i]) + + sw.SetLogger(logger.With("pex", i)) + + r := NewReactor(books[i], &ReactorConfig{SeedMode: false}) + r.SetLogger(logger.With("pex", i)) + sw.AddReactor("pex", r) + + return sw + }) + } + + // Bootstrap nodes know about each other + for i := 0; i < tc.bootstrap; i++ { + for j := 0; j < tc.bootstrap; j++ { + if i != j { + addr := switches[j].NetAddress() + err := books[i].AddAddress(addr, addr) + require.NoError(t, err) + } + } + } + + // Joining nodes each know about a single bootstrap node + for i := tc.bootstrap; i < totalNodes; i++ { + // Each joining node connects to a bootstrap node at index (i-tc.bootstrap) % tc.bootstrap + bootstrapIndex := (i - tc.bootstrap) % tc.bootstrap + addr := switches[bootstrapIndex].NetAddress() + err := books[i].AddAddress(addr, addr) + require.NoError(t, err) + } + + // Start bootstrappers + for i := 0; i < tc.bootstrap; i++ { + err := switches[i].Start() + require.NoError(t, err) + } + + // Wait a bit and start joining nodes + time.Sleep(100 * time.Millisecond) + for i := tc.bootstrap; i < totalNodes; i++ { + err := switches[i].Start() + require.NoError(t, err) + } + + assertFullAddressBooks(t, totalNodes, books) + }) + } +} + +// assertFullAddressBooks checks if all address books have the expected number of peer entries within a timeout period. +func assertFullAddressBooks(t *testing.T, totalNodes int, books []AddrBook) { + var ( + ticker = time.NewTicker(1 * time.Second) + timeoutCh = time.After(90 * time.Second) + ) + defer ticker.Stop() + + allGood := false + expected := totalNodes - 1 + for !allGood { + select { + case <-timeoutCh: + t.Errorf("expected all nodes to connect to each other") + t.FailNow() + case <-ticker.C: + // PEX is responsible for address exchange, so we need to check address books, not the number of connections + // let's make a strong assumption - each node knows of all other nodes + cnt := 0 + for i := 0; i < totalNodes; i++ { + total := len(books[i].GetSelection()) + if total == expected { + cnt++ + } + } + t.Logf("%d nodes with full address book", cnt) + if cnt == totalNodes { + allGood = true + } + } + } + assert.True(t, allGood, "Not all nodes connected to each other") +} diff --git a/p2p/switch.go b/p2p/switch.go index 85520a2eb7..fe60a1a5c9 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -400,7 +400,7 @@ func (sw *Switch) getPeerAddress(peer Peer) (*NetAddress, error) { // StopPeerGracefully disconnects from a peer gracefully. // TODO: handle graceful disconnects. func (sw *Switch) StopPeerGracefully(peer Peer, reactorName string) { - sw.Logger.Info("Stopping peer gracefully") + sw.Logger.Info("Stopping peer gracefully", "peer", peer.ID()) sw.removePeerFromReactor(peer, reactorName) diff --git a/test/e2e/networks/ci.toml b/test/e2e/networks/ci.toml index c9f1a6fbbb..1d3edfb6a2 100644 --- a/test/e2e/networks/ci.toml +++ b/test/e2e/networks/ci.toml @@ -35,7 +35,7 @@ validator04 = 100 validator05 = 50 [node.seed01] -mode = "seed" +# mode = "seed" perturb = ["restart"] [node.validator01]