Skip to content

Commit 6bba9f0

Browse files
committed
Implement an optional read buffer for mux bearers
This implements an optional read buffer for the Socket bearer.
1 parent ab0710b commit 6bba9f0

File tree

25 files changed

+298
-178
lines changed

25 files changed

+298
-178
lines changed

cardano-ping/src/Cardano/Network/Ping.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ pingClient stdout stderr PingOpts{..} versions peer = bracket
674674
let peerStr' = TL.pack peerStr
675675
unless pingOptsQuiet $ TL.hPutStrLn IO.stdout $ peerStr' <> " " <> (showNetworkRtt $ toSample t0_e t0_s)
676676

677-
bearer <- getBearer makeSocketBearer sduTimeout nullTracer sd
677+
bearer <- getBearer makeSocketBearer sduTimeout nullTracer sd Nothing
678678

679679
!t1_s <- write bearer timeoutfn $ wrap handshakeNum InitiatorDir (handshakeReq versions pingOptsHandshakeQuery)
680680
(msg, !t1_e) <- nextMsg bearer timeoutfn handshakeNum

network-mux/bench/socket_read_write/Main.hs

Lines changed: 81 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@ readBenchmark sndSizeV sndSize addr = do
4848
(\sd -> do
4949
atomically $ putTMVar sndSizeV sndSize
5050
Socket.connect sd addr
51-
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd
51+
withReadBufferIO (\buffer -> do
52+
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
5253

53-
let chan = bearerAsChannel bearer (MiniProtocolNum 42) InitiatorDir
54-
doRead (totalPayloadLen sndSize) chan 0
54+
let chan = bearerAsChannel bearer (MiniProtocolNum 42) InitiatorDir
55+
doRead (totalPayloadLen sndSize) chan 0
56+
)
5557
)
5658
where
5759
doRead :: Int64 -> ByteChannel IO -> Int64 -> IO ()
@@ -72,15 +74,17 @@ readDemuxerBenchmark sndSizeV sndSize addr = do
7274
atomically $ putTMVar sndSizeV sndSize
7375

7476
Socket.connect sd addr
75-
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd
76-
ms42 <- mkMiniProtocolState 42
77-
ms41 <- mkMiniProtocolState 41
78-
withAsync (demuxer [ms41, ms42] bearer) $ \aid -> do
79-
withAsync (doRead 42 (totalPayloadLen sndSize) (miniProtocolIngressQueue ms42) 0) $ \aid42 -> do
80-
withAsync (doRead 41 (totalPayloadLen 10) (miniProtocolIngressQueue ms41) 0) $ \aid41 -> do
81-
_ <- waitBoth aid42 aid41
82-
cancel aid
83-
return ()
77+
withReadBufferIO (\buffer -> do
78+
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
79+
ms42 <- mkMiniProtocolState 42
80+
ms41 <- mkMiniProtocolState 41
81+
withAsync (demuxer [ms41, ms42] bearer) $ \aid -> do
82+
withAsync (doRead 42 (totalPayloadLen sndSize) (miniProtocolIngressQueue ms42) 0) $ \aid42 -> do
83+
withAsync (doRead 41 (totalPayloadLen 10) (miniProtocolIngressQueue ms41) 0) $ \aid41 -> do
84+
_ <- waitBoth aid42 aid41
85+
cancel aid
86+
return ()
87+
)
8488
)
8589
where
8690

