Skip to content

Commit 80c0b44

Browse files
committed
feat: single point localnode parametrization
1 parent 288f5c6 commit 80c0b44

File tree

5 files changed

+249
-218
lines changed

5 files changed

+249
-218
lines changed

waku/v2/node/localnode.go

Lines changed: 17 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -6,173 +6,23 @@ import (
66
"net"
77
"strconv"
88

9-
"github.com/ethereum/go-ethereum/p2p/enode"
10-
"github.com/ethereum/go-ethereum/p2p/enr"
119
"github.com/libp2p/go-libp2p/core/event"
1210
"github.com/multiformats/go-multiaddr"
1311
ma "github.com/multiformats/go-multiaddr"
1412
madns "github.com/multiformats/go-multiaddr-dns"
13+
"go.uber.org/zap"
14+
15+
ndoeutils "github.com/waku-org/go-waku/waku/v2/node/utils"
1516
"github.com/waku-org/go-waku/waku/v2/protocol"
16-
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
17+
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
1718
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
1819
"github.com/waku-org/go-waku/waku/v2/utils"
19-
"go.uber.org/zap"
2020
)
2121

22-
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags wenr.WakuEnrBitfield, advertiseAddr []ma.Multiaddr, shouldAutoUpdate bool) error {
23-
var options []wenr.ENROption
24-
options = append(options, wenr.WithUDPPort(udpPort))
25-
options = append(options, wenr.WithWakuBitfield(wakuFlags))
26-
27-
// Reset ENR fields
28-
wenr.DeleteField(localnode, wenr.MultiaddrENRField)
29-
wenr.DeleteField(localnode, enr.TCP(0).ENRKey())
30-
wenr.DeleteField(localnode, enr.IPv4{}.ENRKey())
31-
wenr.DeleteField(localnode, enr.IPv6{}.ENRKey())
32-
33-
if advertiseAddr != nil {
34-
// An advertised address disables libp2p address updates
35-
// and discv5 predictions
36-
ipAddr, err := selectMostExternalAddress(advertiseAddr)
37-
if err != nil {
38-
return err
39-
}
40-
41-
options = append(options, wenr.WithIP(ipAddr))
42-
} else if !shouldAutoUpdate {
43-
// We received a libp2p address update. Autoupdate is disabled
44-
// Using a static ip will disable endpoint prediction.
45-
options = append(options, wenr.WithIP(ipAddr))
46-
} else {
47-
if ipAddr.Port != 0 {
48-
// We received a libp2p address update, but we should still
49-
// allow discv5 to update the enr record. We set the localnode
50-
// keys manually. It's possible that the ENR record might get
51-
// updated automatically
52-
ip4 := ipAddr.IP.To4()
53-
ip6 := ipAddr.IP.To16()
54-
if ip4 != nil && !ip4.IsUnspecified() {
55-
localnode.SetFallbackIP(ip4)
56-
localnode.Set(enr.IPv4(ip4))
57-
localnode.Set(enr.TCP(uint16(ipAddr.Port)))
58-
} else {
59-
localnode.Delete(enr.IPv4{})
60-
localnode.Delete(enr.TCP(0))
61-
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
62-
}
63-
64-
if ip4 == nil && ip6 != nil && !ip6.IsUnspecified() {
65-
localnode.Set(enr.IPv6(ip6))
66-
localnode.Set(enr.TCP6(ipAddr.Port))
67-
} else {
68-
localnode.Delete(enr.IPv6{})
69-
localnode.Delete(enr.TCP6(0))
70-
}
71-
}
72-
}
73-
74-
// Writing the IP + Port has priority over writting the multiaddress which might fail or not
75-
// depending on the enr having space
76-
options = append(options, wenr.WithMultiaddress(multiaddrs...))
77-
78-
return wenr.Update(w.log, localnode, options...)
79-
}
80-
81-
func isPrivate(addr *net.TCPAddr) bool {
82-
return addr.IP.IsPrivate()
83-
}
84-
85-
func isExternal(addr *net.TCPAddr) bool {
86-
return !isPrivate(addr) && !addr.IP.IsLoopback() && !addr.IP.IsUnspecified()
87-
}
88-
89-
func isLoopback(addr *net.TCPAddr) bool {
90-
return addr.IP.IsLoopback()
91-
}
92-
93-
func filterIP(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) {
94-
for _, s := range ss {
95-
if fn(s) {
96-
ret = append(ret, s)
97-
}
98-
}
99-
return
100-
}
101-
102-
func extractIPAddressForENR(addr ma.Multiaddr) (*net.TCPAddr, error) {
103-
// It's a p2p-circuit address. We shouldnt use these
104-
// for building the ENR record default keys
105-
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
106-
if err == nil {
107-
return nil, errors.New("can't use IP address from a p2p-circuit address")
108-
}
109-
110-
// ws and wss addresses are handled by the multiaddr key
111-
// they shouldnt be used for building the ENR record default keys
112-
_, err = addr.ValueForProtocol(ma.P_WS)
113-
if err == nil {
114-
return nil, errors.New("can't use IP address from a ws address")
115-
}
116-
_, err = addr.ValueForProtocol(ma.P_WSS)
117-
if err == nil {
118-
return nil, errors.New("can't use IP address from a wss address")
119-
}
120-
121-
var ipStr string
122-
dns4, err := addr.ValueForProtocol(ma.P_DNS4)
123-
if err != nil {
124-
ipStr, err = addr.ValueForProtocol(ma.P_IP4)
125-
if err != nil {
126-
return nil, err
127-
}
128-
} else {
129-
netIP, err := net.ResolveIPAddr("ip4", dns4)
130-
if err != nil {
131-
return nil, err
132-
}
133-
ipStr = netIP.String()
134-
}
135-
136-
portStr, err := addr.ValueForProtocol(ma.P_TCP)
137-
if err != nil {
138-
return nil, err
139-
}
140-
port, err := strconv.Atoi(portStr)
141-
if err != nil {
142-
return nil, err
143-
}
144-
return &net.TCPAddr{
145-
IP: net.ParseIP(ipStr),
146-
Port: port,
147-
}, nil
148-
}
149-
150-
func selectMostExternalAddress(addresses []ma.Multiaddr) (*net.TCPAddr, error) {
151-
var ipAddrs []*net.TCPAddr
152-
for _, addr := range addresses {
153-
ipAddr, err := extractIPAddressForENR(addr)
154-
if err != nil {
155-
continue
156-
}
157-
ipAddrs = append(ipAddrs, ipAddr)
158-
}
159-
160-
externalIPs := filterIP(ipAddrs, isExternal)
161-
if len(externalIPs) > 0 {
162-
return externalIPs[0], nil
163-
}
164-
165-
privateIPs := filterIP(ipAddrs, isPrivate)
166-
if len(privateIPs) > 0 {
167-
return privateIPs[0], nil
168-
}
169-
170-
loopback := filterIP(ipAddrs, isLoopback)
171-
if len(loopback) > 0 {
172-
return loopback[0], nil
173-
}
174-
175-
return nil, errors.New("could not obtain ip address")
22+
func (w *WakuNode) updateLocalNode() error {
23+
w.localNodeMutex.Lock()
24+
defer w.localNodeMutex.Unlock()
25+
return enr.UpdateLocalNode(w.log, w.localNode, &w.localNodeParams)
17626
}
17727

17828
func decapsulateP2P(addr ma.Multiaddr) (ma.Multiaddr, error) {
@@ -282,7 +132,7 @@ func filter0Port(addresses []ma.Multiaddr) ([]ma.Multiaddr, error) {
282132
}
283133

284134
func (w *WakuNode) getENRAddresses(ctx context.Context, addrs []ma.Multiaddr) (extAddr *net.TCPAddr, multiaddr []ma.Multiaddr, err error) {
285-
extAddr, err = selectMostExternalAddress(addrs)
135+
extAddr, err = ndoeutils.SelectMostExternalAddress(addrs)
286136
if err != nil {
287137
return nil, nil, err
288138
}
@@ -320,7 +170,10 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
320170
return err
321171
}
322172

323-
err = w.updateLocalNode(w.localNode, multiaddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddrs, w.opts.discV5autoUpdate)
173+
w.localNodeParams.Multiaddrs = multiaddresses
174+
w.localNodeParams.IPAddr = ipAddr
175+
176+
err = w.updateLocalNode()
324177
if err != nil {
325178
w.log.Error("updating localnode ENR record", zap.Error(err))
326179
return err
@@ -340,7 +193,8 @@ func (w *WakuNode) setupENR(ctx context.Context, addrs []ma.Multiaddr) error {
340193
}
341194

342195
func (w *WakuNode) SetRelayShards(rs protocol.RelayShards) error {
343-
err := wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs))
196+
w.localNodeParams.RelayShards = rs
197+
err := w.updateLocalNode()
344198
if err != nil {
345199
return err
346200
}
@@ -396,7 +250,8 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
396250
w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0]))
397251
}
398252

