Skip to content

Bolt12/coot/tx submission #4928

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Quiet (Quiet (..))
newtype SizeInBytes = SizeInBytes { getSizeInBytes :: Word32 }
deriving (Eq, Ord)
deriving Show via Quiet SizeInBytes
deriving Bounded via Word32
deriving Enum via Word32
deriving Num via Word32
deriving Real via Word32
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ data ServerStIdle (n :: N) txid tx m a where
-- | Collect a pipelined result.
--
CollectPipelined
:: Maybe (ServerStIdle (S n) txid tx m a)
-> (Collect txid tx -> m (ServerStIdle n txid tx m a))
-> ServerStIdle (S n) txid tx m a
:: Maybe (m (ServerStIdle (S n) txid tx m a))
-> (Collect txid tx -> m ( ServerStIdle n txid tx m a))
-> ServerStIdle (S n) txid tx m a


-- | Transform a 'TxSubmissionServerPipelined' into a 'PeerPipelined'.
Expand Down Expand Up @@ -145,6 +145,5 @@ txSubmissionServerPeerPipelined (TxSubmissionServerPipelined server) =

go (CollectPipelined mNone collect) =
SenderCollect
(fmap go mNone)
(SenderEffect . fmap go . collect)

(SenderEffect . fmap go <$> mNone)
(SenderEffect . fmap go . collect)
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ directPipelined (TxSubmissionServerPipelined mserver)
SendMsgReplyTxs txs client' <- recvMsgRequestTxs txids
directSender (enqueue (CollectTxs txids txs) q) server' client'

