Skip to content

Commit 7f66982

Browse files
committed
tx-submission: registry
1 parent 819d532 commit 7f66982

File tree

3 files changed

+315
-0
lines changed

3 files changed

+315
-0
lines changed

ouroboros-network/ouroboros-network.cabal

+2
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ library
6767
Ouroboros.Network.TxSubmission.Inbound.Decision
6868
Ouroboros.Network.TxSubmission.Inbound.Policy
6969
Ouroboros.Network.TxSubmission.Inbound.State
70+
Ouroboros.Network.TxSubmission.Inbound.Registry
7071
Ouroboros.Network.TxSubmission.Mempool.Reader
7172
Ouroboros.Network.TxSubmission.Outbound
7273
other-modules: Ouroboros.Network.Diffusion.Common
@@ -137,6 +138,7 @@ library
137138
io-classes-mtl ^>=0.1,
138139
network-mux,
139140
si-timers,
141+
strict-mvar,
140142
ouroboros-network-api ^>=0.7.2,
141143
ouroboros-network-framework ^>=0.13.1,
142144
ouroboros-network-protocols ^>=0.9,

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission.hs

+70
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ tests = testGroup "Ouroboros.Network.TxSubmission"
130130
, testProperty "acknowledged" prop_makeDecisions_acknowledged
131131
, testProperty "exhaustive" prop_makeDecisions_exhaustive
132132
]
133+
, testGroup "Registry"
134+
[ testGroup "filterActivePeers"
135+
[ testProperty "not limiting decisions" prop_filterActivePeers_not_limitting_decisions
136+
]
137+
]
133138
]
134139

135140

@@ -1802,6 +1807,71 @@ prop_makeDecisions_exhaustive
18021807
. counterexample ("state'': " ++ show sharedTxState'')
18031808
$ null decisions''
18041809

