diff --git a/cmd/main.go b/cmd/main.go index 9a53cb30..0bd8011d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -42,7 +42,7 @@ var version = "development" type P2PServer interface { Start() error - Shutdown() error + Shutdown() } // nolint: godot @@ -172,9 +172,7 @@ func main() { <-quit - if err := p2pServer.Shutdown(); err != nil { - log.Error().Msgf("failed to stop p2p server: %v", err) - } + p2pServer.Shutdown() if err := ws.Shutdown(); err != nil { log.Error().Msgf("failed to stop websocket server: %v", err) diff --git a/config/config.go b/config/config.go index d9ca0dac..62fa66e7 100644 --- a/config/config.go +++ b/config/config.go @@ -126,6 +126,10 @@ type P2PConfig struct { UserAgentName string `mapstructure:"user_agent_name" description:"The name that should be used during announcement of the client on the p2p network"` UserAgentVersion string `mapstructure:"user_agent_version" description:"By default will be equal to application version, but can be overridden for development purposes"` Experimental bool `mapstructure:"experimental" description:"Turns on a new (highly experimental) way of getting headers with the usage of /internal/transports/p2p instead of /transports/p2p"` + + MaxOutboundConnections uint `mapstructure:"max_outbound_connections" description:"Maximum active outbound connections"` + MaxInboundConnections uint `mapstructure:"max_inbound_connections" description:"Maximum active inbound connections"` + AcceptLocalPeers bool `mapstructure:"accept_local_peers" description:"Accept connection from local network"` } // LoggingConfig represents a logging config. diff --git a/config/defaults.go b/config/defaults.go index f3944a82..a5c03992 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -72,6 +72,9 @@ func getP2PDefaults() *P2PConfig { UserAgentName: ApplicationName, UserAgentVersion: Version(), Experimental: false, + MaxOutboundConnections: 8, + MaxInboundConnections: 8, + AcceptLocalPeers: false, } } diff --git a/internal/chaincfg/params.go b/internal/chaincfg/params.go index f7919c28..e19dbf32 100644 --- a/internal/chaincfg/params.go +++ b/internal/chaincfg/params.go @@ -108,7 +108,7 @@ type Params struct { Net wire.BitcoinNet // DefaultPort defines the default peer-to-peer port for the network. - DefaultPort string + DefaultPort uint16 // DNSSeeds defines a list of DNS seeds for the network that are used // as one method to discover peers. @@ -222,7 +222,7 @@ type Params struct { var MainNetParams = Params{ Name: "mainnet", Net: wire.MainNet, - DefaultPort: "8333", + DefaultPort: uint16(8333), DNSSeeds: []DNSSeed{ {"seed-nodes.bsvb.tech", true}, }, @@ -297,7 +297,7 @@ var MainNetParams = Params{ var RegressionNetParams = Params{ Name: "regtest", Net: wire.TestNet, - DefaultPort: "18444", + DefaultPort: uint16(18444), DNSSeeds: []DNSSeed{}, // Chain parameters @@ -365,7 +365,7 @@ var RegressionNetParams = Params{ var TestNet3Params = Params{ Name: "testnet3", Net: wire.TestNet3, - DefaultPort: "18333", + DefaultPort: uint16(18333), DNSSeeds: []DNSSeed{ {"testnet-seed.metasv.io", true}, {"testnet-btccash-seeder.bitcoinunlimited.info", true}, @@ -452,7 +452,7 @@ var TestNet3Params = Params{ var SimNetParams = Params{ Name: "simnet", Net: wire.SimNet, - DefaultPort: "18555", + DefaultPort: uint16(18555), DNSSeeds: []DNSSeed{}, // NOTE: There must NOT be any seeds. // Chain parameters diff --git a/internal/transports/p2p/addr.go b/internal/transports/p2p/addr.go deleted file mode 100644 index 1dcefa0d..00000000 --- a/internal/transports/p2p/addr.go +++ /dev/null @@ -1,27 +0,0 @@ -package p2pexp - -import ( - "errors" - "fmt" - "net" - "strconv" -) - -func parseAddress(addr, port string) (*net.TCPAddr, error) { - portInt, err := strconv.Atoi(port) - if err != nil { - return nil, fmt.Errorf("could not parse port: %w", err) - } - - ip := net.ParseIP(addr) - if ip == nil { - return nil, errors.New("could not parse peer IP") - } - - netAddr := &net.TCPAddr{ - IP: ip, - Port: portInt, - } - - return netAddr, nil -} diff --git a/internal/transports/p2p/addr_test.go b/internal/transports/p2p/addr_test.go deleted file mode 100644 index 53070edb..00000000 --- a/internal/transports/p2p/addr_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package p2pexp - -import ( - "net" - "testing" - - "github.com/bitcoin-sv/block-headers-service/internal/tests/assert" -) - -func TestParseAddress(t *testing.T) { - t.Run("test success", func(t *testing.T) { - // given - cases := []struct { - addr string - port string - expected *net.TCPAddr - }{ - { - addr: "127.0.0.1", - port: "8333", - expected: &net.TCPAddr{ - IP: net.ParseIP("127.0.0.1"), - Port: 8333, - }, - }, - { - addr: "::ffff:192.0.2.1", - port: "8333", - expected: &net.TCPAddr{ - IP: net.ParseIP("::ffff:192.0.2.1"), - Port: 8333, - }, - }, - { - addr: "2001:db8::68", - port: "8333", - expected: &net.TCPAddr{ - IP: net.ParseIP("2001:db8::68"), - Port: 8333, - }, - }, - } - - for _, c := range cases { - // when - addr, err := parseAddress(c.addr, c.port) - - // then - assert.Equal(t, addr, c.expected) - assert.NoError(t, err) - } - }) - - t.Run("test errors", func(t *testing.T) { - // given - cases := []struct { - addr string - port string - expectedError string - }{ - { - addr: "127.0.0.1", - port: "wrong_port", - expectedError: "could not parse port: strconv.Atoi: parsing \"wrong_port\": invalid syntax", - }, - { - addr: "wrong_ip", - port: "8333", - expectedError: "could not parse peer IP", - }, - } - - for _, c := range cases { - // when - addr, err := parseAddress(c.addr, c.port) - - // then - assert.Equal(t, addr, nil) - assert.IsError(t, err, c.expectedError) - } - }) -} diff --git a/internal/transports/p2p/network/address_book.go b/internal/transports/p2p/network/address_book.go new file mode 100644 index 00000000..62a68fd2 --- /dev/null +++ b/internal/transports/p2p/network/address_book.go @@ -0,0 +1,165 @@ +package network + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/bitcoin-sv/block-headers-service/internal/wire" +) + +type addressBucketType string + +const ( + freeBucket addressBucketType = "free" + usedBucket addressBucketType = "used" + bannedBucket addressBucketType = "banned" +) + +// AddressBook represents a collection of known network addresses. +type AddressBook struct { + banDuration time.Duration + addrs map[addressBucketType]*addrBucket + mu sync.Mutex + addrFitlerFn func(*wire.NetAddress) bool +} + +// NewAddressBook creates and initializes a new AddressBook instance. +func NewAddressBook(banDuration time.Duration, acceptLocalAddresses bool) *AddressBook { + // Set the address filter function based on whether local addresses are accepted + addrFilterFn := wire.IsRoutable + if acceptLocalAddresses { + addrFilterFn = wire.IsRoutableWithLocal + } + + const addressesInitCapacity = 500 + const usedAddressesInitCapacity = 8 + + knownAddress := make(map[addressBucketType]*addrBucket, 3) + knownAddress[freeBucket] = newAddrBucket(addressesInitCapacity) + knownAddress[bannedBucket] = newAddrBucket(addressesInitCapacity) + knownAddress[usedBucket] = newAddrBucket(usedAddressesInitCapacity) + + return &AddressBook{ + banDuration: banDuration, + addrFitlerFn: addrFilterFn, + addrs: knownAddress, + } +} + +// UpsertAddrs updates or adds multiple addresses. +func (a *AddressBook) UpsertAddrs(address []*wire.NetAddress) { + a.mu.Lock() + defer a.mu.Unlock() + + for _, addr := range address { + if !a.addrFitlerFn(addr) { + continue + } + + key, ka, _ := a.findAddr(addr) + // If the address is not found, add it to the AddressBook. + if ka == nil { + a.addrs[freeBucket].add(key, &knownAddress{addr: addr}) + } else if addr.Timestamp.After(ka.addr.Timestamp) { + // Otherwise, update the timestamp if the new one is newer. + ka.addr.Timestamp = addr.Timestamp + } + } +} + +// MarkUsedAddr updates or adds a peer's address. +func (a *AddressBook) MarkUsedAddr(pa *wire.NetAddress) { + a.mu.Lock() + defer a.mu.Unlock() + + key := addrKey(pa) + // remove from free if exists + a.addrs[freeBucket].rm(key) + // add to used + a.addrs[usedBucket].add(key, &knownAddress{addr: pa}) + +} + +// BanAddr bans a network address. Ignores address if doesn't exist in the AddressBook. +func (a *AddressBook) BanAddr(addr *wire.NetAddress) { + a.mu.Lock() + defer a.mu.Unlock() + + if key, ka, bucket := a.findAddr(addr); ka != nil { + switch bucket { + case freeBucket: + a.ban(bucket, key, ka) + case usedBucket: + a.ban(bucket, key, ka) + case bannedBucket: + default: + // Do nothing + } + } +} + +// GetRandFreeAddr returns a randomly chosen unused network address. +func (a *AddressBook) GetRandFreeAddr() *wire.NetAddress { + a.mu.Lock() + defer a.mu.Unlock() + + freeAddres := a.addrs[freeBucket].items + fLen := len(freeAddres) + if fLen == 0 { + return nil + } + + // #nosec G404 + randIndx := rand.Intn(fLen) + return freeAddres[randIndx].addr +} + +func (a *AddressBook) findAddr(addr *wire.NetAddress) (key string, ka *knownAddress, bucket addressBucketType) { + key = addrKey(addr) + + // search in free addresses + if ka = a.addrs[freeBucket].find(key); ka != nil { + bucket = freeBucket + return + } + + // search in used + if ka = a.addrs[usedBucket].find(key); ka != nil { + bucket = usedBucket + return + } + + // search in banned + if ka = a.addrs[bannedBucket].find(key); ka != nil { + bucket = bannedBucket + return + } + + return key, nil, "" +} + +func (a *AddressBook) ban(bucket addressBucketType, key string, ka *knownAddress) { + a.addrs[bucket].rm(key) + a.addrs[bannedBucket].add(key, ka) + go a.unban(key, ka) +} + +func (a *AddressBook) unban(key string, ka *knownAddress) { + time.Sleep(a.banDuration) + + a.mu.Lock() + defer a.mu.Unlock() + + a.addrs[bannedBucket].rm(key) + a.addrs[freeBucket].add(key, ka) +} + +func addrKey(addr *wire.NetAddress) string { + return fmt.Sprintf("%s:%d", addr.IP, addr.Port) +} + +type knownAddress struct { + addr *wire.NetAddress +} diff --git a/internal/transports/p2p/network/address_book_test.go b/internal/transports/p2p/network/address_book_test.go new file mode 100644 index 00000000..8bfe7234 --- /dev/null +++ b/internal/transports/p2p/network/address_book_test.go @@ -0,0 +1,132 @@ +package network + +import ( + "net" + "testing" + "time" + + "github.com/bitcoin-sv/block-headers-service/internal/wire" + "github.com/stretchr/testify/require" +) + +func TestAddressBook_UpsertAddrs(t *testing.T) { + t.Run("add new - accept local", func(t *testing.T) { + // given + sut := NewAddressBook(0, true) + local := &wire.NetAddress{ + IP: net.IPv4(127, 0, 0, 1), + Port: 8333, + Timestamp: time.Now(), + } + + external := &wire.NetAddress{ + IP: net.IPv4(18, 199, 12, 185), + Port: 8333, + Timestamp: time.Now(), + } + + // when + sut.UpsertAddrs([]*wire.NetAddress{local, external}) + + // then + require.Len(t, sut.addrs[freeBucket].items, 2) + }) + + t.Run("add new - do not accept local", func(t *testing.T) { + // given + sut := NewAddressBook(0, false) + local := &wire.NetAddress{ + IP: net.IPv4(127, 0, 0, 1), + Port: 8333, + Timestamp: time.Now(), + } + + external := &wire.NetAddress{ + IP: net.IPv4(18, 199, 12, 185), + Port: 8333, + Timestamp: time.Now(), + } + + // when + sut.UpsertAddrs([]*wire.NetAddress{local, external}) + + // then + require.Len(t, sut.addrs[freeBucket].items, 1) + }) + + t.Run("add existing", func(t *testing.T) { + // given + sut := NewAddressBook(0, true) + addr := &wire.NetAddress{ + IP: net.IPv4(18, 199, 12, 185), + Port: 8333, + Timestamp: time.Now().Add(-1 * time.Minute), + } + + sut.UpsertAddrs([]*wire.NetAddress{addr}) + + // when + updated := &wire.NetAddress{ + IP: addr.IP, + Port: addr.Port, + Timestamp: time.Now(), + } + sut.UpsertAddrs([]*wire.NetAddress{updated}) + + // then + freeItems := sut.addrs[freeBucket].items + require.Len(t, freeItems, 1) + require.Equal(t, updated.Timestamp, freeItems[0].addr.Timestamp) + }) +} + +func TestAddressBook_BanAddr(t *testing.T) { + t.Run("ban address", func(t *testing.T) { + // given + sut := NewAddressBook(1, false) + + addr := &wire.NetAddress{ + IP: net.IPv4(18, 199, 12, 185), + Port: 8333, + Timestamp: time.Now(), + } + + sut.UpsertAddrs([]*wire.NetAddress{addr}) + + // when + sut.BanAddr(addr) + + // then + require.Len(t, sut.addrs[bannedBucket].items, 1) + require.Empty(t, sut.addrs[freeBucket].items) + require.Empty(t, sut.addrs[usedBucket].items) + }) +} + +func TestAddressBook_GetRandUnusedAddr(t *testing.T) { + t.Run("get random address", func(t *testing.T) { + // given + sut := NewAddressBook(time.Hour, false) + + addr := &wire.NetAddress{ + IP: net.IPv4(18, 199, 12, 186), + Port: 8333, + Timestamp: time.Now(), + } + + addr2 := &wire.NetAddress{ + IP: net.IPv4(18, 199, 12, 185), + Port: 8333, + Timestamp: time.Now(), + } + + sut.UpsertAddrs([]*wire.NetAddress{addr, addr2}) + sut.BanAddr(addr2) + + // when + r := sut.GetRandFreeAddr() + + // then + require.Equal(t, addr, r) + }) +} diff --git a/internal/transports/p2p/network/addresses_bucket.go b/internal/transports/p2p/network/addresses_bucket.go new file mode 100644 index 00000000..0a7d82df --- /dev/null +++ b/internal/transports/p2p/network/addresses_bucket.go @@ -0,0 +1,42 @@ +package network + +type addrBucket struct { + items []*knownAddress + lookup map[string]int +} + +func newAddrBucket(initCapacity uint) *addrBucket { + return &addrBucket{ + items: make([]*knownAddress, 0, initCapacity), + lookup: make(map[string]int, initCapacity), + } +} + +func (a *addrBucket) find(key string) *knownAddress { + addrIndex, ok := a.lookup[key] + + if ok { + return a.items[addrIndex] + } + return nil +} + +func (a *addrBucket) add(key string, addr *knownAddress) { + newItemIndex := len(a.items) + + a.items = append(a.items, addr) + a.lookup[key] = newItemIndex +} + +func (a *addrBucket) rm(key string) { + addrIndex, ok := a.lookup[key] + + if ok { + // substitute with last element + a.items[addrIndex] = a.items[len(a.items)-1] + // remove last element + a.items = a.items[:len(a.items)-1] + + delete(a.lookup, key) + } +} diff --git a/internal/transports/p2p/peer/peer.go b/internal/transports/p2p/peer/peer.go index eb87c9f4..f21e086f 100644 --- a/internal/transports/p2p/peer/peer.go +++ b/internal/transports/p2p/peer/peer.go @@ -19,6 +19,13 @@ import ( "github.com/rs/zerolog" ) +// Manager is peer manager. +type Manager interface { + AddAddrs([]*wire.NetAddress) + SignalError(*Peer, error) + SignalSyncFinished() +} + type Peer struct { // conn is the current connection to peer conn net.Conn @@ -61,6 +68,8 @@ type Peer struct { // sendHeadersMode is specifying whether we already sent the sendheaders message // to the peer and we're expecting to get just headers and no inv msg sendHeadersMode bool + // lastSeen is timestamp of last pong message from peer + lastSeen int64 // wg is a WaitGroup used to properly handle peer disconnection wg sync.WaitGroup @@ -70,6 +79,11 @@ type Peer struct { quitting bool // quit is a channel used to properly handle peer disconnection quit chan struct{} + // quitMutex is used to prevents concurrent disconnections + quitMutex sync.Mutex + + // manager is a peer manager + manager Manager } func NewPeer( @@ -80,7 +94,8 @@ func NewPeer( headersService service.Headers, chainService service.Chains, log *zerolog.Logger, -) (*Peer, error) { + manager Manager, +) *Peer { peer := &Peer{ conn: conn, inbound: inbound, @@ -88,69 +103,100 @@ func NewPeer( chainParams: chainParams, headersService: headersService, chainService: chainService, - log: log, services: wire.SFspv, protocolVersion: initialProtocolVersion, wg: sync.WaitGroup{}, msgChan: make(chan wire.Message, writeMsgChannelBufferSize), quitting: false, quit: make(chan struct{}), + manager: manager, } - return peer, nil + logger := log.With().Str("peer", peer.String()).Bool("inbound", peer.inbound).Logger() + peer.log = &logger + + return peer } func (p *Peer) Connect() error { err := p.updatePeerAddr() if err != nil { - p.Disconnect() return err } - p.log.Info().Msgf("connected to peer: %s", p) + p.log.Info().Msg("connected to peer") err = p.negotiateProtocol() if err != nil { - p.log.Error().Msgf("error negotiating protocol with peer %s, reason: %v", p, err) - p.Disconnect() + p.log.Error().Msgf("error negotiating protocol with peer, reason: %v", err) return err } + p.lastSeen = time.Now().Unix() + go p.writeMsgHandler() + go p.readMsgHandler() go p.pingHandler() return nil } func (p *Peer) Disconnect() { - p.log.Info().Msgf("disconnecting peer: %s", p) + p.quitMutex.Lock() + defer p.quitMutex.Unlock() + + if p.quitting { + return + } p.quitting = true + p.log.Info().Msg("disconnecting peer") + close(p.quit) err := p.conn.Close() if err != nil { - p.log.Error().Msgf("error disconnecting peer %s, reason %v", p, err) + p.log.Error().Msgf("error disconnecting peer, reason %v", err) } p.wg.Wait() - p.log.Info().Msgf("successfully disconnected peer %s", p) + p.log.Info().Msg("successfully disconnected peer") } func (p *Peer) StartHeadersSync() error { - go p.writeMsgHandler() - go p.readMsgHandler() - currentTipHeight := p.headersService.GetTipHeight() p.checkpoint = newCheckpoint(p.chainParams.Checkpoints, currentTipHeight, p.log) p.sendHeadersMode = false err := p.requestHeaders() if err != nil { - // TODO: lower peer sync score - p.Disconnect() return err } return nil } +func (p *Peer) SendSendHeaders() { + p.sendHeadersMode = true + + if p.protocolVersion >= wire.SendHeadersVersion { + p.queueMessage(wire.NewMsgSendHeaders()) + } +} + +func (p *Peer) SendGetAddrInfo() { + p.queueMessage(wire.NewMsgGetAddr()) +} + +func (p *Peer) GetPeerAddr() *wire.NetAddress { + return &wire.NetAddress{ + Timestamp: time.Unix(p.lastSeen, 0), + Services: p.services, + IP: p.addr.IP, + Port: uint16(p.addr.Port), + } +} + +func (p *Peer) Inbound() bool { + return p.inbound +} + func (p *Peer) updatePeerAddr() error { remoteAddr, addrIsTcp := p.conn.RemoteAddr().(*net.TCPAddr) @@ -175,7 +221,7 @@ func (p *Peer) requestHeaders() error { } if err != nil { - p.log.Error().Msgf("error requesting headers from peer %s, reason: %v", p, err) + p.log.Error().Msgf("error requesting headers, reason: %v", err) return err } return nil @@ -186,20 +232,20 @@ func (p *Peer) negotiateProtocol() error { if err != nil { return err } - p.log.Info().Msgf("version sent to peer: %s", p) + p.log.Info().Msg("version sent to peer") err = p.readVerAndVerAck() if err != nil { return err } - p.log.Info().Msgf("received version and verack from peer: %s", p) + p.log.Info().Msg("received version and verack") err = p.writeMessage(wire.NewMsgVerAck()) if err != nil { return err } - p.log.Info().Msgf("protocol negotiated successfully with peer: %s", p) + p.log.Info().Msg("protocol negotiated successfully with peer") return nil } @@ -219,11 +265,11 @@ func (p *Peer) pingHandler() { p.log.Error().Msgf("error generating nonce for ping msg, reason: %v", err) continue } - p.log.Info().Msgf("sending ping to peer %s with nonce: %d", p, nonce) + p.log.Debug().Msgf("sending ping to peer with nonce: %d", nonce) p.queueMessage(wire.NewMsgPing(nonce)) case <-p.quit: - p.log.Info().Msgf("ping handler shutdown for peer %s", p) + p.log.Info().Msg("ping handler shutdown for peer") return } } @@ -238,14 +284,16 @@ func (p *Peer) readMsgHandler() { for { select { case <-p.quit: - p.log.Info().Msgf("read msg handler shutdown for peer %s", p) + p.log.Info().Msg("read msg handler shutdown") return default: remoteMsg, _, err := wire.ReadMessage(p.conn, p.protocolVersion, p.chainParams.Net) if err != nil { if !p.quitting { - p.log.Error().Msgf("cannot read message from peer %s, reason: %v", p, err) + err = fmt.Errorf("cannot read message, reason: %w", err) + p.log.Error().Msg(err.Error()) + p.manager.SignalError(p, err) } continue } @@ -254,13 +302,16 @@ func (p *Peer) readMsgHandler() { case *wire.MsgPing: p.handlePingMsg(msg) case *wire.MsgPong: - p.log.Info().Msgf("received pong from peer %s with nonce: %d", p, msg.Nonce) + p.log.Debug().Msgf("received pong with nonce: %d", msg.Nonce) + p.lastSeen = time.Now().Unix() case *wire.MsgHeaders: p.handleHeadersMsg(msg) case *wire.MsgInv: p.handleInvMsg(msg) + case *wire.MsgAddr: + p.handleAddrMsg(msg) default: - p.log.Info().Msgf("received msg of type: %T", msg) + p.log.Debug().Msgf("received msg of type: %T", msg) } } } @@ -293,7 +344,7 @@ func (p *Peer) writeMsgHandler() { case msg := <-p.msgChan: err := p.writeMessage(msg) if err != nil { - p.log.Error().Msgf("error writing msg %T to peer %s", msg, p) + p.log.Error().Msgf("error writing msg %T to peer", msg) } case <-p.quit: // draining the channels for cleanup @@ -301,7 +352,7 @@ func (p *Peer) writeMsgHandler() { select { case <-p.msgChan: default: - p.log.Info().Msgf("write msg handler shutdown for peer %s", p) + p.log.Info().Msg("write msg handler shutdown") return } } @@ -324,7 +375,7 @@ func (p *Peer) writeOurVersionMsg() error { } theirNA := wire.NewNetAddress(p.addr, 0) - lastBlock := p.headersService.GetTip().Height + lastBlock := p.headersService.GetTipHeight() msg := wire.NewMsgVersion(ourNA, theirNA, nonce, lastBlock) err = msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion, userAgentComments) @@ -356,6 +407,7 @@ func (p *Peer) writeGetHeadersMsg(stopHash *chainhash.Hash) error { } } + p.log.Debug().Msgf("queue GetHeaders msg: %v", *msg) p.queueMessage(msg) return nil } @@ -432,43 +484,43 @@ func (p *Peer) requireVerAckReceived(remoteMsg wire.Message) (*wire.MsgVerAck, e } func (p *Peer) handlePingMsg(msg *wire.MsgPing) { - p.log.Info().Msgf("received ping from peer %s with nonce: %d", p, msg.Nonce) + p.log.Debug().Msgf("received ping with nonce: %d", msg.Nonce) if p.protocolVersion > wire.BIP0031Version { - p.log.Info().Msgf("sending pong to peer %s with nonce: %d", p, msg.Nonce) + p.log.Debug().Msgf("sending pong to peer with nonce: %d", msg.Nonce) p.queueMessage(wire.NewMsgPong(msg.Nonce)) } } func (p *Peer) handleInvMsg(msg *wire.MsgInv) { - p.log.Info().Msgf("received inv msg from peer %s", p) + p.log.Info().Msg("received inv msg") if !p.syncedCheckpoints { - p.log.Info().Msgf("we are still syncing, ignoring inv msg from peer %s", p) + p.log.Info().Msg("we are still syncing, ignoring inv msg") return } lastBlock := searchForFinalBlockIndex(msg.InvList) if lastBlock == -1 { - p.log.Info().Msgf("no blocks in inv msg from peer %s", p) + p.log.Info().Msg("no blocks in inv msg from peer") return } lastBlockHash := &msg.InvList[lastBlock].Hash _, err := p.headersService.GetHeightByHash(lastBlockHash) if err == nil { - p.log.Info().Msgf("blocks from inv msg from peer %s already existsing in db", p) + p.log.Info().Msg("blocks from inv msg already existsing in db") return } p.updateLatestStats(0, lastBlockHash) - p.log.Info().Msgf("requesting new headers from peer %s", p) + p.log.Debug().Msg("requesting new headers") err = p.writeGetHeadersMsg(lastBlockHash) if err != nil { - p.log.Error().Msgf("error while requesting new headers from peer %s, reason: %v", p, err) + p.log.Error().Msgf("error while requesting new headers, reason: %v", err) } } func (p *Peer) handleHeadersMsg(msg *wire.MsgHeaders) { - p.log.Info().Msgf("received headers msg from peer %s", p) + p.log.Info().Msg("received headers msg") lastHeight := int32(0) headersReceived := 0 @@ -482,9 +534,8 @@ func (p *Peer) handleHeadersMsg(msg *wire.MsgHeaders) { } if service.BlockRejected.Is(err) { - // TODO: ban peer - p.log.Error().Msgf("received rejected header %v from peer %s", h, p) - p.Disconnect() + p.log.Error().Msgf("received rejected header %v", h) + p.manager.SignalError(p, err) return } @@ -506,18 +557,14 @@ func (p *Peer) handleHeadersMsg(msg *wire.MsgHeaders) { if !h.IsLongestChain() { // TODO: ban peer or lower sync score - p.log.Warn().Msgf( - "received header with hash: %s that's not a part of the longest chain, from peer %s", - h.Hash.String(), p, - ) + p.log.Warn().Msgf("received header with hash: %s that's not a part of the longest chain", h.Hash) continue } err = p.checkpoint.VerifyAndAdvance(h) if err != nil { - // TODO: ban peer or lower peer sync score p.log.Error().Msgf("error when checking checkpoint, reason: %v", err) - p.Disconnect() + p.manager.SignalError(p, err) return } @@ -527,15 +574,11 @@ func (p *Peer) handleHeadersMsg(msg *wire.MsgHeaders) { } if headersReceived == 0 { - p.log.Debug().Msgf("received only existing headers from peer: %s", p) + p.log.Debug().Msg("received only existing headers") return } - p.log.Info().Msgf( - "successfully received %d headers from peer %s, up to height %d", - headersReceived, p, lastHeight, - ) - + p.log.Info().Msgf("successfully received %d headers up to height %d", headersReceived, lastHeight) p.updateLatestStats(lastHeight, lastHash) if p.sendHeadersMode { @@ -543,18 +586,23 @@ func (p *Peer) handleHeadersMsg(msg *wire.MsgHeaders) { } if p.isSynced() { - p.log.Info().Msgf("synced with the tip of chain from peer %s", p) + p.log.Info().Msg("synced with the tip of chain from peer") p.switchToSendHeadersMode() + p.manager.SignalSyncFinished() return } err := p.requestHeaders() if err != nil { - // TODO: lower peer sync score - p.Disconnect() + p.manager.SignalError(p, err) } } +func (p *Peer) handleAddrMsg(msg *wire.MsgAddr) { + p.log.Info().Msgf("received addr msg with %d addresses", len(msg.AddrList)) + p.manager.AddAddrs(msg.AddrList) +} + func (p *Peer) switchToSendHeadersMode() { if !p.sendHeadersMode && p.protocolVersion >= wire.SendHeadersVersion { p.log.Info().Msgf("switching to send headers mode - requesting peer %s to send us headers directly instead of inv msg", p) diff --git a/internal/transports/p2p/peer/peers_collection.go b/internal/transports/p2p/peer/peers_collection.go new file mode 100644 index 00000000..f759f250 --- /dev/null +++ b/internal/transports/p2p/peer/peers_collection.go @@ -0,0 +1,74 @@ +package peer + +import ( + "errors" + "slices" + "sync" +) + +// PeersCollection represents a fixed size collection of peer objects with concurrency-safe operations. +type PeersCollection struct { + peers []*Peer + size uint + mu sync.Mutex +} + +// NewPeersCollection creates and initializes a new PeersCollection instance with the specified, fixed size. +func NewPeersCollection(size uint) *PeersCollection { + return &PeersCollection{ + size: size, + peers: make([]*Peer, 0, size), + } +} + +// AddPeer adds a new peer to the PeersCollection. +// Returns an error if there is no space available for the new peer. +func (col *PeersCollection) AddPeer(p *Peer) error { + col.mu.Lock() + defer col.mu.Unlock() + + if len(col.peers) == int(col.size) { + return errors.New("no space available for new peer") + } + + col.peers = append(col.peers, p) + return nil +} + +// RmPeer removes the specified peer from the PeersCollection. Ignores address if doesn't exist in the PeersCollection. +func (col *PeersCollection) RmPeer(p *Peer) { + col.mu.Lock() + defer col.mu.Unlock() + + // find index of peer + pIndex := slices.Index(col.peers, p) + if pIndex == -1 { + return + } + + // substitute with last element + col.peers[pIndex] = col.peers[len(col.peers)-1] + + // remove last element + col.peers = col.peers[:len(col.peers)-1] +} + +// Space returns the number of available slots for new peers in the PeersCollection. +func (col *PeersCollection) Space() uint { + col.mu.Lock() + defer col.mu.Unlock() + + return col.size - uint(len(col.peers)) +} + +// Enumerate returns a slice containing all non-nil peers in the PeersCollection. Order of peers in the returned slice is not guaranteed. +func (col *PeersCollection) Enumerate() []*Peer { + col.mu.Lock() + defer col.mu.Unlock() + + // copy slice + res := make([]*Peer, len(col.peers)) + copy(res, col.peers) + + return res +} diff --git a/internal/transports/p2p/peer/peers_collection_test.go b/internal/transports/p2p/peer/peers_collection_test.go new file mode 100644 index 00000000..01490279 --- /dev/null +++ b/internal/transports/p2p/peer/peers_collection_test.go @@ -0,0 +1,166 @@ +package peer + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPeersCollection_AddPeer(t *testing.T) { + t.Run("add peer", func(t *testing.T) { + // given + sut := NewPeersCollection(3) + + // when + err := sut.AddPeer(&Peer{}) + + // then + require.NoError(t, err) + }) + + t.Run("no space for new item", func(t *testing.T) { + // given + sut := NewPeersCollection(1) + sut.AddPeer(&Peer{}) + + // when + err := sut.AddPeer(&Peer{}) + + // then + require.Error(t, err) + }) +} + +func TestPeersCollection_RmPeer(t *testing.T) { + t.Run("peer doesn't exist in collection", func(t *testing.T) { + // given + sut := NewPeersCollection(3) + sut.AddPeer(&Peer{}) + + // when + sut.RmPeer(&Peer{}) + + // then + require.Len(t, sut.Enumerate(), 1) + }) + + t.Run("peer exists in collection", func(t *testing.T) { + // given + sut := NewPeersCollection(3) + p1 := &Peer{} + sut.AddPeer(p1) + + // when + sut.RmPeer(p1) + + // then + require.Empty(t, sut.Enumerate()) + }) +} + +func TestPeersCollection_Space(t *testing.T) { + t.Run("empty collection", func(t *testing.T) { + // given + const size = uint(3) + sut := NewPeersCollection(size) + + // when + space := sut.Space() + + // then + require.Equal(t, size, space) + }) + + t.Run("grow only", func(t *testing.T) { + // given + const size = uint(3) + sut := NewPeersCollection(size) + + // when + sut.AddPeer(&Peer{}) + space := sut.Space() + + // then + require.Equal(t, size-1, space) + }) + + t.Run("grow and shrink", func(t *testing.T) { + // given + const size = uint(3) + sut := NewPeersCollection(size) + p1 := &Peer{} + p2 := &Peer{} + p3 := &Peer{} + + // when + sut.AddPeer(p1) + sut.AddPeer(p2) + sut.AddPeer(p3) + + sut.RmPeer(p1) + sut.RmPeer(p2) + + space := sut.Space() + + // then + require.Equal(t, size-1, space) + + }) + +} + +func TestPeersCollection_Enumerate(t *testing.T) { + t.Run("empty collection", func(t *testing.T) { + // given + sut := NewPeersCollection(3) + + // when + enumerated := sut.Enumerate() + + // then + require.Empty(t, enumerated) + + }) + + t.Run("grow only", func(t *testing.T) { + // given + sut := NewPeersCollection(3) + + // when + sut.AddPeer(&Peer{}) + enumerated := sut.Enumerate() + + // then + require.Len(t, enumerated, 1) + + for _, p := range enumerated { + require.NotNil(t, p) + } + }) + + t.Run("grow and shrink", func(t *testing.T) { + // given + sut := NewPeersCollection(3) + p1 := &Peer{} + p2 := &Peer{} + p3 := &Peer{} + + // when + sut.AddPeer(p1) + sut.AddPeer(p2) + sut.AddPeer(p3) + + sut.RmPeer(p1) + sut.RmPeer(p2) + + enumerated := sut.Enumerate() + + // then + require.Len(t, enumerated, 1) + + for _, p := range enumerated { + require.NotNil(t, p) + } + + }) +} diff --git a/internal/transports/p2p/server.go b/internal/transports/p2p/server.go index 8bdef734..31d3b8bb 100644 --- a/internal/transports/p2p/server.go +++ b/internal/transports/p2p/server.go @@ -1,12 +1,18 @@ package p2pexp import ( + "context" "errors" + "fmt" "net" + "sync" + "time" "github.com/bitcoin-sv/block-headers-service/config" "github.com/bitcoin-sv/block-headers-service/internal/chaincfg" + "github.com/bitcoin-sv/block-headers-service/internal/transports/p2p/network" "github.com/bitcoin-sv/block-headers-service/internal/transports/p2p/peer" + "github.com/bitcoin-sv/block-headers-service/internal/wire" "github.com/bitcoin-sv/block-headers-service/service" "github.com/rs/zerolog" ) @@ -17,10 +23,21 @@ type server struct { headersService service.Headers chainService service.Chains log *zerolog.Logger + outboundPeers *peer.PeersCollection + inboundPeers *peer.PeersCollection + listener net.Listener + addresses *network.AddressBook - peers []*peer.Peer + // lifecycle properties + ctx context.Context + ctxCancel context.CancelFunc + ctxWg sync.WaitGroup + + chainSyncFinished bool + chainSyncMutex sync.Mutex } +// NewServer creates and initializes a new P2P server instance. func NewServer( config *config.P2PConfig, chainParams *chaincfg.Params, @@ -29,99 +46,257 @@ func NewServer( log *zerolog.Logger, ) *server { serverLogger := log.With().Str("service", "p2p-experimental").Logger() + ctx, ctxCancel := context.WithCancel(context.Background()) server := &server{ config: config, chainParams: chainParams, headersService: headersService, chainService: chainService, log: &serverLogger, - peers: make([]*peer.Peer, 0), + outboundPeers: peer.NewPeersCollection(config.MaxOutboundConnections), + inboundPeers: peer.NewPeersCollection(config.MaxInboundConnections), + addresses: network.NewAddressBook(config.BanDuration, config.AcceptLocalPeers), + ctx: ctx, + ctxCancel: ctxCancel, } + return server } +// Start starts the P2P server by connecting to outbound peers and listening for inbound connections. func (s *server) Start() error { - err := s.seedAndConnect() + err := s.connectOutboundPeerFromSeed() if err != nil { + s.log.Error().Msgf("error during server start.Reason: %v", err) return err } - return s.listenAndConnect() + err = s.listenInboundPeers() + if err != nil { + s.log.Error().Msgf("error during server start. Shutdown p2p server. Reason: %v", err) + s.Shutdown() + + return err + } + return nil } -func (s *server) Shutdown() error { - for _, p := range s.peers { +// Shutdown gracefully shuts down the P2P server by disconnecting all peers. +func (s *server) Shutdown() { + // Stop listening for incoming connections + if s.listener != nil { + _ = s.listener.Close() + } + + // Cancel all child goroutines + s.ctxCancel() + s.ctxWg.Wait() + + // Disconnect active peers + for _, p := range s.outboundPeers.Enumerate() { + p.Disconnect() + } + for _, p := range s.inboundPeers.Enumerate() { p.Disconnect() } - return nil } -func (s *server) seedAndConnect() error { +func (s *server) connectOutboundPeerFromSeed() error { seeds := seedFromDNS(s.chainParams.DNSSeeds, s.log) if len(seeds) == 0 { return errors.New("no seeds found") } + // add seed addreses to address book + addrs := make([]*wire.NetAddress, 0, len(seeds)) for _, seed := range seeds { - s.log.Info().Msgf("Got peer addr: %s", seed.String()) + s.log.Debug().Msgf("got peer addr: %s", seed.String()) + addrs = append(addrs, &wire.NetAddress{IP: seed, Port: s.chainParams.DefaultPort}) } + s.addresses.UpsertAddrs(addrs) - firstPeerSeed := seeds[0].String() - firstPeerAddr, err := parseAddress(firstPeerSeed, s.chainParams.DefaultPort) - if err != nil { - s.log.Error().Msgf("error parsing peer %s address, reason: %v", firstPeerAddr.String(), err) + // connect to random seed + if err := s.connectToRandomAddr(); err != nil { return err } - inbound := false - conn, err := net.Dial(firstPeerAddr.Network(), firstPeerAddr.String()) + return nil +} + +func (s *server) listenInboundPeers() error { + ourAddr := net.JoinHostPort("", fmt.Sprintf("%d", s.chainParams.DefaultPort)) + listener, err := net.Listen("tcp", ourAddr) if err != nil { - s.log.Error().Msgf("error connecting to peer %s, reason: %v", firstPeerAddr.String(), err) + s.log.Error().Msgf("error creating listener, reason: %v", err) return err } - return s.connectPeer(conn, inbound) + s.listener = listener + go s.observeInboundPeers() + return nil } -func (s *server) listenAndConnect() error { - s.log.Info().Msgf("listening for inbound connections on port %s", s.chainParams.DefaultPort) +func (s *server) connectToAddr(addr net.IP, port uint16) error { + netAddr := &net.TCPAddr{ + IP: addr, + Port: int(port), + } - ourAddr := net.JoinHostPort("", s.chainParams.DefaultPort) - listener, err := net.Listen("tcp", ourAddr) + conn, err := net.Dial(netAddr.Network(), netAddr.String()) if err != nil { - s.log.Error().Msgf("error creating listener, reason: %v", err) + s.log.Error().Str("peer", netAddr.String()). + Bool("inbound", false). + Msgf("error connecting with peer, reason: %v", err) + + s.log.Info().Str("peer", netAddr.String()). + Bool("inbound", false). + Msgf("peer banned, reason: %v", err) + + s.addresses.BanAddr(&wire.NetAddress{IP: netAddr.IP, Port: uint16(netAddr.Port)}) return err } - inbound := true - conn, err := listener.Accept() + peer, err := s.connectPeer(conn, false) if err != nil { - s.log.Error().Msgf("error accepting connection, reason: %v", err) + s.log.Info().Str("peer", netAddr.String()). + Bool("inbound", false). + Msgf("peer banned, reason: %v", err) + + s.addresses.BanAddr(&wire.NetAddress{IP: netAddr.IP, Port: uint16(netAddr.Port)}) return err } - return s.connectPeer(conn, inbound) + _ = s.outboundPeers.AddPeer(peer) // don't need to check error here + s.addresses.MarkUsedAddr(peer.GetPeerAddr()) + return nil } -func (s *server) connectPeer(conn net.Conn, inbound bool) error { - peer, err := peer.NewPeer(conn, inbound, s.config, s.chainParams, s.headersService, s.chainService, s.log) +func (s *server) connectPeer(conn net.Conn, inbound bool) (*peer.Peer, error) { + peer := peer.NewPeer(conn, inbound, s.config, s.chainParams, s.headersService, s.chainService, s.log, s) + + err := peer.Connect() if err != nil { - return err + peer.Disconnect() + s.log.Error().Str("peer", peer.String()). + Bool("inbound", inbound). + Msgf("error connecting with peer, reason: %v", err) + return nil, err } - s.peers = append(s.peers, peer) + peer.SendGetAddrInfo() - err = peer.Connect() - if err != nil { - s.log.Error().Msgf("error connecting with peer %s, reason: %v", peer, err) - return err + if !inbound { + if s.chainSyncFinished { + peer.SendSendHeaders() + } else { + err = peer.StartHeadersSync() + if err != nil { + peer.Disconnect() + return nil, err + } + } } - err = peer.StartHeadersSync() - if err != nil { - s.log.Error().Msgf("error starting sync with peer %s, reason: %v", peer, err) - return err + return peer, nil +} + +func (s *server) observeOutboundPeers() { + sleepDuration := 1 * time.Minute + + for { + select { + case <-s.ctx.Done(): // Exit if context was canceled + s.log.Info().Msg("[observeOutboundPeers] context canceled -> exit") + return + default: + s.ctxWg.Add(1) // Wait on shutdown + + freeSlots := s.outboundPeers.Space() + if freeSlots == 0 { + s.log.Debug().Msg("[observeOutboundPeers] nothing to do") + s.noWaitingSleep(sleepDuration) + continue + } + + s.log.Info().Msgf("try connect with a new peer. Free slots: %d", freeSlots) + _ = s.connectToRandomAddr() // Connect one-by-one to gracefully handle shutdown + + s.ctxWg.Done() + } + } +} + +func (s *server) connectToRandomAddr() error { + addr := s.addresses.GetRandFreeAddr() + if addr == nil { + s.log.Warn().Msgf("no free addresses to connect") + return errors.New("no free addresses to connect") } - return nil + return s.connectToAddr(addr.IP, addr.Port) +} + +func (s *server) observeInboundPeers() { + for { + conn, err := s.listener.Accept() + if err != nil { + s.log.Error().Msgf("error accepting connection, reason: %v", err) + return + } + + if s.inboundPeers.Space() == 0 { + continue + } + + peer, err := s.connectPeer(conn, true) + if err != nil { + continue + } + + _ = s.inboundPeers.AddPeer(peer) + s.addresses.MarkUsedAddr(peer.GetPeerAddr()) + } +} + +// usage MUST be preceded by `s.ctx.Wg.Add(1)`. +func (s *server) noWaitingSleep(duration time.Duration) { + s.ctxWg.Done() // We are sleeping -> no need to wait + time.Sleep(duration) + s.ctxWg.Add(1) // Wake up. Wait for us +} + +// AddAddrs adds addresses to the address book of the P2P server. It's peer.Manager functionality. +func (s *server) AddAddrs(address []*wire.NetAddress) { + s.addresses.UpsertAddrs(address) +} + +// SignalError signals an error with a peer and takes appropriate actions such as banning the peer and disconnecting it. It's peer.Manager functionality. +func (s *server) SignalError(p *peer.Peer, err error) { + // Handle error and decide what to do with the peer + + s.log.Info().Str("peer", p.String()). + Bool("inbound", p.Inbound()). + Msgf("peer banned, reason: %v", err) + + s.addresses.BanAddr(p.GetPeerAddr()) + + // Disconnection here must be non-blocking to prevent deadlock within the peer logic. + go func() { + p.Disconnect() + s.outboundPeers.RmPeer(p) + s.inboundPeers.RmPeer(p) + }() +} + +// SignalSyncFinished signals that the chain synchronization process has finished. It's peer.Manager functionality. +func (s *server) SignalSyncFinished() { + s.chainSyncMutex.Lock() + defer s.chainSyncMutex.Unlock() + + if !s.chainSyncFinished { + // add new peers to pool + go s.observeOutboundPeers() + } + + s.chainSyncFinished = true } diff --git a/internal/wire/network.go b/internal/wire/network.go new file mode 100644 index 00000000..8189cd1d --- /dev/null +++ b/internal/wire/network.go @@ -0,0 +1,234 @@ +// Copyright (c) 2013-2014 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package wire + +import ( + "net" +) + +var ( + // rfc1918Nets specifies the IPv4 private address blocks as defined by + // by RFC1918 (10.0.0.0/8, 172.16.0.0/12, and 192.168.0.0/16). + rfc1918Nets = []net.IPNet{ + ipNet("10.0.0.0", 8, 32), + ipNet("172.16.0.0", 12, 32), + ipNet("192.168.0.0", 16, 32), + } + + // rfc2544Net specifies the the IPv4 block as defined by RFC2544 + // (198.18.0.0/15). + rfc2544Net = ipNet("198.18.0.0", 15, 32) + + // rfc3849Net specifies the IPv6 documentation address block as defined + // by RFC3849 (2001:DB8::/32). + rfc3849Net = ipNet("2001:DB8::", 32, 128) + + // rfc3927Net specifies the IPv4 auto configuration address block as + // defined by RFC3927 (169.254.0.0/16). + rfc3927Net = ipNet("169.254.0.0", 16, 32) + + // rfc3964Net specifies the IPv6 to IPv4 encapsulation address block as + // defined by RFC3964 (2002::/16). + rfc3964Net = ipNet("2002::", 16, 128) + + // rfc4193Net specifies the IPv6 unique local address block as defined + // by RFC4193 (FC00::/7). + rfc4193Net = ipNet("FC00::", 7, 128) + + // rfc4380Net specifies the IPv6 teredo tunneling over UDP address block + // as defined by RFC4380 (2001::/32). + rfc4380Net = ipNet("2001::", 32, 128) + + // rfc4843Net specifies the IPv6 ORCHID address block as defined by + // RFC4843 (2001:10::/28). + rfc4843Net = ipNet("2001:10::", 28, 128) + + // rfc4862Net specifies the IPv6 stateless address autoconfiguration + // address block as defined by RFC4862 (FE80::/64). + rfc4862Net = ipNet("FE80::", 64, 128) + + // rfc5737Net specifies the IPv4 documentation address blocks as defined + // by RFC5737 (192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24). + rfc5737Net = []net.IPNet{ + ipNet("192.0.2.0", 24, 32), + ipNet("198.51.100.0", 24, 32), + ipNet("203.0.113.0", 24, 32), + } + + // rfc6052Net specifies the IPv6 well-known prefix address block as + // defined by RFC6052 (64:FF9B::/96). + rfc6052Net = ipNet("64:FF9B::", 96, 128) + + // rfc6145Net specifies the IPv6 to IPv4 translated address range as + // defined by RFC6145 (::FFFF:0:0:0/96). + rfc6145Net = ipNet("::FFFF:0:0:0", 96, 128) + + // rfc6598Net specifies the IPv4 block as defined by RFC6598 (100.64.0.0/10). + rfc6598Net = ipNet("100.64.0.0", 10, 32) + + // onionCatNet defines the IPv6 address block used to support Tor. + // bitcoind encodes a .onion address as a 16 byte number by decoding the + // address prior to the .onion (i.e. the key hash) base32 into a ten + // byte number. It then stores the first 6 bytes of the address as + // 0xfd, 0x87, 0xd8, 0x7e, 0xeb, 0x43. + // + // This is the same range used by OnionCat, which is part part of the + // RFC4193 unique local IPv6 range. + // + // In summary the format is: + // { magic 6 bytes, 10 bytes base32 decode of key hash }. + onionCatNet = ipNet("fd87:d87e:eb43::", 48, 128) + + // zero4Net defines the IPv4 address block for address staring with 0 + // (0.0.0.0/8). + zero4Net = ipNet("0.0.0.0", 8, 32) +) + +// ipNet returns a net.IPNet struct given the passed IP address string, number +// of one bits to include at the start of the mask, and the total number of bits +// for the mask. +func ipNet(ip string, ones, bits int) net.IPNet { + return net.IPNet{IP: net.ParseIP(ip), Mask: net.CIDRMask(ones, bits)} +} + +// IsIPv4 returns whether or not the given address is an IPv4 address. +func IsIPv4(na *NetAddress) bool { + return na.IP.To4() != nil +} + +// IsLocal returns whether or not the given address is a local address. +func IsLocal(na *NetAddress) bool { + return na.IP.IsLoopback() || zero4Net.Contains(na.IP) +} + +// IsOnionCatTor returns whether or not the passed address is in the IPv6 range +// used by bitcoin to support Tor (fd87:d87e:eb43::/48). Note that this range +// is the same range used by OnionCat, which is part of the RFC4193 unique local +// IPv6 range. +func IsOnionCatTor(na *NetAddress) bool { + return onionCatNet.Contains(na.IP) +} + +// IsRFC1918 returns whether or not the passed address is part of the IPv4 +// private network address space as defined by RFC1918 (10.0.0.0/8, +// 172.16.0.0/12, or 192.168.0.0/16). +func IsRFC1918(na *NetAddress) bool { + for _, rfc := range rfc1918Nets { + if rfc.Contains(na.IP) { + return true + } + } + return false +} + +// IsRFC2544 returns whether or not the passed address is part of the IPv4 +// address space as defined by RFC2544 (198.18.0.0/15). +func IsRFC2544(na *NetAddress) bool { + return rfc2544Net.Contains(na.IP) +} + +// IsRFC3849 returns whether or not the passed address is part of the IPv6 +// documentation range as defined by RFC3849 (2001:DB8::/32). +func IsRFC3849(na *NetAddress) bool { + return rfc3849Net.Contains(na.IP) +} + +// IsRFC3927 returns whether or not the passed address is part of the IPv4 +// autoconfiguration range as defined by RFC3927 (169.254.0.0/16). +func IsRFC3927(na *NetAddress) bool { + return rfc3927Net.Contains(na.IP) +} + +// IsRFC3964 returns whether or not the passed address is part of the IPv6 to +// IPv4 encapsulation range as defined by RFC3964 (2002::/16). +func IsRFC3964(na *NetAddress) bool { + return rfc3964Net.Contains(na.IP) +} + +// IsRFC4193 returns whether or not the passed address is part of the IPv6 +// unique local range as defined by RFC4193 (FC00::/7). +func IsRFC4193(na *NetAddress) bool { + return rfc4193Net.Contains(na.IP) +} + +// IsRFC4380 returns whether or not the passed address is part of the IPv6 +// teredo tunneling over UDP range as defined by RFC4380 (2001::/32). +func IsRFC4380(na *NetAddress) bool { + return rfc4380Net.Contains(na.IP) +} + +// IsRFC4843 returns whether or not the passed address is part of the IPv6 +// ORCHID range as defined by RFC4843 (2001:10::/28). +func IsRFC4843(na *NetAddress) bool { + return rfc4843Net.Contains(na.IP) +} + +// IsRFC4862 returns whether or not the passed address is part of the IPv6 +// stateless address autoconfiguration range as defined by RFC4862 (FE80::/64). +func IsRFC4862(na *NetAddress) bool { + return rfc4862Net.Contains(na.IP) +} + +// IsRFC5737 returns whether or not the passed address is part of the IPv4 +// documentation address space as defined by RFC5737 (192.0.2.0/24, +// 198.51.100.0/24, 203.0.113.0/24). +func IsRFC5737(na *NetAddress) bool { + for _, rfc := range rfc5737Net { + if rfc.Contains(na.IP) { + return true + } + } + + return false +} + +// IsRFC6052 returns whether or not the passed address is part of the IPv6 +// well-known prefix range as defined by RFC6052 (64:FF9B::/96). +func IsRFC6052(na *NetAddress) bool { + return rfc6052Net.Contains(na.IP) +} + +// IsRFC6145 returns whether or not the passed address is part of the IPv6 to +// IPv4 translated address range as defined by RFC6145 (::FFFF:0:0:0/96). +func IsRFC6145(na *NetAddress) bool { + return rfc6145Net.Contains(na.IP) +} + +// IsRFC6598 returns whether or not the passed address is part of the IPv4 +// shared address space specified by RFC6598 (100.64.0.0/10). +func IsRFC6598(na *NetAddress) bool { + return rfc6598Net.Contains(na.IP) +} + +// IsValid returns whether or not the passed address is valid. The address is +// considered invalid under the following circumstances: +// IPv4: It is either a zero or all bits set address. +// IPv6: It is either a zero or RFC3849 documentation address. +func IsValid(na *NetAddress) bool { + // IsUnspecified returns if address is 0, so only all bits set, and + // RFC3849 need to be explicitly checked. + return na.IP != nil && !(na.IP.IsUnspecified() || + na.IP.Equal(net.IPv4bcast)) +} + +// IsRoutable returns whether or not the passed address is routable over +// the public internet. This is true as long as the address is valid and is not +// in any reserved ranges. +func IsRoutable(na *NetAddress) bool { + return IsValid(na) && !(IsRFC1918(na) || IsRFC2544(na) || + IsRFC3927(na) || IsRFC4862(na) || IsRFC3849(na) || + IsRFC4843(na) || IsRFC5737(na) || IsRFC6598(na) || + IsLocal(na) || (IsRFC4193(na) && !IsOnionCatTor(na))) +} + +// IsRoutableWithLocal returns whether or not the passed address is routable over +// the public internet. This is true as long as the address is valid and is not +// in any reserved ranges. +func IsRoutableWithLocal(na *NetAddress) bool { + return IsValid(na) && !(IsRFC1918(na) || IsRFC2544(na) || + IsRFC3927(na) || IsRFC4862(na) || IsRFC3849(na) || + IsRFC4843(na) || IsRFC5737(na) || IsRFC6598(na) || + (IsRFC4193(na) && !IsOnionCatTor(na))) +} diff --git a/transports/p2p/connmgr/seed.go b/transports/p2p/connmgr/seed.go index 76d0d3bf..0cafedde 100644 --- a/transports/p2p/connmgr/seed.go +++ b/transports/p2p/connmgr/seed.go @@ -8,7 +8,7 @@ import ( "fmt" mrand "math/rand" "net" - "strconv" + "time" "github.com/rs/zerolog" @@ -60,7 +60,7 @@ func SeedFromDNS(chainParams *chaincfg.Params, reqServices wire.ServiceFlag, } addresses := make([]*wire.NetAddress, len(seedpeers)) // if this errors then we have *real* problems - intPort, _ := strconv.Atoi(chainParams.DefaultPort) + intPort := chainParams.DefaultPort for i, peer := range seedpeers { addresses[i] = wire.NewNetAddressTimestamp( // bitcoind seeds with addresses from @@ -68,7 +68,7 @@ func SeedFromDNS(chainParams *chaincfg.Params, reqServices wire.ServiceFlag, // and 7 days ago. time.Now().Add(-1*time.Second*time.Duration(secondsIn3Days+ randSource.Int31n(secondsIn4Days))), //nolint:gosec - 0, peer, uint16(intPort)) + 0, peer, intPort) } seedFn(addresses) diff --git a/transports/p2p/p2putil/addresses.go b/transports/p2p/p2putil/addresses.go index a137f7f9..7cc82e41 100644 --- a/transports/p2p/p2putil/addresses.go +++ b/transports/p2p/p2putil/addresses.go @@ -53,7 +53,7 @@ func NewAddressFunc(getAddressFn GetAddressFn, outboundGroupCount OutboundGroupC } // allow nondefault ports after 50 failed tries. - if tries < 50 && fmt.Sprintf("%d", addr.NetAddress().Port) != + if tries < 50 && addr.NetAddress().Port != config.ActiveNetParams.DefaultPort { continue } diff --git a/transports/p2p/p2putil/listeners.go b/transports/p2p/p2putil/listeners.go index acac36b7..5cadeddd 100644 --- a/transports/p2p/p2putil/listeners.go +++ b/transports/p2p/p2putil/listeners.go @@ -15,7 +15,7 @@ import ( // which is non-nil if UPnP is in use. func InitListeners(log *zerolog.Logger) ([]net.Listener, error) { listenAddrs := []string{ - net.JoinHostPort("", config.ActiveNetParams.DefaultPort), + net.JoinHostPort("", fmt.Sprintf("%d", config.ActiveNetParams.DefaultPort)), } // Listen for TCP connections at the configured addresses diff --git a/transports/p2p/server.go b/transports/p2p/server.go index db2e6587..612d33ef 100644 --- a/transports/p2p/server.go +++ b/transports/p2p/server.go @@ -9,7 +9,6 @@ import ( "errors" "fmt" "net" - "strconv" "sync" "sync/atomic" "time" @@ -732,12 +731,11 @@ func (s *server) Stop() { // Shutdown gracefully shuts down the server by stopping and disconnecting all // peers and the main listener and waits for server to stop. -func (s *server) Shutdown() error { +func (s *server) Shutdown() { s.log.Info().Msg("Gracefully shutting down the P2P server...") s.Stop() s.WaitForShutdown() s.log.Info().Msg("P2P Server shutdown complete") - return nil } // WaitForShutdown blocks until the main listener and peer handlers are stopped. @@ -749,7 +747,7 @@ func (s *server) upnpUpdateThread() { // Go off immediately to prevent code duplication, thereafter we renew // lease every 15 minutes. timer := time.NewTimer(0 * time.Second) - lport, _ := strconv.ParseInt(config.ActiveNetParams.DefaultPort, 10, 16) + lport := config.ActiveNetParams.DefaultPort first := true out: for {