Skip to content

feat(SPV-742): add peer manager #247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
8c1d575
feat(SPV-742): improve peer logs
arkadiuszos4chain Apr 26, 2024
c222d04
feat(SPV-742): make server responsible for peers
arkadiuszos4chain Apr 26, 2024
d8f6b67
feat(SPV-742): make multiple outbound connection
arkadiuszos4chain Apr 26, 2024
3118c13
feat(SPV-742): add address book and store active peer addresses there
arkadiuszos4chain Apr 26, 2024
c2a5e82
feat(SPV-742): ask peers for known addresses on connection
arkadiuszos4chain Apr 26, 2024
6d495d1
feat(SPV-742): ban peer/addresse on error (naive 24h ban)
arkadiuszos4chain Apr 26, 2024
45919ea
feat(SPV-742): observe active outbound connections and keep them at d…
arkadiuszos4chain Apr 26, 2024
2fee765
feat(SPV-742): handle incoming connections
arkadiuszos4chain Apr 29, 2024
8f11329
feat(SPV-742): accept peers from local network
arkadiuszos4chain Apr 29, 2024
a2e3723
feat(SPV-742): fix address book
arkadiuszos4chain Apr 30, 2024
18c04d8
feat(SPV-742): add unit tests
arkadiuszos4chain Apr 30, 2024
adb4dec
feat(SPV-742): chnage DefaultPort type to uint16
arkadiuszos4chain Apr 30, 2024
9b214c4
feat(SPV-742): fix linter errors
arkadiuszos4chain Apr 30, 2024
5f5d06f
feat(SPV-742): fix linter errors
arkadiuszos4chain Apr 30, 2024
c2ec406
feat(SPV-742): fix linter errors
arkadiuszos4chain Apr 30, 2024
41c272a
feat(SPV-742): fix linter errors
arkadiuszos4chain May 1, 2024
7bab33f
feat(SPV-742): fix linter errors
arkadiuszos4chain May 1, 2024
4be1d5e
feat(SPV-742): adjust to review; add comments; minor server refactori…
arkadiuszos4chain May 2, 2024
d53cddc
feat(SPV-742): remove comments
arkadiuszos4chain May 2, 2024
a2b499c
feat(SPV-742): remove comments
arkadiuszos4chain May 2, 2024
66cc4e1
feat(SPV-742): remove comments
arkadiuszos4chain May 2, 2024
dc5d944
feat(SPV-742): fix error handling on server start
arkadiuszos4chain May 2, 2024
279a15c
feat(SPV-742): fix deadlock when disconnect peer
arkadiuszos4chain May 2, 2024
a560d91
feat(SPV-742): revert withCancelHandle() for clarity
arkadiuszos4chain May 2, 2024
0439f2f
feat(SPV-742): fix linter
arkadiuszos4chain May 2, 2024
be18961
feat(SPV-742): adjust to some comments
arkadiuszos4chain May 8, 2024
ff09468
feat(SPV-742): adjust to some comments
arkadiuszos4chain May 8, 2024
4a7dea9
feat(SPV-742): adjust comments- replace log helper funcitons with new…
arkadiuszos4chain May 8, 2024
558038c
feat(SPV-742): adjust comments- remove unnecessary peer address string
arkadiuszos4chain May 8, 2024
bb1004c
feat(SPV-742): adjust comments- improve peer collection implementation
arkadiuszos4chain May 10, 2024
25f2fbb
feat(SPV-742): adjust comments- move network to wire package; reimple…
arkadiuszos4chain May 10, 2024
c28cbec
feat(SPV-742): fix linter
arkadiuszos4chain May 10, 2024
8089afc
feat(SPV-742): adjust comments- filter seed address
arkadiuszos4chain May 10, 2024
0b0acf0
feat(SPV-742): adjust comments- use only ONE peer to sync with chain,…
arkadiuszos4chain May 10, 2024
d0f50bb
feat(SPV-742): fix linter
arkadiuszos4chain May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var version = "development"

type P2PServer interface {
Start() error
Shutdown() error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it a good practice to stay with error return when shutting down? Probably we can't do anything with that, but at least we have a message and we can possibly kill it manually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see that neither implementation returns any error - so... why stay here with an error?

Shutdown()
}

