3535import
3636 waku/ [
3737 waku_core,
38+ waku_core/ topics/ sharding,
3839 waku_lightpush/ common,
3940 waku_lightpush/ rpc,
4041 waku_enr,
4748 common/ utils/ nat,
4849 waku_store/ common,
4950 waku_filter_v2/ client,
51+ waku_filter_v2/ common as filter_common,
52+ waku_mix/ protocol,
5053 common/ logging,
5154 ],
5255 ./ config_chat2mix
@@ -57,6 +60,105 @@ import ../../waku/waku_rln_relay
5760logScope:
5861 topics = " chat2 mix"
5962
63+ # ########################
64+ # # Mix Spam Protection ##
65+ # ########################
66+
67+ # Forward declaration
68+ proc maintainSpamProtectionSubscription (
69+ node: WakuNode , contentTopics: seq [ContentTopic ]
70+ ) {.async .}
71+
72+ proc setupMixSpamProtectionViaFilter (node: WakuNode ) {.async .} =
73+ # # Setup filter-based spam protection coordination for mix protocol.
74+ # # Since chat2mix doesn't use relay, we subscribe via filter to receive
75+ # # spam protection coordination messages.
76+
77+ # Register message handler for spam protection coordination
78+ let spamTopics = node.wakuMix.getSpamProtectionContentTopics ()
79+
80+ proc handleSpamMessage (
81+ pubsubTopic: PubsubTopic , message: WakuMessage
82+ ): Future [void ] {.async , gcsafe .} =
83+ await node.wakuMix.handleMessage (pubsubTopic, message)
84+
85+ node.wakuFilterClient.registerPushHandler (handleSpamMessage)
86+
87+ # Wait for filter peer and maintain subscription
88+ asyncSpawn maintainSpamProtectionSubscription (node, spamTopics)
89+
90+ proc maintainSpamProtectionSubscription (
91+ node: WakuNode , contentTopics: seq [ContentTopic ]
92+ ) {.async .} =
93+ # # Maintain filter subscription for spam protection topics.
94+ # # Monitors subscription health with periodic pings and re-subscribes on failure.
95+ const RetryInterval = chronos.seconds (5 )
96+ const SubscriptionMaintenance = chronos.seconds (30 )
97+ const MaxFailedSubscribes = 3
98+ var currentFilterPeer: Option [RemotePeerInfo ] = none (RemotePeerInfo )
99+ var noFailedSubscribes = 0
100+
101+ while true :
102+ # Select or reuse filter peer
103+ if currentFilterPeer.isNone ():
104+ let filterPeerOpt = node.peerManager.selectPeer (WakuFilterSubscribeCodec )
105+ if filterPeerOpt.isNone ():
106+ debug " No filter peer available yet for spam protection, retrying..."
107+ await sleepAsync (RetryInterval )
108+ continue
109+ currentFilterPeer = some (filterPeerOpt.get ())
110+ info " Selected filter peer for spam protection" ,
111+ peer = currentFilterPeer.get ().peerId
112+
113+ # Check if subscription is still alive with ping
114+ let pingErr = (await node.wakuFilterClient.ping (currentFilterPeer.get ())).errorOr:
115+ # Subscription is alive, wait before next check
116+ await sleepAsync (SubscriptionMaintenance )
117+ if noFailedSubscribes > 0 :
118+ noFailedSubscribes = 0
119+ continue
120+
121+ # Subscription lost, need to re-subscribe
122+ warn " Spam protection filter subscription ping failed, re-subscribing" ,
123+ error = pingErr, peer = currentFilterPeer.get ().peerId
124+
125+ # Determine pubsub topic from content topics (using auto-sharding)
126+ if node.wakuAutoSharding.isNone ():
127+ error " Auto-sharding not configured, cannot determine pubsub topic for spam protection"
128+ await sleepAsync (RetryInterval )
129+ continue
130+
131+ let shardRes = node.wakuAutoSharding.get ().getShard (contentTopics[0 ])
132+ if shardRes.isErr ():
133+ error " Failed to determine shard for spam protection" , error = shardRes.error
134+ await sleepAsync (RetryInterval )
135+ continue
136+
137+ let shard = shardRes.get ()
138+ let pubsubTopic: PubsubTopic = shard # converter toPubsubTopic
139+
140+ # Subscribe to spam protection topics
141+ let res = await node.wakuFilterClient.subscribe (
142+ currentFilterPeer.get (), pubsubTopic, contentTopics
143+ )
144+ if res.isErr ():
145+ noFailedSubscribes += 1
146+ warn " Failed to subscribe to spam protection topics via filter" ,
147+ error = res.error, topics = contentTopics, failCount = noFailedSubscribes
148+
149+ if noFailedSubscribes >= MaxFailedSubscribes :
150+ # Try with a different peer
151+ warn " Max subscription failures reached, selecting new filter peer"
152+ currentFilterPeer = none (RemotePeerInfo )
153+ noFailedSubscribes = 0
154+
155+ await sleepAsync (RetryInterval )
156+ else :
157+ info " Successfully subscribed to spam protection topics via filter" ,
158+ topics = contentTopics, peer = currentFilterPeer.get ().peerId
159+ noFailedSubscribes = 0
160+ await sleepAsync (SubscriptionMaintenance )
161+
60162const Help =
61163 """
62164 Commands: /[?|help|connect|nick|exit]
@@ -210,20 +312,21 @@ proc publish(c: Chat, line: string) {.async.} =
210312 try :
211313 if not c.node.wakuLightpushClient.isNil ():
212314 # Attempt lightpush with mix
213-
214- (
215- waitFor c.node.lightpushPublish (
216- some (c.conf.getPubsubTopic (c.node, c.contentTopic)),
217- message,
218- none (RemotePeerInfo ),
219- true ,
220- )
221- ).isOkOr:
222- error " failed to publish lightpush message" , error = error
315+ let res = await c.node.lightpushPublish (
316+ some (c.conf.getPubsubTopic (c.node, c.contentTopic)),
317+ message,
318+ none (RemotePeerInfo ),
319+ true ,
320+ )
321+ if res.isErr ():
322+ error " failed to publish lightpush message" , error = res.error
323+ echo " Error: " & res.error.desc.get (" unknown error" )
223324 else :
224325 error " failed to publish message as lightpush client is not initialized"
326+ echo " Error: lightpush client is not initialized"
225327 except CatchableError :
226328 error " caught error publishing message: " , error = getCurrentExceptionMsg ()
329+ echo " Error: " & getCurrentExceptionMsg ()
227330
228331# TODO This should read or be subscribe handler subscribe
229332proc readAndPrint (c: Chat ) {.async .} =
@@ -452,7 +555,11 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
452555 error " failed to generate mix key pair" , error = error
453556 return
454557
455- (await node.mountMix (conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr:
558+ (
559+ await node.mountMix (
560+ conf.clusterId, mixPrivKey, conf.mixnodes, some (conf.rlnUserMessageLimit)
561+ )
562+ ).isOkOr:
456563 error " failed to mount waku mix protocol: " , error = $ error
457564 quit (QuitFailure )
458565
@@ -487,6 +594,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
487594
488595 # await node.mountRendezvousClient(conf.clusterId)
489596
597+ # Subscribe to spam protection coordination topics via filter since chat2mix doesn't use relay
598+ if not node.wakuFilterClient.isNil ():
599+ asyncSpawn setupMixSpamProtectionViaFilter (node)
600+
490601 await node.start ()
491602
492603 node.peerManager.start ()
0 commit comments