Skip to content

Commit 4445f53

Browse files
committed
WIP: tx-submission: registry
1 parent 152fa3c commit 4445f53

File tree

3 files changed

+261
-0
lines changed

3 files changed

+261
-0
lines changed

ouroboros-network/ouroboros-network.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ library
6767
Ouroboros.Network.TxSubmission.Inbound.Decision
6868
Ouroboros.Network.TxSubmission.Inbound.Policy
6969
Ouroboros.Network.TxSubmission.Inbound.State
70+
Ouroboros.Network.TxSubmission.Inbound.Registry
7071
Ouroboros.Network.TxSubmission.Mempool.Reader
7172
Ouroboros.Network.TxSubmission.Outbound
7273
other-modules: Ouroboros.Network.Diffusion.Common
@@ -137,6 +138,7 @@ library
137138
io-classes-mtl ^>=0.1,
138139
network-mux,
139140
si-timers,
141+
strict-mvar,
140142
ouroboros-network-api ^>=0.7.2,
141143
ouroboros-network-framework ^>=0.13.1,
142144
ouroboros-network-protocols ^>=0.8,

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission.hs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ tests = testGroup "Ouroboros.Network.TxSubmission"
128128
, testProperty "acknowledged" prop_makeDecisions_acknowledged
129129
, testProperty "exhaustive" prop_makeDecisions_exhaustive
130130
]
131+
, testGroup "Registry"
132+
[ testGroup "filterActivePeers"
133+
[ testProperty "not limiting decisions" prop_filterActivePeers_not_limitting_decisions
134+
]
135+
]
131136
]
132137

133138

@@ -1856,6 +1861,71 @@ prop_makeDecisions_exhaustive
18561861
. counterexample ("state'': " ++ show sharedTxState'')
18571862
$ null decisions''
18581863

