diff --git a/ouroboros-network/src/Ouroboros/Network/Protocol/ChainSync/Test.hs b/ouroboros-network/src/Ouroboros/Network/Protocol/ChainSync/Test.hs index 034d3829f6c..f9cfc44e10c 100644 --- a/ouroboros-network/src/Ouroboros/Network/Protocol/ChainSync/Test.hs +++ b/ouroboros-network/src/Ouroboros/Network/Protocol/ChainSync/Test.hs @@ -599,7 +599,7 @@ propChainSyncDemoPipelinedMinBufferedIO -> Property propChainSyncDemoPipelinedMinBufferedIO cps (Positive n) (Positive m) = ioProperty $ do - (clientChan, serverChan) <- createConnectedBufferedChannels (fromIntegral omin) + (clientChan, serverChan) <- createConnectedBoundedChannels (fromIntegral omin) chainSyncDemoPipelined clientChan serverChan (ChainSyncExamples.chainSyncClientPipelinedMin omax) diff --git a/typed-protocols/src/Network/TypedProtocol/Channel.hs b/typed-protocols/src/Network/TypedProtocol/Channel.hs index 36993161dc0..351b2408d84 100644 --- a/typed-protocols/src/Network/TypedProtocol/Channel.hs +++ b/typed-protocols/src/Network/TypedProtocol/Channel.hs @@ -1,6 +1,7 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE BangPatterns #-} module Network.TypedProtocol.Channel ( Channel (..) @@ -10,21 +11,25 @@ module Network.TypedProtocol.Channel , mvarsAsChannel , handlesAsChannel , createConnectedChannels - , createConnectedBufferedChannels + , createConnectedBoundedChannels + , createConnectedDelayChannels , createPipelineTestChannels , channelEffect , delayChannel , loggingChannel ) where -import Control.Monad ((>=>)) +import Control.Monad ((>=>), when) import Control.Monad.Class.MonadSay +import Control.Monad.Class.MonadTime import Control.Monad.Class.MonadTimer import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as LBS import Data.ByteString.Lazy.Internal (smallChunkSize) -import Data.Time.Clock (DiffTime) +import Data.Time.Clock (DiffTime, + diffTimeToPicoseconds, picosecondsToDiffTime) import Numeric.Natural +import System.Random (RandomGen(..), Random(..)) import qualified System.IO as IO ( Handle, hFlush, hIsEOF ) @@ -132,7 +137,7 @@ createConnectedChannels = do mvarsAsChannel bufferA bufferB) --- | Create a pair of channels that are connected via N-place buffers. +-- | Create a pair of channels that are connected via a bounded queue. -- -- This variant /blocks/ when 'send' would exceed the maximum buffer size. -- Use this variant when you want the environment rather than the 'Peer' to @@ -140,9 +145,9 @@ createConnectedChannels = do -- -- This is primarily useful for testing protocols. -- -createConnectedBufferedChannels :: MonadSTM m - => Natural -> m (Channel m a, Channel m a) -createConnectedBufferedChannels sz = do +createConnectedBoundedChannels :: MonadSTM m + => Natural -> m (Channel m a, Channel m a) +createConnectedBoundedChannels sz = do -- Create two TBQueues to act as the channel buffers (one for each -- direction) and use them to make both ends of a bidirectional channel bufferA <- atomically $ newTBQueue sz @@ -158,6 +163,85 @@ createConnectedBufferedChannels sz = do recv = atomically (Just <$> readTBQueue bufferRead) +-- | Create a pair of channels that are connected via buffers with delays. +-- +-- This is a crude approximation of asynchronous network connections where +-- there is delay across the connection, and a limit on in-flight data. +-- +-- The buffer size is bounded. It /blocks/ when 'send' would exceed the maximum +-- buffer size. The size per channel element is provided by a function, so this +-- can be size in bytes or a fixed size (e.g. 1 per element), so the max size +-- should be interpreted accordingly. +-- +-- The delays are modeled in a simplistic "GSV" style: +-- +-- * The G is the minimum latency for a 0 sized message. +-- * The S is the time per message size unit. +-- * The V is the variance in latency, which is assumed to be uniform in the +-- range @0..v@. +-- +-- The sender is delayed by S. The receiver is delayed until the arrival time +-- which is G + S + V. +-- +-- Note that this implementation does not handle the delays correctly if there +-- are multiple writers or multiple readers. +-- +-- This is primarily useful for testing protocols. +-- +createConnectedDelayChannels + :: forall m prng a. + (MonadSTM m, MonadTime m, MonadTimer m, RandomGen prng) + => (a -> Int) -- ^ Data size measure + -> Int -- ^ Max size of data in-flight + -> (DiffTime, DiffTime, DiffTime) -- ^ GSV + -> prng -- ^ PRNG for sampling V + -> m (Channel m a, Channel m a) +createConnectedDelayChannels size maxsize (g, s, v) prng0 = do + let (prngA, prngB) = split prng0 + -- For each direction, create: + -- - a TQueue for the messages + -- - a TVar Int to track the size in-flight + -- - a TVar prng for the sampling from v + bufferA <- atomically $ (,,) <$> newTQueue <*> newTVar 0 <*> newTVar prngA + bufferB <- atomically $ (,,) <$> newTQueue <*> newTVar 0 <*> newTVar prngB + + return (asChannel bufferB bufferA, + asChannel bufferA bufferB) + where + asChannel (rBuffer, rSize, _rPRNG) + (wBuffer, wSize, wPRNG) = + Channel{send, recv} + where + send x = do + atomically $ do + sz <- readTVar wSize + let !sz' = sz + size x + check (sz' <= maxsize) + writeTVar wSize sz' + threadDelay (s * fromIntegral (size x)) + now <- getMonotonicTime + atomically $ do + prng <- readTVar wPRNG + let (vsample, prng') = randomR (0, diffTimeToPicoseconds v) prng + delay :: DiffTime + delay = g + picosecondsToDiffTime vsample + arrive :: Time m + arrive = delay `addTime` now + writeTVar wPRNG prng' + writeTQueue wBuffer (arrive, x) + + recv = do + (arrive, x) <- atomically $ readTQueue rBuffer + now <- getMonotonicTime + let delay = arrive `diffTime` now + when (delay > 0) (threadDelay delay) + atomically $ do + sz <- readTVar rSize + let !sz' = sz - size x + writeTVar rSize sz' + return (Just x) + + -- | Create a pair of channels that are connected via N-place buffers. -- -- This variant /fails/ when 'send' would exceed the maximum buffer size. diff --git a/typed-protocols/test/ChannelTests.hs b/typed-protocols/test/ChannelTests.hs new file mode 100644 index 00000000000..40d53387666 --- /dev/null +++ b/typed-protocols/test/ChannelTests.hs @@ -0,0 +1,135 @@ +{-# LANGUAGE FlexibleContexts #-} + +module ChannelTests (tests) where + +import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadTime +import Control.Monad.IOSim + +import Network.TypedProtocol.Channel + +import Data.Time.Clock (diffTimeToPicoseconds, picosecondsToDiffTime) +import System.Random + +import Test.QuickCheck +import Test.Tasty (TestTree, testGroup) +import Test.Tasty.QuickCheck (testProperty) + + +tests :: TestTree +tests = + testGroup "Channel" + [ testProperty "createConnectedDelayChannels" prop_createConnectedDelayChannels + ] + + +prop_createConnectedDelayChannels + :: Positive Int + -> (Positive Int, Positive Int, Positive Int) + -> Int + -> [Int] -> Property +prop_createConnectedDelayChannels + (Positive maxsize) (Positive g_ms, Positive s_ms, Positive v_ms) + seed xs = + + expectedDelayChannelTimes maxsize (g, s, v) prng xs + === actualDelayChannelTimes maxsize (g, s, v) prng xs + where + prng = mkStdGen seed + + g = millisecondsToDiffTime g_ms + s = millisecondsToDiffTime s_ms + v = millisecondsToDiffTime v_ms + + millisecondsToDiffTime :: Int -> DiffTime + millisecondsToDiffTime = (/1e3) . fromIntegral + +actualDelayChannelTimes + :: RandomGen g + => Int + -> (DiffTime, DiffTime, DiffTime) + -> g + -> [b] + -> [((VTime, b), (VTime, b))] +actualDelayChannelTimes maxsize gsv prng xs0 = + runSimOrThrow $ do + (chanA, chanB) <- createConnectedBufferedChannels + (const 1) maxsize gsv prng + + sa <- async (sender chanA xs0) + ra <- async (receiver chanB [] (length xs0)) + uncurry zip <$> waitBoth sa ra + where + sender chan xs = + sequence + [ do send chan x + now <- getMonotonicTime + return (now,x) + | x <- xs ] + + receiver _ xs 0 = return (reverse xs) + receiver chan xs n = do + Just x <- recv chan + now <- getMonotonicTime + receiver chan ((now,x):xs) (n-1) + + +expectedDelayChannelTimes + :: RandomGen g + => Int + -> (DiffTime, DiffTime, DiffTime) + -> g + -> [b] + -> [((VTime, b), (VTime, b))] +expectedDelayChannelTimes maxsize (g, s, v) prng0 xs0 = + let (prngA, _prngB) = split prng0 + vsamples :: [DiffTime] + vsamples = map picosecondsToDiffTime + (randomRs (0, diffTimeToPicoseconds v) prngA) + in go (VTime 0) (VTime 0) (queue []) vsamples xs0 + where + + go :: VTime -> VTime -> Q VTime -> [DiffTime] -> [b] -> [((VTime, b), (VTime, b))] + go _ _ _ _ [] = [] + + go now maxarrive arrivals vsamples (x:xs) + | queuelen arrivals == maxsize + , Just (arrival, arrivals') <- dequeue arrivals + = to (max now arrival) maxarrive arrivals' vsamples x xs + + | otherwise + = to now maxarrive arrivals vsamples x xs + + to now maxarrive arrivals (vsample:vsamples) x xs = + ((depart, x), (arrive, x)) + : go depart arrive (enqueue arrivals arrive) vsamples xs + where + depart = s `addTime` now + arrive = max maxarrive ((g + vsample) `addTime` depart) + -- cannot have the next message arrive before the last previous arrival + + to _ _ _ [] _ _ = error "expectedDelayChannelTimes: randomRs is infinite" + + + +---------------- +-- Queue +-- + +data Q a = Q [a] [a] + +queue :: [a] -> Q a +queue xs = Q xs [] + +enqueue :: Q a -> a -> Q a +enqueue (Q front back) x = Q front (x : back) + +dequeue :: Q a -> Maybe (a, Q a) +dequeue (Q (x:xs) back) = Just (x, Q xs back) +dequeue (Q [] back) = case reverse back of + x:xs -> Just (x, Q xs []) + [] -> Nothing + +queuelen :: Q a -> Int +queuelen (Q front back) = length front + length back + diff --git a/typed-protocols/test/Main.hs b/typed-protocols/test/Main.hs index 950738d6904..5a2cddf5587 100644 --- a/typed-protocols/test/Main.hs +++ b/typed-protocols/test/Main.hs @@ -4,6 +4,7 @@ import Test.Tasty import qualified Network.TypedProtocol.PingPong.Tests as PingPong import qualified Network.TypedProtocol.ReqResp.Tests as ReqResp +import qualified ChannelTests main :: IO () main = defaultMain tests @@ -13,5 +14,6 @@ tests = testGroup "typed-protocols" [ PingPong.tests , ReqResp.tests + , ChannelTests.tests ] diff --git a/typed-protocols/typed-protocols.cabal b/typed-protocols/typed-protocols.cabal index c152542778c..7323169e056 100644 --- a/typed-protocols/typed-protocols.cabal +++ b/typed-protocols/typed-protocols.cabal @@ -50,7 +50,8 @@ library io-sim-classes, bytestring, contra-tracer, - time + time, + random hs-source-dirs: src default-language: Haskell2010 @@ -80,6 +81,7 @@ test-suite test-protocols , Network.TypedProtocol.ReqResp.Server , Network.TypedProtocol.ReqResp.Tests , Network.TypedProtocol.ReqResp.Type + , ChannelTests build-depends: base , bytestring , contra-tracer @@ -89,6 +91,7 @@ test-suite test-protocols , tasty , tasty-quickcheck , time + , random default-language: Haskell2010 ghc-options: -rtsopts -Wall