Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 17 additions & 162 deletions waku/v2/node/localnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,173 +6,23 @@ import (
"net"
"strconv"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/event"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"go.uber.org/zap"

ndoeutils "github.com/waku-org/go-waku/waku/v2/node/utils"
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags wenr.WakuEnrBitfield, advertiseAddr []ma.Multiaddr, shouldAutoUpdate bool) error {
var options []wenr.ENROption
options = append(options, wenr.WithUDPPort(udpPort))
options = append(options, wenr.WithWakuBitfield(wakuFlags))

// Reset ENR fields
wenr.DeleteField(localnode, wenr.MultiaddrENRField)
wenr.DeleteField(localnode, enr.TCP(0).ENRKey())
wenr.DeleteField(localnode, enr.IPv4{}.ENRKey())
wenr.DeleteField(localnode, enr.IPv6{}.ENRKey())

if advertiseAddr != nil {
// An advertised address disables libp2p address updates
// and discv5 predictions
ipAddr, err := selectMostExternalAddress(advertiseAddr)
if err != nil {
return err
}

options = append(options, wenr.WithIP(ipAddr))
} else if !shouldAutoUpdate {
// We received a libp2p address update. Autoupdate is disabled
// Using a static ip will disable endpoint prediction.
options = append(options, wenr.WithIP(ipAddr))
} else {
if ipAddr.Port != 0 {
// We received a libp2p address update, but we should still
// allow discv5 to update the enr record. We set the localnode
// keys manually. It's possible that the ENR record might get
// updated automatically
ip4 := ipAddr.IP.To4()
ip6 := ipAddr.IP.To16()
if ip4 != nil && !ip4.IsUnspecified() {
localnode.SetFallbackIP(ip4)
localnode.Set(enr.IPv4(ip4))
localnode.Set(enr.TCP(uint16(ipAddr.Port)))
} else {
localnode.Delete(enr.IPv4{})
localnode.Delete(enr.TCP(0))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
}

if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() {
localnode.Set(enr.IPv6(ip6))
localnode.Set(enr.TCP6(ipAddr.Port))
} else {
localnode.Delete(enr.IPv6{})
localnode.Delete(enr.TCP6(0))
}
}
}

// Writing the IP + Port has priority over writting the multiaddress which might fail or not
// depending on the enr having space
options = append(options, wenr.WithMultiaddress(multiaddrs...))

return wenr.Update(w.log, localnode, options...)
}

func isPrivate(addr *net.TCPAddr) bool {
return addr.IP.IsPrivate()
}

func isExternal(addr *net.TCPAddr) bool {
return !isPrivate(addr) && !addr.IP.IsLoopback() && !addr.IP.IsUnspecified()
}

func isLoopback(addr *net.TCPAddr) bool {
return addr.IP.IsLoopback()
}

func filterIP(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) {
for _, s := range ss {
if fn(s) {
ret = append(ret, s)
}
}
return
}

func extractIPAddressForENR(addr ma.Multiaddr) (*net.TCPAddr, error) {
// It's a p2p-circuit address. We shouldnt use these
// for building the ENR record default keys
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
if err == nil {
return nil, errors.New("can't use IP address from a p2p-circuit address")
}

// ws and wss addresses are handled by the multiaddr key
// they shouldnt be used for building the ENR record default keys
_, err = addr.ValueForProtocol(ma.P_WS)
if err == nil {
return nil, errors.New("can't use IP address from a ws address")
}
_, err = addr.ValueForProtocol(ma.P_WSS)
if err == nil {
return nil, errors.New("can't use IP address from a wss address")
}

var ipStr string
dns4, err := addr.ValueForProtocol(ma.P_DNS4)
if err != nil {
ipStr, err = addr.ValueForProtocol(ma.P_IP4)
if err != nil {
return nil, err
}
} else {
netIP, err := net.ResolveIPAddr("ip4", dns4)
if err != nil {
return nil, err
}
ipStr = netIP.String()
}

portStr, err := addr.ValueForProtocol(ma.P_TCP)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
return &net.TCPAddr{
IP: net.ParseIP(ipStr),
Port: port,
}, nil
}

func selectMostExternalAddress(addresses []ma.Multiaddr) (*net.TCPAddr, error) {
var ipAddrs []*net.TCPAddr
for _, addr := range addresses {
ipAddr, err := extractIPAddressForENR(addr)
if err != nil {
continue
}
ipAddrs = append(ipAddrs, ipAddr)
}

externalIPs := filterIP(ipAddrs, isExternal)
if len(externalIPs) > 0 {
return externalIPs[0], nil
}

privateIPs := filterIP(ipAddrs, isPrivate)
if len(privateIPs) > 0 {
return privateIPs[0], nil
}

loopback := filterIP(ipAddrs, isLoopback)
if len(loopback) > 0 {
return loopback[0], nil
}

return nil, errors.New("could not obtain ip address")
func (w *WakuNode) updateLocalNode() error {
w.localNodeMutex.Lock()
defer w.localNodeMutex.Unlock()
return enr.UpdateLocalNode(w.log, w.localNode, &w.localNodeParams)
}

func decapsulateP2P(addr ma.Multiaddr) (ma.Multiaddr, error) {
Expand Down Expand Up @@ -282,7 +132,7 @@ func filter0Port(addresses []ma.Multiaddr) ([]ma.Multiaddr, error) {
}

func (w *WakuNode) getENRAddresses(ctx context.Context, addrs []ma.Multiaddr) (extAddr *net.TCPAddr, multiaddr []ma.Multiaddr, err error) {
extAddr, err = selectMostExternalAddress(addrs)
extAddr, err = ndoeutils.SelectMostExternalAddress(addrs)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -320,7 +170,10 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
return err
}

err = w.updateLocalNode(w.localNode, multiaddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddrs, w.opts.discV5autoUpdate)
w.localNodeParams.Multiaddrs = multiaddresses
w.localNodeParams.IPAddr = ipAddr

err = w.updateLocalNode()
if err != nil {
w.log.Error("updating localnode ENR record", zap.Error(err))
return err
Expand All @@ -340,7 +193,8 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
}

func (w *WakuNode) SetRelayShards(rs protocol.RelayShards) error {
err := wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs))
w.localNodeParams.RelayShards = rs
err := w.updateLocalNode()
if err != nil {
return err
}
Expand Down Expand Up @@ -396,7 +250,8 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0]))
}

err = wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs[0]))
w.localNodeParams.RelayShards = rs[0]
err = w.updateLocalNode()
if err != nil {
w.log.Warn("could not set ENR shard info", zap.Error(err))
continue
Expand Down
106 changes: 106 additions & 0 deletions waku/v2/node/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package utils

import (
"errors"
"net"
"strconv"

"github.com/multiformats/go-multiaddr"
)

func ExtractIPAddressForENR(addr multiaddr.Multiaddr) (*net.TCPAddr, error) {
// It's a p2p-circuit address. We shouldnt use these
// for building the ENR record default keys
_, err := addr.ValueForProtocol(multiaddr.P_CIRCUIT)
if err == nil {
return nil, errors.New("can't use IP address from a p2p-circuit address")
}

// ws and wss addresses are handled by the multiaddr key
// they shouldnt be used for building the ENR record default keys
_, err = addr.ValueForProtocol(multiaddr.P_WS)
if err == nil {
return nil, errors.New("can't use IP address from a ws address")
}
_, err = addr.ValueForProtocol(multiaddr.P_WSS)
if err == nil {
return nil, errors.New("can't use IP address from a wss address")
}

var ipStr string
dns4, err := addr.ValueForProtocol(multiaddr.P_DNS4)
if err != nil {
ipStr, err = addr.ValueForProtocol(multiaddr.P_IP4)
if err != nil {
return nil, err
}
} else {
netIP, err := net.ResolveIPAddr("ip4", dns4)
if err != nil {
return nil, err
}
ipStr = netIP.String()
}

portStr, err := addr.ValueForProtocol(multiaddr.P_TCP)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
return &net.TCPAddr{
IP: net.ParseIP(ipStr),
Port: port,
}, nil
}

func SelectMostExternalAddress(addresses []multiaddr.Multiaddr) (*net.TCPAddr, error) {
var ipAddrs []*net.TCPAddr
for _, addr := range addresses {
ipAddr, err := ExtractIPAddressForENR(addr)
if err != nil {
continue
}
ipAddrs = append(ipAddrs, ipAddr)
}

externalIPs := filterIP(ipAddrs, isExternal)
if len(externalIPs) > 0 {
return externalIPs[0], nil
}

privateIPs := filterIP(ipAddrs, isPrivate)
if len(privateIPs) > 0 {
return privateIPs[0], nil
}

loopback := filterIP(ipAddrs, isLoopback)
if len(loopback) > 0 {
return loopback[0], nil
}

return nil, errors.New("could not obtain ip address")
}

func isPrivate(addr *net.TCPAddr) bool {
return addr.IP.IsPrivate()
}

func isExternal(addr *net.TCPAddr) bool {
return !isPrivate(addr) && !addr.IP.IsLoopback() && !addr.IP.IsUnspecified()
}

func isLoopback(addr *net.TCPAddr) bool {
return addr.IP.IsLoopback()
}

func filterIP(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) {
for _, s := range ss {
if fn(s) {
ret = append(ret, s)
}
}
return
}
20 changes: 18 additions & 2 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,20 @@ type WakuNode struct {
store *store.WakuStore
rlnRelay RLNRelay

wakuFlag enr.WakuEnrBitfield
circuitRelayNodes chan peer.AddrInfo

localNode *enode.LocalNode

// localNodeParams are ENR parameters that will be applied to the localnode.
localNodeParams enr.LocalNodeParams

// LocalNode.Set is a lazy operation that only stores the entry, but does not sign the record.
// But the size of the record is only checked during `sign`, and if it's >300 bytes, it will panic.
// In WithMultiaddress we attempt to write as much addresses as possible, relying on existing local node entries.
// On the other hand, enr.WithWakuRelaySharding is called in a goroutine, so ther is a race condition.
// To make it work properly, we should make sure that entries are not added during WithMultiaddress run.
localNodeMutex *sync.Mutex

bcaster relay.Broadcaster

connectionNotif ConnectionNotifier
Expand Down Expand Up @@ -193,11 +202,18 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.opts = params
w.log = params.logger.Named("node2")
w.wg = &sync.WaitGroup{}
w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay)
w.circuitRelayNodes = make(chan peer.AddrInfo)
w.metrics = newMetrics(params.prometheusReg)
w.metrics.RecordVersion(Version, GitCommit)

w.localNodeMutex = &sync.Mutex{}
w.localNodeParams = enr.LocalNodeParams{
WakuFlags: enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay),
UDPPort: w.opts.udpPort,
AdvertiseAddr: w.opts.advertiseAddrs,
ShouldAutoUpdate: w.opts.discV5autoUpdate,
}

// Setup peerstore wrapper
if params.peerstore != nil {
w.peerstore = wps.NewWakuPeerstore(params.peerstore)
Expand Down
Loading
Loading