From 7d15cf1a7d8dbbe27f5547ef6a8ab85415fbc3a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Mon, 28 Jul 2025 12:06:11 +0200 Subject: [PATCH 1/9] test: add integration test for PEX network bootstrapping with dynamic node connections --- p2p/pex/pex_reactor_test.go | 118 ++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 4312194d3b..42c69b7fcb 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -919,3 +919,121 @@ func TestPEXReactorWhenAddressBookIsSmallerThanMaxDials(t *testing.T) { assert.Equal(t, 1, pexR.AttemptsToDial(peer)) } } + +// TestPexFromScratch simulates establishing a working P2P network from scratch with a single bootstrap node. +// +// To further test reactor-specific peer disconnections and bootstrap node accept capabilities, each switch is running +// not only PEX, but also some test reactors. +// It is asserted that nodes can connect to each other and the address books contain other peers. +func TestPexFromScratch(t *testing.T) { + testCases := []struct { + name string + bootstrap int // n - number of bootstrap nodes + joining int // m - number of joining nodes + }{ + {"1 bootstrap, 3 joining", 1, 3}, + {"2 bootstrap, 5 joining", 2, 5}, + {"3 bootstrap, 20 joining", 3, 20}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // directory to store address books + dir, err := os.MkdirTemp("", "pex_from_scratch") + require.NoError(t, err) + defer os.RemoveAll(dir) + + totalNodes := tc.bootstrap + tc.joining + switches := make([]*p2p.Switch, totalNodes) + books := make([]AddrBook, totalNodes) + logger := log.TestingLogger() + + // Create all switches (bootstrap + joining nodes) + for i := 0; i < totalNodes; i++ { + switches[i] = p2p.MakeSwitch(cfg, i, func(i int, sw *p2p.Switch) *p2p.Switch { + books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false) + books[i].SetLogger(logger.With("pex", i)) + sw.SetAddrBook(books[i]) + + sw.SetLogger(logger.With("pex", i)) + + r := NewReactor(books[i], &ReactorConfig{SeedMode: true}) + r.SetLogger(logger.With("pex", i)) + r.SetEnsurePeersPeriod(250 * time.Millisecond) + sw.AddReactor("pex", r) + + return sw + }) + } + + // Bootstrap nodes know about each other + for i := 0; i < tc.bootstrap; i++ { + for j := 0; j < tc.bootstrap; j++ { + if i != j { + addr := switches[j].NetAddress() + err := books[i].AddAddress(addr, addr) + require.NoError(t, err) + } + } + } + + // Joining nodes each know about a single bootstrap node + for i := tc.bootstrap; i < totalNodes; i++ { + // Each joining node connects to a bootstrap node at index (i-tc.bootstrap) % tc.bootstrap + bootstrapIndex := (i - tc.bootstrap) % tc.bootstrap + addr := switches[bootstrapIndex].NetAddress() + err := books[i].AddAddress(addr, addr) + require.NoError(t, err) + } + + // Start bootstrappers + for i := 0; i < tc.bootstrap; i++ { + err := switches[i].Start() + require.NoError(t, err) + } + + // Wait a bit and start joining nodes + time.Sleep(100 * time.Millisecond) + for i := tc.bootstrap; i < totalNodes; i++ { + err := switches[i].Start() + require.NoError(t, err) + } + + assertFullAddressBooks(t, totalNodes, books) + }) + } +} + +// assertFullAddressBooks checks if all address books have the expected number of peer entries within a timeout period. +func assertFullAddressBooks(t *testing.T, totalNodes int, books []AddrBook) { + var ( + ticker = time.NewTicker(50 * time.Millisecond) + timeoutCh = time.After(5 * time.Second) + ) + defer ticker.Stop() + + allGood := false + expected := totalNodes - 1 + for !allGood { + select { + case <-ticker.C: + // PEX is responsible for address exchange, so we need to check address books, not the number of connections + // let's make a strong assumption - each node knows of all other nodes + cnt := 0 + for i := 0; i < totalNodes; i++ { + total := len(books[i].GetSelection()) + if total == expected { + cnt++ + } + } + t.Logf("%d nodes with full address book", cnt) + if cnt == totalNodes { + allGood = true + } + case <-timeoutCh: + t.Errorf("expected all nodes to connect to each other") + t.Fail() + } + } + assert.True(t, allGood, "Not all nodes connected to each other") +} From 2b458274e13c8c21b49cf1d1ba6fba200e9c6f6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Sat, 2 Aug 2025 22:17:46 +0200 Subject: [PATCH 2/9] feat: make all nodes disconnect just after PEX exchange Adjust tests to show that PEX is working correctly. --- p2p/pex/pex_reactor.go | 2 +- p2p/pex/pex_reactor_test.go | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index cb3a553e8a..a406970795 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -254,7 +254,7 @@ func (r *Reactor) Receive(e p2p.Envelope) { // If we're a seed and this is an inbound peer, // respond once and disconnect. - if r.config.SeedMode && !e.Src.IsOutbound() { + if /* r.config.SeedMode && */ !e.Src.IsOutbound() { id := string(e.Src.ID()) v := r.lastReceivedRequests.Get(id) if v != nil { diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 42c69b7fcb..007b2d8ff3 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -926,6 +926,7 @@ func TestPEXReactorWhenAddressBookIsSmallerThanMaxDials(t *testing.T) { // not only PEX, but also some test reactors. // It is asserted that nodes can connect to each other and the address books contain other peers. func TestPexFromScratch(t *testing.T) { + t.Parallel() testCases := []struct { name string bootstrap int // n - number of bootstrap nodes @@ -957,9 +958,8 @@ func TestPexFromScratch(t *testing.T) { sw.SetLogger(logger.With("pex", i)) - r := NewReactor(books[i], &ReactorConfig{SeedMode: true}) + r := NewReactor(books[i], &ReactorConfig{SeedMode: false}) r.SetLogger(logger.With("pex", i)) - r.SetEnsurePeersPeriod(250 * time.Millisecond) sw.AddReactor("pex", r) return sw @@ -1007,8 +1007,8 @@ func TestPexFromScratch(t *testing.T) { // assertFullAddressBooks checks if all address books have the expected number of peer entries within a timeout period. func assertFullAddressBooks(t *testing.T, totalNodes int, books []AddrBook) { var ( - ticker = time.NewTicker(50 * time.Millisecond) - timeoutCh = time.After(5 * time.Second) + ticker = time.NewTicker(1 * time.Second) + timeoutCh = time.After(90 * time.Second) ) defer ticker.Stop() @@ -1016,6 +1016,9 @@ func assertFullAddressBooks(t *testing.T, totalNodes int, books []AddrBook) { expected := totalNodes - 1 for !allGood { select { + case <-timeoutCh: + t.Errorf("expected all nodes to connect to each other") + t.FailNow() case <-ticker.C: // PEX is responsible for address exchange, so we need to check address books, not the number of connections // let's make a strong assumption - each node knows of all other nodes @@ -1030,9 +1033,6 @@ func assertFullAddressBooks(t *testing.T, totalNodes int, books []AddrBook) { if cnt == totalNodes { allGood = true } - case <-timeoutCh: - t.Errorf("expected all nodes to connect to each other") - t.Fail() } } assert.True(t, allGood, "Not all nodes connected to each other") From 20e9db36e9feca8bd05f3fbc47f2f418ab2edf63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Sat, 2 Aug 2025 22:24:00 +0200 Subject: [PATCH 3/9] test(e2e): run all nodes in identical way Even the seed01 is normal node, not seed node. --- test/e2e/networks/ci.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/networks/ci.toml b/test/e2e/networks/ci.toml index c9f1a6fbbb..1d3edfb6a2 100644 --- a/test/e2e/networks/ci.toml +++ b/test/e2e/networks/ci.toml @@ -35,7 +35,7 @@ validator04 = 100 validator05 = 50 [node.seed01] -mode = "seed" +# mode = "seed" perturb = ["restart"] [node.validator01] From ced8dee6de3941f385c308363d23224d906d06fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Sat, 2 Aug 2025 22:46:17 +0200 Subject: [PATCH 4/9] fix: remove FlushStop so reactor-specific-peers can be used --- p2p/pex/pex_reactor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index a406970795..628ae73b83 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -268,7 +268,6 @@ func (r *Reactor) Receive(e p2p.Envelope) { r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers)) go func() { // In a go-routine so it doesn't block .Receive. - e.Src.FlushStop() r.Switch.StopPeerGracefully(e.Src, r.String()) }() From d4dd7d113587ae49d8318d659ca2ad7342d2fd47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Sat, 2 Aug 2025 22:47:14 +0200 Subject: [PATCH 5/9] chore: cleanup --- p2p/pex/pex_reactor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 628ae73b83..912e9387b0 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -254,7 +254,7 @@ func (r *Reactor) Receive(e p2p.Envelope) { // If we're a seed and this is an inbound peer, // respond once and disconnect. - if /* r.config.SeedMode && */ !e.Src.IsOutbound() { + if !e.Src.IsOutbound() { id := string(e.Src.ID()) v := r.lastReceivedRequests.Get(id) if v != nil { From 0e8f37f722d47bfbb76dae4359f3ab9599e55a0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Sat, 2 Aug 2025 23:46:14 +0200 Subject: [PATCH 6/9] test: update old unit test Added mock reactor with single channel, to use reactor-specific-peers and maintain connection. Increased timeout, to make test more stable - usually it finishes within 260ms, but sometimes it needs to wait and request for peers again, after 30s. --- p2p/pex/pex_reactor_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 007b2d8ff3..18f24eaeb3 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -16,6 +16,7 @@ import ( "github.com/cometbft/cometbft/config" "github.com/cometbft/cometbft/libs/log" "github.com/cometbft/cometbft/p2p" + "github.com/cometbft/cometbft/p2p/conn" "github.com/cometbft/cometbft/p2p/mock" tmp2p "github.com/cometbft/cometbft/proto/tendermint/p2p" ) @@ -91,6 +92,11 @@ func TestPEXReactorRunning(t *testing.T) { r.SetLogger(logger.With("pex", i)) r.SetEnsurePeersPeriod(250 * time.Millisecond) sw.AddReactor("pex", r) + mr := mock.NewReactor() + mr.Channels = []*conn.ChannelDescriptor{ + {ID: 123, Priority: 123}, + } + sw.AddReactor("mock", mr) return sw }) @@ -111,7 +117,7 @@ func TestPEXReactorRunning(t *testing.T) { require.Nil(t, err) } - assertPeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second, N-1) + assertPeersWithTimeout(t, switches, 10*time.Millisecond, 60*time.Second, N-1) // stop them for _, s := range switches { From 9c911ecb7e96be365aac51719e904d2169d72da7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Sun, 3 Aug 2025 00:08:13 +0200 Subject: [PATCH 7/9] fix: more defensive approach to stopping peer in PEX This helps with some flaky tests, and probably can eliminate some edge cases in real world. --- p2p/pex/pex_reactor.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 912e9387b0..e913810332 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -268,7 +268,9 @@ func (r *Reactor) Receive(e p2p.Envelope) { r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers)) go func() { // In a go-routine so it doesn't block .Receive. - r.Switch.StopPeerGracefully(e.Src, r.String()) + if r.Switch != nil && r.Switch.IsRunning() { + r.Switch.StopPeerGracefully(e.Src, r.String()) + } }() } else { From b3d9974cbfc5c89055c93a6c87ab109dd3b7f31e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Mon, 18 Aug 2025 14:43:47 +0200 Subject: [PATCH 8/9] chore: improve logging in PEX & Switch --- p2p/pex/pex_reactor.go | 2 +- p2p/switch.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index e913810332..cb238b4773 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -242,7 +242,7 @@ func (r *Reactor) logErrAddrBook(err error) { // Receive implements Reactor by handling incoming PEX messages. func (r *Reactor) Receive(e p2p.Envelope) { - r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) + r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg_type", fmt.Sprintf("%T", e.Message)) switch msg := e.Message.(type) { case *tmp2p.PexRequest: diff --git a/p2p/switch.go b/p2p/switch.go index 85520a2eb7..fe60a1a5c9 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -400,7 +400,7 @@ func (sw *Switch) getPeerAddress(peer Peer) (*NetAddress, error) { // StopPeerGracefully disconnects from a peer gracefully. // TODO: handle graceful disconnects. func (sw *Switch) StopPeerGracefully(peer Peer, reactorName string) { - sw.Logger.Info("Stopping peer gracefully") + sw.Logger.Info("Stopping peer gracefully", "peer", peer.ID()) sw.removePeerFromReactor(peer, reactorName) From 7b0f131a796ad713d9f501c0fa9366e08febb5cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Mon, 18 Aug 2025 14:45:15 +0200 Subject: [PATCH 9/9] fix: fix the PEX bootstrap tests by adding a sleep This is a temporary solution, just to show that peer is stopped before the messages are delivered - this change is caused by more indirect message processing introduced with parallel message processing. --- p2p/pex/pex_reactor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index cb238b4773..79946d193f 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -268,6 +268,7 @@ func (r *Reactor) Receive(e p2p.Envelope) { r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers)) go func() { // In a go-routine so it doesn't block .Receive. + time.Sleep(200 * time.Millisecond) if r.Switch != nil && r.Switch.IsRunning() { r.Switch.StopPeerGracefully(e.Src, r.String()) }