@@ -111,37 +115,39 @@ readDemuxerBenchmark sndSizeV sndSize addr = do
111115
startServer :: StrictTMVar IO Int64 -> Socket -> IO ()
112116
startServer sndSizeV ad = forever $ do
113117
(sd, _) <- Socket.accept ad
114-
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd
115-
sndSize <- atomically $ takeTMVar sndSizeV
116-
117-
let chan = bearerAsChannel bearer (MiniProtocolNum 42) ResponderDir
118-
payload = BL.replicate sndSize 0xa5
119-
maxData = totalPayloadLen sndSize
120-
numberOfSdus = fromIntegral $ maxData `div` sndSize
121-
replicateM_ numberOfSdus $ do
122-
send chan payload
123-
118+
withReadBufferIO (\buffer -> do
119+
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
120+
sndSize <- atomically $ takeTMVar sndSizeV
121+
122+
let chan = bearerAsChannel bearer (MiniProtocolNum 42) ResponderDir
123+
payload = BL.replicate sndSize 0xa5
124+
maxData = totalPayloadLen sndSize
125+
numberOfSdus = fromIntegral $ maxData `div` sndSize
126+
replicateM_ numberOfSdus $ do
127+
send chan payload
128+
)
124129
-- | Like startServer but it uses the `writeMany` function
125130
-- for vector IO.
126131
startServerMany :: StrictTMVar IO Int64 -> Socket -> IO ()
127132
startServerMany sndSizeV ad = forever $ do
128133
(sd, _) <- Socket.accept ad
129-
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd
130-
sndSize <- atomically $ takeTMVar sndSizeV
131-
132-
let maxData = totalPayloadLen sndSize
133-
numberOfSdus = fromIntegral $ maxData `div` sndSize
134-
numberOfCalls = numberOfSdus `div` 10
135-
runtSdus = numberOfSdus `mod` 10
136-
137-
withTimeoutSerial $ \timeoutFn -> do
138-
replicateM_ numberOfCalls $ do
139-
let sdus = replicate 10 $ wrap $ BL.replicate sndSize 0xa5
140-
void $ writeMany bearer timeoutFn sdus
141-
when (runtSdus > 0) $ do
142-
let sdus = replicate runtSdus $ wrap $ BL.replicate sndSize 0xa5
143-
void $ writeMany bearer timeoutFn sdus
144-
134+
withReadBufferIO (\buffer -> do
135+
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
136+
sndSize <- atomically $ takeTMVar sndSizeV
137+
138+
let maxData = totalPayloadLen sndSize
139+
numberOfSdus = fromIntegral $ maxData `div` sndSize
140+
numberOfCalls = numberOfSdus `div` 10
141+
runtSdus = numberOfSdus `mod` 10
142+
143+
withTimeoutSerial $ \timeoutFn -> do
144+
replicateM_ numberOfCalls $ do
145+
let sdus = replicate 10 $ wrap $ BL.replicate sndSize 0xa5
146+
void $ writeMany bearer timeoutFn sdus
147+
when (runtSdus > 0) $ do
148+
let sdus = replicate runtSdus $ wrap $ BL.replicate sndSize 0xa5
149+
void $ writeMany bearer timeoutFn sdus
150+
)
145151
where
146152
-- wrap a 'ByteString' as 'SDU'
147153
wrap :: BL.ByteString -> SDU
@@ -163,41 +169,43 @@ startServerMany sndSizeV ad = forever $ do
163169
startServerEgresss :: StrictTMVar IO Int64 -> Socket -> IO ()
164170
startServerEgresss sndSizeV ad = forever $ do
165171
(sd, _) <- Socket.accept ad
166-
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd
167-
sndSize <- atomically $ takeTMVar sndSizeV
168-
eq <- atomically $ newTBQueue 100
169-
w42 <- newTVarIO BL.empty
170-
w41 <- newTVarIO BL.empty
171-
172-
let maxData = totalPayloadLen sndSize
173-
numberOfSdus = fromIntegral $ maxData `div` sndSize
174-
numberOfCalls = numberOfSdus `div` 10 :: Int
175-
runtSdus = numberOfSdus `mod` 10 :: Int
176-
177-
withAsync (muxer eq bearer) $ \aid -> do
178-
179-
replicateM_ numberOfCalls $ do
180-
let payload42s = replicate 10 $ BL.replicate sndSize 42
181-
let payload41s = replicate 10 $ BL.replicate 10 41
182-
mapM_ (sendToMux w42 eq (MiniProtocolNum 42) ResponderDir) payload42s
183-
mapM_ (sendToMux w41 eq (MiniProtocolNum 41) ResponderDir) payload41s
184-
when (runtSdus > 0) $ do
185-
let payload42s = replicate runtSdus $ BL.replicate sndSize 42
186-
let payload41s = replicate runtSdus $ BL.replicate 10 41
187-
mapM_ (sendToMux w42 eq (MiniProtocolNum 42) ResponderDir) payload42s
188-
mapM_ (sendToMux w41 eq (MiniProtocolNum 41) ResponderDir) payload41s
189-
190-
-- Wait for the egress queue to empty
191-
atomically $ do
192-
r42 <- readTVar w42
193-
r41 <- readTVar w42
194-
unless (BL.null r42 || BL.null r41) retry
195-
196-
-- when the client is done they will close the socket
197-
-- and we will read zero bytes.
198-
_ <- Socket.recv sd 128
199-
200-
cancel aid
172+
withReadBufferIO (\buffer -> do
173+
bearer <-getBearer makeSocketBearer sduTimeout activeTracer sd buffer
174+
sndSize <- atomically $ takeTMVar sndSizeV
175+
eq <- atomically $ newTBQueue 100
176+
w42 <- newTVarIO BL.empty
177+
w41 <- newTVarIO BL.empty
178+
179+
let maxData = totalPayloadLen sndSize
180+
numberOfSdus = fromIntegral $ maxData `div` sndSize
181+
numberOfCalls = numberOfSdus `div` 10 :: Int
182+
runtSdus = numberOfSdus `mod` 10 :: Int
183+
184+
withAsync (muxer eq bearer) $ \aid -> do
185+
186+
replicateM_ numberOfCalls $ do
187+
let payload42s = replicate 10 $ BL.replicate sndSize 42
188+
let payload41s = replicate 10 $ BL.replicate 10 41
189+
mapM_ (sendToMux w42 eq (MiniProtocolNum 42) ResponderDir) payload42s
190+
mapM_ (sendToMux w41 eq (MiniProtocolNum 41) ResponderDir) payload41s
191+
when (runtSdus > 0) $ do
192+
let payload42s = replicate runtSdus $ BL.replicate sndSize 42
193+
let payload41s = replicate runtSdus $ BL.replicate 10 41
194+
mapM_ (sendToMux w42 eq (MiniProtocolNum 42) ResponderDir) payload42s
195+
mapM_ (sendToMux w41 eq (MiniProtocolNum 41) ResponderDir) payload41s
196+
197+
-- Wait for the egress queue to empty
198+
atomically $ do
199+
r42 <- readTVar w42
200+
r41 <- readTVar w42
201+
unless (BL.null r42 || BL.null r41) retry
202+
203+
-- when the client is done they will close the socket
204+
-- and we will read zero bytes.
205+
_ <- Socket.recv sd 128
206+
207+
cancel aid
208+
)
201209
where
202210
sendToMux :: StrictTVar IO BL.ByteString -> EgressQueue IO -> MiniProtocolNum -> MiniProtocolDir
203211
-> BL.ByteString -> IO ()

