Skip to content

Commit 35adef5

Browse files
committed
Integrate new tx submission protocol
1 parent 144aae0 commit 35adef5

File tree

6 files changed

+113
-20
lines changed

6 files changed

+113
-20
lines changed

ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ library
9999
serialise ^>=0.2,
100100
si-timers ^>=1.5,
101101
strict-stm ^>=1.5,
102+
strict-mvar ^>=1.5,
102103
text,
103104
time,
104105
transformers,

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,13 @@ import Ouroboros.Network.Protocol.TxSubmission2.Codec
118118
import Ouroboros.Network.Protocol.TxSubmission2.Server
119119
import Ouroboros.Network.Protocol.TxSubmission2.Type
120120
import Ouroboros.Network.TxSubmission.Inbound
121+
import Ouroboros.Network.TxSubmission.Inbound.Policy
122+
(TxDecisionPolicy (..))
123+
import Ouroboros.Network.TxSubmission.Inbound.Registry (PeerTxAPI,
124+
withPeer)
125+
import Ouroboros.Network.TxSubmission.Inbound.Server
126+
(EnableNewTxSubmissionProtocol (..), txSubmissionInboundV2)
127+
import Ouroboros.Network.TxSubmission.Inbound.Types (TraceTxLogic)
121128
import Ouroboros.Network.TxSubmission.Mempool.Reader
122129
(mapTxSubmissionMempoolReader)
123130
import Ouroboros.Network.TxSubmission.Outbound
@@ -168,7 +175,13 @@ data Handlers m addr blk = Handlers {
168175
, hTxSubmissionServer
169176
:: NodeToNodeVersion
170177
-> ConnectionId addr
171-
-> TxSubmissionServerPipelined (GenTxId blk) (GenTx blk) m ()
178+
-> Either
179+
(TxSubmissionServerPipelined (GenTxId blk) (GenTx blk) m ())
180+
(PeerTxAPI m (GenTxId blk) (GenTx blk)
181+
-> TxSubmissionServerPipelined (GenTxId blk) (GenTx blk) m ())
182+
-- ^ Either we use the legacy tx submission protocol or the newest one
183+
-- which require PeerTxAPI. This is decided by
184+
-- 'EnableNewTxSubmissionProtocol' flag.
172185

173186
, hKeepAliveClient
174187
:: NodeToNodeVersion
@@ -209,10 +222,12 @@ mkHandlers ::
209222
)
210223
=> NodeKernelArgs m addrNTN addrNTC blk
211224
-> NodeKernel m addrNTN addrNTC blk
225+
-> EnableNewTxSubmissionProtocol
212226
-> Handlers m addrNTN blk
213227
mkHandlers
214228
NodeKernelArgs {chainSyncFutureCheck, chainSyncHistoricityCheck, keepAliveRng, miniProtocolParameters}
215-
NodeKernel {getChainDB, getMempool, getTopLevelConfig, getTracers = tracers, getPeerSharingAPI, getGsmState} =
229+
NodeKernel {getChainDB, getMempool, getTopLevelConfig, getTracers = tracers, getPeerSharingAPI, getGsmState}
230+
enableNewTxSubmissionProtocol =
216231
Handlers {
217232
hChainSyncClient = \peer _isBigLedgerpeer dynEnv ->
218233
CsClient.chainSyncClient
@@ -243,17 +258,32 @@ mkHandlers
243258
, hTxSubmissionClient = \version controlMessageSTM peer ->
244259
txSubmissionOutbound
245260
(contramap (TraceLabelPeer peer) (Node.txOutboundTracer tracers))
246-
(NumTxIdsToAck $ txSubmissionMaxUnacked miniProtocolParameters)
261+
(NumTxIdsToAck $ getNumTxIdsToReq
262+
$ maxUnacknowledgedTxIds
263+
$ txDecisionPolicy
264+
$ miniProtocolParameters)
247265
(mapTxSubmissionMempoolReader txForgetValidated $ getMempoolReader getMempool)
248266
version
249267
controlMessageSTM
250268
, hTxSubmissionServer = \version peer ->
251-
txSubmissionInbound
252-
(contramap (TraceLabelPeer peer) (Node.txInboundTracer tracers))
253-
(NumTxIdsToAck $ txSubmissionMaxUnacked miniProtocolParameters)
254-
(mapTxSubmissionMempoolReader txForgetValidated $ getMempoolReader getMempool)
255-
(getMempoolWriter getMempool)
256-
version
269+
case enableNewTxSubmissionProtocol of
270+
EnableNewTxSubmissionProtocol ->
271+
Right $ \api ->
272+
txSubmissionInboundV2
273+
(contramap (TraceLabelPeer peer) (Node.txInboundTracer tracers))
274+
(getMempoolWriter getMempool)
275+
api
276+
DisableNewTxSubmissionProtocol ->
277+
Left
278+
$ txSubmissionInbound
279+
(contramap (TraceLabelPeer peer) (Node.txInboundTracer tracers))
280+
(NumTxIdsToAck $ getNumTxIdsToReq
281+
$ maxUnacknowledgedTxIds
282+
$ txDecisionPolicy
283+
$ miniProtocolParameters)
284+
(mapTxSubmissionMempoolReader txForgetValidated $ getMempoolReader getMempool)
285+
(getMempoolWriter getMempool)
286+
version
257287
, hKeepAliveClient = \_version -> keepAliveClient (Node.keepAliveClientTracer tracers) keepAliveRng
258288
, hKeepAliveServer = \_version _peer -> keepAliveServer
259289
, hPeerSharingClient = \_version controlMessageSTM _peer -> peerSharingClient controlMessageSTM
@@ -375,6 +405,7 @@ data Tracers' peer blk e f = Tracers {
375405
, tBlockFetchTracer :: f (TraceLabelPeer peer (TraceSendRecv (BlockFetch blk (Point blk))))
376406
, tBlockFetchSerialisedTracer :: f (TraceLabelPeer peer (TraceSendRecv (BlockFetch (Serialised blk) (Point blk))))
377407
, tTxSubmission2Tracer :: f (TraceLabelPeer peer (TraceSendRecv (TxSubmission2 (GenTxId blk) (GenTx blk))))
408+
, tTxLogicTracer :: f (TraceLabelPeer peer (TraceTxLogic peer (GenTxId blk) (GenTx blk)))
378409
}
379410

