Skip to content

Commit c19738c

Browse files
committed
Add external handler
1 parent c857cb0 commit c19738c

File tree

2 files changed

+26
-10
lines changed

2 files changed

+26
-10
lines changed

src/nim_chat_poc.nim

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
import
2+
chronicles,
23
chronos
34

45
import
56
waku_client
67

8+
9+
proc handleMessages(pubsubTopic: string, message: seq[byte]): Future[
10+
void] {.gcsafe, raises: [Defect].} =
11+
info "ClientRecv", pubTopic = pubsubTopic, msg = message
12+
713
proc main(): Future[void] {.async.} =
814
echo "Starting POC"
915
let cfg = DefaultConfig()
10-
let client = initWakuClient(cfg)
16+
let client = initWakuClient(cfg, @[PayloadHandler(handleMessages)])
1117
await client.start()
1218
echo "End of POC"
1319

src/waku_client.nim

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ const
2727
FilterContentTopic = ContentTopic("/chatsdk/test/proto")
2828

2929

30+
type PayloadHandler* = proc(pubsubTopic: string, message: seq[byte]): Future[void] {.
31+
gcsafe, raises: [Defect]
32+
.}
33+
3034
type WakuConfig* = object
3135
port*: uint16
3236
clusterId*: uint16
@@ -37,6 +41,7 @@ type
3741
WakuClient* = ref object
3842
cfg: WakuConfig
3943
node*: WakuNode
44+
handlers: seq[PayloadHandler]
4045

4146

4247
proc DefaultConfig*(): WakuConfig =
@@ -102,8 +107,8 @@ proc buildWakuNode(cfg: WakuConfig): WakuNode =
102107

103108
result = node
104109

105-
proc messagePushHandler(
106-
pubsubTopic: PubsubTopic, message: WakuMessage
110+
proc messageHandler(client: WakuClient, pubsubTopic: PubsubTopic,
111+
message: WakuMessage
107112
) {.async, gcsafe.} =
108113
let payloadStr = string.fromBytes(message.payload)
109114
notice "message received",
@@ -113,6 +118,10 @@ proc messagePushHandler(
113118
timestamp = message.timestamp
114119

115120

121+
for handler in client.handlers:
122+
discard handler(pubsubTopic, message.payload)
123+
124+
116125
proc taskKeepAlive(client: WakuClient) {.async.} =
117126
let peer = parsePeerInfo(StaticPeer).get()
118127
while true:
@@ -154,19 +163,20 @@ proc start*(client: WakuClient) {.async.} =
154163
quit(1)
155164

156165
client.node.peerManager.start()
157-
client.node.wakuFilterClient.registerPushHandler(messagePushHandler)
158-
159-
let topic = PubsubTopic(client.cfg.pubsubTopic)
160166

161167
let subscription: SubscriptionEvent = (kind: PubsubSub, topic:
162168
client.cfg.pubsubTopic)
163169

164-
let res = subscribe(client.node, subscription, messagePushHandler)
170+
let msg_handler = proc(pubsubTopic: PubsubTopic,
171+
message: WakuMessage) {.async, gcsafe.} = discard client.messageHandler(
172+
pubsubTopic, message)
173+
174+
let res = subscribe(client.node, subscription, msg_handler)
165175
if res.isErr:
166176
error "Subscribe failed", err = res.error
167177

168178
await allFutures(taskKeepAlive(client), taskPublishDemo(client))
169179

170-
171-
proc initWakuClient*(cfg: WakuConfig): WakuClient =
172-
result = WakuClient(cfg: cfg, node: buildWakuNode(cfg))
180+
proc initWakuClient*(cfg: WakuConfig, handlers: seq[
181+
PayloadHandler]): WakuClient =
182+
result = WakuClient(cfg: cfg, node: buildWakuNode(cfg), handlers: handlers)

0 commit comments

Comments
 (0)