Skip to content

Commit 10673af

Browse files
committed
Add fixed relay/core shard subscriptions and fix test
1 parent 0eb6aa0 commit 10673af

File tree

2 files changed

+143
-60
lines changed

2 files changed

+143
-60
lines changed
Lines changed: 110 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,53 @@
11
{.used.}
22

3-
import std/strutils
3+
import std/[strutils, net, options]
44
import chronos, testutils/unittests, stew/byteutils
5-
import ../testlib/[common, testasync]
5+
import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto]
6+
import ../testlib/[common, wakucore, wakunode, testasync]
7+
68
import
79
waku, waku/[waku_node, waku_core, common/broker/broker_context, events/message_events]
810
import waku/api/api_conf, waku/factory/waku_conf
911

12+
const TestTimeout = chronos.seconds(10)
13+
const DefaultShard = PubsubTopic("/waku/2/rs/1/0")
14+
1015
type ReceiveEventListenerManager = ref object
1116
brokerCtx: BrokerContext
1217
receivedListener: MessageReceivedEventListener
13-
receivedFuture: Future[void]
18+
receivedEvent: AsyncEvent
1419
receivedMessages: seq[WakuMessage]
20+
targetCount: int
1521

1622
proc newReceiveEventListenerManager(
17-
brokerCtx: BrokerContext
23+
brokerCtx: BrokerContext, expectedCount: int = 1
1824
): ReceiveEventListenerManager =
19-
let manager = ReceiveEventListenerManager(brokerCtx: brokerCtx, receivedMessages: @[])
20-
manager.receivedFuture = newFuture[void]("receivedEvent")
25+
let manager = ReceiveEventListenerManager(
26+
brokerCtx: brokerCtx, receivedMessages: @[], targetCount: expectedCount
27+
)
28+
manager.receivedEvent = newAsyncEvent()
2129

22-
manager.receivedListener = MessageReceivedEvent.listen(
23-
brokerCtx,
24-
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
25-
manager.receivedMessages.add(event.message)
26-
echo "RECEIVED EVENT TRIGGERED: contentTopic=", event.message.contentTopic
30+
manager.receivedListener = MessageReceivedEvent
31+
.listen(
32+
brokerCtx,
33+
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
34+
manager.receivedMessages.add(event.message)
2735

28-
if not manager.receivedFuture.finished():
29-
manager.receivedFuture.complete()
30-
,
31-
).valueOr:
32-
raiseAssert error
36+
if manager.receivedMessages.len >= manager.targetCount:
37+
manager.receivedEvent.fire()
38+
,
39+
)
40+
.expect("Failed to listen to MessageReceivedEvent")
3341

3442
return manager
3543

3644
proc teardown(manager: ReceiveEventListenerManager) =
3745
MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener)
3846

39-
proc waitForEvent(
47+
proc waitForEvents(
4048
manager: ReceiveEventListenerManager, timeout: Duration
4149
): Future[bool] {.async.} =
42-
return await manager.receivedFuture.withTimeout(timeout)
50+
return await manager.receivedEvent.wait().withTimeout(timeout)
4351

4452
proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
4553
let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0)
@@ -54,56 +62,99 @@ proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
5462
p2pReliability = true,
5563
)
5664

57-
suite "Waku API - Subscription Service":
58-
asyncTest "Subscription API, two relays with subscribe and receive message":
59-
var node1, node2: Waku
60-
65+
proc setupSubscriberNode(conf: NodeConfig): Future[Waku] {.async.} =
66+
var node: Waku
67+
lockNewGlobalBrokerContext:
68+
node = (await createNode(conf)).expect("Failed to create subscriber node")
69+
(await startWaku(addr node)).expect("Failed to start subscriber node")
70+
return node
71+
72+
proc publishWhenMeshReady(
73+
publisher: WakuNode,
74+
pubsubTopic: PubsubTopic,
75+
contentTopic: ContentTopic,
76+
payload: seq[byte],
77+
maxRetries: int = 50,
78+
retryDelay: Duration = 200.milliseconds,
79+
): Future[Result[int, string]] {.async.} =
80+
for _ in 0 ..< maxRetries:
81+
let msg = WakuMessage(
82+
payload: payload, contentTopic: contentTopic, version: 0, timestamp: now()
83+
)
84+
85+
let publishRes = await publisher.publish(some(pubsubTopic), msg)
86+
if publishRes.isOk() and publishRes.value > 0:
87+
return publishRes
88+
89+
await sleepAsync(retryDelay)
90+
91+
return err("Timed out waiting for mesh")
92+
93+
suite "Messaging API, SubscriptionService":
94+
var
95+
publisherNode {.threadvar.}: WakuNode
96+
publisherPeerInfo {.threadvar.}: RemotePeerInfo
97+
publisherPeerId {.threadvar.}: PeerId
98+
99+
subscriberNode {.threadvar.}: Waku
100+
101+
asyncSetup:
61102
lockNewGlobalBrokerContext:
62-
node1 = (await createNode(createApiNodeConf())).valueOr:
63-
raiseAssert error
64-
(await startWaku(addr node1)).isOkOr:
65-
raiseAssert "Failed to start node1"
66-
67-
lockNewGlobalBrokerContext:
68-
node2 = (await createNode(createApiNodeConf())).valueOr:
69-
raiseAssert error
70-
(await startWaku(addr node2)).isOkOr:
71-
raiseAssert "Failed to start node2"
72-
73-
let node2PeerInfo = node2.node.peerInfo.toRemotePeerInfo()
74-
await node1.node.connectToNodes(@[node2PeerInfo])
75-
76-
await sleepAsync(2.seconds)
77-
103+
publisherNode =
104+
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
105+
106+
publisherNode.mountMetadata(1, @[0'u16]).expect("Failed to mount metadata")
107+
(await publisherNode.mountRelay()).expect("Failed to mount relay")
108+
await publisherNode.mountLibp2pPing()
109+
await publisherNode.start()
110+
111+
publisherPeerInfo = publisherNode.peerInfo.toRemotePeerInfo()
112+
publisherPeerId = publisherNode.peerInfo.peerId
113+
114+
proc dummyHandler(
115+
topic: PubsubTopic, msg: WakuMessage
116+
): Future[void] {.async, gcsafe.} =
117+
discard
118+
119+
publisherNode.subscribe((kind: PubsubSub, topic: DefaultShard), dummyHandler).expect(
120+
"Failed to subscribe publisherNode"
121+
)
122+
123+
asyncTeardown:
124+
if not subscriberNode.isNil():
125+
(await subscriberNode.stop()).expect("Failed to stop subscriber node")
126+
subscriberNode = nil
127+
128+
if not publisherNode.isNil():
129+
await publisherNode.stop()
130+
publisherNode = nil
131+
132+
asyncTest "Subscription API, relay node auto subscribe and receive message":
133+
subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core))
134+
await subscriberNode.node.connectToNodes(@[publisherPeerInfo])
78135
let testTopic = ContentTopic("/waku/2/test-content/proto")
79136

