Skip to content

Add createConnectedBufferedChannels with simple delay model #1141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ propChainSyncDemoPipelinedMinBufferedIO
-> Property
propChainSyncDemoPipelinedMinBufferedIO cps (Positive n) (Positive m) =
ioProperty $ do
(clientChan, serverChan) <- createConnectedBufferedChannels (fromIntegral omin)
(clientChan, serverChan) <- createConnectedBoundedChannels (fromIntegral omin)
chainSyncDemoPipelined
clientChan serverChan
(ChainSyncExamples.chainSyncClientPipelinedMin omax)
Expand Down
98 changes: 91 additions & 7 deletions typed-protocols/src/Network/TypedProtocol/Channel.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-}

module Network.TypedProtocol.Channel
( Channel (..)
Expand All @@ -10,21 +11,25 @@ module Network.TypedProtocol.Channel
, mvarsAsChannel
, handlesAsChannel
, createConnectedChannels
, createConnectedBufferedChannels
, createConnectedBoundedChannels
, createConnectedDelayChannels
, createPipelineTestChannels
, channelEffect
, delayChannel
, loggingChannel
) where

import Control.Monad ((>=>))
import Control.Monad ((>=>), when)
import Control.Monad.Class.MonadSay
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import Data.ByteString.Lazy.Internal (smallChunkSize)
import Data.Time.Clock (DiffTime)
import Data.Time.Clock (DiffTime,
diffTimeToPicoseconds, picosecondsToDiffTime)
import Numeric.Natural
import System.Random (RandomGen(..), Random(..))

import qualified System.IO as IO
( Handle, hFlush, hIsEOF )
Expand Down Expand Up @@ -132,17 +137,17 @@ createConnectedChannels = do
mvarsAsChannel bufferA bufferB)


-- | Create a pair of channels that are connected via N-place buffers.
-- | Create a pair of channels that are connected via a bounded queue.
--
-- This variant /blocks/ when 'send' would exceed the maximum buffer size.
-- Use this variant when you want the environment rather than the 'Peer' to
-- limit the pipelining.
--
-- This is primarily useful for testing protocols.
--
createConnectedBufferedChannels :: MonadSTM m
=> Natural -> m (Channel m a, Channel m a)
createConnectedBufferedChannels sz = do
createConnectedBoundedChannels :: MonadSTM m
=> Natural -> m (Channel m a, Channel m a)
createConnectedBoundedChannels sz = do
-- Create two TBQueues to act as the channel buffers (one for each
-- direction) and use them to make both ends of a bidirectional channel
bufferA <- atomically $ newTBQueue sz
Expand All @@ -158,6 +163,85 @@ createConnectedBufferedChannels sz = do
recv = atomically (Just <$> readTBQueue bufferRead)


-- | Create a pair of channels that are connected via buffers with delays.
--
-- This is a crude approximation of asynchronous network connections where
-- there is delay across the connection, and a limit on in-flight data.
--
-- The buffer size is bounded. It /blocks/ when 'send' would exceed the maximum
-- buffer size. The size per channel element is provided by a function, so this
-- can be size in bytes or a fixed size (e.g. 1 per element), so the max size
-- should be interpreted accordingly.
--
-- The delays are modeled in a simplistic "GSV" style:
--
-- * The G is the minimum latency for a 0 sized message.
-- * The S is the time per message size unit.
-- * The V is the variance in latency, which is assumed to be uniform in the
-- range @0..v@.
--
-- The sender is delayed by S. The receiver is delayed until the arrival time
-- which is G + S + V.
Comment on lines +180 to +184
Copy link
Contributor

@nfrisby nfrisby Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not certain, but those three sentences together feel somewhat incorrect. Something like "the receiver is delayed until either G + S + V, or by something more than that because the channel had to preserve order instead of respecting that particular scheduled arrival time."

This is the same thing I mentioned in the test code.

