Skip to content

Commit 3aaa2b8

Browse files
committed
tx-submission: inbound peer using tx-submission decision logic
1 parent bc724f9 commit 3aaa2b8

File tree

3 files changed

+198
-1
lines changed

3 files changed

+198
-1
lines changed

ouroboros-network/ouroboros-network.cabal

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ library
6464
Ouroboros.Network.PeerSelection.RootPeersDNS.PublicRootPeers
6565
Ouroboros.Network.PeerSharing
6666
Ouroboros.Network.TxSubmission.Inbound
67+
Ouroboros.Network.TxSubmission.Inbound.Server
6768
Ouroboros.Network.TxSubmission.Inbound.Decision
6869
Ouroboros.Network.TxSubmission.Inbound.Policy
6970
Ouroboros.Network.TxSubmission.Inbound.State

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs

+12-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
{-# OPTIONS_GHC -Wno-partial-fields #-}
1010

11+
-- | Legacy `tx-submission` inbound peer.
12+
--
1113
module Ouroboros.Network.TxSubmission.Inbound
1214
( txSubmissionInbound
1315
, TxSubmissionMempoolWriter (..)
@@ -41,6 +43,7 @@ import Network.TypedProtocol.Pipelined (N, Nat (..), natToInt)
4143
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
4244
import Ouroboros.Network.Protocol.TxSubmission2.Server
4345
import Ouroboros.Network.Protocol.TxSubmission2.Type
46+
import Ouroboros.Network.TxSubmission.Inbound.Decision (TxDecision)
4447
import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..),
4548
TxSubmissionMempoolReader (..))
4649

@@ -81,9 +84,17 @@ data TraceTxSubmissionInbound txid tx =
8184
-- | Just processed transaction pass/fail breakdown.
8285
| TraceTxSubmissionProcessed ProcessedTxCount
8386
-- | Server received 'MsgDone'
84-
| TraceTxInboundTerminated
8587
| TraceTxInboundCanRequestMoreTxs Int
8688
| TraceTxInboundCannotRequestMoreTxs Int
89+
90+
--
91+
-- messages emitted by the new implementation of the server in
92+
-- "Ouroboros.Network.TxSubmission.Inbound.Server"; some of them are also
93+
-- used in this module.
94+
--
95+
96+
| TraceTxInboundTerminated
97+
| TraceTxInboundDecision (TxDecision txid tx)
8798
deriving (Eq, Show)
8899

