Skip to content

Commit 0eb6aa0

Browse files
committed
Implement stateful SubscriptionService for Core mode (WIP)
* SubscriptionService tracks shard and content topic interest * Emit MessageReceivedEvent on subscribed content topics * Add selectPeers() to PeerManager for future edge node sub impl * Add test_api_subscriptions.nim with a placeholder sub test
1 parent f208cb7 commit 0eb6aa0

File tree

5 files changed

+307
-67
lines changed

5 files changed

+307
-67
lines changed

tests/api/test_all.nim

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
11
{.used.}
22

3-
import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health
3+
import
4+
./test_entry_nodes,
5+
./test_node_conf,
6+
./test_api_send,
7+
./test_api_subscription,
8+
./test_api_health
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
{.used.}
2+
3+
import std/strutils
4+
import chronos, testutils/unittests, stew/byteutils
5+
import ../testlib/[common, testasync]
6+
import
7+
waku, waku/[waku_node, waku_core, common/broker/broker_context, events/message_events]
8+
import waku/api/api_conf, waku/factory/waku_conf
9+
10+
type ReceiveEventListenerManager = ref object
11+
brokerCtx: BrokerContext
12+
receivedListener: MessageReceivedEventListener
13+
receivedFuture: Future[void]
14+
receivedMessages: seq[WakuMessage]
15+
16+
proc newReceiveEventListenerManager(
17+
brokerCtx: BrokerContext
18+
): ReceiveEventListenerManager =
19+
let manager = ReceiveEventListenerManager(brokerCtx: brokerCtx, receivedMessages: @[])
20+
manager.receivedFuture = newFuture[void]("receivedEvent")
21+
22+
manager.receivedListener = MessageReceivedEvent.listen(
23+
brokerCtx,
24+
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
25+
manager.receivedMessages.add(event.message)
26+
echo "RECEIVED EVENT TRIGGERED: contentTopic=", event.message.contentTopic
27+
28+
if not manager.receivedFuture.finished():
29+
manager.receivedFuture.complete()
30+
,
31+
).valueOr:
32+
raiseAssert error
33+
34+
return manager
35+
36+
proc teardown(manager: ReceiveEventListenerManager) =
37+
MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener)
38+
39+
proc waitForEvent(
40+
manager: ReceiveEventListenerManager, timeout: Duration
41+
): Future[bool] {.async.} =
42+
return await manager.receivedFuture.withTimeout(timeout)
43+
44+
proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
45+
let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0)
46+
result = NodeConfig.init(
47+
mode = mode,
48+
protocolsConfig = ProtocolsConfig.init(
49+
entryNodes = @[],
50+
clusterId = 1,
51+
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
52+
),
53+
networkingConfig = netConf,
54+
p2pReliability = true,
55+
)
56+
57+
suite "Waku API - Subscription Service":
58+
asyncTest "Subscription API, two relays with subscribe and receive message":
59+
var node1, node2: Waku
60+
61+
lockNewGlobalBrokerContext:
62+
node1 = (await createNode(createApiNodeConf())).valueOr:
63+
raiseAssert error
64+
(await startWaku(addr node1)).isOkOr:
65+
raiseAssert "Failed to start node1"
66+
67+
lockNewGlobalBrokerContext:
68+
node2 = (await createNode(createApiNodeConf())).valueOr:
69+
raiseAssert error
70+
(await startWaku(addr node2)).isOkOr:
71+
raiseAssert "Failed to start node2"
72+
73+
let node2PeerInfo = node2.node.peerInfo.toRemotePeerInfo()
74+
await node1.node.connectToNodes(@[node2PeerInfo])
75+
76+
await sleepAsync(2.seconds)
77+
78+
let testTopic = ContentTopic("/waku/2/test-content/proto")
79+
80+
(await node1.subscribe(testTopic)).isOkOr:
81+
raiseAssert "Node1 failed to subscribe: " & error
82+
83+
(await node2.subscribe(testTopic)).isOkOr:
84+
raiseAssert "Node2 failed to subscribe: " & error
85+
86+
await sleepAsync(2.seconds)
87+
88+
let eventManager = newReceiveEventListenerManager(node2.brokerCtx)
89+
defer:
90+
eventManager.teardown()
91+
92+
let envelope = MessageEnvelope.init(testTopic, "hello world payload")
93+
let sendResult = await node1.send(envelope)
94+
check sendResult.isOk()
95+
96+
const eventTimeout = 5.seconds
97+
let receivedInTime = await eventManager.waitForEvent(eventTimeout)
98+
99+
check receivedInTime == true
100+
check eventManager.receivedMessages.len == 1
101+
102+
let receivedMsg = eventManager.receivedMessages[0]
103+
check receivedMsg.contentTopic == testTopic
104+
check string.fromBytes(receivedMsg.payload) == "hello world payload"
105+
106+
(await node1.stop()).isOkOr:
107+
raiseAssert error
108+
(await node2.stop()).isOkOr:
109+
raiseAssert error

