Skip to content

Commit 819d532

Browse files
committed
tx-submission: generalised CollectPipelined
Allow monadic action when trying to pipeline more messages, while collecting responses.
1 parent 086fcc4 commit 819d532

File tree

5 files changed

+16
-12
lines changed

5 files changed

+16
-12
lines changed

ouroboros-network-protocols/src/Ouroboros/Network/Protocol/TxSubmission2/Server.hs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ data ServerStIdle (n :: N) txid tx m a where
8383
-- | Collect a pipelined result.
8484
--
8585
CollectPipelined
86-
:: Maybe (ServerStIdle (S n) txid tx m a)
87-
-> (Collect txid tx -> m (ServerStIdle n txid tx m a))
88-
-> ServerStIdle (S n) txid tx m a
86+
:: Maybe (m (ServerStIdle (S n) txid tx m a))
87+
-> (Collect txid tx -> m ( ServerStIdle n txid tx m a))
88+
-> ServerStIdle (S n) txid tx m a
8989

9090

9191
-- | Transform a 'TxSubmissionServerPipelined' into a 'PeerPipelined'.
@@ -145,6 +145,5 @@ txSubmissionServerPeerPipelined (TxSubmissionServerPipelined server) =
145145

146146
go (CollectPipelined mNone collect) =
147147
SenderCollect
148-
(fmap go mNone)
149-
(SenderEffect . fmap go . collect)
150-
148+
(SenderEffect . fmap go <$> mNone)
149+
(SenderEffect . fmap go . collect)

ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/TxSubmission2/Direct.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ directPipelined (TxSubmissionServerPipelined mserver)
5454
SendMsgReplyTxs txs client' <- recvMsgRequestTxs txids
5555
directSender (enqueue (CollectTxs txids txs) q) server' client'
5656

57-
directSender q (CollectPipelined (Just server') _) client =
57+
directSender q (CollectPipelined (Just server) _) client = do
58+
server' <- server
5859
directSender q server' client
5960

6061
directSender (ConsQ c q) (CollectPipelined _ collect) client = do

ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/TxSubmission2/Examples.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ txSubmissionServer tracer txId maxUnacked maxTxIdsToRequest maxTxToRequest =
272272
--
273273
| canRequestMoreTxs st
274274
= CollectPipelined
275-
(Just (serverReqTxs accum (Succ n) st))
275+
(Just (pure $ serverReqTxs accum (Succ n) st))
276276
(handleReply accum n st)
277277

278278
-- In this case there is nothing else to do so we block until we

ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/TxSubmission.hs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import Prelude hiding (seq)
1818

1919
import NoThunks.Class
2020

21+
import Control.Concurrent.Class.MonadMVar (MonadMVar)
2122
import Control.Concurrent.Class.MonadSTM
2223
import Control.Exception (SomeException (..), assert)
2324
import Control.Monad.Class.MonadAsync
@@ -265,6 +266,7 @@ txSubmissionSimulation
265266
, MonadDelay m
266267
, MonadFork m
267268
, MonadMask m
269+
, MonadMVar m
268270
, MonadSay m
269271
, MonadST m
270272
, MonadSTM m
@@ -279,13 +281,14 @@ txSubmissionSimulation
279281

280282
, txid ~ Int
281283
)
282-
=> NumTxIdsToAck
284+
=> Tracer m (String, TraceSendRecv (TxSubmission2 txid (Tx txid)))
285+
-> NumTxIdsToAck
283286
-> [Tx txid]
284287
-> ControlMessageSTM m
285288
-> Maybe DiffTime
286289
-> Maybe DiffTime
287290
-> m ([Tx txid], [Tx txid])
288-
txSubmissionSimulation maxUnacked outboundTxs
291+
txSubmissionSimulation tracer maxUnacked outboundTxs
289292
controlMessageSTM
290293
inboundDelay outboundDelay = do
291294

@@ -294,7 +297,7 @@ txSubmissionSimulation maxUnacked outboundTxs
294297
(outboundChannel, inboundChannel) <- createConnectedChannels
295298
outboundAsync <-
296299
async $ runPeerWithLimits
297-
(("OUTBOUND",) `contramap` verboseTracer)
300+
(("OUTBOUND",) `contramap` tracer)
298301
txSubmissionCodec2
299302
(byteLimitsTxSubmission2 (fromIntegral . BSL.length))
300303
timeLimitsTxSubmission2
@@ -361,6 +364,7 @@ prop_txSubmission (Positive maxUnacked) (NonEmpty outboundTxs) delay =
361364
* realToFrac (length outboundTxs `div` 4))
362365
atomically (writeTVar controlMessageVar Terminate)
363366
txSubmissionSimulation
367+
verboseTracer
364368
(NumTxIdsToAck maxUnacked) outboundTxs
365369
(readTVar controlMessageVar)
366370
mbDelayTime mbDelayTime

ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ txSubmissionInbound tracer (NumTxIdsToAck maxUnacked) mpReader mpWriter _version
252252
--
253253
traceWith tracer (TraceTxInboundCanRequestMoreTxs (natToInt n))
254254
pure $ CollectPipelined
255-
(Just (continueWithState (serverReqTxs (Succ n')) st))
255+
(Just (pure $ continueWithState (serverReqTxs (Succ n')) st))
256256
(collectAndContinueWithState (handleReply n') st)
257257

258258
else do

0 commit comments

Comments
 (0)