Skip to content

Commit 61a1300

Browse files
committed
feat: drop old msgs to be relayed
1 parent 9fe2ec6 commit 61a1300

File tree

1 file changed

+15
-6
lines changed

1 file changed

+15
-6
lines changed

libp2p/protocols/pubsub/pubsubpeer.nim

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,16 @@ type
5353
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
5454
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
5555

56+
Ttlmessage* = object
57+
msg*: seq[byte]
58+
ttl*: Moment
59+
5660
RpcMessageQueue* = ref object
5761
sendPriorityQueue: Deque[Future[void]]
58-
nonPriorityQueue: AsyncQueue[seq[byte]]
62+
nonPriorityQueue: AsyncQueue[Ttlmessage]
5963
sendNonPriorityTask: Future[void]
64+
# The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms.
65+
maxDurationInNonPriorityQueue: Duration
6066

6167
PubSubPeer* = ref object of RootObj
6268
getConn*: GetConn # callback to establish a new send connection
@@ -289,7 +295,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
289295
when defined(libp2p_expensive_metrics):
290296
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
291297
else:
292-
await p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
298+
await p.rpcmessagequeue.nonPriorityQueue.addLast(Ttlmessage(msg: msg, ttl: Moment.now()))
293299
when defined(libp2p_expensive_metrics):
294300
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
295301
trace "message queued", p, msg = shortLog(msg)
@@ -373,10 +379,12 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
373379
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
374380
await p.rpcmessagequeue.sendPriorityQueue[0]
375381
p.clearSendPriorityQueue()
376-
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
382+
let ttlMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
377383
when defined(libp2p_expensive_metrics):
378384
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
379-
await p.sendMsg(msg)
385+
if Moment.now() - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue:
386+
continue
387+
await p.sendMsg(ttlMsg.msg)
380388

381389
proc startSendNonPriorityTask(p: PubSubPeer) =
382390
debug "starting sendNonPriorityTask", p
@@ -394,10 +402,11 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
394402
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
395403
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
396404

397-
proc new(T: typedesc[RpcMessageQueue]): T =
405+
proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 500.milliseconds): T =
398406
return T(
399407
sendPriorityQueue: initDeque[Future[void]](),
400-
nonPriorityQueue: newAsyncQueue[seq[byte]](),
408+
nonPriorityQueue: newAsyncQueue[Ttlmessage](),
409+
maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue,
401410
)
402411

403412
proc new*(

0 commit comments

Comments
 (0)