89100
data TxSubmissionProtocolError =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
{-# LANGUAGE BangPatterns #-}
2+
{-# LANGUAGE DataKinds #-}
3+
{-# LANGUAGE GADTs #-}
4+
{-# LANGUAGE KindSignatures #-}
5+
{-# LANGUAGE LambdaCase #-}
6+
{-# LANGUAGE NamedFieldPuns #-}
7+
{-# LANGUAGE ScopedTypeVariables #-}
8+
9+
module Ouroboros.Network.TxSubmission.Inbound.Server where
10+
11+
import Data.List.NonEmpty qualified as NonEmpty
12+
import Data.Map.Strict qualified as Map
13+
import Data.Sequence.Strict qualified as StrictSeq
14+
import Data.Set qualified as Set
15+
16+
import Control.Concurrent.Class.MonadSTM.Strict
17+
import Control.Exception (assert)
18+
import Control.Monad.Class.MonadThrow
19+
import Control.Tracer (Tracer, traceWith)
20+
21+
import Network.TypedProtocol.Pipelined
22+
23+
import Control.Monad (unless)
24+
import Ouroboros.Network.Protocol.TxSubmission2.Server
25+
import Ouroboros.Network.TxSubmission.Inbound (TraceTxSubmissionInbound (..),
26+
TxSubmissionMempoolWriter (..), TxSubmissionProtocolError (..))
27+
import Ouroboros.Network.TxSubmission.Inbound.Decision (TxDecision (..))
28+
import Ouroboros.Network.TxSubmission.Inbound.Registry (PeerTxAPI (..))
29+
30+
31+
-- | A tx-submission outbound side (server, sic!).
32+
--
33+
-- The server blocks on receiving `TxDecision` from the decision logic. If
34+
-- there are tx's to download it pipelines two requests: first for tx's second
35+
-- for txid's. If there are no tx's to download, it either sends a blocking or
36+
-- non-blocking request for txid's.
37+
--
38+
txSubmissionInboundV2
39+
:: forall txid tx idx m.
40+
( MonadSTM m
41+
, MonadThrow m
42+
, Ord txid
43+
)
44+
=> Tracer m (TraceTxSubmissionInbound txid tx)
45+
-> TxSubmissionMempoolWriter txid tx idx m
46+
-> PeerTxAPI m txid tx
47+
-> TxSubmissionServerPipelined txid tx m ()
48+
txSubmissionInboundV2
49+
tracer
50+
TxSubmissionMempoolWriter {
51+
txId,
52+
mempoolAddTxs
53+
}
54+
PeerTxAPI {
55+
readTxDecision,
56+
handleReceivedTxIds,
57+
handleReceivedTxs
58+
}
59+
=
60+
TxSubmissionServerPipelined serverIdle
61+
where
62+
serverIdle
63+
:: m (ServerStIdle Z txid tx m ())
64+
serverIdle = do
65+
-- Block on next decision.
66+
txd@TxDecision { txdTxsToRequest = txsToReq, txdTxsToMempool = txs }
67+
<- readTxDecision
68+
traceWith tracer (TraceTxInboundDecision txd)
69+
txidsAccepted <- mempoolAddTxs txs
70+
let !collected = length txidsAccepted
71+
traceWith tracer $
72+
TraceTxSubmissionCollected collected
73+
-- TODO:
74+
-- We can update the state so that other `tx-submission` servers will
75+
-- not try to add these txs to the mempool.
76+
if Set.null txsToReq
77+
then serverReqTxIds Zero txd
78+
else serverReqTxs txd
79+
80+
81+
-- Pipelined request of txs
82+
serverReqTxs :: TxDecision txid tx
83+
-> m (ServerStIdle Z txid tx m ())
84+
serverReqTxs txd@TxDecision { txdTxsToRequest = txsToReq } =
85+
pure $ SendMsgRequestTxsPipelined (Set.toList txsToReq)
86+
(serverReqTxIds (Succ Zero) txd)
87+
88+
89+
serverReqTxIds :: forall (n :: N).
90+
Nat n
91+
-> TxDecision txid tx
92+
-> m (ServerStIdle n txid tx m ())
93+
serverReqTxIds
94+
n TxDecision { txdTxIdsToAcknowledge = 0,
95+
txdTxIdsToRequest = 0 }
96+
=
97+
case n of
98+
Zero -> serverIdle
99+
Succ _ -> handleReplies n
100+
101+
serverReqTxIds
102+
-- if there are no unacknowledged txids, the protocol requires sending
103+
-- a blocking `MsgRequestTxIds` request. This is important, as otherwise
104+
-- the client side wouldn't have a chance to terminate the
105+
-- mini-protocol.
106+
Zero TxDecision { txdTxIdsToAcknowledge = txIdsToAck,
107+
txdPipelineTxIds = False,
108+
txdTxIdsToRequest = txIdsToReq
109+
}
110+
=
111+
pure $ SendMsgRequestTxIdsBlocking
112+
txIdsToAck txIdsToReq
113+
-- Our result if the client terminates the protocol
114+
(traceWith tracer TraceTxInboundTerminated)
115+
(\txids -> do
116+
let txids' = NonEmpty.toList txids
117+
txidsSeq = StrictSeq.fromList $ fst <$> txids'
118+
txidsMap = Map.fromList txids'
119+
unless (StrictSeq.length txidsSeq <= fromIntegral txIdsToReq) $
120+
throwIO ProtocolErrorTxIdsNotRequested
121+
handleReceivedTxIds txIdsToReq txidsSeq txidsMap
122+
serverIdle
123+
)
124+
125+
serverReqTxIds
126+
n@Zero TxDecision { txdTxIdsToAcknowledge = txIdsToAck,
127+
txdPipelineTxIds = True,
128+
txdTxIdsToRequest = txIdsToReq
129+
}
130+
=
131+
pure $ SendMsgRequestTxIdsPipelined
132+
txIdsToAck txIdsToReq
133+
(handleReplies (Succ n))
134+
135+
serverReqTxIds
136+
n@Succ{} TxDecision { txdTxIdsToAcknowledge = txIdsToAck,
137+
txdPipelineTxIds,
138+
txdTxIdsToRequest = txIdsToReq
139+
}
140+
=
141+
-- it is impossible that we have had `tx`'s to request (Succ{} - is an
142+
-- evidence for that), but no unacknowledged `txid`s.
143+
assert txdPipelineTxIds $
144+
pure $ SendMsgRequestTxIdsPipelined
145+
txIdsToAck txIdsToReq
146+
(handleReplies (Succ n))
147+
148+
149+
handleReplies :: forall (n :: N).
150+
Nat (S n)
151+
-> m (ServerStIdle (S n) txid tx m ())
152+
handleReplies (Succ n'@Succ{}) =
153+
pure $ CollectPipelined
154+
Nothing
155+
(handleReply (handleReplies n'))
156+
157+
handleReplies (Succ Zero) =
158+
pure $ CollectPipelined
159+
Nothing
160+
(handleReply serverIdle)
161+
162+
handleReply :: forall (n :: N).
163+
m (ServerStIdle n txid tx m ())
164+
-- continuation
165+
-> Collect txid tx
166+
-> m (ServerStIdle n txid tx m ())
167+
handleReply k = \case
168+
CollectTxIds txIdsToReq txids -> do
169+
let txidsSeq = StrictSeq.fromList $ fst <$> txids
170+
txidsMap = Map.fromList txids
171+
unless (StrictSeq.length txidsSeq <= fromIntegral txIdsToReq) $
172+
throwIO ProtocolErrorTxIdsNotRequested
173+
handleReceivedTxIds txIdsToReq txidsSeq txidsMap
174+
k
175+
CollectTxs txids txs -> do
176+
let requested = Set.fromList txids
177+
received = Map.fromList [ (txId tx, tx) | tx <- txs ]
178+
179+
unless (Map.keysSet received `Set.isSubsetOf` requested) $
180+
throwIO ProtocolErrorTxNotRequested
181+
-- TODO: all sizes of txs which were announced earlier with
182+
-- `MsgReplyTxIds` must be verified.
183+
184+
handleReceivedTxs requested received
185+
k

0 commit comments

Comments
 (0)