-
Notifications
You must be signed in to change notification settings - Fork 65
feat: behaviour penalty when non-priority queue reaches maxNumElementsInNonPriorityQueue #1083
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
d9a60f3
a6237bd
0c61f12
917dbf8
87110e5
8014a84
47990be
a16293b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,10 +35,10 @@ when defined(pubsubpeer_queue_metrics): | |
| declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"]) | ||
| declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"]) | ||
|
|
||
| declareCounter(libp2p_pubsub_disconnects_over_non_priority_queue_limit, "number of peers disconnected due to over non-prio queue capacity") | ||
|
|
||
| const | ||
| DefaultMaxNumElementsInNonPriorityQueue* = 1024 | ||
| BehaviourPenaltyFoNonPriorityQueueOverLimit = 0.0001 # this value is quite arbitrary and was found empirically | ||
| # to result in a behaviourPenalty around [0.1, 0.2] when the score is updated. | ||
|
|
||
| type | ||
| PeerRateLimitError* = object of CatchableError | ||
|
|
@@ -93,7 +93,6 @@ type | |
|
|
||
| rpcmessagequeue: RpcMessageQueue | ||
| maxNumElementsInNonPriorityQueue*: int # The max number of elements allowed in the non-priority queue. | ||
| disconnected: bool | ||
|
|
||
| RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] | ||
| {.gcsafe, raises: [].} | ||
|
|
@@ -194,7 +193,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = | |
|
|
||
| proc closeSendConn(p: PubSubPeer, event: PubSubPeerEventKind) {.async.} = | ||
| if p.sendConn != nil: | ||
| trace "Removing send connection", p, conn = p.sendConn | ||
| debug "Removing send connection", p, conn = p.sendConn | ||
| await p.sendConn.close() | ||
| p.sendConn = nil | ||
|
|
||
|
|
@@ -222,7 +221,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} = | |
| # remote peer - if we had multiple channels up and one goes down, all | ||
| # stop working so we make an effort to only keep a single channel alive | ||
|
|
||
| trace "Get new send connection", p, newConn | ||
| debug "Get new send connection", p, newConn | ||
|
|
||
| # Careful to race conditions here. | ||
| # Topic subscription relies on either connectedFut | ||
|
|
@@ -238,19 +237,15 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} = | |
| finally: | ||
| await p.closeSendConn(PubSubPeerEventKind.StreamClosed) | ||
|
|
||
| proc connectImpl(p: PubSubPeer) {.async.} = | ||
| proc connectImpl(peer: PubSubPeer) {.async.} = | ||
| try: | ||
| # Keep trying to establish a connection while it's possible to do so - the | ||
| # send connection might get disconnected due to a timeout or an unrelated | ||
| # issue so we try to get a new on | ||
| while true: | ||
| if p.disconnected: | ||
| if not p.connectedFut.finished: | ||
| p.connectedFut.complete() | ||
| return | ||
| await connectOnce(p) | ||
| await connectOnce(peer) | ||
| except CatchableError as exc: # never cancelled | ||
| debug "Could not establish send connection", msg = exc.msg | ||
| debug "Could not establish send connection", peer, msg = exc.msg | ||
|
|
||
| proc connect*(p: PubSubPeer) = | ||
| if p.connected: | ||
|
|
@@ -284,6 +279,13 @@ proc clearSendPriorityQueue(p: PubSubPeer) = | |
| value = p.rpcmessagequeue.sendPriorityQueue.len.int64, | ||
| labelValues = [$p.peerId]) | ||
|
|
||
| proc clearNonPriorityQueue*(p: PubSubPeer) = | ||
| if len(p.rpcmessagequeue.nonPriorityQueue) > 0: | ||
| p.rpcmessagequeue.nonPriorityQueue.clear() | ||
|
|
||
| when defined(pubsubpeer_queue_metrics): | ||
| libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) | ||
|
|
||
| proc sendMsgContinue(conn: Connection, msgFut: Future[void]) {.async.} = | ||
| # Continuation for a pending `sendMsg` future from below | ||
| try: | ||
|
|
@@ -348,7 +350,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v | |
| p.rpcmessagequeue.nonPriorityQueue.len()) == 0 | ||
|
|
||
| if msg.len <= 0: | ||
| debug "empty message, skipping", p, msg = shortLog(msg) | ||
| debug "empty message, skipping", peer = p, msg = shortLog(msg) | ||
| Future[void].completed() | ||
| elif msg.len > p.maxMessageSize: | ||
| info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len | ||
|
|
@@ -362,12 +364,10 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool): Future[v | |
| f | ||
| else: | ||
| if len(p.rpcmessagequeue.nonPriorityQueue) >= p.maxNumElementsInNonPriorityQueue: | ||
| if not p.disconnected: | ||
| p.disconnected = true | ||
| libp2p_pubsub_disconnects_over_non_priority_queue_limit.inc() | ||
| p.closeSendConn(PubSubPeerEventKind.DisconnectionRequested) | ||
| else: | ||
| Future[void].completed() | ||
| p.behaviourPenalty += BehaviourPenaltyFoNonPriorityQueueOverLimit | ||
| trace "Peer has reached maxNumElementsInNonPriorityQueue. Discarding message and applying behaviour penalty.", peer = p, score = p.score, | ||
| behaviourPenalty = p.behaviourPenalty, agent = p.getAgent() | ||
| Future[void].completed() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given most return paths are
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you please elaborate on what you believe is bad with the current approach?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's nothing wrong per se. What I was hinting at was, when you have code akin to this: if <cond>:
foo()
return A
elif <cond>:
bar()
return A
elif <cond>:
baz()
return B
...
else:
foobar()
return ASometimes it's clearer to group/extract the returns: if <cond>:
foo()
elif <cond>:
bar()
elif <cond>:
baz()
return B
...
else:
foobar()
return ABut I just realised here it's almost 50/50, so not a lot of improvement can be made. |
||
| else: | ||
| let f = p.rpcmessagequeue.nonPriorityQueue.addLast(msg) | ||
| when defined(pubsubpeer_queue_metrics): | ||
|
|
@@ -480,11 +480,9 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = | |
| p.rpcmessagequeue.sendNonPriorityTask.cancelSoon() | ||
| p.rpcmessagequeue.sendNonPriorityTask = nil | ||
| p.rpcmessagequeue.sendPriorityQueue.clear() | ||
| p.rpcmessagequeue.nonPriorityQueue.clear() | ||
|
|
||
| when defined(pubsubpeer_queue_metrics): | ||
| libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) | ||
| libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) | ||
| p.clearNonPriorityQueue() | ||
|
|
||
| proc new(T: typedesc[RpcMessageQueue]): T = | ||
| return T( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How asynchronous is this operation? That is: How feasible it is that, between the
clearcall and the metrics update, some value is added to thenonPriorityQueue? 🤔There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This proc isn't async and won't be suspended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that's right. nvm then