Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion tests/wakunode_rest/test_rest_admin.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ suite "Waku v2 Rest API - Admin":
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)

let shard = RelayShard(clusterId: clusterId, shardId: 0)
let shard = RelayShard(clusterId: clusterId, shardId: 5)
node1.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
node2.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
Expand Down Expand Up @@ -212,6 +212,18 @@ suite "Waku v2 Rest API - Admin":
let conn2 = await node1.peerManager.connectPeer(peerInfo2)
let conn3 = await node1.peerManager.connectPeer(peerInfo3)

var count = 0
while count < 20:
## Wait ~1s at most for the peer store to update shard info
let getRes = await client.getPeers()
if getRes.data.allIt(it.shards == @[5.uint16]):
break

count.inc()
await sleepAsync(50.milliseconds)

assert count < 20, "Timeout waiting for shards to be updated in peer store"

# Check successful connections
check:
conn2 == true
Expand Down
5 changes: 5 additions & 0 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,11 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
$clusterId
break guardClauses

# Store the shard information from metadata in the peer store
if pm.switch.peerStore.peerExists(peerId):
let shards = metadata.shards.mapIt(it.uint16)
pm.switch.peerStore.setShardInfo(peerId, shards)

return

info "disconnecting from peer", peerId = peerId, reason = reason
Expand Down
8 changes: 8 additions & 0 deletions waku/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type
# Keeps track of the ENR (Ethereum Node Record) of a peer
ENRBook* = ref object of PeerBook[enr.Record]

# Keeps track of peer shards
ShardBook* = ref object of PeerBook[seq[uint16]]

proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo =
let addresses =
if peerStore[LastSeenBook][peerId].isSome():
Expand All @@ -55,6 +58,7 @@ proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo =
else:
none(enr.Record),
protocols: peerStore[ProtoBook][peerId],
shards: peerStore[ShardBook][peerId],
agent: peerStore[AgentBook][peerId],
protoVersion: peerStore[ProtoVersionBook][peerId],
publicKey: peerStore[KeyBook][peerId],
Expand All @@ -76,6 +80,7 @@ proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] =
toSeq(peerStore[AddressBook].book.keys()),
toSeq(peerStore[ProtoBook].book.keys()),
toSeq(peerStore[KeyBook].book.keys()),
toSeq(peerStore[ShardBook].book.keys()),
)
.toHashSet()

Expand Down Expand Up @@ -127,6 +132,9 @@ proc addPeer*(peerStore: PeerStore, peer: RemotePeerInfo, origin = UnknownOrigin
if peer.enr.isSome():
peerStore[ENRBook][peer.peerId] = peer.enr.get()

proc setShardInfo*(peerStore: PeerStore, peerId: PeerID, shards: seq[uint16]) =
peerStore[ShardBook][peerId] = shards

proc peers*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] =
peerStore.peers().filterIt(it.protocols.contains(proto))

Expand Down
12 changes: 11 additions & 1 deletion waku/waku_core/peers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type RemotePeerInfo* = ref object
addrs*: seq[MultiAddress]
enr*: Option[enr.Record]
protocols*: seq[string]
shards*: seq[uint16]

agent*: string
protoVersion*: string
Expand All @@ -73,6 +74,7 @@ proc init*(
addrs: seq[MultiAddress] = @[],
enr: Option[enr.Record] = none(enr.Record),
protocols: seq[string] = @[],
shards: seq[uint16] = @[],
publicKey: crypto.PublicKey = crypto.PublicKey(),
agent: string = "",
protoVersion: string = "",
Expand All @@ -88,6 +90,7 @@ proc init*(
addrs: addrs,
enr: enr,
protocols: protocols,
shards: shards,
publicKey: publicKey,
agent: agent,
protoVersion: protoVersion,
Expand All @@ -105,9 +108,12 @@ proc init*(
addrs: seq[MultiAddress] = @[],
enr: Option[enr.Record] = none(enr.Record),
protocols: seq[string] = @[],
shards: seq[uint16] = @[],
): T {.raises: [Defect, ResultError[cstring], LPError].} =
let peerId = PeerID.init(peerId).tryGet()
RemotePeerInfo(peerId: peerId, addrs: addrs, enr: enr, protocols: protocols)
RemotePeerInfo(
peerId: peerId, addrs: addrs, enr: enr, protocols: protocols, shards: shards
)

## Parse

Expand Down Expand Up @@ -326,6 +332,7 @@ converter toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =
addrs: peerInfo.listenAddrs,
enr: none(enr.Record),
protocols: peerInfo.protocols,
shards: @[],
agent: peerInfo.agentVersion,
protoVersion: peerInfo.protoVersion,
publicKey: peerInfo.publicKey,
Expand Down Expand Up @@ -361,6 +368,9 @@ proc getAgent*(peer: RemotePeerInfo): string =
return peer.agent

proc getShards*(peer: RemotePeerInfo): seq[uint16] =
if peer.shards.len > 0:
return peer.shards

if peer.enr.isNone():
return @[]

Expand Down