380411
instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer blk e f) where
@@ -384,6 +415,7 @@ instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer blk e f) where
384415
, tBlockFetchTracer = f tBlockFetchTracer
385416
, tBlockFetchSerialisedTracer = f tBlockFetchSerialisedTracer
386417
, tTxSubmission2Tracer = f tTxSubmission2Tracer
418+
, tTxLogicTracer = f tTxLogicTracer
387419
}
388420
where
389421
f :: forall a. Semigroup a
@@ -399,6 +431,7 @@ nullTracers = Tracers {
399431
, tBlockFetchTracer = nullTracer
400432
, tBlockFetchSerialisedTracer = nullTracer
401433
, tTxSubmission2Tracer = nullTracer
434+
, tTxLogicTracer = nullTracer
402435
}
403436

404437
showTracers :: ( Show blk
@@ -416,6 +449,7 @@ showTracers tr = Tracers {
416449
, tBlockFetchTracer = showTracing tr
417450
, tBlockFetchSerialisedTracer = showTracing tr
418451
, tTxSubmission2Tracer = showTracing tr
452+
, tTxLogicTracer = showTracing tr
419453
}
420454

421455
{-------------------------------------------------------------------------------
@@ -533,7 +567,7 @@ mkApps ::
533567
, ShowProxy blk
534568
, ShowProxy (Header blk)
535569
, ShowProxy (TxId (GenTx blk))
536-
, ShowProxy (GenTx blk)
570+
, ShowProxy (GenTx blk), HasTxId (GenTx blk), LedgerSupportsMempool blk, Show addrNTN
537571
)
538572
=> NodeKernel m addrNTN addrNTC blk -- ^ Needed for bracketing only
539573
-> Tracers m (ConnectionId addrNTN) blk e
@@ -695,13 +729,27 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout lopBucke
695729
-> m ((), Maybe bTX)
696730
aTxSubmission2Server version ResponderContext { rcConnectionId = them } channel = do
697731
labelThisThread "TxSubmissionServer"
698-
runPipelinedPeerWithLimits
699-
(contramap (TraceLabelPeer them) tTxSubmission2Tracer)
700-
(cTxSubmission2Codec (mkCodecs version))
701-
blTxSubmission2
702-
timeLimitsTxSubmission2
703-
channel
704-
(txSubmissionServerPeerPipelined (hTxSubmissionServer version them))
732+
733+
let runServer serverApi =
734+
runPipelinedPeerWithLimits
735+
(contramap (TraceLabelPeer them) tTxSubmission2Tracer)
736+
(cTxSubmission2Codec (mkCodecs version))
737+
blTxSubmission2
738+
timeLimitsTxSubmission2
739+
channel
740+
(txSubmissionServerPeerPipelined serverApi)
741+
742+
case hTxSubmissionServer version them of
743+
Left legacyTxSubmissionServer ->
744+
runServer legacyTxSubmissionServer
745+
Right newTxSubmissionServer ->
746+
withPeer (contramap (TraceLabelPeer them) tTxLogicTracer)
747+
(getTxChannelsVar kernel)
748+
(getSharedTxStateVar kernel)
749+
(mapTxSubmissionMempoolReader txForgetValidated
750+
$ getMempoolReader (getMempool kernel))
751+
them $ \api ->
752+
runServer (newTxSubmissionServer api)
705753

706754
aKeepAliveClient
707755
:: NodeToNodeVersion

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing)
136136
import Ouroboros.Network.PeerSelection.PeerSharing.Codec
137137
(decodeRemoteAddress, encodeRemoteAddress)
138138
import Ouroboros.Network.RethrowPolicy
139+
import Ouroboros.Network.TxSubmission.Inbound.Server
140+
(EnableNewTxSubmissionProtocol)
139141
import qualified SafeWildCards
140142
import System.Exit (ExitCode (..))
141143
import System.FilePath ((</>))
@@ -200,6 +202,9 @@ data RunNodeArgs m addrNTN addrNTC blk (p2p :: Diffusion.P2P) = RunNodeArgs {
200202
, rnGetUseBootstrapPeers :: STM m UseBootstrapPeers
201203

202204
, rnGenesisConfig :: GenesisConfig
205+
206+
-- | Enable or disable the new tx submission protocol
207+
, rnEnableNewTxSubmissionProtocol :: EnableNewTxSubmissionProtocol
203208
}
204209