399-
err = wenr.Update(w.log, w.localNode, wenr.WithWakuRelaySharding(rs[0]))
253+
w.localNodeParams.RelayShards = rs[0]
254+
err = w.updateLocalNode()
400255
if err != nil {
401256
w.log.Warn("could not set ENR shard info", zap.Error(err))
402257
continue

waku/v2/node/utils/utils.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package utils
2+
3+
import (
4+
"errors"
5+
"net"
6+
"strconv"
7+
8+
"github.com/multiformats/go-multiaddr"
9+
)
10+
11+
func ExtractIPAddressForENR(addr multiaddr.Multiaddr) (*net.TCPAddr, error) {
12+
// It's a p2p-circuit address. We shouldnt use these
13+
// for building the ENR record default keys
14+
_, err := addr.ValueForProtocol(multiaddr.P_CIRCUIT)
15+
if err == nil {
16+
return nil, errors.New("can't use IP address from a p2p-circuit address")
17+
}
18+
19+
// ws and wss addresses are handled by the multiaddr key
20+
// they shouldnt be used for building the ENR record default keys
21+
_, err = addr.ValueForProtocol(multiaddr.P_WS)
22+
if err == nil {
23+
return nil, errors.New("can't use IP address from a ws address")
24+
}
25+
_, err = addr.ValueForProtocol(multiaddr.P_WSS)
26+
if err == nil {
27+
return nil, errors.New("can't use IP address from a wss address")
28+
}
29+
30+
var ipStr string
31+
dns4, err := addr.ValueForProtocol(multiaddr.P_DNS4)
32+
if err != nil {
33+
ipStr, err = addr.ValueForProtocol(multiaddr.P_IP4)
34+
if err != nil {
35+
return nil, err
36+
}
37+
} else {
38+
netIP, err := net.ResolveIPAddr("ip4", dns4)
39+
if err != nil {
40+
return nil, err
41+
}
42+
ipStr = netIP.String()
43+
}
44+
45+
portStr, err := addr.ValueForProtocol(multiaddr.P_TCP)
46+
if err != nil {
47+
return nil, err
48+
}
49+
port, err := strconv.Atoi(portStr)
50+
if err != nil {
51+
return nil, err
52+
}
53+
return &net.TCPAddr{
54+
IP: net.ParseIP(ipStr),
55+
Port: port,
56+
}, nil
57+
}
58+
59+
func SelectMostExternalAddress(addresses []multiaddr.Multiaddr) (*net.TCPAddr, error) {
60+
var ipAddrs []*net.TCPAddr
61+
for _, addr := range addresses {
62+
ipAddr, err := ExtractIPAddressForENR(addr)
63+
if err != nil {
64+
continue
65+
}
66+
ipAddrs = append(ipAddrs, ipAddr)
67+
}
68+
69+
externalIPs := filterIP(ipAddrs, isExternal)
70+
if len(externalIPs) > 0 {
71+
return externalIPs[0], nil
72+
}
73+
74+
privateIPs := filterIP(ipAddrs, isPrivate)
75+
if len(privateIPs) > 0 {
76+
return privateIPs[0], nil
77+
}
78+
79+
loopback := filterIP(ipAddrs, isLoopback)
80+
if len(loopback) > 0 {
81+
return loopback[0], nil
82+
}
83+
84+
return nil, errors.New("could not obtain ip address")
85+
}
86+
87+
func isPrivate(addr *net.TCPAddr) bool {
88+
return addr.IP.IsPrivate()
89+
}
90+
91+
func isExternal(addr *net.TCPAddr) bool {
92+
return !isPrivate(addr) && !addr.IP.IsLoopback() && !addr.IP.IsUnspecified()
93+
}
94+
95+
func isLoopback(addr *net.TCPAddr) bool {
96+
return addr.IP.IsLoopback()
97+
}
98+
99+
func filterIP(ss []*net.TCPAddr, fn func(*net.TCPAddr) bool) (ret []*net.TCPAddr) {
100+
for _, s := range ss {
101+
if fn(s) {
102+
ret = append(ret, s)
103+
}
104+
}
105+
return
106+
}

waku/v2/node/wakunode2.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,20 @@ type WakuNode struct {
106106
store *store.WakuStore
107107
rlnRelay RLNRelay
108108

109-
wakuFlag enr.WakuEnrBitfield
110109
circuitRelayNodes chan peer.AddrInfo
111110

112111
localNode *enode.LocalNode
113112

113+
// localNodeParams are ENR parameters that will be applied to the localnode.
114+
localNodeParams enr.LocalNodeParams
115+
116+
// LocalNode.Set is a lazy operation that only stores the entry, but does not sign the record.
117+
// But the size of the record is only checked during `sign`, and if it's >300 bytes, it will panic.
118+
// In WithMultiaddress we attempt to write as much addresses as possible, relying on existing local node entries.
119+
// On the other hand, enr.WithWakuRelaySharding is called in a goroutine, so ther is a race condition.
120+
// To make it work properly, we should make sure that entries are not added during WithMultiaddress run.
121+
localNodeMutex *sync.Mutex
122+
114123
bcaster relay.Broadcaster
115124

116125
connectionNotif ConnectionNotifier
@@ -193,11 +202,18 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
193202
w.opts = params
194203
w.log = params.logger.Named("node2")
195204
w.wg = &sync.WaitGroup{}
196-
w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay)
197205
w.circuitRelayNodes = make(chan peer.AddrInfo)
198206
w.metrics = newMetrics(params.prometheusReg)
199207
w.metrics.RecordVersion(Version, GitCommit)
200208

209+
w.localNodeMutex = &sync.Mutex{}
210+
w.localNodeParams = enr.LocalNodeParams{
211+
WakuFlags: enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay),
212+
UDPPort: w.opts.udpPort,
213+
AdvertiseAddr: w.opts.advertiseAddrs,
214+
ShouldAutoUpdate: w.opts.discV5autoUpdate,
215+
}
216+
201217
// Setup peerstore wrapper
202218
if params.peerstore != nil {
203219
w.peerstore = wps.NewWakuPeerstore(params.peerstore)

0 commit comments

Comments
 (0)