waku/api/api.nim

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@ proc send*(
4848
): Future[Result[RequestId, string]] {.async.} =
4949
?checkApiAvailability(w)
5050

51+
let isSubbed = w.deliveryService.subscriptionService
52+
.isSubscribed(envelope.contentTopic)
53+
.valueOr(false)
54+
if not isSubbed:
55+
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
56+
let subRes = w.deliveryService.subscriptionService.subscribe(envelope.contentTopic)
57+
if subRes.isErr():
58+
warn "Failed to auto-subscribe", error = subRes.error
59+
5160
let requestId = RequestId.new(w.rng)
5261

5362
let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr:
Lines changed: 134 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,167 @@
1-
import chronos, chronicles
1+
import std/[sets, tables, options, strutils], chronos, chronicles, results
22
import
33
waku/[
44
waku_core,
55
waku_core/topics,
6+
waku_core/topics/sharding,
67
events/message_events,
78
waku_node,
9+
waku_relay,
810
common/broker/broker_context,
911
]
1012

1113
type SubscriptionService* = ref object of RootObj
12-
brokerCtx: BrokerContext
1314
node: WakuNode
15+
shardSubs: HashSet[PubsubTopic]
16+
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
17+
relayHandler: WakuRelayHandler
1418

1519
proc new*(T: typedesc[SubscriptionService], node: WakuNode): T =
16-
## The storeClient will help to acquire any possible missed messages
20+
let service = SubscriptionService(
21+
node: node,
22+
shardSubs: initHashSet[PubsubTopic](),
23+
contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]](),
24+
)
1725

18-
return SubscriptionService(brokerCtx: node.brokerCtx, node: node)
26+
service.relayHandler = proc(
27+
topic: PubsubTopic, msg: WakuMessage
28+
): Future[void] {.async, gcsafe.} =
29+
if not service.contentTopicSubs.hasKey(topic) or
30+
not service.contentTopicSubs[topic].contains(msg.contentTopic):
31+
return
32+
33+
let msgHash = computeMessageHash(topic, msg).to0xHex()
34+
info "MessageReceivedEvent",
35+
pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash
36+
37+
MessageReceivedEvent.emit(service.node.brokerCtx, msgHash, msg)
38+
39+
return service
40+
41+
proc getShardForContentTopic(
42+
self: SubscriptionService, topic: ContentTopic
43+
): Result[PubsubTopic, string] =
44+
if self.node.wakuAutoSharding.isSome():
45+
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
46+
return ok($shardObj)
47+
48+
return
49+
err("Manual sharding is not supported in this API. Autosharding must be enabled.")
50+
51+
proc doSubscribe(self: SubscriptionService, shard: PubsubTopic): Result[void, string] =
52+
self.node.subscribe((kind: PubsubSub, topic: shard), self.relayHandler).isOkOr:
53+
error "Failed to subscribe to Relay shard", shard = shard, error = error
54+
return err("Failed to subscribe: " & error)
55+
return ok()
56+
57+
proc doUnsubscribe(
58+
self: SubscriptionService, shard: PubsubTopic
59+
): Result[void, string] =
60+
self.node.unsubscribe((kind: PubsubUnsub, topic: shard)).isOkOr:
61+
error "Failed to unsubscribe from Relay shard", shard = shard, error = error
62+
return err("Failed to unsubscribe: " & error)
63+
return ok()
1964

2065
proc isSubscribed*(
2166
self: SubscriptionService, topic: ContentTopic
2267
): Result[bool, string] =
23-
var isSubscribed = false
24-
if self.node.wakuRelay.isNil() == false:
25-
return self.node.isSubscribed((kind: ContentSub, topic: topic))
68+
if self.node.wakuRelay.isNil():
69+
return err("SubscriptionService currently only supports Relay (Core) mode.")
2670

27-
# TODO: Add support for edge mode with Filter subscription management
28-
return ok(isSubscribed)
71+
let shard = ?self.getShardForContentTopic(topic)
2972