205210

@@ -400,6 +405,7 @@ runWith :: forall m addrNTN addrNTC versionDataNTN versionDataNTC blk p2p.
400405
, Hashable addrNTN -- the constraint comes from `initNodeKernel`
401406
, NetworkIO m
402407
, NetworkAddr addrNTN
408+
, Show addrNTN
403409
)
404410
=> RunNodeArgs m addrNTN addrNTC blk p2p
405411
-> (NodeToNodeVersion -> addrNTN -> CBOR.Encoding)
@@ -567,7 +573,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} =
567573
(gcChainSyncLoPBucketConfig llrnGenesisConfig)
568574
(gcCSJConfig llrnGenesisConfig)
569575
(reportMetric Diffusion.peerMetricsConfiguration peerMetrics)
570-
(NTN.mkHandlers nodeKernelArgs nodeKernel)
576+
(NTN.mkHandlers nodeKernelArgs nodeKernel rnEnableNewTxSubmissionProtocol)
571577

572578
mkNodeToClientApps
573579
:: NodeKernelArgs m addrNTN (ConnectionId addrNTC) blk

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node/Tracers.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import Ouroboros.Network.BlockFetch (FetchDecision,
4242
import Ouroboros.Network.KeepAlive (TraceKeepAliveClient)
4343
import Ouroboros.Network.TxSubmission.Inbound
4444
(TraceTxSubmissionInbound)
45+
import Ouroboros.Network.TxSubmission.Inbound.Types (TraceTxLogic)
4546
import Ouroboros.Network.TxSubmission.Outbound
4647
(TraceTxSubmissionOutbound)
4748

@@ -59,6 +60,7 @@ data Tracers' remotePeer localPeer blk f = Tracers
5960
, txInboundTracer :: f (TraceLabelPeer remotePeer (TraceTxSubmissionInbound (GenTxId blk) (GenTx blk)))
6061
, txOutboundTracer :: f (TraceLabelPeer remotePeer (TraceTxSubmissionOutbound (GenTxId blk) (GenTx blk)))
6162
, localTxSubmissionServerTracer :: f (TraceLocalTxSubmissionServerEvent blk)
63+
, txLogicTracer :: f (TraceTxLogic remotePeer (GenTxId blk) (GenTx blk))
6264
, mempoolTracer :: f (TraceEventMempool blk)
6365
, forgeTracer :: f (TraceLabelCreds (TraceForgeEvent blk))
6466
, blockchainTimeTracer :: f (TraceBlockchainTimeEvent UTCTime)
@@ -82,6 +84,7 @@ instance (forall a. Semigroup (f a))
8284
, txInboundTracer = f txInboundTracer
8385
, txOutboundTracer = f txOutboundTracer
8486
, localTxSubmissionServerTracer = f localTxSubmissionServerTracer
87+
, txLogicTracer = f txLogicTracer
8588
, mempoolTracer = f mempoolTracer
8689
, forgeTracer = f forgeTracer
8790
, blockchainTimeTracer = f blockchainTimeTracer
@@ -113,6 +116,7 @@ nullTracers = Tracers
113116
, txInboundTracer = nullTracer
114117
, txOutboundTracer = nullTracer
115118
, localTxSubmissionServerTracer = nullTracer
119+
, txLogicTracer = nullTracer
116120
, mempoolTracer = nullTracer
117121
, forgeTracer = nullTracer
118122
, blockchainTimeTracer = nullTracer
@@ -147,6 +151,7 @@ showTracers tr = Tracers
147151
, txInboundTracer = showTracing tr
148152
, txOutboundTracer = showTracing tr
149153
, localTxSubmissionServerTracer = showTracing tr
154+
, txLogicTracer = showTracing tr
150155
, mempoolTracer = showTracing tr
151156
, forgeTracer = showTracing tr
152157
, blockchainTimeTracer = showTracing tr

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ module Ouroboros.Consensus.NodeKernel (
2727
) where
2828

2929

30+
import qualified Control.Concurrent.Class.MonadMVar.Strict as StrictSTM
3031
import qualified Control.Concurrent.Class.MonadSTM as LazySTM
3132
import qualified Control.Concurrent.Class.MonadSTM.Strict as StrictSTM
3233
import Control.DeepSeq (force)
@@ -42,6 +43,7 @@ import Data.Functor ((<&>))
4243
import Data.Hashable (Hashable)
4344
import Data.List.NonEmpty (NonEmpty)
4445
import Data.Map.Strict (Map)
46+
import qualified Data.Map.Strict as Map
4547
import Data.Maybe (isJust, mapMaybe)
4648
import Data.Proxy
4749
import qualified Data.Text as Text
@@ -94,6 +96,7 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment,
9496
import qualified Ouroboros.Network.AnchoredFragment as AF
9597
import Ouroboros.Network.Block (castTip, tipFromHeader)
9698
import Ouroboros.Network.BlockFetch
99+
import Ouroboros.Network.BlockFetch.ClientRegistry (readPeerGSVs)
97100
import Ouroboros.Network.Diffusion (PublicPeerSelectionState)
98101
import Ouroboros.Network.NodeToNode (ConnectionId,
99102
MiniProtocolParameters (..))
@@ -110,6 +113,9 @@ import Ouroboros.Network.SizeInBytes
110113
import Ouroboros.Network.TxSubmission.Inbound
111114
(TxSubmissionMempoolWriter)
112115
import qualified Ouroboros.Network.TxSubmission.Inbound as Inbound
116+
import Ouroboros.Network.TxSubmission.Inbound.Registry
117+
(SharedTxStateVar, TxChannels (..), TxChannelsVar,
118+
decisionLogicThread, newSharedTxStateVar)
113119
import Ouroboros.Network.TxSubmission.Mempool.Reader
114120
(TxSubmissionMempoolReader)
115121
import qualified Ouroboros.Network.TxSubmission.Mempool.Reader as MempoolReader
@@ -162,6 +168,14 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {
162168

163169
, getOutboundConnectionsState
164170
:: StrictTVar m OutboundConnectionsState
171+
172+
-- | Communication channels between `TxSubmission` client mini-protocol and
173+
-- decision logic.
174+
, getTxChannelsVar :: TxChannelsVar m (ConnectionId addrNTN) (GenTxId blk) (GenTx blk)
175+
176+
-- | Shared state of all `TxSubmission` clients.
177+
--
178+
, getSharedTxStateVar :: SharedTxStateVar m (ConnectionId addrNTN) (GenTxId blk) (GenTx blk)
165179
}
166180

167181
-- | Arguments required when initializing a node
@@ -207,6 +221,7 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
207221
, peerSharingRng
208222
, publicPeerSelectionStateVar
209223
, genesisArgs
224+
, miniProtocolParameters
210225
} = do
211226
-- using a lazy 'TVar', 'BlockForging' does not have a 'NoThunks' instance.
212227
blockForgingVar :: LazySTM.TMVar m [BlockForging m blk] <- LazySTM.newTMVarIO []
@@ -279,6 +294,9 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
279294
ps_POLICY_PEER_SHARE_STICKY_TIME
280295
ps_POLICY_PEER_SHARE_MAX_PEERS
281296

