Skip to content

Commit 1cbdaa2

Browse files
committed
network-mux: split mux tracer into three independent parts
The `BearerTracer` has an extra event: `EmitDeltaQ`. The tracers are now conveniently passed to `Mx.run` function from where they are available to a channel & a bearer.
1 parent e9a2fa6 commit 1cbdaa2

File tree

41 files changed

+674
-549
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+674
-549
lines changed

cardano-client/src/Cardano/Client/Subscription.hs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ data Decision =
7171
-- ^ reconnect
7272

7373
data SubscriptionTracers a = SubscriptionTracers {
74-
stMuxTracer :: Tracer IO (Mx.WithBearer (ConnectionId LocalAddress) MuxTrace),
74+
stMuxTracer :: Tracer IO (Mx.WithBearer (ConnectionId LocalAddress) Mx.Trace),
75+
stMuxChannelTracer :: Tracer IO (Mx.WithBearer (ConnectionId LocalAddress) Mx.ChannelTrace),
76+
stMuxBearerTracer :: Tracer IO (Mx.WithBearer (ConnectionId LocalAddress) Mx.BearerTrace),
7577
-- ^ low level mux-network tracer, which logs mux sdu (send and received)
7678
-- and other low level multiplexing events.
7779
stHandshakeTracer :: Tracer IO (Mx.WithBearer (ConnectionId LocalAddress)
@@ -110,6 +112,8 @@ subscribe
110112
subscribe snocket networkMagic supportedVersions
111113
SubscriptionTracers {
112114
stMuxTracer = muxTracer,
115+
stMuxChannelTracer = muxChannelTracer,
116+
stMuxBearerTracer = muxBearerTracer,
113117
stHandshakeTracer = handshakeTracer,
114118
stSubscriptionTracer = tracer
115119
}
@@ -124,7 +128,12 @@ subscribe snocket networkMagic supportedVersions
124128
NtC.connectTo
125129
snocket
126130
NetworkConnectTracers {
127-
nctMuxTracer = muxTracer,
131+
nctMuxTracers =
132+
Mx.Tracers {
133+
Mx.tracer = muxTracer,
134+
Mx.channelTracer = muxChannelTracer,
135+
Mx.bearerTracer = muxBearerTracer
136+
},
128137
nctHandshakeTracer = handshakeTracer
129138
}
130139
(versionedProtocols networkMagic supportedVersions protocols)

cardano-ping/src/Cardano/Network/Ping.hs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -681,9 +681,9 @@ pingClient stdout stderr PingOpts{..} versions peer = bracket
681681
let peerStr' = TL.pack peerStr
682682
unless pingOptsQuiet $ TL.hPutStrLn IO.stdout $ peerStr' <> " " <> (showNetworkRtt $ toSample t0_e t0_s)
683683

684-
bearer <- getBearer makeSocketBearer sduTimeout nullTracer sd Nothing
684+
bearer <- getBearer makeSocketBearer sduTimeout sd Nothing
685685

686-
!t1_s <- write bearer timeoutfn $ wrap handshakeNum InitiatorDir (handshakeReq versions pingOptsHandshakeQuery)
686+
!t1_s <- write bearer nullTracer timeoutfn $ wrap handshakeNum InitiatorDir (handshakeReq versions pingOptsHandshakeQuery)
687687
(msg, !t1_e) <- nextMsg bearer timeoutfn handshakeNum
688688
unless pingOptsQuiet $ TL.hPutStrLn IO.stdout $ peerStr' <> " " <> (showHandshakeRtt $ diffTime t1_e t1_s)
689689

@@ -713,7 +713,7 @@ pingClient stdout stderr PingOpts{..} versions peer = bracket
713713
then getTip bearer timeoutfn peerStr
714714
else keepAlive bearer timeoutfn peerStr version (tdigest []) 0
715715
-- send terminating message
716-
_ <- write bearer timeoutfn $ wrap keepaliveNum InitiatorDir (keepAliveDone version)
716+
_ <- write bearer nullTracer timeoutfn $ wrap keepaliveNum InitiatorDir (keepAliveDone version)
717717
return ()
718718
-- protocol idle timeout
719719
MT.threadDelay idleTimeout
@@ -771,7 +771,7 @@ pingClient stdout stderr PingOpts{..} versions peer = bracket
771771

772772
nextMsg :: Mx.Bearer IO -> TimeoutFn IO -> MiniProtocolNum -> IO (LBS.ByteString, Time)
773773
nextMsg bearer timeoutfn ptclNum = do
774-
(sdu, t_e) <- Network.Mux.Types.read bearer timeoutfn
774+
(sdu, t_e) <- Network.Mux.Types.read bearer nullTracer timeoutfn
775775
if Mx.mhNum (Mx.msHeader sdu) == ptclNum
776776
then return (Mx.msBlob sdu, t_e)
777777
else nextMsg bearer timeoutfn ptclNum
@@ -786,7 +786,7 @@ pingClient stdout stderr PingOpts{..} versions peer = bracket
786786
keepAlive _ _ _ _ _ cookie | cookie == pingOptsCount = return ()
787787
keepAlive bearer timeoutfn peerStr version td !cookie = do
788788
let cookie16 = fromIntegral cookie
789-
!t_s <- write bearer timeoutfn $ wrap keepaliveNum InitiatorDir (keepAliveReq version cookie16)
789+
!t_s <- write bearer nullTracer timeoutfn $ wrap keepaliveNum InitiatorDir (keepAliveReq version cookie16)
790790
(!msg, !t_e) <- nextMsg bearer timeoutfn keepaliveNum
791791
let rtt = toSample t_e t_s
792792
td' = insert rtt td
@@ -810,7 +810,7 @@ pingClient stdout stderr PingOpts{..} versions peer = bracket
810810
-> String
811811
-> IO ()
812812
getTip bearer timeoutfn peerStr = do
813-
!t_s <- write bearer timeoutfn $ wrap chainSyncNum InitiatorDir chainSyncFindIntersect
813+
!t_s <- write bearer nullTracer timeoutfn $ wrap chainSyncNum InitiatorDir chainSyncFindIntersect
814814
(!msg, !t_e) <- nextMsg bearer timeoutfn chainSyncNum
815815
case CBOR.deserialiseFromBytes chainSyncIntersectNotFoundDec msg of
816816
Left err -> throwIO (PingClientFindIntersectDeserialiseFailure err peerStr)

network-mux/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
* run, miniProtocolJob, monitor now accept MuxTracerBundle record
1010
instead of `Tracer m Trace` type.
1111
* Removed handshake trace events from mux trace.
12+
* `Trace` was split into three traces: `Trace` of mux events, `ChannelTrace`
13+
& `BearerTrace`. As a result `run` & `Bearer` API were modified.
1214

1315
### Non-breaking changes
1416
* Define msHeaderLength instead of using '8'

network-mux/bench/socket_read_write/Main.hs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ readBenchmark sndSizeV sndSize addr = do
5252
atomically $ putTMVar sndSizeV sndSize
5353
Socket.connect sd addr
5454
withReadBufferIO (\buffer -> do
55-
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
55+
bearer <- getBearer makeSocketBearer sduTimeout sd buffer
5656

57-
let chan = bearerAsChannel bearer (MiniProtocolNum 42) InitiatorDir
57+
let chan = bearerAsChannel activeTracer bearer (MiniProtocolNum 42) InitiatorDir
5858
doRead chan 0
5959
)
6060
)
@@ -79,9 +79,9 @@ readDemuxerQueueBenchmark sndSizeV sndSize addr = do
7979

8080
Socket.connect sd addr
8181
withReadBufferIO (\buffer -> do
82-
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
82+
bearer <- getBearer makeSocketBearer sduTimeout sd buffer
8383
ms42 <- mkMiniProtocolState 42
84-
withAsync (demuxer [ms42] bearer) $ \aid -> do
84+
withAsync (demuxer [ms42] activeTracer bearer) $ \aid -> do
8585
doRead 0xa5 (totalPayloadLen sndSize) (miniProtocolIngressQueue ms42)
8686
cancel aid
8787
)
@@ -111,10 +111,10 @@ readDemuxerBenchmark sndSizeV sndSize addr = do
111111

112112
Socket.connect sd addr
113113
withReadBufferIO (\buffer -> do
114-
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
114+
bearer <- getBearer makeSocketBearer sduTimeout sd buffer
115115
ms42 <- mkMiniProtocolState 42
116116
ms41 <- mkMiniProtocolState 41
117-
withAsync (demuxer [ms41, ms42] bearer) $ \aid -> do
117+
withAsync (demuxer [ms41, ms42] activeTracer bearer) $ \aid -> do
118118
withAsync (doRead 42 (totalPayloadLen sndSize) (miniProtocolIngressQueue ms42) 0) $ \aid42 -> do
119119
withAsync (doRead 41 (totalPayloadLen 10) (miniProtocolIngressQueue ms41) 0) $ \aid41 -> do
120120
_ <- waitBoth aid42 aid41
@@ -151,10 +151,10 @@ startServer :: StrictTMVar IO Int64 -> Socket -> IO ()
151151
startServer sndSizeV ad = forever $ do
152152
(sd, _) <- Socket.accept ad
153153
withReadBufferIO (\buffer -> do
154-
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
154+
bearer <- getBearer makeSocketBearer sduTimeout sd buffer
155155
sndSize <- atomically $ takeTMVar sndSizeV
156156

157-
let chan = bearerAsChannel bearer (MiniProtocolNum 42) ResponderDir
157+
let chan = bearerAsChannel activeTracer bearer (MiniProtocolNum 42) ResponderDir
158158
payload = BL.replicate sndSize 0xa5
159159
maxData = totalPayloadLen sndSize
160160
numberOfSdus = fromIntegral $ maxData `div` sndSize
@@ -167,7 +167,7 @@ startServerMany :: StrictTMVar IO Int64 -> Socket -> IO ()
167167
startServerMany sndSizeV ad = forever $ do
168168
(sd, _) <- Socket.accept ad
169169
withReadBufferIO (\buffer -> do
170-
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
170+
bearer <- getBearer makeSocketBearer sduTimeout sd buffer
171171
sndSize <- atomically $ takeTMVar sndSizeV
172172

173173
let maxData = totalPayloadLen sndSize
@@ -178,10 +178,10 @@ startServerMany sndSizeV ad = forever $ do
178178
withTimeoutSerial $ \timeoutFn -> do
179179
replicateM_ numberOfCalls $ do
180180
let sdus = replicate 10 $ wrap $ BL.replicate sndSize 0xa5
181-
void $ writeMany bearer timeoutFn sdus
181+
void $ writeMany bearer activeTracer timeoutFn sdus
182182
when (runtSdus > 0) $ do
183183
let sdus = replicate runtSdus $ wrap $ BL.replicate sndSize 0xa5
184-
void $ writeMany bearer timeoutFn sdus
184+
void $ writeMany bearer activeTracer timeoutFn sdus
185185
)
186186
where
187187
-- wrap a 'ByteString' as 'SDU'
@@ -205,7 +205,7 @@ startServerEgresss :: StrictTMVar IO Int64 -> Socket -> IO ()
205205
startServerEgresss sndSizeV ad = forever $ do
206206
(sd, _) <- Socket.accept ad
207207
withReadBufferIO (\buffer -> do
208-
bearer <-getBearer makeSocketBearer sduTimeout activeTracer sd buffer
208+
bearer <-getBearer makeSocketBearer sduTimeout sd buffer
209209
sndSize <- atomically $ takeTMVar sndSizeV
210210
eq <- atomically $ newTBQueue 100
211211
w42 <- newTVarIO BL.empty
@@ -216,7 +216,7 @@ startServerEgresss sndSizeV ad = forever $ do
216216
numberOfCalls = numberOfSdus `div` 10 :: Int
217217
runtSdus = numberOfSdus `mod` 10 :: Int
218218

219-
withAsync (muxer eq bearer) $ \aid -> do
219+
withAsync (muxer eq activeTracer bearer) $ \aid -> do
220220

221221
replicateM_ numberOfCalls $ do
222222
let payload42s = replicate 10 $ BL.replicate sndSize 42

network-mux/demo/mux-demo.hs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import Control.Concurrent (forkIO)
1717
import Control.Concurrent.STM (atomically)
1818
import Control.Exception (finally)
1919
import Control.Monad
20-
import Control.Tracer (Tracer (..), nullTracer, showTracing)
20+
import Control.Tracer (Tracer (..), showTracing)
2121

2222
import System.Environment qualified as SysEnv
2323
import System.Exit
@@ -68,8 +68,6 @@ putStrLn_ = BSC.putStrLn . BSC.pack
6868
debugTracer :: Show a => Tracer IO a
6969
debugTracer = showTracing (Tracer putStrLn_)
7070

71-
nullMuxTracerBundle :: (Applicative m) => MuxTracerBundle m
72-
nullMuxTracerBundle = MuxTracerBundle nullTracer nullTracer
7371
--
7472
-- Protocols
7573
--
@@ -103,7 +101,7 @@ server =
103101
associateWithIOManager ioManager (Left hpipe)
104102
Win32.Async.connectNamedPipe hpipe
105103
void $ forkIO $ do
106-
bearer <- getBearer Mx.makeNamedPipeBearer (-1) nullTracer hpipe Nothing
104+
bearer <- getBearer Mx.makeNamedPipeBearer (-1) hpipe Nothing
107105
serverWorker bearer
108106
`finally` closeHandle hpipe
109107
#else
@@ -115,7 +113,7 @@ server = do
115113
forever $ do
116114
(sock', _addr) <- Socket.accept sock
117115
void $ forkIO $ do
118-
bearer <- getBearer Mx.makeSocketBearer 1.0 nullTracer sock' Nothing
116+
bearer <- getBearer Mx.makeSocketBearer 1.0 sock' Nothing
119117
serverWorker bearer
120118
`finally` Socket.close sock'
121119
#endif
@@ -135,7 +133,7 @@ serverWorker bearer = do
135133
putStrLn $ "Result: " ++ show result
136134
Mx.stop mux
137135

138-
Mx.run nullMuxTracerBundle mux bearer
136+
Mx.run Mx.nullTracers mux bearer
139137
where
140138
ptcls :: [MiniProtocolInfo ResponderMode]
141139
ptcls = [ MiniProtocolInfo {
@@ -170,13 +168,13 @@ client n msg =
170168
fILE_FLAG_OVERLAPPED
171169
Nothing
172170
associateWithIOManager ioManager (Left hpipe)
173-
bearer <- getBearer Mx.makeNamedPipeBearer (-1) nullTracer hpipe Nothing
171+
bearer <- getBearer Mx.makeNamedPipeBearer (-1) hpipe Nothing
174172
clientWorker bearer n msg
175173
#else
176174
client n msg = do
177175
sock <- Socket.socket AF_UNIX Socket.Stream Socket.defaultProtocol
178176
Socket.connect sock (SockAddrUnix pipeName)
179-
bearer <- getBearer Mx.makeSocketBearer 1.0 nullTracer sock Nothing
177+
bearer <- getBearer Mx.makeSocketBearer 1.0 sock Nothing
180178
clientWorker bearer n msg
181179
#endif
182180

@@ -195,7 +193,7 @@ clientWorker bearer n msg = do
195193
putStrLn $ "Result: " ++ show result
196194
Mx.stop mux
197195

198-
Mx.run nullMuxTracerBundle mux bearer
196+
Mx.run Mx.nullTracers mux bearer
199197
where
200198
ptcls :: [MiniProtocolInfo Mx.InitiatorMode]
201199
ptcls = [ MiniProtocolInfo {

0 commit comments

Comments
 (0)