Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ proc validateAndRelay(g: GossipSub,

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false, validMsgId = msgId)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds:
if topic notin g.topics: continue
Expand Down Expand Up @@ -497,6 +497,9 @@ method rpcHandler*(g: GossipSub,

libp2p_gossipsub_duplicate.inc()

if msg.data.len > msgId.len * 10: #Dont relay to the peers from which we already received (We just do it for large messages)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is misleading. Only the "We just do it for large messages" applies to this line.
I'd move the first part one line below.

Also, I'd add the same comment as above:

    # IDontWant is only worth it if the message is substantially
    # bigger than the messageId

peer.heDontWants[^1].incl(msgId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be renamed to notDesiredMessageIDs or something similar.


# onto the next message
continue

Expand Down
11 changes: 6 additions & 5 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =

libp2p_pubsub_peers.set(p.peers.len.int64)

proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} =
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool, id: MessageId = @[]) {.raises: [].} =
## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network.
##
## Parameters:
Expand All @@ -150,13 +150,14 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.rai
## priority messages have been sent.

trace "sending pubsub message to peer", peer, msg = shortLog(msg)
asyncSpawn peer.send(msg, p.anonymize, isHighPriority)
asyncSpawn peer.send(msg, p.anonymize, isHighPriority, id)

proc broadcast*(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg,
isHighPriority: bool) {.raises: [].} =
isHighPriority: bool,
validMsgId: MessageId = @[]) {.raises: [].} =
## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network.
##
## Parameters:
Expand Down Expand Up @@ -211,12 +212,12 @@ proc broadcast*(

if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
p.send(peer, msg, isHighPriority)
p.send(peer, msg, isHighPriority, validMsgId)
else:
# Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers:
asyncSpawn peer.sendEncoded(encoded, isHighPriority)
asyncSpawn peer.sendEncoded(encoded, isHighPriority, validMsgId)

proc sendSubs*(p: PubSub,
peer: PubSubPeer,
Expand Down
36 changes: 26 additions & 10 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import rpc/[messages, message, protobuf],
../../protobuf/minprotobuf,
../../utility

#import gossipsub/libp2p_gossipsub_staggerDontWantSave

export peerid, connection, deques

logScope:
Expand Down Expand Up @@ -52,11 +54,15 @@ type
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}

MessageWithId = object
message: seq[byte]
msgId: MessageId

RpcMessageQueue* = ref object
# Tracks async tasks for sending high-priority peer-published messages.
sendPriorityQueue: Deque[Future[void]]
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
nonPriorityQueue: AsyncQueue[seq[byte]]
nonPriorityQueue: AsyncQueue[MessageWithId]
# Task for processing non-priority message queue.
sendNonPriorityTask: Future[void]

Expand Down Expand Up @@ -95,6 +101,10 @@ when defined(libp2p_agents_metrics):
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent

proc newMessageWithId(msg: seq[byte], id: MessageId): MessageWithId =
result.message = msg
result.msgId = id

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
Expand Down Expand Up @@ -256,7 +266,7 @@ proc clearSendPriorityQueue(p: PubSubPeer) =
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()

proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
proc sendMsg(p: PubSubPeer, msg: seq[byte], msgId: MessageId) {.async.} =
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
Expand All @@ -269,6 +279,12 @@ proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =

trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)

if msgId.len > 0:
for dontWants in p.heDontWants:
if msgId in dontWants:
#libp2p_gossipsub_staggerDontWantSave.inc()
trace "Skipped sending msg/dontwant received from peer", conn, encoded = shortLog(msg)
return
try:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
Expand All @@ -281,7 +297,7 @@ proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =

await conn.close() # This will clean up the send connection

proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.} =
proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool, validMsgId: MessageId = @[]) {.async.} =
## Asynchronously sends an encoded message to a specified `PubSubPeer`.
##
## Parameters:
Expand All @@ -302,13 +318,13 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool) {.async.}

if isHighPriority:
p.clearSendPriorityQueue()
let f = p.sendMsg(msg)
let f = p.sendMsg(msg, validMsgId)
if not f.finished:
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
await p.rpcmessagequeue.nonPriorityQueue.addLast(newMessageWithId(msg, validMsgId))
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)
Expand Down Expand Up @@ -348,7 +364,7 @@ iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize:
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.async.} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool, validMsgId: MessageId = @[]) {.async.} =
## Asynchronously sends an `RPCMsg` to a specified `PubSubPeer` with an option for anonymization.
##
## Parameters:
Expand Down Expand Up @@ -377,11 +393,11 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool) {.

if encoded.len > p.maxMessageSize and msg.messages.len > 1:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
await p.sendEncoded(encodedSplitMsg, isHighPriority)
await p.sendEncoded(encodedSplitMsg, isHighPriority, validMsgId)
else:
# If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
await p.sendEncoded(encoded, isHighPriority)
await p.sendEncoded(encoded, isHighPriority, validMsgId)

proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
Expand All @@ -403,7 +419,7 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
await p.rpcmessagequeue.sendPriorityQueue[^1]
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
await p.sendMsg(msg)
await p.sendMsg(msg.message, msg.msgId)

proc startSendNonPriorityTask(p: PubSubPeer) =
debug "starting sendNonPriorityTask", p
Expand All @@ -424,7 +440,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
proc new(T: typedesc[RpcMessageQueue]): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[seq[byte]](),
nonPriorityQueue: newAsyncQueue[MessageWithId](),
)

proc new*(
Expand Down