Skip to content

Commit a3de87b

Browse files
committed
WIP: Working ranking of peers
DeltaQ metrics is only available for our warm and hot peers that also have us as hot. So a fraction of all downstream clients will have a metric. This change the ranking of peers to use simple scoring system. Deliver a new TX before in time before it gets into the block gives you one point. Delivering a TXs thats already in the mempool, is invalid, or fail because it was included in a recent blocks gives you a penalty.
1 parent a9cd34d commit a3de87b

File tree

9 files changed

+140
-30
lines changed

9 files changed

+140
-30
lines changed

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Node/MiniProtocols.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
687687
them $ \api -> do
688688
let server = txSubmissionInboundV2
689689
txSubmissionInboundTracer
690+
(getMempoolReader mempool)
690691
(getMempoolWriter mempool)
691692
api
692693
labelThisThread "TxSubmissionServer"

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/AppV2.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ runTxSubmission tracer tracerTxLogic state txDecisionPolicy = do
215215
(getMempoolReader inboundMempool)
216216
addr $ \api -> do
217217
let server = txSubmissionInboundV2 verboseTracer
218+
(getMempoolReader inboundMempool)
218219
(getMempoolWriter inboundMempool)
219220
api
220221
runPipelinedPeerWithLimits

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/TxLogic.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,9 @@ mkArbPeerTxState mempoolHasTxFun txIdsInflight unacked txMaskMap =
299299
requestedTxIdsInflight,
300300
requestedTxsInflight,
301301
requestedTxsInflightSize,
302-
unknownTxs }
302+
unknownTxs,
303+
rejectedTxs = 0,
304+
fetchedTxs = Set.empty }
303305
(Set.fromList $ Map.elems inflightMap)
304306
bufferedMap
305307
where

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import Control.Exception (assert)
3636
import Control.Monad (unless)
3737
import Control.Monad.Class.MonadSTM
3838
import Control.Monad.Class.MonadThrow
39+
import Control.Monad.Class.MonadTime.SI
3940
import Control.Monad.Class.MonadTimer.SI
4041
import Control.Tracer (Tracer, traceWith)
4142

@@ -314,13 +315,18 @@ txSubmissionInbound tracer (NumTxIdsToAck maxUnacked) mpReader mpWriter _version
314315
traceWith tracer $
315316
TraceTxSubmissionCollected collected
316317

318+
!start <- getMonotonicTime
317319
txidsAccepted <- mempoolAddTxs txsReady
318-
320+
!end <- getMonotonicTime
321+
let duration = diffTime end start
322+
traceWith tracer $
323+
TraceTxInboundAddedToMempool txidsAccepted duration
319324
let !accepted = length txidsAccepted
320325

321326
traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
322327
ptxcAccepted = accepted
323328
, ptxcRejected = collected - accepted
329+
, ptxcScore = 0 -- This implementatin does not track score
324330
}
325331

326332
continueWithStateM (serverIdle n) st {

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Decision.hs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,40 @@ makeDecisions
5959
, Map peeraddr (TxDecision txid tx)
6060
)
6161
makeDecisions policy SharedDecisionContext {
62-
sdcPeerGSV = peerGSV,
62+
sdcPeerGSV = _peerGSV,
6363
sdcSharedTxState = st
6464
}
6565
= fn
6666
. pickTxsToDownload policy st
67-
. orderByDeltaQ peerGSV
67+
. orderByRejections
6868
where
6969
fn :: forall a.
7070
(a, [(peeraddr, TxDecision txid tx)])
7171
-> (a, Map peeraddr (TxDecision txid tx))
7272
fn (a, as) = (a, Map.fromList as)
7373

7474

75+
-- | Order peers by how useful the TXs they have provided are.
76+
--
77+
-- TXs delivered late will fail to apply because they where included in
78+
-- a recently adopted block. Peers can race against each other by setting
79+
-- `txInflightMultiplicity` to > 1.
80+
--
81+
-- TODO: Should not depend on plain `peeraddr` as a tie breaker.
82+
orderByRejections :: Map peeraddr (PeerTxState txid tx)
83+
-> [ (peeraddr, PeerTxState txid tx)]
84+
orderByRejections =
85+
sortOn (\(_peeraddr, ps) -> rejectedTxs ps)
86+
. Map.toList
87+
7588
-- | Order peers by `DeltaQ`.
7689
--
77-
orderByDeltaQ :: forall peeraddr txid tx.
90+
_orderByDeltaQ :: forall peeraddr txid tx.
7891
Ord peeraddr
7992
=> Map peeraddr PeerGSV
8093
-> Map peeraddr (PeerTxState txid tx)
8194
-> [(peeraddr, PeerTxState txid tx)]
82-
orderByDeltaQ dq =
95+
_orderByDeltaQ dq =
8396
sortOn (\(peeraddr, _) ->
8497
gsvRequestResponseDuration
8598
(Map.findWithDefault defaultGSV peeraddr dq)

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Policy.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ defaultTxDecisionPolicy =
5757
maxUnacknowledgedTxIds = 10, -- must be the same as txSubmissionMaxUnacked
5858
txsSizeInflightPerPeer = max_TX_SIZE * 6,
5959
maxTxsSizeInflight = max_TX_SIZE * 20,
60-
txInflightMultiplicity = 1
60+
txInflightMultiplicity = 2
6161
}

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Registry.hs

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import Data.Foldable (traverse_
2727
, foldl'
2828
#endif
2929
)
30+
import Data.Functor (void)
3031
import Data.Map.Strict (Map)
3132
import Data.Map.Strict qualified as Map
3233
import Data.Maybe (fromMaybe)
@@ -75,8 +76,14 @@ data PeerTxAPI m txid tx = PeerTxAPI {
7576
-- ^ requested txids
7677
-> Map txid tx
7778
-- ^ received txs
78-
-> m ()
79+
-> m (),
7980
-- ^ handle received txs
81+
82+
countRejectedTxs :: Int
83+
-> m Int,
84+
85+
consumeFetchedTxs :: Set txid
86+
-> m (Set txid)
8087
}
8188

