Skip to content

Commit 764e98d

Browse files
karknucoot
authored andcommitted
tx-submission: working ranking of peers
DeltaQ metrics is only available for our warm and hot peers that also have us as hot. So a fraction of all downstream clients will have a metric. This change the ranking of peers to use simple scoring system. Deliver a new TX in time before it gets into the block gives you one point. Delivering a TX that's already in the mempool, is invalid, or fail because it was included in a recent blocks gives you a penalty.
1 parent e6943ba commit 764e98d

File tree

9 files changed

+149
-30
lines changed

9 files changed

+149
-30
lines changed

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Diffusion/Node/MiniProtocols.hs

+1
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
713713
them $ \api -> do
714714
let server = txSubmissionInboundV2
715715
txSubmissionInboundTracer
716+
(getMempoolReader mempool)
716717
(getMempoolWriter mempool)
717718
api
718719
labelThisThread "TxSubmissionServer"

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/AppV2.hs

+1
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ runTxSubmission tracer tracerTxLogic state txDecisionPolicy = do
216216
getTxSize
217217
addr $ \api -> do
218218
let server = txSubmissionInboundV2 verboseTracer
219+
(getMempoolReader inboundMempool)
219220
(getMempoolWriter inboundMempool)
220221
api
221222
runPipelinedPeerWithLimits

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission/TxLogic.hs

+3-1
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,9 @@ mkArbPeerTxState mempoolHasTxFun txIdsInflight unacked txMaskMap =
302302
requestedTxIdsInflight,
303303
requestedTxsInflight,
304304
requestedTxsInflightSize,
305-
unknownTxs }
305+
unknownTxs,
306+
rejectedTxs = 0,
307+
fetchedTxs = Set.empty }
306308
(Set.fromList $ Map.elems inflightMap)
307309
bufferedMap
308310
where

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs

+7-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import Control.Exception (assert)
3636
import Control.Monad (unless)
3737
import Control.Monad.Class.MonadSTM
3838
import Control.Monad.Class.MonadThrow
39+
import Control.Monad.Class.MonadTime.SI
3940
import Control.Monad.Class.MonadTimer.SI
4041
import Control.Tracer (Tracer, traceWith)
4142

@@ -314,13 +315,18 @@ txSubmissionInbound tracer (NumTxIdsToAck maxUnacked) mpReader mpWriter _version
314315
traceWith tracer $
315316
TraceTxSubmissionCollected collected
316317

318+
!start <- getMonotonicTime
317319
txidsAccepted <- mempoolAddTxs txsReady
318-
320+
!end <- getMonotonicTime
321+
let duration = diffTime end start
322+
traceWith tracer $
323+
TraceTxInboundAddedToMempool txidsAccepted duration
319324
let !accepted = length txidsAccepted
320325

321326
traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
322327
ptxcAccepted = accepted
323328
, ptxcRejected = collected - accepted
329+
, ptxcScore = 0 -- This implementatin does not track score
324330
}
325331

326332
continueWithStateM (serverIdle n) st {

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Decision.hs

+17-4
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,40 @@ makeDecisions
5959
, Map peeraddr (TxDecision txid tx)
6060
)
6161
makeDecisions policy SharedDecisionContext {
62-
sdcPeerGSV = peerGSV,
62+
sdcPeerGSV = _peerGSV,
6363
sdcSharedTxState = st
6464
}
6565
= fn
6666
. pickTxsToDownload policy st
67-
. orderByDeltaQ peerGSV
67+
. orderByRejections
6868
where
6969
fn :: forall a.
7070
(a, [(peeraddr, TxDecision txid tx)])
7171
-> (a, Map peeraddr (TxDecision txid tx))
7272
fn (a, as) = (a, Map.fromList as)
7373

7474

75+
-- | Order peers by how useful the TXs they have provided are.
76+
--
77+
-- TXs delivered late will fail to apply because they where included in
78+
-- a recently adopted block. Peers can race against each other by setting
79+
-- `txInflightMultiplicity` to > 1.
80+
--
81+
-- TODO: Should not depend on plain `peeraddr` as a tie breaker.
82+
orderByRejections :: Map peeraddr (PeerTxState txid tx)
83+
-> [ (peeraddr, PeerTxState txid tx)]
84+
orderByRejections =
85+
sortOn (\(_peeraddr, ps) -> rejectedTxs ps)
86+
. Map.toList
87+
7588
-- | Order peers by `DeltaQ`.
7689
--
77-
orderByDeltaQ :: forall peeraddr txid tx.
90+
_orderByDeltaQ :: forall peeraddr txid tx.
7891
Ord peeraddr
7992
=> Map peeraddr PeerGSV
8093
-> Map peeraddr (PeerTxState txid tx)
8194
-> [(peeraddr, PeerTxState txid tx)]
82-
orderByDeltaQ dq =
95+
_orderByDeltaQ dq =
8396
sortOn (\(peeraddr, _) ->
8497
gsvRequestResponseDuration
8598
(Map.findWithDefault defaultGSV peeraddr dq)

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Policy.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ defaultTxDecisionPolicy =
5757
maxUnacknowledgedTxIds = 10, -- must be the same as txSubmissionMaxUnacked
5858
txsSizeInflightPerPeer = max_TX_SIZE * 6,
5959
maxTxsSizeInflight = max_TX_SIZE * 20,
60-
txInflightMultiplicity = 1
60+
txInflightMultiplicity = 2
6161
}

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Registry.hs