30-
#TODO: later PR may consider to refactor or place this function elsewhere
31-
# The only important part is that it emits MessageReceivedEvent
32-
proc getReceiveHandler(self: SubscriptionService): WakuRelayHandler =
33-
return proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
34-
let msgHash = computeMessageHash(topic, msg).to0xHex()
35-
info "API received message",
36-
pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash
73+
if self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(
74+
topic
75+
):
76+
return ok(true)
3777

38-
MessageReceivedEvent.emit(self.brokerCtx, msgHash, msg)
78+
return ok(false)
3979

4080
proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] =
41-
let isSubscribed = self.isSubscribed(topic).valueOr:
42-
error "Failed to check subscription status: ", error = error
43-
return err("Failed to check subscription status: " & error)
81+
if self.node.wakuRelay.isNil():
82+
return err("SubscriptionService currently only supports Relay (Core) mode.")
4483

45-
if isSubscribed == false:
46-
if self.node.wakuRelay.isNil() == false:
47-
self.node.subscribe((kind: ContentSub, topic: topic), self.getReceiveHandler()).isOkOr:
48-
error "Failed to subscribe: ", error = error
49-
return err("Failed to subscribe: " & error)
84+
let shard = ?self.getShardForContentTopic(topic)
5085

51-
# TODO: Add support for edge mode with Filter subscription management
86+
let needShardSub =
87+
not self.shardSubs.contains(shard) and not self.contentTopicSubs.hasKey(shard)
88+
89+
if needShardSub:
90+
?self.doSubscribe(shard)
91+
92+
self.contentTopicSubs.mgetOrPut(shard, initHashSet[ContentTopic]()).incl(topic)
5293

5394
return ok()
5495

5596
proc unsubscribe*(
5697
self: SubscriptionService, topic: ContentTopic
5798
): Result[void, string] =
58-
if self.node.wakuRelay.isNil() == false:
59-
self.node.unsubscribe((kind: ContentSub, topic: topic)).isOkOr:
60-
error "Failed to unsubscribe: ", error = error
61-
return err("Failed to unsubscribe: " & error)
99+
if self.node.wakuRelay.isNil():
100+
return err("SubscriptionService currently only supports Relay (Core) mode.")
101+
102+
let shard = ?self.getShardForContentTopic(topic)
103+
104+
if self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(
105+
topic
106+
):
107+
let isLastTopic = self.contentTopicSubs[shard].len == 1
108+
let needShardUnsub = isLastTopic and not self.shardSubs.contains(shard)
109+
110+
if needShardUnsub:
111+
?self.doUnsubscribe(shard)
112+
113+
self.contentTopicSubs[shard].excl(topic)
114+
if self.contentTopicSubs[shard].len == 0:
115+
self.contentTopicSubs.del(shard)
116+
117+
return ok()
118+
119+
proc subscribeShard*(
120+
self: SubscriptionService, shards: seq[PubsubTopic]
121+
): Result[void, string] =
122+
if self.node.wakuRelay.isNil():
123+
return err("SubscriptionService currently only supports Relay (Core) mode.")
124+
125+
var errors: seq[string] = @[]
126+
127+
for shard in shards:
128+
if not self.shardSubs.contains(shard):
129+
let needShardSub = not self.contentTopicSubs.hasKey(shard)
130+
131+
if needShardSub:
132+
let res = self.doSubscribe(shard)
133+
if res.isErr():
134+
errors.add("Shard " & shard & " failed: " & res.error)
135+
continue
136+
137+
self.shardSubs.incl(shard)
138+
139+
if errors.len > 0:
140+
return err("Batch subscribe had errors: " & errors.join("; "))
141+
142+
return ok()
143+
144+
proc unsubscribeShard*(
145+
self: SubscriptionService, shards: seq[PubsubTopic]
146+
): Result[void, string] =
147+
if self.node.wakuRelay.isNil():
148+
return err("SubscriptionService currently only supports Relay (Core) mode.")
149+
150+
var errors: seq[string] = @[]
151+
152+
for shard in shards:
153+
if self.shardSubs.contains(shard):
154+
let needShardUnsub = not self.contentTopicSubs.hasKey(shard)
155+
156+
if needShardUnsub:
157+
let res = self.doUnsubscribe(shard)
158+
if res.isErr():
159+
errors.add("Shard " & shard & " failed: " & res.error)
160+
continue
161+
162+
self.shardSubs.excl(shard)
163+
164+
if errors.len > 0:
165+
return err("Batch unsubscribe had errors: " & errors.join("; "))
62166

63-
# TODO: Add support for edge mode with Filter subscription management
64167
return ok()

0 commit comments

Comments
 (0)