Skip to content

Commit 8f0f5dd

Browse files
committed
fix: rejecting excess relay connections (#3065)
1 parent bad30bf commit 8f0f5dd

File tree

1 file changed

+50
-57
lines changed

1 file changed

+50
-57
lines changed

waku/node/peer_manager/peer_manager.nim

Lines changed: 50 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,24 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
399399
asyncSpawn(pm.switch.disconnect(peerId))
400400
pm.peerStore.delete(peerId)
401401

402+
proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
403+
## Returns the peerIds of physical connections (in and out)
404+
## containing at least one stream with the given protocol.
405+
406+
var inPeers: seq[PeerId]
407+
var outPeers: seq[PeerId]
408+
409+
for peerId, muxers in pm.switch.connManager.getConnections():
410+
for peerConn in muxers:
411+
let streams = peerConn.getStreams()
412+
if streams.anyIt(it.protocol == protocol):
413+
if peerConn.connection.transportDir == Direction.In:
414+
inPeers.add(peerId)
415+
elif peerConn.connection.transportDir == Direction.Out:
416+
outPeers.add(peerId)
417+
418+
return (inPeers, outPeers)
419+
402420
# called when a peer i) first connects to us ii) disconnects all connections from us
403421
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
404422
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
@@ -412,6 +430,17 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
412430
direction = if event.initiator: Outbound else: Inbound
413431
connectedness = Connected
414432

433+
## Check max allowed in-relay peers
434+
let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0]
435+
if inRelayPeers.len > pm.inRelayPeersTarget and
436+
pm.peerStore.hasPeer(peerId, WakuRelayCodec):
437+
debug "disconnecting relay peer because reached max num in-relay peers",
438+
peerId = peerId,
439+
inRelayPeers = inRelayPeers.len,
440+
inRelayPeersTarget = pm.inRelayPeersTarget
441+
await pm.switch.disconnect(peerId)
442+
443+
## Apply max ip colocation limit
415444
if (let ip = pm.getPeerIp(peerId); ip.isSome()):
416445
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)
417446

@@ -494,7 +523,7 @@ proc new*(
494523
error "Max backoff time can't be over 1 week", maxBackoff = backoff
495524
raise newException(Defect, "Max backoff time can't be over 1 week")
496525

497-
let outRelayPeersTarget = max(maxRelayPeersValue div 3, 10)
526+
let outRelayPeersTarget = maxRelayPeersValue div 3
498527

499528
let pm = PeerManager(
500529
switch: switch,
@@ -560,46 +589,6 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str
560589

561590
pm.addPeer(remotePeerInfo)
562591

563-
proc reconnectPeers*(
564-
pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0)
565-
) {.async.} =
566-
## Reconnect to peers registered for this protocol. This will update connectedness.
567-
## Especially useful to resume connections from persistent storage after a restart.
568-
569-
trace "Reconnecting peers", proto = proto
570-
571-
# Proto is not persisted, we need to iterate over all peers.
572-
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
573-
# Check that the peer can be connected
574-
if peerInfo.connectedness == CannotConnect:
575-
error "Not reconnecting to unreachable or non-existing peer",
576-
peerId = peerInfo.peerId
577-
continue
578-
579-
# Respect optional backoff period where applicable.
580-
let
581-
# TODO: Add method to peerStore (eg isBackoffExpired())
582-
disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert
583-
currentTime = Moment.init(getTime().toUnix, Second)
584-
# Current time comparable to persisted value
585-
backoffTime = disconnectTime + backoff - currentTime
586-
# Consider time elapsed since last disconnect
587-
588-
trace "Respecting backoff",
589-
backoff = backoff,
590-
disconnectTime = disconnectTime,
591-
currentTime = currentTime,
592-
backoffTime = backoffTime
593-
594-
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
595-
if backoffTime > ZeroDuration:
596-
trace "Backing off before reconnect...",
597-
peerId = peerInfo.peerId, backoffTime = backoffTime
598-
# We disconnected recently and still need to wait for a backoff period before connecting
599-
await sleepAsync(backoffTime)
600-
601-
discard await pm.connectRelay(peerInfo)
602-
603592
####################
604593
# Dialer interface #
605594
####################
@@ -685,23 +674,29 @@ proc connectToNodes*(
685674
# later.
686675
await sleepAsync(chronos.seconds(5))
687676

688-
proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
689-
## Returns the peerIds of physical connections (in and out)
690-
## containing at least one stream with the given protocol.
677+
proc reconnectPeers*(
678+
pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0)
679+
) {.async.} =
680+
## Reconnect to peers registered for this protocol. This will update connectedness.
681+
## Especially useful to resume connections from persistent storage after a restart.
691682

692-
var inPeers: seq[PeerId]
693-
var outPeers: seq[PeerId]
683+
debug "Reconnecting peers", proto = proto
694684

695-
for peerId, muxers in pm.switch.connManager.getConnections():
696-
for peerConn in muxers:
697-
let streams = peerConn.getStreams()
698-
if streams.anyIt(it.protocol == protocol):
699-
if peerConn.connection.transportDir == Direction.In:
700-
inPeers.add(peerId)
701-
elif peerConn.connection.transportDir == Direction.Out:
702-
outPeers.add(peerId)
685+
# Proto is not persisted, we need to iterate over all peers.
686+
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
687+
# Check that the peer can be connected
688+
if peerInfo.connectedness == CannotConnect:
689+
error "Not reconnecting to unreachable or non-existing peer",
690+
peerId = peerInfo.peerId
691+
continue
703692

704-
return (inPeers, outPeers)
693+
if backoffTime > ZeroDuration:
694+
debug "Backing off before reconnect",
695+
peerId = peerInfo.peerId, backoffTime = backoffTime
696+
# We disconnected recently and still need to wait for a backoff period before connecting
697+
await sleepAsync(backoffTime)
698+
699+
await pm.connectToNodes(@[peerInfo])
705700

706701
proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
707702
var
@@ -730,9 +725,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
730725

731726
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
732727
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
733-
let maxConnections = pm.switch.connManager.inSema.size
734728
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
735-
let inPeersTarget = maxConnections - pm.outRelayPeersTarget
736729

737730
if inRelayPeers.len > pm.inRelayPeersTarget:
738731
await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)

0 commit comments

Comments
 (0)