297+
txChannelsVar <- StrictSTM.newMVar (TxChannels Map.empty)
298+
sharedTxStateVar <- newSharedTxStateVar
299+
282300
case gnkaGetLoEFragment genesisArgs of
283301
LoEAndGDDDisabled -> pure ()
284302
LoEAndGDDEnabled varGetLoEFragment -> do
@@ -311,6 +329,14 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
311329
fetchClientRegistry
312330
blockFetchConfiguration
313331

332+
void $ forkLinkedThread registry "NodeKernel.decisionLogicThread" $
333+
decisionLogicThread
334+
(txLogicTracer tracers)
335+
(txDecisionPolicy miniProtocolParameters)
336+
(readPeerGSVs fetchClientRegistry)
337+
txChannelsVar
338+
sharedTxStateVar
339+
314340
return NodeKernel
315341
{ getChainDB = chainDB
316342
, getMempool = mempool
@@ -325,6 +351,8 @@ initNodeKernel args@NodeKernelArgs { registry, cfg, tracers
325351
, getPeerSharingAPI = peerSharingAPI
326352
, getOutboundConnectionsState
327353
= varOutboundConnectionsState
354+
, getTxChannelsVar = txChannelsVar
355+
, getSharedTxStateVar = sharedTxStateVar
328356
}
329357
where
330358
blockForgingController :: InternalState m remotePeer localPeer blk

ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ import Ouroboros.Network.Protocol.KeepAlive.Type
122122
import Ouroboros.Network.Protocol.Limits (waitForever)
123123
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing)
124124
import Ouroboros.Network.Protocol.TxSubmission2.Type
125+
import Ouroboros.Network.TxSubmission.Inbound.Policy
126+
(TxDecisionPolicy (..), defaultTxDecisionPolicy)
127+
import Ouroboros.Network.TxSubmission.Inbound.Server
128+
(EnableNewTxSubmissionProtocol (..))
125129
import qualified System.FS.Sim.MockFS as Mock
126130
import System.FS.Sim.MockFS (MockFS)
127131
import System.Random (mkStdGen, split)
@@ -1015,7 +1019,8 @@ runThreadNetwork systemTime ThreadNetworkArgs
10151019
chainSyncPipeliningHighMark = 4,
10161020
chainSyncPipeliningLowMark = 2,
10171021
blockFetchPipeliningMax = 10,
1018-
txSubmissionMaxUnacked = 1000 -- TODO ?
1022+
txDecisionPolicy =
1023+
defaultTxDecisionPolicy { maxUnacknowledgedTxIds = 1000 } -- TODO ?
10191024
}
10201025
, blockFetchConfiguration = BlockFetchConfiguration {
10211026
bfcMaxConcurrencyBulkSync = 1
@@ -1071,7 +1076,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
10711076
-- The purpose of this test is not testing protocols, so
10721077
-- returning constant empty list is fine if we have thorough
10731078
-- tests about the peer sharing protocol itself.
1074-
(NTN.mkHandlers nodeKernelArgs nodeKernel)
1079+
(NTN.mkHandlers nodeKernelArgs nodeKernel DisableNewTxSubmissionProtocol)
10751080

10761081
-- In practice, a robust wallet/user can persistently add a transaction
10771082
-- until it appears on the chain. This thread adds robustness for the

0 commit comments

Comments
 (0)