// nolint: godot
Expand Down Expand Up @@ -172,9 +172,7 @@ func main() {

<-quit

if err := p2pServer.Shutdown(); err != nil {
log.Error().Msgf("failed to stop p2p server: %v", err)
}
p2pServer.Shutdown()

if err := ws.Shutdown(); err != nil {
log.Error().Msgf("failed to stop websocket server: %v", err)
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ type P2PConfig struct {
UserAgentName string `mapstructure:"user_agent_name" description:"The name that should be used during announcement of the client on the p2p network"`
UserAgentVersion string `mapstructure:"user_agent_version" description:"By default will be equal to application version, but can be overridden for development purposes"`
Experimental bool `mapstructure:"experimental" description:"Turns on a new (highly experimental) way of getting headers with the usage of /internal/transports/p2p instead of /transports/p2p"`

MaxOutboundConnections uint `mapstructure:"max_outbound_connections" description:"Maximum active outbound connections"`
MaxInboundConnections uint `mapstructure:"max_inbound_connections" description:"Maximum active inbound connections"`
AcceptLocalPeers bool `mapstructure:"accept_local_peers" description:"Accept connection from local network"`
}

// LoggingConfig represents a logging config.
Expand Down
3 changes: 3 additions & 0 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func getP2PDefaults() *P2PConfig {
UserAgentName: ApplicationName,
UserAgentVersion: Version(),
Experimental: false,
MaxOutboundConnections: 8,
MaxInboundConnections: 8,
AcceptLocalPeers: false,
}
}

Expand Down
10 changes: 5 additions & 5 deletions internal/chaincfg/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type Params struct {
Net wire.BitcoinNet

// DefaultPort defines the default peer-to-peer port for the network.
DefaultPort string
DefaultPort uint16

// DNSSeeds defines a list of DNS seeds for the network that are used
// as one method to discover peers.
Expand Down Expand Up @@ -222,7 +222,7 @@ type Params struct {
var MainNetParams = Params{
Name: "mainnet",
Net: wire.MainNet,
DefaultPort: "8333",
DefaultPort: uint16(8333),
DNSSeeds: []DNSSeed{
{"seed-nodes.bsvb.tech", true},
},
Expand Down Expand Up @@ -297,7 +297,7 @@ var MainNetParams = Params{
var RegressionNetParams = Params{
Name: "regtest",
Net: wire.TestNet,
DefaultPort: "18444",
DefaultPort: uint16(18444),
DNSSeeds: []DNSSeed{},

// Chain parameters
Expand Down Expand Up @@ -365,7 +365,7 @@ var RegressionNetParams = Params{
var TestNet3Params = Params{
Name: "testnet3",
Net: wire.TestNet3,
DefaultPort: "18333",
DefaultPort: uint16(18333),
DNSSeeds: []DNSSeed{
{"testnet-seed.metasv.io", true},
{"testnet-btccash-seeder.bitcoinunlimited.info", true},
Expand Down Expand Up @@ -452,7 +452,7 @@ var TestNet3Params = Params{
var SimNetParams = Params{
Name: "simnet",
Net: wire.SimNet,
DefaultPort: "18555",
DefaultPort: uint16(18555),
DNSSeeds: []DNSSeed{}, // NOTE: There must NOT be any seeds.

// Chain parameters
Expand Down
27 changes: 0 additions & 27 deletions internal/transports/p2p/addr.go

This file was deleted.

82 changes: 0 additions & 82 deletions internal/transports/p2p/addr_test.go

This file was deleted.

165 changes: 165 additions & 0 deletions internal/transports/p2p/network/address_book.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package network

import (
"fmt"
"math/rand"
"sync"
"time"

"github.com/bitcoin-sv/block-headers-service/internal/wire"
)

type addressBucketType string

const (
freeBucket addressBucketType = "free"
usedBucket addressBucketType = "used"
bannedBucket addressBucketType = "banned"
)

// AddressBook represents a collection of known network addresses.
type AddressBook struct {
banDuration time.Duration
addrs map[addressBucketType]*addrBucket
mu sync.Mutex
addrFitlerFn func(*wire.NetAddress) bool
}

// NewAddressBook creates and initializes a new AddressBook instance.
func NewAddressBook(banDuration time.Duration, acceptLocalAddresses bool) *AddressBook {
// Set the address filter function based on whether local addresses are accepted
addrFilterFn := wire.IsRoutable
if acceptLocalAddresses {
addrFilterFn = wire.IsRoutableWithLocal
}

const addressesInitCapacity = 500
const usedAddressesInitCapacity = 8

knownAddress := make(map[addressBucketType]*addrBucket, 3)
knownAddress[freeBucket] = newAddrBucket(addressesInitCapacity)
knownAddress[bannedBucket] = newAddrBucket(addressesInitCapacity)
knownAddress[usedBucket] = newAddrBucket(usedAddressesInitCapacity)

return &AddressBook{
banDuration: banDuration,
addrFitlerFn: addrFilterFn,
addrs: knownAddress,
}
}

// UpsertAddrs updates or adds multiple addresses.
func (a *AddressBook) UpsertAddrs(address []*wire.NetAddress) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (a *AddressBook) UpsertAddrs(address []*wire.NetAddress) {
func (a *AddressBook) AddAll(address []*wire.NetAddress) {

a.mu.Lock()
defer a.mu.Unlock()

for _, addr := range address {
if !a.addrFitlerFn(addr) {
continue
}

key, ka, _ := a.findAddr(addr)
// If the address is not found, add it to the AddressBook.
if ka == nil {
a.addrs[freeBucket].add(key, &knownAddress{addr: addr})
} else if addr.Timestamp.After(ka.addr.Timestamp) {
// Otherwise, update the timestamp if the new one is newer.
ka.addr.Timestamp = addr.Timestamp
}
}
}

// MarkUsedAddr updates or adds a peer's address.
func (a *AddressBook) MarkUsedAddr(pa *wire.NetAddress) {
a.mu.Lock()
defer a.mu.Unlock()

key := addrKey(pa)
// remove from free if exists
a.addrs[freeBucket].rm(key)
// add to used
a.addrs[usedBucket].add(key, &knownAddress{addr: pa})

}

// BanAddr bans a network address. Ignores address if doesn't exist in the AddressBook.
func (a *AddressBook) BanAddr(addr *wire.NetAddress) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (a *AddressBook) BanAddr(addr *wire.NetAddress) {
func (a *AddressBook) Ban(addr *wire.NetAddress) {

a.mu.Lock()
defer a.mu.Unlock()

if key, ka, bucket := a.findAddr(addr); ka != nil {
switch bucket {
case freeBucket:
a.ban(bucket, key, ka)
case usedBucket:
a.ban(bucket, key, ka)
case bannedBucket:
default:
// Do nothing
}
}
}

// GetRandFreeAddr returns a randomly chosen unused network address.
func (a *AddressBook) GetRandFreeAddr() *wire.NetAddress {
a.mu.Lock()
defer a.mu.Unlock()

freeAddres := a.addrs[freeBucket].items
fLen := len(freeAddres)
if fLen == 0 {
return nil
}

// #nosec G404
randIndx := rand.Intn(fLen)
return freeAddres[randIndx].addr
}

func (a *AddressBook) findAddr(addr *wire.NetAddress) (key string, ka *knownAddress, bucket addressBucketType) {
key = addrKey(addr)

// search in free addresses
if ka = a.addrs[freeBucket].find(key); ka != nil {
bucket = freeBucket
return
}

// search in used
if ka = a.addrs[usedBucket].find(key); ka != nil {
bucket = usedBucket
return
}

// search in banned
if ka = a.addrs[bannedBucket].find(key); ka != nil {
bucket = bannedBucket
return
}

return key, nil, ""
}

func (a *AddressBook) ban(bucket addressBucketType, key string, ka *knownAddress) {
a.addrs[bucket].rm(key)
a.addrs[bannedBucket].add(key, ka)
go a.unban(key, ka)
}

func (a *AddressBook) unban(key string, ka *knownAddress) {
time.Sleep(a.banDuration)

a.mu.Lock()
defer a.mu.Unlock()

a.addrs[bannedBucket].rm(key)
a.addrs[freeBucket].add(key, ka)
}

func addrKey(addr *wire.NetAddress) string {
return fmt.Sprintf("%s:%d", addr.IP, addr.Port)
}

type knownAddress struct {
addr *wire.NetAddress
}
Loading
Loading