Skip to content

Commit c3c1e52

Browse files
iohk-bors[bot]karknumrBliss
authored
Merge #1900 #1907
1900: Label mux threads r=karknu a=karknu 1907: ImmutableDB: allow reading concurrently with appending r=mrBliss a=mrBliss Read the potentially stale value of the internal state provided by the `StrictMVar` instead of blocking on the `TMVar` in case an appender has locked it. Reads can safely happen concurrently with appends, so this will allow for some more concurrency. I don't expect this to make any impact for a "client" node that is syncing, as it will typically not read from its ImmutableDB, only append to it, which is necessarily sequential. However, a "server" node might have to read headers/blocks from its ImmutableDB to serve to other nodes. Serving multiple requests won't require taking + putting a `TMVar` anymore, it's just reading a `TVar` now. I expect to see the most improvement for a node serving others while it is also still syncing itself. Blocks being appended to the ImmutableDB will happen concurrently with all read requests. Co-authored-by: Karl Knutsson <[email protected]> Co-authored-by: Thomas Winant <[email protected]>
3 parents d0791b3 + 1ffd9f2 + 0a41099 commit c3c1e52

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

network-mux/src/Network/Mux.hs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,11 @@ muxStart tracer (MuxApplication ptcls) bearer = do
177177

178178
muxerJob tq cnt =
179179
JobPool.Job (mux cnt MuxState { egressQueue = tq, Egress.bearer })
180-
MuxerException
180+
MuxerException "muxer"
181181

182182
demuxerJob tbl =
183183
JobPool.Job (demux DemuxState { dispatchTable = tbl, Ingress.bearer })
184-
DemuxerException
184+
DemuxerException "demuxer"
185185

186186
miniProtocolInitiatorJob = miniProtocolJob selectInitiator ModeInitiator
187187
miniProtocolResponderJob = miniProtocolJob selectResponder ModeResponder
@@ -216,6 +216,7 @@ muxStart tracer (MuxApplication ptcls) bearer = do
216216
where
217217
job run = JobPool.Job (jobAction run)
218218
(MiniProtocolException pnum pix pmode)
219+
((show pix) ++ "." ++ (show pmode))
219220

220221
jobAction run = do
221222
chan <- mkChannel

network-mux/src/Network/Mux/JobPool.hs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ data JobPool m a = JobPool {
2828
completionQueue :: !(TQueue m a)
2929
}
3030

31-
data Job m a = Job (m a) (SomeException -> a)
31+
data Job m a = Job (m a) (SomeException -> a) String
3232

3333
withJobPool :: forall m a b.
3434
(MonadAsync m, MonadThrow m)
@@ -52,12 +52,13 @@ forkJob :: forall m a.
5252
=> JobPool m a
5353
-> Job m a
5454
-> m ()
55-
forkJob JobPool{jobsVar, completionQueue} (Job action handler) =
55+
forkJob JobPool{jobsVar, completionQueue} (Job action handler label) =
5656
mask $ \restore -> do
5757
jobAsync <- async $ do
58+
tid <- myThreadId
59+
labelThread tid label
5860
res <- handleJust notAsyncExceptions (return . handler) $
5961
restore action
60-
tid <- myThreadId
6162
atomically $ do
6263
writeTQueue completionQueue res
6364
modifyTVar' jobsVar (Map.delete tid)

ouroboros-consensus/src/Ouroboros/Consensus/Storage/ImmutableDB/Impl/State.hs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,9 @@ getOpenState :: (HasCallStack, IOLike m)
155155
=> ImmutableDBEnv m hash
156156
-> m (SomePair (HasFS m) (OpenState m hash))
157157
getOpenState ImmutableDBEnv {..} = do
158-
internalState <- readMVar varInternalState
158+
-- We use 'readMVarSTM' to read a potentially stale internal state if
159+
-- somebody's appending to the ImmutableDB at the same time.
160+
internalState <- atomically $ readMVarSTM varInternalState
159161
case internalState of
160162
DbClosed -> throwUserError ClosedDBError
161163
DbOpen openState -> return (SomePair hasFS openState)
@@ -244,15 +246,19 @@ withOpenState ImmutableDBEnv { hasFS = hasFS :: HasFS m h, .. } action = do
244246
where
245247
HasFS{..} = hasFS
246248

249+
-- We use 'readMVarSTM' to read a potentially stale internal state if
250+
-- somebody's appending to the ImmutableDB at the same time. Reads can
251+
-- safely happen concurrently with appends, so this is fine and allows for
252+
-- some extra concurrency.
247253
open :: m (OpenState m hash h)
248-
open = readMVar varInternalState >>= \case
254+
open = atomically (readMVarSTM varInternalState) >>= \case
249255
DbOpen ost -> return ost
250256
DbClosed -> throwUserError ClosedDBError
251257

252258
-- close doesn't take the state that @open@ returned, because the state
253259
-- may have been updated by someone else since we got it (remember we're
254-
-- using 'readMVar' here, 'takeMVar'). So we need to get the most recent
255-
-- state anyway.
260+
-- using 'readMVarSTM' here, not 'takeMVar'). So we need to get the most
261+
-- recent state anyway.
256262
close :: ExitCase (Either ImmutableDBError r)
257263
-> m ()
258264
close ec = case ec of

0 commit comments

Comments
 (0)