Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
g.pruned(peer, t)
g.mesh.removePeer(t, peer)
prunes &= peer
peer.clearNonPriorityQueue()
if prunes.len > 0:
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
Expand Down
40 changes: 18 additions & 22 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ when defined(pubsubpeer_queue_metrics):
declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])

declareCounter(libp2p_pubsub_disconnects_over_non_priority_queue_limit, "number of peers disconnected due to over non-prio queue capacity")

const
DefaultMaxNumElementsInNonPriorityQueue* = 1024

Expand Down Expand Up @@ -90,7 +88,6 @@ type

rpcmessagequeue: RpcMessageQueue
maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue.
disconnected: bool

RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}
Expand Down Expand Up @@ -191,7 +188,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =

proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} =
if p.sendConn != nil:
trace "Removing send connection", p, conn = p.sendConn
debug "Removing send connection", p, conn = p.sendConn
await p.sendConn.close()
p.sendConn = nil

Expand Down Expand Up @@ -219,7 +216,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
# remote peer - if we had multiple channels up and one goes down, all
# stop working so we make an effort to only keep a single channel alive

trace "Get new send connection", p, newConn
debug "Get new send connection", p, newConn

# Careful to race conditions here.
# Topic subscription relies on either connectedFut
Expand All @@ -235,19 +232,15 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
finally:
await p.closeSendConn(PubSubPeerEventKind.StreamClosed)

proc connectImpl(p: PubSubPeer) {.async.} =
proc connectImpl(peer: PubSubPeer) {.async.} =
try:
# Keep trying to establish a connection while it's possible to do so - the
# send connection might get disconnected due to a timeout or an unrelated
# issue so we try to get a new on
while true:
if p.disconnected:
if not p.connectedFut.finished:
p.connectedFut.complete()
return
await connectOnce(p)
await connectOnce(peer)
except CatchableError as exc: # never cancelled
debug "Could not establish send connection", msg = exc.msg
debug "Could not establish send connection", peer, msg = exc.msg

proc connect*(p: PubSubPeer) =
if p.connected:
Expand Down Expand Up @@ -281,6 +274,13 @@ proc clearSendPriorityQueue(p: PubSubPeer) =
value = p.rpcmessagequeue.sendPriorityQueue.len.int64,
labelValues = [$p.peerId])

proc clearNonPriorityQueue*(p: PubSubPeer) =
if len(p.rpcmessagequeue.nonPriorityQueue) > 0:
p.rpcmessagequeue.nonPriorityQueue.clear()

when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)

Comment on lines +282 to +288
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How asynchronous is this operation? That is: How feasible it is that, between the clear call and the metrics update, some value is added to the nonPriorityQueue? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This proc isn't async and won't be suspended.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's right. nvm then

proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} =
# Continuation for a pending `sendMsg` future from below
try:
Expand Down Expand Up @@ -337,7 +337,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
doAssert(not isNil(p), "pubsubpeer nil!")

if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
debug "empty message, skipping", peer = p, msg = shortLog(msg)
Future[void].completed()
elif msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
Expand All @@ -352,12 +352,10 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v
f
else:
if len(p.rpcmessagequeue.nonPriorityQueue) >= p.maxNumElementsInNonPriorityQueue:
if not p.disconnected:
p.disconnected = true
libp2p_pubsub_disconnects_over_non_priority_queue_limit.inc()
p.closeSendConn(PubSubPeerEventKind.DisconnectionRequested)
else:
Future[void].completed()
p.behaviourPenalty += 0.0001
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest moving the value used here to a constant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

trace "Peer has reached maxNumElementsInNonPriorityQueue. Discarding message and applying behaviour penalty.", peer = p, score = p.score,
behaviourPenalty = p.behaviourPenalty, agent = p.getAgent()
Future[void].completed()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given most return paths are Future[void].completed() how about moving it outside the if statement and doing an explicit return on each arm for the cases that are not Future[void].completed().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please elaborate on what you believe is bad with the current approach?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's nothing wrong per se. What I was hinting at was, when you have code akin to this:

if <cond>:
  foo()
  return A  

elif <cond>:
  bar()
  return A

elif <cond>:
  baz()
  return B

...

else:
  foobar()
  return A

Sometimes it's clearer to group/extract the returns:

if <cond>:
  foo()

elif <cond>:
  bar()


elif <cond>:
  baz()
  return B

...

else:
  foobar()

return A

But I just realised here it's almost 50/50, so not a lot of improvement can be made.

else:
let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
when defined(pubsubpeer_queue_metrics):
Expand Down Expand Up @@ -470,11 +468,9 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
p.rpcmessagequeue.sendNonPriorityTask.cancelSoon()
p.rpcmessagequeue.sendNonPriorityTask = nil
p.rpcmessagequeue.sendPriorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()

when defined(pubsubpeer_queue_metrics):
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
p.clearNonPriorityQueue()

proc new(T: typedesc[RpcMessageQueue]): T =
return T(
Expand Down