--
-- Note that this implementation does not handle the delays correctly if there
-- are multiple writers or multiple readers.
--
-- This is primarily useful for testing protocols.
--
createConnectedDelayChannels
:: forall m prng a.
(MonadSTM m, MonadTime m, MonadTimer m, RandomGen prng)
=> (a -> Int) -- ^ Data size measure
-> Int -- ^ Max size of data in-flight
-> (DiffTime, DiffTime, DiffTime) -- ^ GSV
-> prng -- ^ PRNG for sampling V
-> m (Channel m a, Channel m a)
createConnectedDelayChannels size maxsize (g, s, v) prng0 = do
let (prngA, prngB) = split prng0
-- For each direction, create:
-- - a TQueue for the messages
-- - a TVar Int to track the size in-flight
-- - a TVar prng for the sampling from v
bufferA <- atomically $ (,,) <$> newTQueue <*> newTVar 0 <*> newTVar prngA
bufferB <- atomically $ (,,) <$> newTQueue <*> newTVar 0 <*> newTVar prngB

return (asChannel bufferB bufferA,
asChannel bufferA bufferB)
where
asChannel (rBuffer, rSize, _rPRNG)
(wBuffer, wSize, wPRNG) =
Channel{send, recv}
where
send x = do
atomically $ do
sz <- readTVar wSize
let !sz' = sz + size x
check (sz' <= maxsize)
writeTVar wSize sz'
threadDelay (s * fromIntegral (size x))
now <- getMonotonicTime
atomically $ do
prng <- readTVar wPRNG
let (vsample, prng') = randomR (0, diffTimeToPicoseconds v) prng
delay :: DiffTime
delay = g + picosecondsToDiffTime vsample
arrive :: Time m
arrive = delay `addTime` now
writeTVar wPRNG prng'
writeTQueue wBuffer (arrive, x)

recv = do
(arrive, x) <- atomically $ readTQueue rBuffer
now <- getMonotonicTime
let delay = arrive `diffTime` now
when (delay > 0) (threadDelay delay)
atomically $ do
sz <- readTVar rSize
let !sz' = sz - size x
writeTVar rSize sz'
return (Just x)


-- | Create a pair of channels that are connected via N-place buffers.
--
-- This variant /fails/ when 'send' would exceed the maximum buffer size.
Expand Down
135 changes: 135 additions & 0 deletions typed-protocols/test/ChannelTests.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
{-# LANGUAGE FlexibleContexts #-}

module ChannelTests (tests) where

import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadTime
import Control.Monad.IOSim

import Network.TypedProtocol.Channel

import Data.Time.Clock (diffTimeToPicoseconds, picosecondsToDiffTime)
import System.Random

import Test.QuickCheck
import Test.Tasty (TestTree, testGroup)
import Test.Tasty.QuickCheck (testProperty)


tests :: TestTree
tests =
testGroup "Channel"
[ testProperty "createConnectedDelayChannels" prop_createConnectedDelayChannels
]


prop_createConnectedDelayChannels
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worthwhile to also test the other half of the Channel?

:: Positive Int
-> (Positive Int, Positive Int, Positive Int)
-> Int
-> [Int] -> Property
prop_createConnectedDelayChannels
(Positive maxsize) (Positive g_ms, Positive s_ms, Positive v_ms)
seed xs =

expectedDelayChannelTimes maxsize (g, s, v) prng xs
=== actualDelayChannelTimes maxsize (g, s, v) prng xs
where
prng = mkStdGen seed

g = millisecondsToDiffTime g_ms
s = millisecondsToDiffTime s_ms
v = millisecondsToDiffTime v_ms

millisecondsToDiffTime :: Int -> DiffTime
millisecondsToDiffTime = (/1e3) . fromIntegral

actualDelayChannelTimes
:: RandomGen g
=> Int
-> (DiffTime, DiffTime, DiffTime)
-> g
-> [b]
-> [((VTime, b), (VTime, b))]
actualDelayChannelTimes maxsize gsv prng xs0 =
runSimOrThrow $ do
(chanA, chanB) <- createConnectedBufferedChannels
(const 1) maxsize gsv prng

sa <- async (sender chanA xs0)
ra <- async (receiver chanB [] (length xs0))
uncurry zip <$> waitBoth sa ra
where
sender chan xs =
sequence
[ do send chan x
now <- getMonotonicTime
return (now,x)
| x <- xs ]

receiver _ xs 0 = return (reverse xs)
receiver chan xs n = do
Just x <- recv chan
now <- getMonotonicTime
receiver chan ((now,x):xs) (n-1)


expectedDelayChannelTimes
:: RandomGen g
=> Int
-> (DiffTime, DiffTime, DiffTime)
-> g
-> [b]
-> [((VTime, b), (VTime, b))]
expectedDelayChannelTimes maxsize (g, s, v) prng0 xs0 =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually surprising to me that one can dequeue only when the channel is full to correctly model channels with delay. I think that's possible because the time (the now argument of go) is decoupled and moves on as we dequeue.

Maybe you have a simple model how this works, it would be good to put it in haddock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My "ah ha" moment along these lines was realizing that the scheduled "arrival times" of the elements in the buffer might not be monotonic -- so they're actually the "earliest possible arrival time" per message, not necessarily the actual arrival time. Hence arrive = max maxarrive ((g + vsample) `addTime` depart).

That contention/overlap between the queue's semantics and the scheduled arrival times' values was a bit surprising. Would it be worthwhile to maintain the max while generating the arrival times? Or perhaps just a comment/renaming some variables would suffice. (Possibly include whichever in the implementation too, so that debugging the queue's contents might be less confusing?)

More directly to Marcin's comment: to is called once per element, as expected. The queue maintained here is not analogous to the original queue of elements. A renaming might help: perhaps now -> senderTime and maxarrive to receiverTime? Or senderClock and receiverClock, etc.

let (prngA, _prngB) = split prng0
vsamples :: [DiffTime]
vsamples = map picosecondsToDiffTime
(randomRs (0, diffTimeToPicoseconds v) prngA)
in go (VTime 0) (VTime 0) (queue []) vsamples xs0
where

go :: VTime -> VTime -> Q VTime -> [DiffTime] -> [b] -> [((VTime, b), (VTime, b))]
go _ _ _ _ [] = []

go now maxarrive arrivals vsamples (x:xs)
| queuelen arrivals == maxsize
, Just (arrival, arrivals') <- dequeue arrivals
= to (max now arrival) maxarrive arrivals' vsamples x xs

| otherwise
= to now maxarrive arrivals vsamples x xs

to now maxarrive arrivals (vsample:vsamples) x xs =
((depart, x), (arrive, x))
: go depart arrive (enqueue arrivals arrive) vsamples xs
where
depart = s `addTime` now
arrive = max maxarrive ((g + vsample) `addTime` depart)
-- cannot have the next message arrive before the last previous arrival

to _ _ _ [] _ _ = error "expectedDelayChannelTimes: randomRs is infinite"



----------------
-- Queue
--
Comment on lines +115 to +117
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is simple and good, but would reusing Data.Sequence be even simpler (e.g. less maintenance burden)?


data Q a = Q [a] [a]

queue :: [a] -> Q a
queue xs = Q xs []

enqueue :: Q a -> a -> Q a
enqueue (Q front back) x = Q front (x : back)

dequeue :: Q a -> Maybe (a, Q a)
dequeue (Q (x:xs) back) = Just (x, Q xs back)
dequeue (Q [] back) = case reverse back of
x:xs -> Just (x, Q xs [])
[] -> Nothing

queuelen :: Q a -> Int
queuelen (Q front back) = length front + length back

2 changes: 2 additions & 0 deletions typed-protocols/test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Test.Tasty

import qualified Network.TypedProtocol.PingPong.Tests as PingPong
import qualified Network.TypedProtocol.ReqResp.Tests as ReqResp
import qualified ChannelTests

main :: IO ()
main = defaultMain tests
Expand All @@ -13,5 +14,6 @@ tests =
testGroup "typed-protocols"
[ PingPong.tests
, ReqResp.tests
, ChannelTests.tests
]

5 changes: 4 additions & 1 deletion typed-protocols/typed-protocols.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ library
io-sim-classes,
bytestring,
contra-tracer,
time
time,
random
Comment on lines -53 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preserve alphabetic order?


hs-source-dirs: src
default-language: Haskell2010
Expand Down Expand Up @@ -80,6 +81,7 @@ test-suite test-protocols
, Network.TypedProtocol.ReqResp.Server
, Network.TypedProtocol.ReqResp.Tests
, Network.TypedProtocol.ReqResp.Type
, ChannelTests
build-depends: base
, bytestring
, contra-tracer
Expand All @@ -89,6 +91,7 @@ test-suite test-protocols
, tasty
, tasty-quickcheck
, time
, random
Comment on lines 93 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preserve alphabetic order?

default-language: Haskell2010
ghc-options: -rtsopts
-Wall
Expand Down