network-mux/demo/mux-demo.hs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ server =
101101
associateWithIOManager ioManager (Left hpipe)
102102
Win32.Async.connectNamedPipe hpipe
103103
void $ forkIO $ do
104-
bearer <- getBearer Mx.makeNamedPipeBearer (-1) nullTracer hpipe
104+
bearer <- getBearer Mx.makeNamedPipeBearer (-1) nullTracer hpipe Nothing
105105
serverWorker bearer
106106
`finally` closeHandle hpipe
107107
#else
@@ -113,7 +113,7 @@ server = do
113113
forever $ do
114114
(sock', _addr) <- Socket.accept sock
115115
void $ forkIO $ do
116-
bearer <- getBearer Mx.makeSocketBearer 1.0 nullTracer sock'
116+
bearer <- getBearer Mx.makeSocketBearer 1.0 nullTracer sock' Nothing
117117
serverWorker bearer
118118
`finally` Socket.close sock'
119119
#endif
@@ -167,13 +167,13 @@ client n msg =
167167
fILE_FLAG_OVERLAPPED
168168
Nothing
169169
associateWithIOManager ioManager (Left hpipe)
170-
bearer <- getBearer Mx.makeNamedPipeBearer (-1) nullTracer hpipe
170+
bearer <- getBearer Mx.makeNamedPipeBearer (-1) nullTracer hpipe Nothing
171171
clientWorker bearer n msg
172172
#else
173173
client n msg = do
174174
sock <- Socket.socket AF_UNIX Socket.Stream Socket.defaultProtocol
175175
Socket.connect sock (SockAddrUnix pipeName)
176-
bearer <- getBearer Mx.makeSocketBearer 1.0 nullTracer sock
176+
bearer <- getBearer Mx.makeSocketBearer 1.0 nullTracer sock Nothing
177177
clientWorker bearer n msg
178178
#endif
179179

network-mux/src/Network/Mux/Bearer.hs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ module Network.Mux.Bearer
1515
#if defined(mingw32_HOST_OS)
1616
, makeNamedPipeBearer
1717
#endif
18+
, withReadBufferIO
1819
) where
1920