80-
(await node1.subscribe(testTopic)).isOkOr:
81-
raiseAssert "Node1 failed to subscribe: " & error
82-
83-
(await node2.subscribe(testTopic)).isOkOr:
84-
raiseAssert "Node2 failed to subscribe: " & error
137+
(await subscriberNode.subscribe(testTopic)).expect(
138+
"subscriberNode failed to subscribe"
139+
)
85140

86-
await sleepAsync(2.seconds)
87-
88-
let eventManager = newReceiveEventListenerManager(node2.brokerCtx)
141+
let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1)
89142
defer:
90143
eventManager.teardown()
91144

92-
let envelope = MessageEnvelope.init(testTopic, "hello world payload")
93-
let sendResult = await node1.send(envelope)
94-
check sendResult.isOk()
145+
const testMessageStr = "Hello, world!"
146+
let msgPayload = testMessageStr.toBytes()
147+
148+
discard (
149+
await publishWhenMeshReady(publisherNode, DefaultShard, testTopic, msgPayload)
150+
).expect("Timed out waiting for mesh to stabilize")
95151

96-
const eventTimeout = 5.seconds
97-
let receivedInTime = await eventManager.waitForEvent(eventTimeout)
152+
let receivedInTime = await eventManager.waitForEvents(TestTimeout)
98153

99-
check receivedInTime == true
100-
check eventManager.receivedMessages.len == 1
154+
# Hard abort if these conditions aren't met to prevent an IndexDefect below
155+
require receivedInTime
156+
require eventManager.receivedMessages.len == 1
101157

102158
let receivedMsg = eventManager.receivedMessages[0]
103159
check receivedMsg.contentTopic == testTopic
104-
check string.fromBytes(receivedMsg.payload) == "hello world payload"
105-
106-
(await node1.stop()).isOkOr:
107-
raiseAssert error
108-
(await node2.stop()).isOkOr:
109-
raiseAssert error
160+
check string.fromBytes(receivedMsg.payload) == testMessageStr

waku/factory/waku.nim

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import
3535
node/health_monitor,
3636
node/waku_metrics,
3737
node/delivery_service/delivery_service,
38+
node/delivery_service/subscription_service,
3839
rest_api/message_cache,
3940
rest_api/endpoint/server,
4041
rest_api/endpoint/builder as rest_server_builder,
@@ -416,6 +417,37 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
416417
if not waku[].deliveryService.isNil():
417418
waku[].deliveryService.startDeliveryService()
418419

420+
## Subscription Service
421+
if not waku.node.wakuRelay.isNil() and not waku.deliveryService.isNil():
422+
let subService = waku.deliveryService.subscriptionService
423+
424+
if waku.node.wakuAutoSharding.isSome():
425+
# Subscribe relay to all shards in autosharding.
426+
let autoSharding = waku.node.wakuAutoSharding.get()
427+
let clusterId = autoSharding.clusterId
428+
let numShards = autoSharding.shardCountGenZero
429+
430+
if numShards > 0:
431+
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
432+
433+
for i in 0 ..< numShards:
434+
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
435+
clusterPubsubTopics.add(PubsubTopic($shardObj))
436+
437+
subService.subscribeShard(clusterPubsubTopics).isOkOr:
438+
return err("Failed to auto-subscribe Relay to cluster shards: " & error)
439+
else:
440+
# Fallback to configured shards when no autosharding.
441+
if waku.conf.subscribeShards.len > 0:
442+
let manualShards = waku.conf.subscribeShards.mapIt(
443+
PubsubTopic(
444+
$(RelayShard(clusterId: waku.conf.clusterId, shardId: uint16(it)))
445+
)
446+
)
447+
448+
subService.subscribeShard(manualShards).isOkOr:
449+
return err("Failed to subscribe Relay to manual shards: " & error)
450+
419451
## Health Monitor
420452
waku[].healthMonitor.startHealthMonitor().isOkOr:
421453
return err("failed to start health monitor: " & $error)
@@ -450,7 +482,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
450482
).isOkOr:
451483
error "Failed to set RequestProtocolHealth provider", error = error
452484

453-
## Setup RequestHealthReport provider (The lost child)
485+
## Setup RequestHealthReport provider
454486

455487
RequestHealthReport.setProvider(
456488
globalBrokerContext(),

0 commit comments

Comments
 (0)