8289

@@ -123,7 +130,9 @@ withPeer tracer
123130
( TxChannels { txChannelMap = txChannelMap' }
124131
, PeerTxAPI { readTxDecision = takeMVar chann',
125132
handleReceivedTxIds,
126-
handleReceivedTxs }
133+
handleReceivedTxs,
134+
countRejectedTxs,
135+
consumeFetchedTxs }
127136
)
128137

129138
atomically $ modifyTVar sharedStateVar registerPeer
@@ -151,7 +160,9 @@ withPeer tracer
151160
requestedTxsInflightSize = 0,
152161
requestedTxsInflight = Set.empty,
153162
unacknowledgedTxIds = StrictSeq.empty,
154-
unknownTxs = Set.empty }
163+
unknownTxs = Set.empty,
164+
rejectedTxs = 0,
165+
fetchedTxs = Set.empty }
155166
peerTxStates
156167
}
157168

@@ -210,8 +221,43 @@ withPeer tracer
210221
-> Map txid tx
211222
-- ^ received txs
212223
-> m ()
213-
handleReceivedTxs txids txs =
224+
handleReceivedTxs txids txs = do
225+
void $ atomically $ modifyTVar sharedStateVar addFethed
214226
collectTxs tracer sharedStateVar peeraddr txids txs
227+
where
228+
addFethed :: SharedTxState peeraddr txid tx
229+
-> SharedTxState peeraddr txid tx
230+
addFethed st@SharedTxState { peerTxStates } =
231+
let peerTxStates' = Map.update (\ps -> Just $! ps { fetchedTxs = Set.union (fetchedTxs ps) txids }) peeraddr peerTxStates in
232+
st {peerTxStates = peerTxStates' }
233+
234+
countRejectedTxs :: Int
235+
-> m Int
236+
countRejectedTxs n = atomically $ do
237+
modifyTVar sharedStateVar cntRejects
238+
st <- readTVar sharedStateVar
239+
case Map.lookup peeraddr (peerTxStates st) of
240+
Nothing -> error "missing peer updated"
241+
Just ps -> return $ rejectedTxs ps
242+
where
243+
cntRejects :: SharedTxState peeraddr txid tx
244+
-> SharedTxState peeraddr txid tx
245+
cntRejects st@SharedTxState { peerTxStates } =
246+
let peerTxStates' = Map.update (\ps -> Just $! ps { rejectedTxs = min 42 (max (-42) (rejectedTxs ps + n)) }) peeraddr peerTxStates in
247+
st {peerTxStates = peerTxStates'}
248+
249+
consumeFetchedTxs :: Set txid
250+
-> m (Set txid)
251+
consumeFetchedTxs otxids = atomically $ do
252+
st <- readTVar sharedStateVar
253+
case Map.lookup peeraddr (peerTxStates st) of
254+
Nothing -> error "missing peer in consumeFetchedTxs"
255+
Just ps -> do
256+
let o = Set.intersection (fetchedTxs ps) otxids
257+
r = Set.difference (fetchedTxs ps) otxids
258+
st' = st { peerTxStates = Map.update (\ps' -> Just $! ps' { fetchedTxs = r }) peeraddr (peerTxStates st) }
259+
writeTVar sharedStateVar st'
260+
return o
215261

216262

217263
decisionLogicThread

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Server.hs

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import Control.Tracer (Tracer, traceWith)
2121

2222
import Network.TypedProtocol.Pipelined
2323

24-
import Control.Monad (unless)
24+
import Control.Monad (unless, when)
2525
import Ouroboros.Network.Protocol.TxSubmission2.Server
2626
import Ouroboros.Network.TxSubmission.Inbound.Registry (PeerTxAPI (..))
2727
import Ouroboros.Network.TxSubmission.Inbound.Types
28+
import Ouroboros.Network.TxSubmission.Mempool.Reader
2829

2930
-- | Flag to enable/disable the usage of the new tx submission protocol
3031
--
@@ -48,19 +49,25 @@ txSubmissionInboundV2
4849
, Ord txid
4950
)
5051
=> Tracer m (TraceTxSubmissionInbound txid tx)
52+
-> TxSubmissionMempoolReader txid tx idx m
5153
-> TxSubmissionMempoolWriter txid tx idx m
5254
-> PeerTxAPI m txid tx
5355
-> TxSubmissionServerPipelined txid tx m ()
5456
txSubmissionInboundV2
5557
tracer
58+
TxSubmissionMempoolReader{
59+
mempoolGetSnapshot
60+
}
5661
TxSubmissionMempoolWriter {
5762
txId,
5863
mempoolAddTxs
5964
}
6065
PeerTxAPI {
6166
readTxDecision,
6267
handleReceivedTxIds,
63-
handleReceivedTxs
68+
handleReceivedTxs,
69+
countRejectedTxs,
70+
consumeFetchedTxs
6471
}
6572
=
6673
TxSubmissionServerPipelined serverIdle
@@ -73,23 +80,52 @@ txSubmissionInboundV2
7380
<- readTxDecision
7481
traceWith tracer (TraceTxInboundDecision txd)
7582

76-
!start <- getMonotonicTime
77-
txidsAccepted <- mempoolAddTxs txs
78-
!end <- getMonotonicTime
79-
let duration = diffTime end start
80-
81-
traceWith tracer $
82-
TraceTxInboundAddedToMempool txidsAccepted duration
83-
8483
let !collected = length txs
85-
let !accepted = length txidsAccepted
86-
traceWith tracer $
87-
TraceTxSubmissionCollected collected
88-
89-
traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
90-
ptxcAccepted = accepted
91-
, ptxcRejected = collected - accepted
92-
}
84+
mpSnapshot <- atomically mempoolGetSnapshot
85+
let receivedL = [ (txId tx, tx) | tx <- txs ]
86+
fetchedSet <- consumeFetchedTxs (Set.fromList (map fst receivedL))
87+
88+
-- Only attempt to add TXs if we actually has fetched some.
89+
when (not $ Set.null fetchedSet) $ do
90+
let fetched = filter
91+
(\(txid, _) -> Set.member txid fetchedSet)
92+
receivedL
93+
fetchedS = Set.fromList $ map fst fetched
94+
95+
-- Note that checking if the mempool contains a TX before
96+
-- spending several ms attempting to add it to the pool has
97+
-- been judged immoral.
98+
let fresh = filter
99+
(\(txid, _) -> not $ mempoolHasTx mpSnapshot txid)
100+
receivedL
101+
102+
!start <- getMonotonicTime
103+
txidsAccepted <- mempoolAddTxs $ map snd fresh
104+
!end <- getMonotonicTime
105+
let duration = diffTime end start
106+
107+
let acceptedS = Set.fromList txidsAccepted
108+
acceptedFetched = Set.intersection fetchedS acceptedS
109+
!accepted = Set.size acceptedFetched
110+
!rejected = Set.size fetchedS - accepted
111+
112+
traceWith tracer $
113+
TraceTxInboundAddedToMempool txidsAccepted duration
114+
traceWith tracer $
115+
TraceTxSubmissionCollected collected
116+
117+
-- Accepted TXs are discounted from rejected.
118+
--
119+
-- The number of rejected TXs may be too high.
120+
-- The reason for that is that any peer which has downloaded a
121+
-- TX is permitted to add TXs for all TXids hit has offered.
122+
-- This is done to preserve TX ordering.
123+
!s <- countRejectedTxs (rejected - accepted) -- accepted TXs are discounted
124+
traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
125+
ptxcAccepted = accepted
126+
, ptxcRejected = rejected
127+
, ptxcScore = s
128+
}
93129

94130
-- TODO:
95131
-- We can update the state so that other `tx-submission` servers will

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Types.hs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ data PeerTxState txid tx = PeerTxState {
7474
-- since that could potentially lead to corrupting the node, not being
7575
-- able to download a `tx` which is needed & available from other nodes.
7676
--
77-
unknownTxs :: !(Set txid)
77+
unknownTxs :: !(Set txid),
78+
79+
rejectedTxs :: !Int,
80+
81+
fetchedTxs :: !(Set txid)
7882
}
7983
deriving (Eq, Show, Generic)
8084

@@ -259,6 +263,7 @@ data ProcessedTxCount = ProcessedTxCount {
259263
ptxcAccepted :: Int
260264
-- | Just rejected this many transactions.
261265
, ptxcRejected :: Int
266+
, ptxcScore :: Int
262267
}
263268
deriving (Eq, Show)
264269

0 commit comments

Comments
 (0)