@@ -35,6 +35,9 @@ type WakuRendezVous* = ref object
3535 registrationInterval: timer.Duration
3636 periodicRegistrationFut: Future [void ]
3737
38+ requestInterval: timer.Duration
39+ periodicRequestFut: Future [void ]
40+
3841proc batchAdvertise * (
3942 self: WakuRendezVous ,
4043 namespace: string ,
@@ -143,10 +146,11 @@ proc advertiseAll(
143146 let futs = collect (newSeq):
144147 for shardId in shards:
145148 # Get a random RDV peer for that shard
146- let rpi = self.peerManager.selectPeer (
147- RendezVousCodec ,
148- some (toPubsubTopic (RelayShard (clusterId: self.clusterId, shardId: shardId))),
149- ).valueOr:
149+
150+ let pubsub =
151+ toPubsubTopic (RelayShard (clusterId: self.clusterId, shardId: shardId))
152+
153+ let rpi = self.peerManager.selectPeer (RendezVousCodec , some (pubsub)).valueOr:
150154 continue
151155
152156 let namespace = computeNamespace (self.clusterId, shardId)
@@ -214,7 +218,7 @@ proc initialRequestAll*(
214218 rendezvousPeerFoundTotal.inc ()
215219 self.peerManager.addPeer (record)
216220
217- debug " waku rendezvous initial requests finished"
221+ debug " waku rendezvous initial request finished"
218222
219223 return ok ()
220224
@@ -237,6 +241,22 @@ proc periodicRegistration(self: WakuRendezVous) {.async.} =
237241 # Back to normal interval if no errors
238242 self.registrationInterval = DefaultRegistrationInterval
239243
244+ proc periodicRequests (self: WakuRendezVous ) {.async .} =
245+ debug " waku rendezvous periodic requests started" , interval = self.requestInterval
246+
247+ # infinite loop
248+ while true :
249+ (await self.initialRequestAll ()).isOkOr:
250+ error " waku rendezvous requests failed" , error = error
251+
252+ await sleepAsync (self.requestInterval)
253+
254+ # Exponential backoff
255+ self.requestInterval += self.requestInterval
256+
257+ if self.requestInterval >= 1 .days:
258+ break
259+
240260proc new * (
241261 T: type WakuRendezVous ,
242262 switch: Switch ,
@@ -266,6 +286,7 @@ proc new*(
266286 wrv.getShards = getShards
267287 wrv.getCapabilities = getCapabilities
268288 wrv.registrationInterval = DefaultRegistrationInterval
289+ wrv.requestInterval = DefaultRequestsInterval
269290
270291 debug " waku rendezvous initialized" ,
271292 clusterId = clusterId, shards = getShards (), capabilities = getCapabilities ()
@@ -276,10 +297,15 @@ proc start*(self: WakuRendezVous) {.async: (raises: []).} =
276297 # start registering forever
277298 self.periodicRegistrationFut = self.periodicRegistration ()
278299
300+ self.periodicRequestFut = self.periodicRequests ()
301+
279302 debug " waku rendezvous discovery started"
280303
281304proc stopWait * (self: WakuRendezVous ) {.async : (raises: []).} =
282305 if not self.periodicRegistrationFut.isNil ():
283306 await self.periodicRegistrationFut.cancelAndWait ()
284307
308+ if not self.periodicRequestFut.isNil ():
309+ await self.periodicRequestFut.cancelAndWait ()
310+
285311 debug " waku rendezvous discovery stopped"
0 commit comments