+59-4
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,14 @@ data PeerTxAPI m txid tx = PeerTxAPI {
7676
-- ^ requested txids
7777
-> Map txid tx
7878
-- ^ received txs
79-
-> m (Maybe TxSubmissionProtocolError)
79+
-> m (Maybe TxSubmissionProtocolError),
8080
-- ^ handle received txs
81+
82+
countRejectedTxs :: Int
83+
-> m Int,
84+
85+
consumeFetchedTxs :: Set txid
86+
-> m (Set txid)
8187
}
8288

8389

@@ -128,7 +134,9 @@ withPeer tracer
128134
( TxChannels { txChannelMap = txChannelMap' }
129135
, PeerTxAPI { readTxDecision = takeMVar chann',
130136
handleReceivedTxIds,
131-
handleReceivedTxs }
137+
handleReceivedTxs,
138+
countRejectedTxs,
139+
consumeFetchedTxs }
132140
)
133141

134142
atomically $ modifyTVar sharedStateVar registerPeer
@@ -156,7 +164,9 @@ withPeer tracer
156164
requestedTxsInflightSize = 0,
157165
requestedTxsInflight = Set.empty,
158166
unacknowledgedTxIds = StrictSeq.empty,
159-
unknownTxs = Set.empty }
167+
unknownTxs = Set.empty,
168+
rejectedTxs = 0,
169+
fetchedTxs = Set.empty }
160170
peerTxStates
161171
}
162172

@@ -215,8 +225,53 @@ withPeer tracer
215225
-> Map txid tx
216226
-- ^ received txs
217227
-> m (Maybe TxSubmissionProtocolError)
218-
handleReceivedTxs txids txs =
228+
handleReceivedTxs txids txs = do
229+
atomically $ modifyTVar sharedStateVar addFetched
219230
collectTxs tracer txSize sharedStateVar peeraddr txids txs
231+
where
232+
addFetched :: SharedTxState peeraddr txid tx
233+
-> SharedTxState peeraddr txid tx
234+
addFetched st@SharedTxState { peerTxStates } =
235+
let peerTxStates' =
236+
Map.update
237+
(\ps -> Just $! ps { fetchedTxs = Set.union (fetchedTxs ps) txids })
238+
peeraddr peerTxStates
239+
in st {peerTxStates = peerTxStates' }
240+
241+
countRejectedTxs :: Int
242+
-> m Int
243+
countRejectedTxs n = atomically $ do
244+
modifyTVar sharedStateVar cntRejects
245+
st <- readTVar sharedStateVar
246+
case Map.lookup peeraddr (peerTxStates st) of
247+
Nothing -> error "missing peer updated"
248+
Just ps -> return $ rejectedTxs ps
249+
where
250+
cntRejects :: SharedTxState peeraddr txid tx
251+
-> SharedTxState peeraddr txid tx
252+
cntRejects st@SharedTxState { peerTxStates } =
253+
let peerTxStates' =
254+
Map.update
255+
(\ps -> Just $! ps { rejectedTxs = min 42 (max (-42) (rejectedTxs ps + n)) })
256+
peeraddr peerTxStates
257+
in st { peerTxStates = peerTxStates' }
258+
259+
consumeFetchedTxs :: Set txid
260+
-> m (Set txid)
261+
consumeFetchedTxs otxids = atomically $ do
262+
st <- readTVar sharedStateVar
263+
case Map.lookup peeraddr (peerTxStates st) of
264+
Nothing -> error "missing peer in consumeFetchedTxs"
265+
Just ps -> do
266+
let o = Set.intersection (fetchedTxs ps) otxids
267+
r = Set.difference (fetchedTxs ps) otxids
268+
st' = st { peerTxStates =
269+
Map.update
270+
(\ps' -> Just $! ps' { fetchedTxs = r })
271+
peeraddr (peerTxStates st)
272+
}
273+
writeTVar sharedStateVar st'
274+
return o
220275

221276

222277
decisionLogicThread

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Server.hs

+54-18
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import Control.Tracer (Tracer, traceWith)
2121

2222
import Network.TypedProtocol
2323

24-
import Control.Monad (unless)
24+
import Control.Monad (unless, when)
2525
import Ouroboros.Network.Protocol.TxSubmission2.Server
2626
import Ouroboros.Network.TxSubmission.Inbound.Registry (PeerTxAPI (..))
2727
import Ouroboros.Network.TxSubmission.Inbound.Types
28+
import Ouroboros.Network.TxSubmission.Mempool.Reader
2829

2930
-- | Flag to enable/disable the usage of the new tx submission protocol
3031
--
@@ -48,19 +49,25 @@ txSubmissionInboundV2
4849
, Ord txid
4950
)
5051
=> Tracer m (TraceTxSubmissionInbound txid tx)
52+
-> TxSubmissionMempoolReader txid tx idx m
5153
-> TxSubmissionMempoolWriter txid tx idx m
5254
-> PeerTxAPI m txid tx
5355
-> TxSubmissionServerPipelined txid tx m ()
5456
txSubmissionInboundV2
5557
tracer
58+
TxSubmissionMempoolReader{
59+
mempoolGetSnapshot
60+
}
5661
TxSubmissionMempoolWriter {
5762
txId,
5863
mempoolAddTxs
5964
}
6065
PeerTxAPI {
6166
readTxDecision,
6267
handleReceivedTxIds,
63-
handleReceivedTxs
68+
handleReceivedTxs,
69+
countRejectedTxs,
70+
consumeFetchedTxs
6471
}
6572
=
6673
TxSubmissionServerPipelined serverIdle
@@ -73,23 +80,52 @@ txSubmissionInboundV2
7380
<- readTxDecision
7481
traceWith tracer (TraceTxInboundDecision txd)
7582