2021
import Control.Monad.Class.MonadSTM
@@ -28,6 +29,7 @@ import Network.Socket (getSocketOption, SocketOption (..), Socket)
2829
#if defined(mingw32_HOST_OS)
2930
import System.Win32 (HANDLE)
3031
#endif
32+
import Foreign.Marshal.Alloc
3133

3234
import Network.Mux.Bearer.Pipe
3335
import Network.Mux.Bearer.Queues
@@ -47,26 +49,34 @@ newtype MakeBearer m fd = MakeBearer {
4749
-- tracer
4850
-> fd
4951
-- file descriptor
52+
-> Maybe (ReadBuffer m)
53+
-- Optional Readbuffer
5054
-> m (Bearer m)
5155
}
5256

53-
5457
pureBearer :: Applicative m
55-
=> (DiffTime -> Tracer m Trace -> fd -> Bearer m)
56-
-> DiffTime -> Tracer m Trace -> fd -> m (Bearer m)
57-
pureBearer f = \sduTimeout tr fd -> pure (f sduTimeout tr fd)
58+
=> (DiffTime -> Tracer m Trace -> fd -> Maybe (ReadBuffer m) -> Bearer m)
59+
-> DiffTime -> Tracer m Trace -> fd -> Maybe (ReadBuffer m) -> m (Bearer m)
60+
pureBearer f = \sduTimeout rb tr fd -> pure (f sduTimeout rb tr fd)
61+
5862

5963
makeSocketBearer :: MakeBearer IO Socket
60-
makeSocketBearer = MakeBearer $ (\sduTimeout tr fd -> do
61-
readBuffer <- newTVarIO BL.empty
64+
makeSocketBearer = MakeBearer $ (\sduTimeout tr fd rb -> do
6265
batch <- getSocketOption fd SendBuffer
63-
return $ socketAsBearer size batch readBuffer bufSize sduTimeout tr fd)
66+
return $ socketAsBearer size batch rb sduTimeout tr fd)
6467
where
6568
size = SDUSize 12_288
66-
bufSize = 16_384
69+
70+
withReadBufferIO :: (Maybe (ReadBuffer IO) -> IO b)
71+
-> IO b
72+
withReadBufferIO f = allocaBytesAligned size 8 $ \ptr -> do
73+
v <- atomically $ newTVar BL.empty
74+
f $ Just $ ReadBuffer v ptr size
75+
where
76+
size = 131_072
6777

6878
makePipeChannelBearer :: MakeBearer IO PipeChannel
69-
makePipeChannelBearer = MakeBearer $ pureBearer (\_ -> pipeAsBearer size)
79+
makePipeChannelBearer = MakeBearer $ pureBearer (\_ tr fd _ -> pipeAsBearer size tr fd)
7080
where
7181
size = SDUSize 32_768
7282

@@ -75,13 +85,13 @@ makeQueueChannelBearer :: ( MonadSTM m
7585
, MonadThrow m
7686
)
7787
=> MakeBearer m (QueueChannel m)
78-
makeQueueChannelBearer = MakeBearer $ pureBearer (\_ -> queueChannelAsBearer size)
88+
makeQueueChannelBearer = MakeBearer $ pureBearer (\_ tr q _-> queueChannelAsBearer size tr q)
7989
where
8090
size = SDUSize 1_280
8191

8292
#if defined(mingw32_HOST_OS)
8393
makeNamedPipeBearer :: MakeBearer IO HANDLE
84-
makeNamedPipeBearer = MakeBearer $ pureBearer (\_ -> namedPipeAsBearer size)
94+
MakeBearer $ pureBearer (\_ _ -> namedPipeAsBearer size)
8595
where
8696
size = SDUSize 24_576
8797
#endif

0 commit comments

Comments
 (0)