Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions p2p/pex/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (r *Reactor) logErrAddrBook(err error) {

// Receive implements Reactor by handling incoming PEX messages.
func (r *Reactor) Receive(e p2p.Envelope) {
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg_type", fmt.Sprintf("%T", e.Message))

switch msg := e.Message.(type) {
case *tmp2p.PexRequest:
Expand All @@ -254,7 +254,7 @@ func (r *Reactor) Receive(e p2p.Envelope) {

// If we're a seed and this is an inbound peer,
// respond once and disconnect.
if r.config.SeedMode && !e.Src.IsOutbound() {
if !e.Src.IsOutbound() {
id := string(e.Src.ID())
v := r.lastReceivedRequests.Get(id)
if v != nil {
Expand All @@ -268,8 +268,10 @@ func (r *Reactor) Receive(e p2p.Envelope) {
r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
go func() {
// In a go-routine so it doesn't block .Receive.
e.Src.FlushStop()
r.Switch.StopPeerGracefully(e.Src, r.String())
time.Sleep(200 * time.Millisecond)
if r.Switch != nil && r.Switch.IsRunning() {
r.Switch.StopPeerGracefully(e.Src, r.String())
}
}()

} else {
Expand Down
126 changes: 125 additions & 1 deletion p2p/pex/pex_reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/p2p/conn"
"github.com/cometbft/cometbft/p2p/mock"
tmp2p "github.com/cometbft/cometbft/proto/tendermint/p2p"
)
Expand Down Expand Up @@ -91,6 +92,11 @@ func TestPEXReactorRunning(t *testing.T) {
r.SetLogger(logger.With("pex", i))
r.SetEnsurePeersPeriod(250 * time.Millisecond)
sw.AddReactor("pex", r)
mr := mock.NewReactor()
mr.Channels = []*conn.ChannelDescriptor{
{ID: 123, Priority: 123},
}
sw.AddReactor("mock", mr)

return sw
})
Expand All @@ -111,7 +117,7 @@ func TestPEXReactorRunning(t *testing.T) {
require.Nil(t, err)
}

assertPeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second, N-1)
assertPeersWithTimeout(t, switches, 10*time.Millisecond, 60*time.Second, N-1)

// stop them
for _, s := range switches {
Expand Down Expand Up @@ -919,3 +925,121 @@ func TestPEXReactorWhenAddressBookIsSmallerThanMaxDials(t *testing.T) {
assert.Equal(t, 1, pexR.AttemptsToDial(peer))
}
}

// TestPexFromScratch simulates establishing a working P2P network from scratch with a single bootstrap node.
//
// To further test reactor-specific peer disconnections and bootstrap node accept capabilities, each switch is running
// not only PEX, but also some test reactors.
// It is asserted that nodes can connect to each other and the address books contain other peers.
func TestPexFromScratch(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
bootstrap int // n - number of bootstrap nodes
joining int // m - number of joining nodes
}{
{"1 bootstrap, 3 joining", 1, 3},
{"2 bootstrap, 5 joining", 2, 5},
{"3 bootstrap, 20 joining", 3, 20},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// directory to store address books
dir, err := os.MkdirTemp("", "pex_from_scratch")
require.NoError(t, err)
defer os.RemoveAll(dir)

totalNodes := tc.bootstrap + tc.joining
switches := make([]*p2p.Switch, totalNodes)
books := make([]AddrBook, totalNodes)
logger := log.TestingLogger()

// Create all switches (bootstrap + joining nodes)
for i := 0; i < totalNodes; i++ {
switches[i] = p2p.MakeSwitch(cfg, i, func(i int, sw *p2p.Switch) *p2p.Switch {
books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false)
books[i].SetLogger(logger.With("pex", i))
sw.SetAddrBook(books[i])

sw.SetLogger(logger.With("pex", i))

r := NewReactor(books[i], &ReactorConfig{SeedMode: false})
r.SetLogger(logger.With("pex", i))
sw.AddReactor("pex", r)

return sw
})
}

// Bootstrap nodes know about each other
for i := 0; i < tc.bootstrap; i++ {
for j := 0; j < tc.bootstrap; j++ {
if i != j {
addr := switches[j].NetAddress()
err := books[i].AddAddress(addr, addr)
require.NoError(t, err)
}
}
}

// Joining nodes each know about a single bootstrap node
for i := tc.bootstrap; i < totalNodes; i++ {
// Each joining node connects to a bootstrap node at index (i-tc.bootstrap) % tc.bootstrap
bootstrapIndex := (i - tc.bootstrap) % tc.bootstrap
addr := switches[bootstrapIndex].NetAddress()
err := books[i].AddAddress(addr, addr)
require.NoError(t, err)
}

// Start bootstrappers
for i := 0; i < tc.bootstrap; i++ {
err := switches[i].Start()
require.NoError(t, err)
}

// Wait a bit and start joining nodes
time.Sleep(100 * time.Millisecond)
for i := tc.bootstrap; i < totalNodes; i++ {
err := switches[i].Start()
require.NoError(t, err)
}

assertFullAddressBooks(t, totalNodes, books)
})
}
}

// assertFullAddressBooks checks if all address books have the expected number of peer entries within a timeout period.
func assertFullAddressBooks(t *testing.T, totalNodes int, books []AddrBook) {
var (
ticker = time.NewTicker(1 * time.Second)
timeoutCh = time.After(90 * time.Second)
)
defer ticker.Stop()

allGood := false
expected := totalNodes - 1
for !allGood {
select {
case <-timeoutCh:
t.Errorf("expected all nodes to connect to each other")
t.FailNow()
case <-ticker.C:
// PEX is responsible for address exchange, so we need to check address books, not the number of connections
// let's make a strong assumption - each node knows of all other nodes
cnt := 0
for i := 0; i < totalNodes; i++ {
total := len(books[i].GetSelection())
if total == expected {
cnt++
}
}
t.Logf("%d nodes with full address book", cnt)
if cnt == totalNodes {
allGood = true
}
}
}
assert.True(t, allGood, "Not all nodes connected to each other")
}
2 changes: 1 addition & 1 deletion p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (sw *Switch) getPeerAddress(peer Peer) (*NetAddress, error) {
// StopPeerGracefully disconnects from a peer gracefully.
// TODO: handle graceful disconnects.
func (sw *Switch) StopPeerGracefully(peer Peer, reactorName string) {
sw.Logger.Info("Stopping peer gracefully")
sw.Logger.Info("Stopping peer gracefully", "peer", peer.ID())

sw.removePeerFromReactor(peer, reactorName)

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/networks/ci.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ validator04 = 100
validator05 = 50

[node.seed01]
mode = "seed"
# mode = "seed"
perturb = ["restart"]

[node.validator01]
Expand Down
Loading