Skip to content

Split mux's Tracer type #5112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 32 commits into
base: mwojtowicz/inbound-governor-turbo
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
62e711a
Inbound governor information channel tracer
crocodile-dentist Apr 11, 2025
325792f
IG.with body changes
crocodile-dentist Mar 31, 2025
049f6f6
Information Channel changes
crocodile-dentist Apr 9, 2025
34f7830
Change IG information channel queue bound
crocodile-dentist Apr 9, 2025
2be776b
Rework IG loop
crocodile-dentist Apr 11, 2025
20464eb
IG loop mostly whitespace
crocodile-dentist Apr 11, 2025
fd644ce
unregister conn changes
crocodile-dentist Apr 13, 2025
6998c4b
Move the remnants of the deleted Event module
crocodile-dentist Mar 31, 2025
a628216
Refactor InboundGovernor interface
crocodile-dentist Mar 31, 2025
67ceee8
Refactor Server.with signature
crocodile-dentist Mar 31, 2025
5c0500a
Refactor ConnectionManager
crocodile-dentist Mar 31, 2025
b1bb232
Refactor Connection Handler
crocodile-dentist Mar 31, 2025
5305202
Mux changes
crocodile-dentist Mar 31, 2025
21b7187
Introduce MuxTracerBundle
crocodile-dentist Apr 9, 2025
3d0837f
Attach the IG tracer to Mux tracer in the connection handler
crocodile-dentist Mar 31, 2025
dbf565b
integrate mux changes into mux tests & demo
crocodile-dentist Apr 9, 2025
b33d052
bugfix prop_mux_starvation test
crocodile-dentist Apr 9, 2025
0e64a13
Refactor Server
crocodile-dentist Mar 31, 2025
94a591a
Refactor Diffusion
crocodile-dentist Mar 31, 2025
88eb3ba
Refactor tests/demos to comply with new interfaces
crocodile-dentist Mar 31, 2025
1cb5d24
multinodeExperiment minor improvements
crocodile-dentist Apr 2, 2025
646809f
Fix shrinker bug
crocodile-dentist Mar 31, 2025
27b25c4
Make testnet simulation failure traces more comprehensible
crocodile-dentist Mar 31, 2025
f913ff9
Improve Simulation testnet shrinker
crocodile-dentist Mar 31, 2025
52a07ca
Tracing UX improvement
crocodile-dentist Mar 31, 2025
504c5b9
Refactor Simulation module
crocodile-dentist Mar 31, 2025
37c3dd6
update spec
crocodile-dentist Mar 31, 2025
e58b21a
changelog update
crocodile-dentist Mar 31, 2025
b641feb
fix test failure
crocodile-dentist Apr 13, 2025
2a5ced0
maybe fix assertion failure due to races
crocodile-dentist Apr 17, 2025
e9a2fa6
network-mux: removed handshake traces
coot Apr 17, 2025
1cbdaa2
network-mux: split mux tracer into three independent parts
coot Apr 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions cardano-client/src/Cardano/Client/Subscription.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ data Decision =
-- ^ reconnect