directSender q (CollectPipelined (Just server') _) client =
directSender q (CollectPipelined (Just server) _) client = do
server' <- server
directSender q server' client

directSender (ConsQ c q) (CollectPipelined _ collect) client = do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ txSubmissionServer tracer txId maxUnacked maxTxIdsToRequest maxTxToRequest =
--
| canRequestMoreTxs st
= CollectPipelined
(Just (serverReqTxs accum (Succ n) st))
(Just (pure $ serverReqTxs accum (Succ n) st))
(handleReply accum n st)

-- In this case there is nothing else to do so we block until we
Expand Down
12 changes: 12 additions & 0 deletions ouroboros-network/ouroboros-network.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ library
Ouroboros.Network.PeerSelection.RootPeersDNS.PublicRootPeers
Ouroboros.Network.PeerSharing
Ouroboros.Network.TxSubmission.Inbound
Ouroboros.Network.TxSubmission.Inbound.Decision
Ouroboros.Network.TxSubmission.Inbound.Policy
Ouroboros.Network.TxSubmission.Inbound.Registry
Ouroboros.Network.TxSubmission.Inbound.Server
Ouroboros.Network.TxSubmission.Inbound.State
Ouroboros.Network.TxSubmission.Inbound.Types
Ouroboros.Network.TxSubmission.Mempool.Reader
Ouroboros.Network.TxSubmission.Outbound
other-modules: Ouroboros.Network.Diffusion.Common
Expand Down Expand Up @@ -139,6 +145,7 @@ library
io-classes-mtl ^>=0.1,
network-mux,
si-timers,
strict-mvar,
ouroboros-network-api ^>=0.9.0,
ouroboros-network-framework ^>=0.13.2.2,
ouroboros-network-protocols ^>=0.10,
Expand Down Expand Up @@ -198,6 +205,7 @@ library sim-tests-lib

cardano-prelude,
cardano-slotting,
cardano-strict-containers,
contra-tracer,
nothunks,

Expand All @@ -215,6 +223,7 @@ library sim-tests-lib
ouroboros-network-testing ^>= 0.7.0,
si-timers,
strict-stm,
strict-mvar,
typed-protocols,
typed-protocols-examples,
exposed-modules: Ouroboros.Network.BlockFetch.Examples
Expand Down Expand Up @@ -246,6 +255,9 @@ library sim-tests-lib
Test.Ouroboros.Network.Testnet
Test.Ouroboros.Network.Testnet.Simulation.Node
Test.Ouroboros.Network.TxSubmission
Test.Ouroboros.Network.TxSubmission.Common
Test.Ouroboros.Network.TxSubmission.TxSubmissionV1
Test.Ouroboros.Network.TxSubmission.TxSubmissionV2
Test.Ouroboros.Network.Version

ghc-options: -Wall
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ import Ouroboros.Network.Testing.Data.Script (Script (..), stepScriptSTM')

import Simulation.Network.Snocket (AddressType (..), FD)

import Ouroboros.Network.BlockFetch.ClientRegistry (readPeerGSVs)
import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers)
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
(LedgerPeersConsensusInterface,
Expand All @@ -105,6 +106,11 @@ import Ouroboros.Network.PeerSelection.RelayAccessPoint (DomainAccessPoint,
import Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions (DNSLookupType)
import Ouroboros.Network.PeerSelection.State.LocalRootPeers (HotValency,
WarmValency)
import Ouroboros.Network.TxSubmission.Inbound.Policy (TxDecisionPolicy)
import Ouroboros.Network.TxSubmission.Inbound.Registry (DebugTxLogic,
decisionLogicThread)
import Ouroboros.Network.TxSubmission.Inbound.State (DebugSharedTxState)
import Ouroboros.Network.TxSubmission.Inbound.Types (TraceTxSubmissionInbound)
import Test.Ouroboros.Network.Diffusion.Node.ChainDB (addBlock,
getBlockPointSet)
import Test.Ouroboros.Network.Diffusion.Node.MiniProtocols qualified as Node
Expand All @@ -114,6 +120,7 @@ import Test.Ouroboros.Network.Diffusion.Node.NodeKernel (NodeKernel (..),
import Test.Ouroboros.Network.Diffusion.Node.NodeKernel qualified as Node
import Test.Ouroboros.Network.PeerSelection.RootPeersDNS (DNSLookupDelay,
DNSTimeout, mockDNSActions)
import Test.Ouroboros.Network.TxSubmission.Common (Tx)


data Interfaces m = Interfaces
Expand Down Expand Up @@ -158,6 +165,8 @@ data Arguments m = Arguments
, aDNSTimeoutScript :: Script DNSTimeout
, aDNSLookupDelayScript :: Script DNSLookupDelay
, aDebugTracer :: Tracer m String
, aTxDecisionPolicy :: TxDecisionPolicy
, aTxs :: [Tx Int]
}

-- The 'mockDNSActions' is not using \/ specifying 'resolverException', thus we
Expand Down Expand Up @@ -195,9 +204,12 @@ run :: forall resolver m.
NtCAddr NtCVersion NtCVersionData
ResolverException m
-> Tracer m (TraceLabelPeer NtNAddr (TraceFetchClientState BlockHeader))
-> Tracer m (TraceTxSubmissionInbound Int (Tx Int))
-> Tracer m (DebugSharedTxState NtNAddr Int (Tx Int))
-> Tracer m (DebugTxLogic NtNAddr Int (Tx Int))
-> m Void
run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
Node.withNodeKernelThread blockGeneratorArgs
run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch tracerTxSubmissionInbound tracerTxSubmissionDebug tracerTxLogic =
Node.withNodeKernelThread blockGeneratorArgs (aTxs na)
$ \ nodeKernel nodeKernelThread -> do
dnsTimeoutScriptVar <- newTVarIO (aDNSTimeoutScript na)
dnsLookupDelayScriptVar <- newTVarIO (aDNSLookupDelayScript na)
Expand Down Expand Up @@ -270,7 +282,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
, Diff.P2P.daPeerSharingRegistry = nkPeerSharingRegistry nodeKernel
}

let apps = Node.applications (aDebugTracer na) nodeKernel Node.cborCodecs limits appArgs blockHeader
let apps = Node.applications (aDebugTracer na) tracerTxSubmissionInbound tracerTxSubmissionDebug nodeKernel Node.cborCodecs limits appArgs blockHeader

withAsync
(Diff.P2P.runM interfaces
Expand All @@ -280,11 +292,19 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
(mkArgsExtra useBootstrapPeersScriptVar) apps appsExtra)
$ \ diffusionThread ->
withAsync (blockFetch nodeKernel) $ \blockFetchLogicThread ->
wait diffusionThread
<> wait blockFetchLogicThread
<> wait nodeKernelThread

withAsync (decisionLogicThread
tracerTxLogic
(aTxDecisionPolicy na)
(readPeerGSVs (nkFetchClientRegistry nodeKernel))
(nkTxChannelsVar nodeKernel)
(nkSharedTxStateVar nodeKernel)) $ \decLogicThread ->
wait diffusionThread
<> wait blockFetchLogicThread
<> wait nodeKernelThread
<> wait decLogicThread
where
blockFetch :: NodeKernel BlockHeader Block s m
blockFetch :: NodeKernel BlockHeader Block s txid m
-> m Void
blockFetch nodeKernel = do
blockFetchLogic
Expand All @@ -300,7 +320,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
bfcSalt = 0
})

blockFetchPolicy :: NodeKernel BlockHeader Block s m
blockFetchPolicy :: NodeKernel BlockHeader Block s txid m
-> BlockFetchConsensusInterface NtNAddr BlockHeader Block m
blockFetchPolicy nodeKernel =
BlockFetchConsensusInterface {
Expand Down Expand Up @@ -422,6 +442,7 @@ run blockGeneratorArgs limits ni na tracersExtra tracerBlockFetch =
, Node.aaOwnPeerSharing = aOwnPeerSharing na
, Node.aaUpdateOutboundConnectionsState =
iUpdateOutboundConnectionsState ni
, Node.aaTxDecisionPolicy = aTxDecisionPolicy na
}

--- Utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ import Pipes qualified

import Ouroboros.Network.NodeToNode (blockFetchMiniProtocolNum,
chainSyncMiniProtocolNum, keepAliveMiniProtocolNum,
peerSharingMiniProtocolNum)
peerSharingMiniProtocolNum, txSubmissionMiniProtocolNum)
import Ouroboros.Network.PeerSelection.LedgerPeers
import Ouroboros.Network.PeerSelection.LocalRootPeers (OutboundConnectionsState)
import Ouroboros.Network.PeerSelection.PeerSharing qualified as PSTypes
Expand All @@ -96,7 +96,21 @@ import Ouroboros.Network.Protocol.PeerSharing.Client (peerSharingClientPeer)
import Ouroboros.Network.Protocol.PeerSharing.Codec (codecPeerSharing)
import Ouroboros.Network.Protocol.PeerSharing.Server (peerSharingServerPeer)
import Ouroboros.Network.Protocol.PeerSharing.Type (PeerSharing)
import Ouroboros.Network.Protocol.TxSubmission2.Client (txSubmissionClientPeer)
import Ouroboros.Network.Protocol.TxSubmission2.Server
(txSubmissionServerPeerPipelined)
import Ouroboros.Network.Protocol.TxSubmission2.Type (NumTxIdsToAck (..),
NumTxIdsToReq (..), TxSubmission2)
import Ouroboros.Network.TxSubmission.Inbound.Policy (TxDecisionPolicy (..))
import Ouroboros.Network.TxSubmission.Inbound.Registry (SharedTxStateVar,
TxChannelsVar, withPeer)
import Ouroboros.Network.TxSubmission.Inbound.Server (txSubmissionInboundV2)
import Ouroboros.Network.TxSubmission.Inbound.State (DebugSharedTxState)
import Ouroboros.Network.TxSubmission.Inbound.Types (TraceTxSubmissionInbound)
import Ouroboros.Network.TxSubmission.Outbound (txSubmissionOutbound)
import Test.Ouroboros.Network.Diffusion.Node.NodeKernel
import Test.Ouroboros.Network.TxSubmission.Common (Mempool, Tx,
getMempoolReader, getMempoolWriter, txSubmissionCodec2)