1864+
1865+
-- | `filterActivePeers` should not change decisions made by `makeDecisions`
1866+
--
1867+
--
1868+
-- This test checks the following properties:
1869+
--
1870+
-- In what follows, the set of active peers is defined as the keys of the map
1871+
-- returned by `filterActivePeers`.
1872+
--
1873+
-- 1. The set of active peers is a superset of peers for which a decision was
1874+
-- made;
1875+
-- 2. The set of active peer which can acknowledge txids is a subset of peers
1876+
-- for which a decision was made;
1877+
-- 3. Decisions made from the results of `filterActivePeers` is the same as from
1878+
-- the original set.
1879+
--
1880+
-- Ad 2. a stronger property is not possible. There can be a peer for which
1881+
-- a decision was not taken but which is an active peer.
1882+
--
1883+
prop_filterActivePeers_not_limitting_decisions
1884+
:: ArbDecisionContexts TxId
1885+
-> Property
1886+
prop_filterActivePeers_not_limitting_decisions
1887+
ArbDecisionContexts {
1888+
arbDecisionPolicy = policy,
1889+
arbSharedContext =
1890+
sharedCtx@SharedDecisionContext { sdcSharedTxState = st }
1891+
}
1892+
=
1893+
counterexample (unlines
1894+
["decisions: " ++ show decisions
1895+
," " ++ show decisionPeers
1896+
,"active decisions: " ++ show decisionsOfActivePeers
1897+
," " ++ show activePeers]) $
1898+
1899+
counterexample ("found non-active peers for which decision can be made: "
1900+
++ show (decisionPeers Set.\\ activePeers)
1901+
)
1902+
(decisionPeers `Set.isSubsetOf` activePeers)
1903+
.&&.
1904+
counterexample ("found an active peer which can acknowledge txids "
1905+
++ "for which decision was not made: "
1906+
++ show (activePeersAck Set.\\ decisionPeers))
1907+
(activePeersAck `Set.isSubsetOf` decisionPeers)
1908+
.&&.
1909+
counterexample "decisions from active peers are not equal to decisions from all peers"
1910+
(decisions === decisionsOfActivePeers)
1911+
where
1912+
activePeersMap = TXS.filterActivePeers policy st
1913+
activePeers = Map.keysSet activePeersMap
1914+
-- peers which are active & can acknowledge txids
1915+
activePeersAck = activePeers
1916+
`Set.intersection`
1917+
Map.keysSet (Map.filter (TXS.hasTxIdsToAcknowledge st) (peerTxStates st))
1918+
decisionsOfActivePeers
1919+
= snd $ TXS.makeDecisions policy sharedCtx activePeersMap
1920+
1921+
decisions = snd $ TXS.makeDecisions policy sharedCtx (peerTxStates st)
1922+
decisionPeers = Map.keysSet decisions
1923+
1924+
1925+
-- TODO: makeDecisions property: all peers which have txid's to ack are
1926+
-- included, this would catch the other bug, and it's important for the system
1927+
-- to run well.
1928+
18591929
--
18601930
-- Auxiliary functions
18611931
--
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
{-# LANGUAGE BlockArguments #-}
2+
{-# LANGUAGE BangPatterns #-}
3+
{-# LANGUAGE LambdaCase #-}
4+
{-# LANGUAGE NamedFieldPuns #-}
5+
{-# LANGUAGE ScopedTypeVariables #-}
6+
7+
module Ouroboros.Network.TxSubmission.Inbound.Registry
8+
( SharedTxStateVar
9+
, newSharedTxStateVar
10+
, decisionLogicThread
11+
, withPeer
12+
) where
13+
14+
import Control.Concurrent.Class.MonadMVar.Strict
15+
import Control.Concurrent.Class.MonadSTM.Strict
16+
import Control.Monad.Class.MonadThrow
17+
import Control.Monad.Class.MonadTimer.SI
18+
19+
import Data.Foldable (foldl', traverse_)
20+
import Data.Map.Strict (Map)
21+
import Data.Map.Strict qualified as Map
22+
import Data.Maybe (fromMaybe)
23+
import Data.Set qualified as Set
24+
import Data.Sequence.Strict qualified as StrictSeq
25+
import Data.Void (Void)
26+
27+
import Ouroboros.Network.DeltaQ (PeerGSV (..))
28+
import Ouroboros.Network.TxSubmission.Inbound.Decision
29+
import Ouroboros.Network.TxSubmission.Inbound.Policy
30+
import Ouroboros.Network.TxSubmission.Inbound.State
31+
32+
-- | Communication channels between `TxSubmission` client mini-protocol and
33+
-- decision logic.
34+
--
35+
newtype TxChannels m peeraddr txid = TxChannels {
36+
txChannelMap :: Map peeraddr (StrictMVar m (TxDecision txid))
37+
}
38+
39+
type TxChannelsVar m peeraddr txid = StrictMVar m (TxChannels m peeraddr txid)
40+
41+
-- | API to access `PeerTxState` inside `PeerTxStateVar`.
42+
--
43+
newtype PeerTxAPI m txid = PeerTxAPI {
44+
-- A blocking action
45+
readTxDecision :: m (TxDecision txid)
46+
}
47+
48+
49+
-- | A bracket function which registers / de-registers a new peer in
50+
-- `SharedTxStateVar` and `PeerTxStateVar`s, which exposes `PeerTxStateAPI`.
51+
-- `PeerTxStateAPI` is only safe inside the `withPeer` scope.
52+
--
53+
withPeer
54+
:: forall peeraddr txid tx m a.
55+
( MonadMask m
56+
, MonadMVar m
57+
, MonadSTM m
58+
, Ord txid
59+
, Ord peeraddr
60+
, Show peeraddr
61+
)
62+
=> TxChannelsVar m peeraddr txid
63+
-> SharedTxStateVar m peeraddr txid tx
64+
-> peeraddr
65+
-- ^ new peer
66+
-> (PeerTxAPI m txid -> m a)
67+
-- ^ callback which gives access to `PeerTxStateAPI`
68+
-> m a
69+
withPeer channelsVar sharedStateVar peeraddr io =
70+
bracket
71+
(do -- create a communication channel
72+
!peerTxAPI <-
73+
modifyMVar channelsVar
74+
\ TxChannels { txChannelMap } -> do
75+
chann <- newEmptyMVar
76+
let (chann', txChannelMap') =
77+
Map.alterF (\mbChann ->
78+
let !chann'' = fromMaybe chann mbChann
79+
in (chann'', Just chann''))
80+
peeraddr
81+
txChannelMap
82+
return
83+
( TxChannels { txChannelMap = txChannelMap' }
84+
, PeerTxAPI { readTxDecision = takeMVar chann' }
85+
)
86+
87+
atomically $ modifyTVar sharedStateVar registerPeer
88+
return peerTxAPI
89+
)
90+
-- the handler is a short blocking operation, thus we need to use
91+
-- `uninterruptibleMask_`
92+
(\_ -> uninterruptibleMask_ do
93+
atomically $ modifyTVar sharedStateVar unregisterPeer
94+
modifyMVar_ channelsVar
95+
\ TxChannels { txChannelMap } ->
96+
return TxChannels { txChannelMap = Map.delete peeraddr txChannelMap }
97+
)
98+
io
99+
where
100+
registerPeer :: SharedTxState peeraddr txid tx
101+
-> SharedTxState peeraddr txid tx
102+
registerPeer st@SharedTxState { peerTxStates } =
103+
st { peerTxStates =
104+
Map.insert
105+
peeraddr
106+
PeerTxState {
107+
availableTxIds = Map.empty,
108+
requestedTxIdsInflight = 0,
109+
requestedTxsInflightSize = 0,
110+
requestedTxsInflight = Set.empty,
111+
unacknowledgedTxIds = StrictSeq.empty,
112+
unknownTxs = Set.empty }
113+
peerTxStates
114+
}
115+
116+
-- TODO: this function needs to be tested!
117+
unregisterPeer :: SharedTxState peeraddr txid tx
118+
-> SharedTxState peeraddr txid tx
119+
unregisterPeer st@SharedTxState { peerTxStates,
120+
bufferedTxs,
121+
referenceCounts } =
122+
st { peerTxStates = peerTxStates',
123+
bufferedTxs = bufferedTxs',
124+
referenceCounts = referenceCounts' }
125+
where
126+
(PeerTxState { unacknowledgedTxIds }, peerTxStates') =
127+
Map.alterF
128+
(\case
129+
Nothing -> error ("TxSubmission.withPeer: invariant violation for peer " ++ show peeraddr)
130+
Just a -> (a, Nothing))
131+
peeraddr
132+
peerTxStates
133+
134+
referenceCounts' =
135+
foldl' (flip $ Map.update
136+
\cnt -> if cnt > 1
137+
then Just $! pred cnt
138+
else Nothing)
139+
referenceCounts
140+
unacknowledgedTxIds
141+
142+
liveSet = Map.keysSet referenceCounts'
143+
144+
bufferedTxs' = bufferedTxs
145+
`Map.restrictKeys`
146+
liveSet
147+
148+
149+
decisionLogicThread
150+
:: forall m peeraddr txid tx.
151+
( MonadDelay m
152+
, MonadMVar m
153+
, MonadSTM m
154+
, Ord peeraddr
155+
, Ord txid
156+
)
157+
=> TxDecisionPolicy
158+
-> StrictTVar m (Map peeraddr PeerGSV)
159+
-> TxChannelsVar m peeraddr txid
160+
-> SharedTxStateVar m peeraddr txid tx
161+
-> m Void
162+
decisionLogicThread policy gsvVar txChannelsVar sharedStateVar = go
163+
where
164+
go :: m Void
165+
go = do
166+
-- We rate limit the decision making process, it could overwhelm the CPU
167+
-- if there are too many inbound connections.
168+
threadDelay 0.005 -- 5ms
169+
170+
decisions <- atomically do
171+
sharedCtx <-
172+
SharedDecisionContext
173+
<$> readTVar gsvVar
174+
<*> readTVar sharedStateVar
175+
let activePeers = filterActivePeers policy (sdcSharedTxState sharedCtx)
176+
177+
-- block until at least one peer is active
178+
check (not (Map.null activePeers))
179+
180+
let (sharedState, decisions) = makeDecisions policy sharedCtx activePeers
181+
writeTVar sharedStateVar sharedState
182+
return decisions
183+
TxChannels { txChannelMap } <- readMVar txChannelsVar
184+
traverse_
185+
(\(mvar, d) -> modifyMVar_ mvar (\d' -> return (d' <> d)))
186+
(Map.intersectionWith (,)
187+
txChannelMap
188+
decisions)
189+
go

0 commit comments

Comments
 (0)