data SubscriptionTracers a = SubscriptionTracers {
stMuxTracer :: Tracer IO (Mx.WithBearer (ConnectionId LocalAddress) MuxTrace),
stMuxTracer :: Tracer IO (Mx.WithBearer (ConnectionId LocalAddress) Mx.Trace),
stMuxChannelTracer :: Tracer IO (Mx.WithBearer (ConnectionId LocalAddress) Mx.ChannelTrace),
stMuxBearerTracer :: Tracer IO (Mx.WithBearer (ConnectionId LocalAddress) Mx.BearerTrace),
-- ^ low level mux-network tracer, which logs mux sdu (send and received)
-- and other low level multiplexing events.
stHandshakeTracer :: Tracer IO (Mx.WithBearer (ConnectionId LocalAddress)
Expand Down Expand Up @@ -110,6 +112,8 @@ subscribe
subscribe snocket networkMagic supportedVersions
SubscriptionTracers {
stMuxTracer = muxTracer,
stMuxChannelTracer = muxChannelTracer,
stMuxBearerTracer = muxBearerTracer,
stHandshakeTracer = handshakeTracer,
stSubscriptionTracer = tracer
}
Expand All @@ -124,7 +128,12 @@ subscribe snocket networkMagic supportedVersions
NtC.connectTo
snocket
NetworkConnectTracers {
nctMuxTracer = muxTracer,
nctMuxTracers =
Mx.Tracers {
Mx.tracer = muxTracer,
Mx.channelTracer = muxChannelTracer,
Mx.bearerTracer = muxBearerTracer
},
nctHandshakeTracer = handshakeTracer
}
(versionedProtocols networkMagic supportedVersions protocols)
Expand Down
12 changes: 6 additions & 6 deletions cardano-ping/src/Cardano/Network/Ping.hs
Original file line number Diff line number Diff line change
Expand Up @@ -681,9 +681,9 @@ pingClient stdout stderr PingOpts{..} versions peer = bracket
let peerStr' = TL.pack peerStr
unless pingOptsQuiet $ TL.hPutStrLn IO.stdout $ peerStr' <> " " <> (showNetworkRtt $ toSample t0_e t0_s)

bearer <- getBearer makeSocketBearer sduTimeout nullTracer sd Nothing
bearer <- getBearer makeSocketBearer sduTimeout sd Nothing

!t1_s <- write bearer timeoutfn $ wrap handshakeNum InitiatorDir (handshakeReq versions pingOptsHandshakeQuery)
!t1_s <- write bearer nullTracer timeoutfn $ wrap handshakeNum InitiatorDir (handshakeReq versions pingOptsHandshakeQuery)
(msg, !t1_e) <- nextMsg bearer timeoutfn handshakeNum
unless pingOptsQuiet $ TL.hPutStrLn IO.stdout $ peerStr' <> " " <> (showHandshakeRtt $ diffTime t1_e t1_s)

Expand Down Expand Up @@ -713,7 +713,7 @@ pingClient stdout stderr PingOpts{..} versions peer = bracket
then getTip bearer timeoutfn peerStr
else keepAlive bearer timeoutfn peerStr version (tdigest []) 0
-- send terminating message
_ <- write bearer timeoutfn $ wrap keepaliveNum InitiatorDir (keepAliveDone version)
_ <- write bearer nullTracer timeoutfn $ wrap keepaliveNum InitiatorDir (keepAliveDone version)
return ()
-- protocol idle timeout
MT.threadDelay idleTimeout
Expand Down Expand Up @@ -771,7 +771,7 @@ pingClient stdout stderr PingOpts{..} versions peer = bracket

nextMsg :: Mx.Bearer IO -> TimeoutFn IO -> MiniProtocolNum -> IO (LBS.ByteString, Time)
nextMsg bearer timeoutfn ptclNum = do
(sdu, t_e) <- Network.Mux.Types.read bearer timeoutfn
(sdu, t_e) <- Network.Mux.Types.read bearer nullTracer timeoutfn
if Mx.mhNum (Mx.msHeader sdu) == ptclNum
then return (Mx.msBlob sdu, t_e)
else nextMsg bearer timeoutfn ptclNum
Expand All @@ -786,7 +786,7 @@ pingClient stdout stderr PingOpts{..} versions peer = bracket
keepAlive _ _ _ _ _ cookie | cookie == pingOptsCount = return ()
keepAlive bearer timeoutfn peerStr version td !cookie = do
let cookie16 = fromIntegral cookie
!t_s <- write bearer timeoutfn $ wrap keepaliveNum InitiatorDir (keepAliveReq version cookie16)
!t_s <- write bearer nullTracer timeoutfn $ wrap keepaliveNum InitiatorDir (keepAliveReq version cookie16)
(!msg, !t_e) <- nextMsg bearer timeoutfn keepaliveNum
let rtt = toSample t_e t_s
td' = insert rtt td
Expand All @@ -810,7 +810,7 @@ pingClient stdout stderr PingOpts{..} versions peer = bracket
-> String
-> IO ()
getTip bearer timeoutfn peerStr = do
!t_s <- write bearer timeoutfn $ wrap chainSyncNum InitiatorDir chainSyncFindIntersect
!t_s <- write bearer nullTracer timeoutfn $ wrap chainSyncNum InitiatorDir chainSyncFindIntersect
(!msg, !t_e) <- nextMsg bearer timeoutfn chainSyncNum
case CBOR.deserialiseFromBytes chainSyncIntersectNotFoundDec msg of
Left err -> throwIO (PingClientFindIntersectDeserialiseFailure err peerStr)
Expand Down
24 changes: 13 additions & 11 deletions docs/network-spec/connection-manager.tex
Original file line number Diff line number Diff line change
Expand Up @@ -2100,15 +2100,18 @@ \subsubsection{\RemoteIdle}
connection is used (\warm{} or \hot{}) or not (\cold{}) by the outbound side.

\subsubsection{\RemoteWarm}
A connection enters \RemoteWarm{} state once any of the mini-protocols starts
to operate. Once all hot mini-protocols start, the state will transition to
\RemoteHot{}. Note that this is slightly different than the notion of a \warm{}
peer, for which all \established{} and \warm{} mini-protocols are active, but
\hot{} ones are idle.
A connection dwells in \RemoteWarm{} if there are strictly only any warm or established
responder protocols running. Note also that an established protocol is one that may run
in both hot and warm states, but cannot be the only type running to maintain hot state
once all proper hot protocols have terminated. In other words, the connection must be
demoted in that case.

\subsubsection{\RemoteHot}
A connection enters \RemoteHot{} transition once all hot protocols started, if
any of them terminates the connection will be put in \RemoteWarm{}.
A connection enters \RemoteHot{} state once any hot responder protocol has started.
In particular, if a hot responder is the first to start, the state cycles through \RemoteWarm{}
first. Once all hot responders terminate, the connection will be put in \RemoteWarm{} regardless
of whether there are any warm or established responders left. In the latter case, if there aren't any
other protocols running, the connection will then follow up with further demotion to \RemoteIdle{}.

\subsection{Transitions}

Expand Down Expand Up @@ -2166,11 +2169,10 @@ \subsubsection{\MuxTerminated}
termination of the connection, as it can detect this by itself.

\subsubsection{\PromotedToHotRemote}
The inbound governor detects when all \hot{} mini-protocols started. In such
The inbound governor detects when any \hot{} mini-protocols have started. In such
case a \RemoteWarm{} connection is put in \RemoteHot{} state.

\subsubsection{\DemotedToWarmRemote}
Dually to \PromotedToHotRemote{} state transition, as soon as any of the \hot{}
mini-protocols terminates, the connection will transition to \RemoteWarm{}
Dually to \PromotedToHotRemote{} state transition, as soon as all of the \hot{}
mini-protocols terminate, the connection will transition to \RemoteWarm{}
state.

5 changes: 5 additions & 0 deletions network-mux/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
* Bearer writeMany function for vector IO
* An optional read buffer for Bearer
* Polling of the egress queue
* run, miniProtocolJob, monitor now accept MuxTracerBundle record
instead of `Tracer m Trace` type.
* Removed handshake trace events from mux trace.
* `Trace` was split into three traces: `Trace` of mux events, `ChannelTrace`
& `BearerTrace`. As a result `run` & `Bearer` API were modified.

### Non-breaking changes
* Define msHeaderLength instead of using '8'
Expand Down
26 changes: 13 additions & 13 deletions network-mux/bench/socket_read_write/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ readBenchmark sndSizeV sndSize addr = do
atomically $ putTMVar sndSizeV sndSize
Socket.connect sd addr
withReadBufferIO (\buffer -> do
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
bearer <- getBearer makeSocketBearer sduTimeout sd buffer

let chan = bearerAsChannel bearer (MiniProtocolNum 42) InitiatorDir
let chan = bearerAsChannel activeTracer bearer (MiniProtocolNum 42) InitiatorDir
doRead chan 0
)
)
Expand All @@ -79,9 +79,9 @@ readDemuxerQueueBenchmark sndSizeV sndSize addr = do

Socket.connect sd addr
withReadBufferIO (\buffer -> do
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
bearer <- getBearer makeSocketBearer sduTimeout sd buffer
ms42 <- mkMiniProtocolState 42
withAsync (demuxer [ms42] bearer) $ \aid -> do
withAsync (demuxer [ms42] activeTracer bearer) $ \aid -> do
doRead 0xa5 (totalPayloadLen sndSize) (miniProtocolIngressQueue ms42)
cancel aid
)
Expand Down Expand Up @@ -111,10 +111,10 @@ readDemuxerBenchmark sndSizeV sndSize addr = do

Socket.connect sd addr
withReadBufferIO (\buffer -> do
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
bearer <- getBearer makeSocketBearer sduTimeout sd buffer
ms42 <- mkMiniProtocolState 42
ms41 <- mkMiniProtocolState 41
withAsync (demuxer [ms41, ms42] bearer) $ \aid -> do
withAsync (demuxer [ms41, ms42] activeTracer bearer) $ \aid -> do
withAsync (doRead 42 (totalPayloadLen sndSize) (miniProtocolIngressQueue ms42) 0) $ \aid42 -> do
withAsync (doRead 41 (totalPayloadLen 10) (miniProtocolIngressQueue ms41) 0) $ \aid41 -> do
_ <- waitBoth aid42 aid41
Expand Down Expand Up @@ -151,10 +151,10 @@ startServer :: StrictTMVar IO Int64 -> Socket -> IO ()
startServer sndSizeV ad = forever $ do
(sd, _) <- Socket.accept ad
withReadBufferIO (\buffer -> do
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
bearer <- getBearer makeSocketBearer sduTimeout sd buffer
sndSize <- atomically $ takeTMVar sndSizeV

let chan = bearerAsChannel bearer (MiniProtocolNum 42) ResponderDir
let chan = bearerAsChannel activeTracer bearer (MiniProtocolNum 42) ResponderDir
payload = BL.replicate sndSize 0xa5
maxData = totalPayloadLen sndSize
numberOfSdus = fromIntegral $ maxData `div` sndSize
Expand All @@ -167,7 +167,7 @@ startServerMany :: StrictTMVar IO Int64 -> Socket -> IO ()
startServerMany sndSizeV ad = forever $ do
(sd, _) <- Socket.accept ad
withReadBufferIO (\buffer -> do
bearer <- getBearer makeSocketBearer sduTimeout activeTracer sd buffer
bearer <- getBearer makeSocketBearer sduTimeout sd buffer
sndSize <- atomically $ takeTMVar sndSizeV

let maxData = totalPayloadLen sndSize
Expand All @@ -178,10 +178,10 @@ startServerMany sndSizeV ad = forever $ do
withTimeoutSerial $ \timeoutFn -> do
replicateM_ numberOfCalls $ do
let sdus = replicate 10 $ wrap $ BL.replicate sndSize 0xa5
void $ writeMany bearer timeoutFn sdus
void $ writeMany bearer activeTracer timeoutFn sdus
when (runtSdus > 0) $ do
let sdus = replicate runtSdus $ wrap $ BL.replicate sndSize 0xa5
void $ writeMany bearer timeoutFn sdus
void $ writeMany bearer activeTracer timeoutFn sdus
)
where
-- wrap a 'ByteString' as 'SDU'
Expand All @@ -205,7 +205,7 @@ startServerEgresss :: StrictTMVar IO Int64 -> Socket -> IO ()
startServerEgresss sndSizeV ad = forever $ do
(sd, _) <- Socket.accept ad
withReadBufferIO (\buffer -> do
bearer <-getBearer makeSocketBearer sduTimeout activeTracer sd buffer
bearer <-getBearer makeSocketBearer sduTimeout sd buffer
sndSize <- atomically $ takeTMVar sndSizeV
eq <- atomically $ newTBQueue 100
w42 <- newTVarIO BL.empty
Expand All @@ -216,7 +216,7 @@ startServerEgresss sndSizeV ad = forever $ do
numberOfCalls = numberOfSdus `div` 10 :: Int
runtSdus = numberOfSdus `mod` 10 :: Int

withAsync (muxer eq bearer) $ \aid -> do
withAsync (muxer eq activeTracer bearer) $ \aid -> do

replicateM_ numberOfCalls $ do
let payload42s = replicate 10 $ BL.replicate sndSize 42
Expand Down
14 changes: 7 additions & 7 deletions network-mux/demo/mux-demo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Exception (finally)
import Control.Monad
import Control.Tracer (Tracer (..), nullTracer, showTracing)
import Control.Tracer (Tracer (..), showTracing)

import System.Environment qualified as SysEnv
import System.Exit
Expand Down Expand Up @@ -101,7 +101,7 @@ server =
associateWithIOManager ioManager (Left hpipe)
Win32.Async.connectNamedPipe hpipe
void $ forkIO $ do
bearer <- getBearer Mx.makeNamedPipeBearer (-1) nullTracer hpipe Nothing
bearer <- getBearer Mx.makeNamedPipeBearer (-1) hpipe Nothing
serverWorker bearer
`finally` closeHandle hpipe
#else
Expand All @@ -113,7 +113,7 @@ server = do
forever $ do
(sock', _addr) <- Socket.accept sock
void $ forkIO $ do
bearer <- getBearer Mx.makeSocketBearer 1.0 nullTracer sock' Nothing
bearer <- getBearer Mx.makeSocketBearer 1.0 sock' Nothing
serverWorker bearer
`finally` Socket.close sock'
#endif
Expand All @@ -133,7 +133,7 @@ serverWorker bearer = do
putStrLn $ "Result: " ++ show result
Mx.stop mux

Mx.run nullTracer mux bearer
Mx.run Mx.nullTracers mux bearer
where
ptcls :: [MiniProtocolInfo ResponderMode]
ptcls = [ MiniProtocolInfo {
Expand Down Expand Up @@ -168,13 +168,13 @@ client n msg =
fILE_FLAG_OVERLAPPED
Nothing
associateWithIOManager ioManager (Left hpipe)
bearer <- getBearer Mx.makeNamedPipeBearer (-1) nullTracer hpipe Nothing
bearer <- getBearer Mx.makeNamedPipeBearer (-1) hpipe Nothing
clientWorker bearer n msg
#else
client n msg = do
sock <- Socket.socket AF_UNIX Socket.Stream Socket.defaultProtocol
Socket.connect sock (SockAddrUnix pipeName)
bearer <- getBearer Mx.makeSocketBearer 1.0 nullTracer sock Nothing
bearer <- getBearer Mx.makeSocketBearer 1.0 sock Nothing
clientWorker bearer n msg
#endif

Expand All @@ -193,7 +193,7 @@ clientWorker bearer n msg = do
putStrLn $ "Result: " ++ show result
Mx.stop mux

Mx.run nullTracer mux bearer
Mx.run Mx.nullTracers mux bearer
where
ptcls :: [MiniProtocolInfo Mx.InitiatorMode]
ptcls = [ MiniProtocolInfo {
Expand Down
Loading
Loading