Skip to content

Commit 6a1bdd9

Browse files
committed
WIP: bind responder threads to the lower cores
Bind responder threads to the lower cores, reserving the higest two cores for other tasks (block adoption, draining mempool etc.). TODO: undefineds in Ouroboros.Network.Socket TODO: provide better mux API
1 parent 7a33b77 commit 6a1bdd9

File tree

12 files changed

+106
-49
lines changed

12 files changed

+106
-49
lines changed

network-mux/demo/mux-demo.hs

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ serverWorker bearer = do
133133
putStrLn $ "Result: " ++ show result
134134
Mx.stop mux
135135

136-
Mx.run nullTracer mux bearer
136+
Mx.run nullTracer 1 mux bearer
137137
where
138138
ptcls :: [MiniProtocolInfo ResponderMode]
139139
ptcls = [ MiniProtocolInfo {
@@ -192,7 +192,7 @@ clientWorker bearer n msg = do
192192
putStrLn $ "Result: " ++ show result
193193
Mx.stop mux
194194

195-
Mx.run nullTracer mux bearer
195+
Mx.run nullTracer 0 mux bearer
196196
where
197197
ptcls :: [MiniProtocolInfo Mx.InitiatorMode]
198198
ptcls = [ MiniProtocolInfo {

network-mux/src/Control/Concurrent/JobPool.hs

+44-9
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module Control.Concurrent.JobPool
1212
, Job (..)
1313
, withJobPool
1414
, forkJob
15+
, forkJobOn
1516
, readSize
1617
, readGroupSize
1718
, waitForJob
@@ -29,6 +30,9 @@ import Control.Monad.Class.MonadAsync
2930
import Control.Monad.Class.MonadFork (MonadThread (..))
3031
import Control.Monad.Class.MonadThrow
3132

33+
import Control.Concurrent (getNumCapabilities)
34+
import System.IO.Unsafe (unsafePerformIO)
35+
3236
-- | JobPool allows to submit asynchronous jobs, wait for their completion or
3337
-- cancel. Jobs are grouped, each group can be cancelled separately.
3438
--
@@ -69,16 +73,18 @@ withJobPool =
6973
jobs <- readTVarIO jobsVar
7074
mapM_ uninterruptibleCancel jobs
7175

72-
forkJob :: forall group m a.
73-
( MonadAsync m, MonadMask m
74-
, Ord group
75-
)
76-
=> JobPool group m a
77-
-> Job group m a
78-
-> m ()
79-
forkJob JobPool{jobsVar, completionQueue} (Job action handler group label) =
76+
77+
forkJob' :: forall group m a.
78+
( MonadAsync m, MonadMask m
79+
, Ord group
80+
)
81+
=> (m () -> m (Async m ()))
82+
-> JobPool group m a
83+
-> Job group m a
84+
-> m ()
85+
forkJob' doFork JobPool{jobsVar, completionQueue} (Job action handler group label) =
8086
mask $ \restore -> do
81-
jobAsync <- async $ do
87+
jobAsync <- doFork $ do
8288
tid <- myThreadId
8389
io tid restore
8490
`onException`
@@ -104,6 +110,35 @@ forkJob JobPool{jobsVar, completionQueue} (Job action handler group label) =
104110
restore action
105111
atomically $ writeTQueue completionQueue res
106112

113+
114+
115+
forkJob :: forall group m a.
116+
( MonadAsync m, MonadMask m
117+
, Ord group
118+
)
119+
=> JobPool group m a
120+
-> Job group m a
121+
-> m ()
122+
forkJob = forkJob' async
123+
124+
125+
forkJobOn :: forall group m a.
126+
( MonadAsync m, MonadMask m
127+
, Ord group
128+
)
129+
=> Int
130+
-> JobPool group m a
131+
-> Job group m a
132+
-> m ()
133+
forkJobOn c = forkJob' (asyncOn limitCapability)
134+
where
135+
limitCapability :: Int
136+
limitCapability =
137+
-- TODO: add `getNumCapabilities` to `MonadFork`
138+
let sysCap = unsafePerformIO getNumCapabilities in
139+
c `mod` (max 1 $ sysCap - 2)
140+
141+
107142
readSize :: MonadSTM m => JobPool group m a -> STM m Int
108143
readSize JobPool{jobsVar} = Map.size <$> readTVar jobsVar
109144

network-mux/src/Network/Mux.hs

+8-5
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,11 @@ run :: forall m mode.
211211
, MonadMask m
212212
)
213213
=> Tracer m Trace
214+
-> Int
214215
-> Mux mode m
215216
-> Bearer m
216217
-> m ()
217-
run tracer Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer@Bearer {name} = do
218+
run tracer peerHash Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer@Bearer{name} = do
218219
egressQueue <- atomically $ newTBQueue 100
219220

220221
-- label shared variables
@@ -231,7 +232,8 @@ run tracer Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer@Bearer {
231232
-- Wait for someone to shut us down by calling muxStop or an error.
232233
-- Outstanding jobs are shut down Upon completion of withJobPool.
233234
withTimeoutSerial $ \timeout ->
234-
monitor tracer
235+
monitor peerHash
236+
tracer
235237
timeout
236238
jobpool
237239
egressQueue
@@ -375,14 +377,15 @@ monitor :: forall mode m.
375377
, Alternative (STM m)
376378
, MonadThrow (STM m)
377379
)
378-
=> Tracer m Trace
380+
=> Int
381+
-> Tracer m Trace
379382
-> TimeoutFn m
380383
-> JobPool.JobPool Group m JobResult
381384
-> EgressQueue m
382385
-> StrictTQueue m (ControlCmd mode m)
383386
-> StrictTVar m Status
384387
-> m ()
385-
monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
388+
monitor peerHash tracer timeout jobpool egressQueue cmdQueue muxStatus =
386389
go (MonitorCtx Map.empty)
387390
where
388391
go :: MonitorCtx m mode -> m ()
@@ -451,7 +454,7 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
451454
ptclAction) -> do
452455
traceWith tracer (TraceStartEagerly miniProtocolNum
453456
(protocolDirEnum miniProtocolDir))
454-
JobPool.forkJob jobpool $
457+
JobPool.forkJobOn peerHash jobpool $
455458
miniProtocolJob
456459
tracer
457460
egressQueue

network-mux/test/Test/Mux.hs

+22-22
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,8 @@ prop_mux_snd_recv (DummyRun messages) = ioProperty $ do
358358

359359
serverMux <- Mx.new [serverApp]
360360

361-
withAsync (Mx.run clientTracer clientMux clientBearer) $ \clientAsync ->
362-
withAsync (Mx.run serverTracer serverMux serverBearer) $ \serverAsync -> do
361+
withAsync (Mx.run clientTracer 0 clientMux clientBearer) $ \clientAsync ->
362+
withAsync (Mx.run serverTracer 0 serverMux serverBearer) $ \serverAsync -> do
363363

364364
r <- step clientMux clientApp serverMux serverApp messages
365365
Mx.stop serverMux
@@ -434,10 +434,10 @@ prop_mux_snd_recv_bi (DummyRun messages) = ioProperty $ do
434434

435435

436436
clientMux <- Mx.new clientApps
437-
clientAsync <- async $ Mx.run clientTracer clientMux clientBearer
437+
clientAsync <- async $ Mx.run clientTracer 0 clientMux clientBearer
438438

439439
serverMux <- Mx.new serverApps
440-
serverAsync <- async $ Mx.run serverTracer serverMux serverBearer
440+
serverAsync <- async $ Mx.run serverTracer 1 serverMux serverBearer
441441

442442
r <- step clientMux clientApps serverMux serverApps messages
443443
Mx.stop clientMux
@@ -541,7 +541,7 @@ prop_mux_snd_recv_compat messages = ioProperty $ do
541541
)
542542

543543
-- Wait for the first MuxApplication to finish, then stop the mux.
544-
withAsync (Mx.run clientTracer clientMux clientBearer) $ \aid -> do
544+
withAsync (Mx.run clientTracer 0 clientMux clientBearer) $ \aid -> do
545545
_ <- atomically res
546546
Mx.stop clientMux
547547
wait aid
@@ -559,7 +559,7 @@ prop_mux_snd_recv_compat messages = ioProperty $ do
559559
)
560560