1810+
1811+
-- | `filterActivePeers` should not change decisions made by `makeDecisions`
1812+
--
1813+
--
1814+
-- This test checks the following properties:
1815+
--
1816+
-- In what follows, the set of active peers is defined as the keys of the map
1817+
-- returned by `filterActivePeers`.
1818+
--
1819+
-- 1. The set of active peers is a superset of peers for which a decision was
1820+
-- made;
1821+
-- 2. The set of active peer which can acknowledge txids is a subset of peers
1822+
-- for which a decision was made;
1823+
-- 3. Decisions made from the results of `filterActivePeers` is the same as from
1824+
-- the original set.
1825+
--
1826+
-- Ad 2. a stronger property is not possible. There can be a peer for which
1827+
-- a decision was not taken but which is an active peer.
1828+
--
1829+
prop_filterActivePeers_not_limitting_decisions
1830+
:: ArbDecisionContexts TxId
1831+
-> Property
1832+
prop_filterActivePeers_not_limitting_decisions
1833+
ArbDecisionContexts {
1834+
arbDecisionPolicy = policy,
1835+
arbSharedContext =
1836+
sharedCtx@SharedDecisionContext { sdcSharedTxState = st }
1837+
}
1838+
=
1839+
counterexample (unlines
1840+
["decisions: " ++ show decisions
1841+
," " ++ show decisionPeers
1842+
,"active decisions: " ++ show decisionsOfActivePeers
1843+
," " ++ show activePeers]) $
1844+
1845+
counterexample ("found non-active peers for which decision can be made: "
1846+
++ show (decisionPeers Set.\\ activePeers)
1847+
)
1848+
(decisionPeers `Set.isSubsetOf` activePeers)
1849+
.&&.
1850+
counterexample ("found an active peer which can acknowledge txids "
1851+
++ "for which decision was not made: "
1852+
++ show (activePeersAck Set.\\ decisionPeers))
1853+
(activePeersAck `Set.isSubsetOf` decisionPeers)
1854+
.&&.
1855+
counterexample "decisions from active peers are not equal to decisions from all peers"
1856+
(decisions === decisionsOfActivePeers)
1857+
where
1858+
activePeersMap = TXS.filterActivePeers policy st
1859+
activePeers = Map.keysSet activePeersMap
1860+
-- peers which are active & can acknowledge txids
1861+
activePeersAck = activePeers
1862+
`Set.intersection`
1863+
Map.keysSet (Map.filter (TXS.hasTxIdsToAcknowledge st) (peerTxStates st))
1864+
(_, decisionsOfActivePeers)
1865+
= TXS.makeDecisions policy sharedCtx activePeersMap
1866+
1867+
(_, decisions) = TXS.makeDecisions policy sharedCtx (peerTxStates st)
1868+
decisionPeers = Map.keysSet decisions
1869+
1870+
1871+
-- TODO: makeDecisions property: all peers which have txid's to ack are
1872+
-- included, this would catch the other bug, and it's important for the system
1873+
-- to run well.
1874+
18051875
--
18061876
-- Auxiliary functions
18071877
--
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
{-# LANGUAGE BangPatterns #-}
2+
{-# LANGUAGE BlockArguments #-}
3+
{-# LANGUAGE LambdaCase #-}
4+
{-# LANGUAGE NamedFieldPuns #-}
5+
{-# LANGUAGE ScopedTypeVariables #-}
6+
7+
module Ouroboros.Network.TxSubmission.Inbound.Registry
8+
( SharedTxStateVar
9+
, newSharedTxStateVar
10+
, PeerTxAPI (..)
11+
, decisionLogicThread
12+
, withPeer
13+
) where
14+
15+
import Control.Concurrent.Class.MonadMVar.Strict
16+
import Control.Concurrent.Class.MonadSTM.Strict
17+
import Control.Monad.Class.MonadThrow
18+
import Control.Monad.Class.MonadTimer.SI
19+
20+
import Data.Foldable (foldl', traverse_)
21+
import Data.Map.Strict (Map)
22+
import Data.Map.Strict qualified as Map
23+
import Data.Maybe (fromMaybe)
24+
import Data.Sequence.Strict (StrictSeq)
25+
import Data.Sequence.Strict qualified as StrictSeq
26+
import Data.Set (Set)
27+
import Data.Set qualified as Set
28+
import Data.Void (Void)
29+
30+
import Ouroboros.Network.DeltaQ (PeerGSV (..))
31+
import Ouroboros.Network.Protocol.TxSubmission2.Type
32+
import Ouroboros.Network.TxSubmission.Inbound.Decision
33+
import Ouroboros.Network.TxSubmission.Inbound.Policy
34+
import Ouroboros.Network.TxSubmission.Inbound.State
35+
import Ouroboros.Network.TxSubmission.Mempool.Reader
36+
37+
-- | Communication channels between `TxSubmission` client mini-protocol and
38+
-- decision logic.
39+
--
40+
newtype TxChannels m peeraddr txid tx = TxChannels {
41+
txChannelMap :: Map peeraddr (StrictMVar m (TxDecision txid tx))
42+
}
43+
44+
type TxChannelsVar m peeraddr txid tx = StrictMVar m (TxChannels m peeraddr txid tx)
45+
46+
-- | API to access `PeerTxState` inside `PeerTxStateVar`.
47+
--
48+
data PeerTxAPI m txid tx = PeerTxAPI {
49+
readTxDecision :: m (TxDecision txid tx),
50+
-- ^ a blocking action which reads `TxDecision`
51+
52+
handleReceivedTxIds :: NumTxIdsToReq
53+
-> StrictSeq txid
54+
-- ^ received txids
55+
-> Map txid SizeInBytes
56+
-- ^ received sizes of advertised tx's
57+
-> m (),
58+
-- ^ handle received txids
59+
60+
handleReceivedTxs :: Set txid
61+
-- ^ requested txids
62+
-> Map txid tx
63+
-- ^ received txs
64+
-> m ()
65+
-- ^ handle received txs
66+
}
67+
68+
69+
-- | A bracket function which registers / de-registers a new peer in
70+
-- `SharedTxStateVar` and `PeerTxStateVar`s, which exposes `PeerTxStateAPI`.
71+
-- `PeerTxStateAPI` is only safe inside the `withPeer` scope.
72+
--
73+
withPeer
74+
:: forall peeraddr txid tx idx m a.
75+
( MonadMask m
76+
, MonadMVar m
77+
, MonadSTM m
78+
, Ord txid
79+
, Ord peeraddr
80+
, Show peeraddr
81+
)
82+
=> TxChannelsVar m peeraddr txid tx
83+
-> SharedTxStateVar m peeraddr txid tx
84+
-> TxSubmissionMempoolReader txid tx idx m
85+
-> peeraddr
86+
-- ^ new peer
87+
-> (PeerTxAPI m txid tx -> m a)
88+
-- ^ callback which gives access to `PeerTxStateAPI`
89+
-> m a
90+
withPeer channelsVar
91+
sharedStateVar
92+
TxSubmissionMempoolReader { mempoolGetSnapshot }
93+
peeraddr io =
94+
bracket
95+
(do -- create a communication channel
96+
!peerTxAPI <-
97+
modifyMVar channelsVar
98+
\ TxChannels { txChannelMap } -> do
99+
chann <- newEmptyMVar
100+
let (chann', txChannelMap') =
101+
Map.alterF (\mbChann ->
102+
let !chann'' = fromMaybe chann mbChann
103+
in (chann'', Just chann''))
104+
peeraddr
105+
txChannelMap
106+
return
107+
( TxChannels { txChannelMap = txChannelMap' }
108+
, PeerTxAPI { readTxDecision = takeMVar chann',
109+
handleReceivedTxIds,
110+
handleReceivedTxs }
111+
)
112+
113+
atomically $ modifyTVar sharedStateVar registerPeer
114+
return peerTxAPI
115+
)
116+
-- the handler is a short blocking operation, thus we need to use
117+
-- `uninterruptibleMask_`
118+
(\_ -> uninterruptibleMask_ do
119+
atomically $ modifyTVar sharedStateVar unregisterPeer
120+
modifyMVar_ channelsVar
121+
\ TxChannels { txChannelMap } ->
122+
return TxChannels { txChannelMap = Map.delete peeraddr txChannelMap }
123+
)
124+
io
125+
where
126+
registerPeer :: SharedTxState peeraddr txid tx
127+
-> SharedTxState peeraddr txid tx
128+
registerPeer st@SharedTxState { peerTxStates } =
129+
st { peerTxStates =
130+
Map.insert
131+
peeraddr
132+
PeerTxState {
133+
availableTxIds = Map.empty,
134+
requestedTxIdsInflight = 0,
135+
requestedTxsInflightSize = 0,
136+
requestedTxsInflight = Set.empty,
137+
unacknowledgedTxIds = StrictSeq.empty,
138+
unknownTxs = Set.empty }
139+
peerTxStates
140+
}
141+
142+
-- TODO: this function needs to be tested!
143+
unregisterPeer :: SharedTxState peeraddr txid tx
144+
-> SharedTxState peeraddr txid tx
145+
unregisterPeer st@SharedTxState { peerTxStates,
146+
bufferedTxs,
147+
referenceCounts } =
148+
st { peerTxStates = peerTxStates',
149+
bufferedTxs = bufferedTxs',
150+
referenceCounts = referenceCounts' }
151+
where
152+
(PeerTxState { unacknowledgedTxIds }, peerTxStates') =
153+
Map.alterF
154+
(\case
155+
Nothing -> error ("TxSubmission.withPeer: invariant violation for peer " ++ show peeraddr)
156+
Just a -> (a, Nothing))
157+
peeraddr
158+
peerTxStates
159+
160+
referenceCounts' =
161+
foldl' (flip $ Map.update
162+
\cnt -> if cnt > 1
163+
then Just $! pred cnt
164+
else Nothing)
165+
referenceCounts
166+
unacknowledgedTxIds
167+
168+
liveSet = Map.keysSet referenceCounts'
169+
170+
bufferedTxs' = bufferedTxs
171+
`Map.restrictKeys`
172+
liveSet
173+
174+
--
175+
-- PeerTxAPI
176+
--
177+
178+
handleReceivedTxIds :: NumTxIdsToReq
179+
-> StrictSeq txid
180+
-> Map txid SizeInBytes
181+
-> m ()
182+
handleReceivedTxIds numTxIdsToReq txidsSeq txidsMap = do
183+
-- TODO: hide this inside `receivedTxIds` so it's run in the same STM
184+
-- transaction.
185+
mempoolSnapshot <- atomically mempoolGetSnapshot
186+
receivedTxIds sharedStateVar
187+
mempoolSnapshot
188+
peeraddr
189+
numTxIdsToReq
190+
txidsSeq
191+
txidsMap
192+
193+
194+
handleReceivedTxs :: Set txid
195+
-- ^ requested txids
196+
-> Map txid tx
197+
-- ^ received txs
198+
-> m ()
199+
handleReceivedTxs txids txs =
200+
collectTxs sharedStateVar peeraddr txids txs
201+
202+
203+
decisionLogicThread
204+
:: forall m peeraddr txid tx.
205+
( MonadDelay m
206+
, MonadMVar m
207+
, MonadSTM m
208+
, Ord peeraddr
209+
, Ord txid
210+
)
211+
=> TxDecisionPolicy
212+
-> StrictTVar m (Map peeraddr PeerGSV)
213+
-> TxChannelsVar m peeraddr txid tx
214+
-> SharedTxStateVar m peeraddr txid tx
215+
-> m Void
216+
decisionLogicThread policy gsvVar txChannelsVar sharedStateVar = go
217+
where
218+
go :: m Void
219+
go = do
220+
-- We rate limit the decision making process, it could overwhelm the CPU
221+
-- if there are too many inbound connections.
222+
threadDelay 0.005 -- 5ms
223+
224+
decisions <- atomically do
225+
sharedCtx <-
226+
SharedDecisionContext
227+
<$> readTVar gsvVar
228+
<*> readTVar sharedStateVar
229+
let activePeers = filterActivePeers policy (sdcSharedTxState sharedCtx)
230+
231+
-- block until at least one peer is active
232+
check (not (Map.null activePeers))
233+
234+
let (sharedState, decisions) = makeDecisions policy sharedCtx activePeers
235+
writeTVar sharedStateVar sharedState
236+
return decisions
237+
TxChannels { txChannelMap } <- readMVar txChannelsVar
238+
traverse_
239+
(\(mvar, d) -> modifyMVar_ mvar (\d' -> pure (d' <> d)))
240+
(Map.intersectionWith (,)
241+
txChannelMap
242+
decisions)
243+
go

0 commit comments

Comments
 (0)