-- | Protocol codecs.
Expand All @@ -112,6 +126,8 @@ data Codecs addr header block m = Codecs
CBOR.DeserialiseFailure m ByteString
, peerSharingCodec :: Codec (PeerSharing addr)
CBOR.DeserialiseFailure m ByteString
, txSubmissionCodec :: Codec (TxSubmission2 Int (Tx Int))
CBOR.DeserialiseFailure m ByteString
}

cborCodecs :: MonadST m => Codecs NtNAddr BlockHeader Block m
Expand All @@ -125,6 +141,7 @@ cborCodecs = Codecs
, keepAliveCodec = codecKeepAlive_v2
, pingPongCodec = codecPingPong
, peerSharingCodec = codecPeerSharing encodeNtNAddr decodeNtNAddr
, txSubmissionCodec = txSubmissionCodec2
}


Expand Down Expand Up @@ -178,6 +195,14 @@ data LimitsAndTimeouts header block = LimitsAndTimeouts
:: ProtocolTimeLimits (PeerSharing NtNAddr)
, peerSharingSizeLimits
:: ProtocolSizeLimits (PeerSharing NtNAddr) ByteString

-- tx submission
, txSubmissionLimits
:: MiniProtocolLimits
, txSubmissionTimeLimits
:: ProtocolTimeLimits (TxSubmission2 Int (Tx Int))
, txSubmissionSizeLimits
:: ProtocolSizeLimits (TxSubmission2 Int (Tx Int)) ByteString
}