76-
!start <- getMonotonicTime
77-
txidsAccepted <- mempoolAddTxs txs
78-
!end <- getMonotonicTime
79-
let duration = diffTime end start
80-
81-
traceWith tracer $
82-
TraceTxInboundAddedToMempool txidsAccepted duration
83-
8483
let !collected = length txs
85-
let !accepted = length txidsAccepted
86-
traceWith tracer $
87-
TraceTxSubmissionCollected collected
88-
89-
traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
90-
ptxcAccepted = accepted
91-
, ptxcRejected = collected - accepted
92-
}
84+
mpSnapshot <- atomically mempoolGetSnapshot
85+
let receivedL = [ (txId tx, tx) | tx <- txs ]
86+
fetchedSet <- consumeFetchedTxs (Set.fromList (map fst receivedL))
87+
88+
-- Only attempt to add TXs if we actually has fetched some.
89+
when (not $ Set.null fetchedSet) $ do
90+
let fetched = filter
91+
(\(txid, _) -> Set.member txid fetchedSet)
92+
receivedL
93+
fetchedS = Set.fromList $ map fst fetched
94+
95+
-- Note that checking if the mempool contains a TX before
96+
-- spending several ms attempting to add it to the pool has
97+
-- been judged immoral.
98+
let fresh = filter
99+
(\(txid, _) -> not $ mempoolHasTx mpSnapshot txid)
100+
receivedL
101+
102+
!start <- getMonotonicTime
103+
txidsAccepted <- mempoolAddTxs $ map snd fresh
104+
!end <- getMonotonicTime
105+
let duration = diffTime end start
106+
107+
let acceptedS = Set.fromList txidsAccepted
108+
acceptedFetched = Set.intersection fetchedS acceptedS
109+
!accepted = Set.size acceptedFetched
110+
!rejected = Set.size fetchedS - accepted
111+
112+
traceWith tracer $
113+
TraceTxInboundAddedToMempool txidsAccepted duration
114+
traceWith tracer $
115+
TraceTxSubmissionCollected collected
116+
117+
-- Accepted TXs are discounted from rejected.
118+
--
119+
-- The number of rejected TXs may be too high.
120+
-- The reason for that is that any peer which has downloaded a
121+
-- TX is permitted to add TXs for all TXids hit has offered.
122+
-- This is done to preserve TX ordering.
123+
!s <- countRejectedTxs (rejected - accepted) -- accepted TXs are discounted
124+
traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
125+
ptxcAccepted = accepted
126+
, ptxcRejected = rejected
127+
, ptxcScore = s
128+
}
93129

94130
-- TODO:
95131
-- We can update the state so that other `tx-submission` servers will

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound/Types.hs

+6-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ data PeerTxState txid tx = PeerTxState {
7777
-- since that could potentially lead to corrupting the node, not being
7878
-- able to download a `tx` which is needed & available from other nodes.
7979
--
80-
unknownTxs :: !(Set txid)
80+
unknownTxs :: !(Set txid),
81+
82+
rejectedTxs :: !Int,
83+
84+
fetchedTxs :: !(Set txid)
8185
}
8286
deriving (Eq, Show, Generic)
8387

@@ -262,6 +266,7 @@ data ProcessedTxCount = ProcessedTxCount {
262266
ptxcAccepted :: Int
263267
-- | Just rejected this many transactions.
264268
, ptxcRejected :: Int
269+
, ptxcScore :: Int
265270
}
266271
deriving (Eq, Show)
267272

0 commit comments

Comments
 (0)