561561
-- Wait for the first MuxApplication to finish, then stop the mux.
562-
withAsync (Mx.run serverTracer serverMux serverBearer) $ \aid -> do
562+
withAsync (Mx.run serverTracer 1 serverMux serverBearer) $ \aid -> do
563563
_ <- atomically res
564564
Mx.stop serverMux
565565
wait aid
@@ -719,7 +719,7 @@ runMuxApplication initApps initBearer respApps respBearer = do
719719
respMux <- Mx.new $ map (\(pn,_) ->
720720
MiniProtocolInfo (Mx.MiniProtocolNum pn) Mx.ResponderDirectionOnly defaultMiniProtocolLimits)
721721
respApps'
722-
respAsync <- async $ Mx.run serverTracer respMux respBearer
722+
respAsync <- async $ Mx.run serverTracer 1 respMux respBearer
723723
getRespRes <- sequence [ Mx.runMiniProtocol
724724
respMux
725725
(Mx.MiniProtocolNum pn)
@@ -732,7 +732,7 @@ runMuxApplication initApps initBearer respApps respBearer = do
732732
initMux <- Mx.new $ map (\(pn,_) ->
733733
MiniProtocolInfo (Mx.MiniProtocolNum pn) Mx.InitiatorDirectionOnly defaultMiniProtocolLimits)
734734
initApps'
735-
initAsync <- async $ Mx.run clientTracer initMux initBearer
735+
initAsync <- async $ Mx.run clientTracer 0 initMux initBearer
736736
getInitRes <- sequence [ Mx.runMiniProtocol
737737
initMux
738738
(Mx.MiniProtocolNum pn)
@@ -952,17 +952,17 @@ prop_mux_starvation (Uneven response0 response1) =
952952
}
953953

954954
serverMux <- Mx.new [serverApp2, serverApp3]
955-
serverMux_aid <- async $ Mx.run serverTracer serverMux serverBearer
955+
serverMux_aid <- async $ Mx.run serverTracer 0 serverMux serverBearer
956956
serverRes2 <- Mx.runMiniProtocol serverMux (miniProtocolNum serverApp2) (miniProtocolDir serverApp2)
957957
Mx.StartOnDemand server_short
958958
serverRes3 <- Mx.runMiniProtocol serverMux (miniProtocolNum serverApp3) (miniProtocolDir serverApp3)
959959
Mx.StartOnDemand server_long
960960

961961
clientMux <- Mx.new [clientApp2, clientApp3]
962-
clientMux_aid <- async $ Mx.run (clientTracer <> headerTracer) clientMux clientBearer
963-
clientRes2 <- Mx.runMiniProtocol clientMux (Mx.miniProtocolNum clientApp2) (Mx.miniProtocolDir clientApp2)
962+
clientMux_aid <- async $ Mx.run (clientTracer <> headerTracer) 1 clientMux clientBearer
963+
clientRes2 <- Mx.runMiniProtocol clientMux (miniProtocolNum clientApp2) (miniProtocolDir clientApp2)
964964
Mx.StartEagerly client_short
965-
clientRes3 <- Mx.runMiniProtocol clientMux (Mx.miniProtocolNum clientApp3) (Mx.miniProtocolDir clientApp3)
965+
clientRes3 <- Mx.runMiniProtocol clientMux (miniProtocolNum clientApp3) (miniProtocolDir clientApp3)
966966
Mx.StartEagerly client_long
967967

968968

@@ -1157,7 +1157,7 @@ prop_demux_sdu a = do
11571157
serverRes <- Mx.runMiniProtocol serverMux (Mx.miniProtocolNum serverApp) (Mx.miniProtocolDir serverApp)
11581158
Mx.StartEagerly server_mp
11591159

1160-
said <- async $ Mx.run serverTracer serverMux serverBearer
1160+
said <- async $ Mx.run serverTracer 1 serverMux serverBearer
11611161
return (server_r, said, serverRes, serverMux)
11621162

11631163
-- Server that expects to receive a specific ByteString.
@@ -1432,7 +1432,7 @@ prop_mux_restart_m (DummyRestartingInitiatorApps apps) = do
14321432
let minis = map (appToInfo Mx.InitiatorDirectionOnly . fst) apps
14331433

14341434
mux <- Mx.new minis
1435-
mux_aid <- async $ Mx.run nullTracer mux bearer
1435+
mux_aid <- async $ Mx.run nullTracer 0 mux bearer
14361436
getRes <- sequence [ Mx.runMiniProtocol
14371437
mux
14381438
(daNum $ fst app)
@@ -1479,7 +1479,7 @@ prop_mux_restart_m (DummyRestartingResponderApps rapps) = do
14791479
minis = map (appToInfo Mx.ResponderDirectionOnly) apps
14801480

14811481
mux <- Mx.new minis
1482-
mux_aid <- async $ Mx.run nullTracer mux bearer
1482+
mux_aid <- async $ Mx.run nullTracer 1 mux bearer
14831483
getRes <- sequence [ Mx.runMiniProtocol
14841484
mux
14851485
(daNum $ fst app)
@@ -1528,7 +1528,7 @@ prop_mux_restart_m (DummyRestartingInitiatorResponderApps rapps) = do
15281528
respMinis = map (appToInfo Mx.ResponderDirection) apps
15291529

15301530
mux <- Mx.new $ initMinis ++ respMinis
1531-
mux_aid <- async $ Mx.run nullTracer mux bearer
1531+
mux_aid <- async $ Mx.run nullTracer 1 mux bearer
15321532
getInitRes <- sequence [ Mx.runMiniProtocol
15331533
mux
15341534
(daNum $ fst app)
@@ -1603,7 +1603,7 @@ prop_mux_start_m bearer _ checkRes (DummyInitiatorApps apps) runTime = do
16031603
minRunTime = minimum $ runTime : (map daRunTime $ filter (\app -> daAction app == DummyAppFail) apps)
16041604

16051605
mux <- Mx.new minis
1606-
mux_aid <- async $ Mx.run nullTracer mux bearer
1606+
mux_aid <- async $ Mx.run nullTracer 0 mux bearer
16071607
killer <- async $ (threadDelay runTime) >> Mx.stop mux
16081608
getRes <- sequence [ Mx.runMiniProtocol
16091609
mux
@@ -1624,7 +1624,7 @@ prop_mux_start_m bearer trigger checkRes (DummyResponderApps apps) runTime = do
16241624
minRunTime = minimum $ runTime : (map (\a -> daRunTime a + daStartAfter a) $ filter (\app -> daAction app == DummyAppFail) apps)
16251625

16261626
mux <- Mx.new minis
1627-
mux_aid <- async $ Mx.run verboseTracer mux bearer
1627+
mux_aid <- async $ Mx.run verboseTracer 0 mux bearer
16281628
getRes <- sequence [ Mx.runMiniProtocol
16291629
mux
16301630
(daNum app)
@@ -1650,7 +1650,7 @@ prop_mux_start_m bearer _trigger _checkRes (DummyResponderAppsKillMux apps) runT
16501650
let minis = map (appToInfo Mx.ResponderDirectionOnly) apps
16511651

16521652
mux <- Mx.new minis
1653-
mux_aid <- async $ Mx.run verboseTracer mux bearer
1653+
mux_aid <- async $ Mx.run verboseTracer 1 mux bearer
16541654
getRes <- sequence [ Mx.runMiniProtocol
16551655
mux
16561656
(daNum app)
@@ -1673,7 +1673,7 @@ prop_mux_start_m bearer trigger checkRes (DummyInitiatorResponderApps apps) runT
16731673
minRunTime = minimum $ runTime : (map (\a -> daRunTime a) $ filter (\app -> daAction app == DummyAppFail) apps)
16741674

16751675
mux <- Mx.new $ initMinis ++ respMinis
1676-
mux_aid <- async $ Mx.run verboseTracer mux bearer
1676+
mux_aid <- async $ Mx.run verboseTracer 0 mux bearer
16771677
getInitRes <- sequence [ Mx.runMiniProtocol
16781678
mux
16791679
(daNum app)
@@ -1835,7 +1835,7 @@ close_experiment
18351835
])
18361836
Mx.stop $ \mux ->
18371837
withNetworkCtx clientCtx $ \clientBearer ->
1838-
withAsync (Mx.run ((Client,) `contramap` muxTracer) mux clientBearer) $ \_muxAsync ->
1838+
withAsync (Mx.run ((Client,) `contramap` muxTracer) 0 mux clientBearer) $ \_muxAsync ->
18391839
Mx.runMiniProtocol
18401840
mux miniProtocolNum
18411841
Mx.InitiatorDirectionOnly Mx.StartEagerly
@@ -1853,7 +1853,7 @@ close_experiment
18531853
])
18541854
Mx.stop $ \mux ->
18551855
withNetworkCtx serverCtx $ \serverBearer ->
1856-
withAsync (Mx.run ((Server,) `contramap` muxTracer) mux serverBearer) $ \_muxAsync -> do
1856+
withAsync (Mx.run ((Server,) `contramap` muxTracer) 0 mux serverBearer) $ \_muxAsync -> do
18571857
Mx.runMiniProtocol
18581858
mux miniProtocolNum
18591859
Mx.ResponderDirectionOnly Mx.StartOnDemand

ouroboros-network-framework/demo/connection-manager.hs

+8-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import Control.Tracer (Tracer (..), contramap, nullTracer, traceWith)
3737
import Data.ByteString.Lazy (ByteString)
3838
import Data.Either (partitionEithers)
3939
import Data.Functor (($>))
40+
import Data.Hashable (Hashable)
4041
import Data.List.NonEmpty (NonEmpty (..))
4142
import Data.Typeable (Typeable)
4243

@@ -77,6 +78,7 @@ import Ouroboros.Network.Server.RateLimiting (AcceptedConnectionsLimit (..))
7778
import Ouroboros.Network.Server2 qualified as Server
7879
import Ouroboros.Network.Snocket (Snocket, socketSnocket)
7980
import Ouroboros.Network.Snocket qualified as Snocket
81+
import Ouroboros.Network.Socket () -- Hashable SockAddr
8082
import Ouroboros.Network.Util.ShowProxy
8183

8284

@@ -174,7 +176,10 @@ withBidirectionalConnectionManager
174176
:: forall peerAddr socket m a.
175177
( ConnectionManagerMonad m
176178

177-
, Ord peerAddr, Show peerAddr, Typeable peerAddr
179+
, Hashable peerAddr
180+
, Ord peerAddr
181+
, Show peerAddr
182+
, Typeable peerAddr
178183

179184
-- debugging
180185
, MonadFix m
@@ -441,7 +446,8 @@ runInitiatorProtocols
441446
--
442447
bidirectionalExperiment
443448
:: forall peerAddr socket.
444-
( Ord peerAddr
449+
( Hashable peerAddr
450+
, Ord peerAddr
445451
, Show peerAddr
446452
, Typeable peerAddr
447453
, Eq peerAddr

ouroboros-network-framework/ouroboros-network-framework.cabal

+2
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ library testlib
140140
cborg,
141141
containers,
142142
contra-tracer,
143+
hashable,
143144
io-classes,
144145
io-sim,
145146
network-mux,
@@ -331,6 +332,7 @@ executable demo-connection-manager
331332
base >=4.14 && <4.21,
332333
bytestring,
333334
contra-tracer,
335+
hashable,
334336
io-classes,
335337
network,
336338
network-mux,

0 commit comments

Comments
 (0)