Expand Down Expand Up @@ -208,6 +233,8 @@ data AppArgs header block m = AppArgs
:: PSTypes.PeerSharing
, aaUpdateOutboundConnectionsState
:: OutboundConnectionsState -> STM m ()

, aaTxDecisionPolicy :: TxDecisionPolicy
}


Expand All @@ -233,18 +260,21 @@ applications :: forall block header s m.
, RandomGen s
)
=> Tracer m String
-> NodeKernel header block s m
-> Tracer m (TraceTxSubmissionInbound Int (Tx Int))
-> Tracer m (DebugSharedTxState NtNAddr Int (Tx Int))
-> NodeKernel header block s Int m
-> Codecs NtNAddr header block m
-> LimitsAndTimeouts header block
-> AppArgs header block m
-> (block -> header)
-> Diff.Applications NtNAddr NtNVersion NtNVersionData
NtCAddr NtCVersion NtCVersionData
m ()
applications debugTracer nodeKernel
applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug nodeKernel
Codecs { chainSyncCodec, blockFetchCodec
, keepAliveCodec, pingPongCodec
, peerSharingCodec
, txSubmissionCodec
}
limits
AppArgs
Expand All @@ -257,6 +287,7 @@ applications debugTracer nodeKernel
, aaChainSyncEarlyExit
, aaOwnPeerSharing
, aaUpdateOutboundConnectionsState
, aaTxDecisionPolicy
}
toHeader =
Diff.Applications
Expand Down Expand Up @@ -316,6 +347,17 @@ applications debugTracer nodeKernel
blockFetchInitiator
blockFetchResponder
}

, MiniProtocol {
miniProtocolNum = txSubmissionMiniProtocolNum,
miniProtocolLimits = txSubmissionLimits limits,
miniProtocolRun =
InitiatorAndResponderProtocol
(txSubmissionInitiator aaTxDecisionPolicy (nkMempool nodeKernel))
(txSubmissionResponder (nkMempool nodeKernel)
(nkTxChannelsVar nodeKernel)
(nkSharedTxStateVar nodeKernel))
}
]
, withWarm = WithWarm
[ MiniProtocol
Expand Down Expand Up @@ -600,6 +642,61 @@ applications debugTracer nodeKernel
$ peerSharingServerPeer
$ peerSharingServer psAPI

txSubmissionInitiator
:: TxDecisionPolicy
-> Mempool m Int
-> MiniProtocolCb (ExpandedInitiatorContext NtNAddr m) ByteString m ()
txSubmissionInitiator txDecisionPolicy mempool =
MiniProtocolCb $
\ ExpandedInitiatorContext {
eicConnectionId = connId,
eicControlMessage = controlMessageSTM
}
channel
-> do
let client = txSubmissionOutbound
((show . (connId,)) `contramap` debugTracer)
(NumTxIdsToAck $ getNumTxIdsToReq
$ maxUnacknowledgedTxIds
$ txDecisionPolicy)
(getMempoolReader mempool)
maxBound
controlMessageSTM
labelThisThread "TxSubmissionClient"
runPeerWithLimits
((show . (connId,)) `contramap` debugTracer)
txSubmissionCodec
(txSubmissionSizeLimits limits)
(txSubmissionTimeLimits limits)
channel
(txSubmissionClientPeer client)

txSubmissionResponder
:: Mempool m Int
-> TxChannelsVar m NtNAddr Int (Tx Int)
-> SharedTxStateVar m NtNAddr Int (Tx Int)
-> MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
txSubmissionResponder mempool txChannelsVar sharedTxStateVar =
MiniProtocolCb $
\ ResponderContext { rcConnectionId = connId@ConnectionId { remoteAddress = them }} channel
-> do
withPeer txSubmissionInboundDebug
txChannelsVar
sharedTxStateVar
(getMempoolReader mempool)
them $ \api -> do
let server = txSubmissionInboundV2
txSubmissionInboundTracer
(getMempoolWriter mempool)
api
labelThisThread "TxSubmissionServer"
runPipelinedPeerWithLimits
((show . (connId,)) `contramap` debugTracer)
txSubmissionCodec
(txSubmissionSizeLimits limits)
(txSubmissionTimeLimits limits)
channel
(txSubmissionServerPeerPipelined server)

--
-- Orphaned Instances
Expand Down
Loading
Loading