Skip to content

Commit 9615df4

Browse files
committed
Use STM for lockless pool
MVar-based pool seems to exhibit horrible lock contention in specific workloads (especially with a single stripe), let's try a lockless implementation. Additional notes: - Putting resources back doesn't block now so note "signal uninterruptible" no longer applies.
1 parent 3474c75 commit 9615df4

4 files changed

Lines changed: 158 additions & 156 deletions

File tree

resource-pool.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ library
3636
build-depends: base >= 4.11 && < 5
3737
, hashable >= 1.1.0.0
3838
, primitive >= 0.7
39+
, stm >= 2.5.3.0
3940
, time
4041

4142
ghc-options: -Wall -Wcompat
@@ -45,4 +46,5 @@ library
4546
default-extensions: DeriveGeneric
4647
, LambdaCase
4748
, RankNTypes
49+
, ScopedTypeVariables
4850
, TypeApplications

src/Data/Pool.hs

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ module Data.Pool
2424
, createPool
2525
) where
2626

27-
import Control.Concurrent
27+
import Control.Concurrent.STM
2828
import Control.Exception
29+
import Control.Monad
2930
import Data.Time (NominalDiffTime)
3031

3132
import Data.Pool.Internal
@@ -63,17 +64,19 @@ withResource pool act = mask $ \unmask -> do
6364
takeResource :: Pool a -> IO (a, LocalPool a)
6465
takeResource pool = mask_ $ do
6566
lp <- getLocalPool (localPools pool)
66-
stripe <- takeMVar (stripeVar lp)
67-
if available stripe == 0
68-
then do
69-
q <- newEmptyMVar
70-
putMVar (stripeVar lp) $! stripe {queueR = Queue q (queueR stripe)}
71-
waitForResource (stripeVar lp) q >>= \case
72-
Just a -> pure (a, lp)
73-
Nothing -> do
74-
a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp)
75-
pure (a, lp)
76-
else takeAvailableResource pool lp stripe
67+
join . atomically $ do
68+
stripe <- readTVar (stripeVar lp)
69+
if available stripe == 0
70+
then do
71+
q <- newEmptyTMVar
72+
writeTVar (stripeVar lp) $! stripe {queueR = Queue q (queueR stripe)}
73+
pure $
74+
waitForResource (stripeVar lp) q >>= \case
75+
Just a -> pure (a, lp)
76+
Nothing -> do
77+
a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp)
78+
pure (a, lp)
79+
else takeAvailableResource pool lp stripe
7780

7881
-- | A variant of 'withResource' that doesn't execute the action and returns
7982
-- 'Nothing' instead of blocking if the local pool is exhausted.
@@ -91,12 +94,13 @@ tryWithResource pool act = mask $ \unmask ->
9194
tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a))
9295
tryTakeResource pool = mask_ $ do
9396
lp <- getLocalPool (localPools pool)
94-
stripe <- takeMVar (stripeVar lp)
95-
if available stripe == 0
96-
then do
97-
putMVar (stripeVar lp) stripe
98-
pure Nothing
99-
else Just <$> takeAvailableResource pool lp stripe
97+
join . atomically $ do
98+
stripe <- readTVar (stripeVar lp)
99+
if available stripe == 0
100+
then do
101+
writeTVar (stripeVar lp) stripe
102+
pure $ pure Nothing
103+
else fmap Just <$> takeAvailableResource pool lp stripe
100104

101105
{-# DEPRECATED createPool "Use newPool instead" #-}
102106

@@ -121,16 +125,17 @@ takeAvailableResource
121125
:: Pool a
122126
-> LocalPool a
123127
-> Stripe a
124-
-> IO (a, LocalPool a)
128+
-> STM (IO (a, LocalPool a))
125129
takeAvailableResource pool lp stripe = case cache stripe of
126130
[] -> do
127-
putMVar (stripeVar lp) $! stripe {available = available stripe - 1}
128-
a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp)
129-
pure (a, lp)
131+
writeTVar (stripeVar lp) $! stripe {available = available stripe - 1}
132+
pure $ do
133+
a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp)
134+
pure (a, lp)
130135
Entry a _ : as -> do
131-
putMVar (stripeVar lp) $!
136+
writeTVar (stripeVar lp) $!
132137
stripe
133138
{ available = available stripe - 1
134139
, cache = as
135140
}
136-
pure (a, lp)
141+
pure $ pure (a, lp)

src/Data/Pool/Internal.hs

Lines changed: 53 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
module Data.Pool.Internal where
88

99
import Control.Concurrent
10+
import Control.Concurrent.STM
1011
import Control.Exception
1112
import Control.Monad
1213
import Data.Hashable (hash)
1314
import Data.IORef
1415
import qualified Data.List as L
1516
import Data.Primitive.SmallArray
16-
import GHC.Clock
17+
import GHC.Clock (getMonotonicTime)
18+
import GHC.Conc (unsafeIOToSTM)
1719

1820
-- | Striped resource pool based on "Control.Concurrent.QSem".
1921
data Pool a = Pool
@@ -25,12 +27,13 @@ data Pool a = Pool
2527
-- | A single, local pool.
2628
data LocalPool a = LocalPool
2729
{ stripeId :: !Int
28-
, stripeVar :: !(MVar (Stripe a))
30+
, stripeVar :: !(TVar (Stripe a))
2931
, cleanerRef :: !(IORef ())
3032
}
3133

3234
-- | Stripe of a resource pool. If @available@ is 0, the list of threads waiting
33-
-- for a resource (each with an associated 'MVar') is @queue ++ reverse queueR@.
35+
-- for a resource (each with an associated 'TMVar') is @queue ++ reverse queueR@
36+
-- to ensure fairness.
3437
data Stripe a = Stripe
3538
{ available :: !Int
3639
, cache :: ![Entry a]
@@ -44,10 +47,10 @@ data Entry a = Entry
4447
, lastUsed :: !Double
4548
}
4649

47-
-- | A queue of MVarS corresponding to threads waiting for resources.
50+
-- | A queue of TMVarS corresponding to threads waiting for resources.
4851
--
4952
-- Basically a monomorphic list to save two pointer indirections.
50-
data Queue a = Queue !(MVar (Maybe a)) (Queue a) | Empty
53+
data Queue a = Queue !(TMVar (Maybe a)) (Queue a) | Empty
5154

5255
-- | Configuration of a 'Pool'.
5356
data PoolConfig a = PoolConfig
@@ -129,7 +132,7 @@ newPool pc = do
129132
pools <- fmap (smallArrayFromListN numStripes) . forM [1 .. numStripes] $ \n -> do
130133
ref <- newIORef ()
131134
stripe <-
132-
newMVar
135+
newTVarIO
133136
Stripe
134137
{ available = poolMaxResources pc `quotCeil` numStripes
135138
, cache = []
@@ -175,21 +178,18 @@ newPool pc = do
175178
-- Note that this will ignore any exceptions in the destroy function.
176179
destroyResource :: Pool a -> LocalPool a -> a -> IO ()
177180
destroyResource pool lp a = do
178-
uninterruptibleMask_ $ do
179-
-- Note [signal uninterruptible]
180-
stripe <- takeMVar (stripeVar lp)
181+
atomically $ do
182+
stripe <- readTVar (stripeVar lp)
181183
newStripe <- signal stripe Nothing
182-
putMVar (stripeVar lp) newStripe
183-
void . try @SomeException $ freeResource (poolConfig pool) a
184+
writeTVar (stripeVar lp) $! newStripe
185+
void . try @SomeException $ freeResource (poolConfig pool) a
184186

185187
-- | Return a resource to the given 'LocalPool'.
186188
putResource :: LocalPool a -> a -> IO ()
187-
putResource lp a = do
188-
uninterruptibleMask_ $ do
189-
-- Note [signal uninterruptible]
190-
stripe <- takeMVar (stripeVar lp)
191-
newStripe <- signal stripe (Just a)
192-
putMVar (stripeVar lp) newStripe
189+
putResource lp a = atomically $ do
190+
stripe <- readTVar (stripeVar lp)
191+
newStripe <- signal stripe (Just a)
192+
writeTVar (stripeVar lp) $! newStripe
193193

194194
-- | Destroy all resources in all stripes in the pool.
195195
--
@@ -235,89 +235,78 @@ getLocalPool pools = do
235235
where
236236
stripes = sizeofSmallArray pools
237237

238-
-- | Wait for the resource to be put into a given 'MVar'.
239-
waitForResource :: MVar (Stripe a) -> MVar (Maybe a) -> IO (Maybe a)
240-
waitForResource mstripe q = takeMVar q `onException` cleanup
238+
-- | Wait for the resource to be put into a given 'TMVar'.
239+
waitForResource :: TVar (Stripe a) -> TMVar (Maybe a) -> IO (Maybe a)
240+
waitForResource mstripe q = atomically (takeTMVar q) `onException` cleanup
241241
where
242-
cleanup = uninterruptibleMask_ $ do
243-
-- Note [signal uninterruptible]
244-
stripe <- takeMVar mstripe
242+
cleanup = atomically $ do
243+
stripe <- readTVar mstripe
245244
newStripe <-
246-
tryTakeMVar q >>= \case
245+
tryTakeTMVar q >>= \case
247246
Just ma -> do
248247
-- Between entering the exception handler and taking ownership of
249248
-- the stripe we got the resource we wanted. We don't need it
250249
-- anymore though, so pass it to someone else.
251250
signal stripe ma
252251
Nothing -> do
253-
-- If we're still waiting, fill up the MVar with an undefined value
254-
-- so that 'signal' can discard our MVar from the queue.
255-
putMVar q $ error "unreachable"
252+
-- If we're still waiting, fill up the TMVar with an undefined value
253+
-- so that 'signal' can discard our TMVar from the queue.
254+
writeTMVar q $ error "unreachable"
256255
pure stripe
257-
putMVar mstripe newStripe
256+
writeTVar mstripe $! newStripe
258257

259258
-- | If an exception is received while a resource is being created, restore the
260259
-- original size of the stripe.
261-
restoreSize :: MVar (Stripe a) -> IO ()
262-
restoreSize mstripe = uninterruptibleMask_ $ do
263-
-- 'uninterruptibleMask_' is used since 'takeMVar' might block.
264-
stripe <- takeMVar mstripe
265-
putMVar mstripe $! stripe {available = available stripe + 1}
260+
restoreSize :: TVar (Stripe a) -> IO ()
261+
restoreSize mstripe = atomically $ do
262+
modifyTVar' mstripe $ \stripe -> stripe {available = available stripe + 1}
266263

267264
-- | Free resource entries in the stripes that fulfil a given condition.
268265
cleanStripe
269266
:: (Entry a -> Bool)
270267
-> (a -> IO ())
271-
-> MVar (Stripe a)
268+
-> TVar (Stripe a)
272269
-> IO ()
273-
cleanStripe isStale free mstripe = mask $ \unmask -> do
270+
cleanStripe isStale free mstripe = mask_ $ do
274271
-- Asynchronous exceptions need to be masked here to prevent leaking of
275272
-- 'stale' resources before they're freed.
276-
stale <- modifyMVar mstripe $ \stripe -> unmask $ do
273+
stale <- atomically $ do
274+
stripe <- readTVar mstripe
277275
let (stale, fresh) = L.partition isStale (cache stripe)
278-
-- There's no need to update 'available' here because it only tracks
279-
-- the number of resources taken from the pool.
280-
newStripe = stripe {cache = fresh}
281-
newStripe `seq` pure (newStripe, map entry stale)
276+
-- There's no need to update 'available' here because it only tracks
277+
-- the number of resources taken from the pool.
278+
writeTVar mstripe $! stripe {cache = fresh}
279+
pure $ map entry stale
282280
-- We need to ignore exceptions in the 'free' function, otherwise if an
283281
-- exception is thrown half-way, we leak the rest of the resources. Also,
284282
-- asynchronous exceptions need to be hard masked here since freeing a
285283
-- resource might in theory block.
286284
uninterruptibleMask_ . forM_ stale $ try @SomeException . free
287285

288-
-- Note [signal uninterruptible]
289-
--
290-
-- If we have
291-
--
292-
-- bracket takeResource putResource (...)
293-
--
294-
-- and an exception arrives at the putResource, then we must not lose the
295-
-- resource. The putResource is masked by bracket, but taking the MVar might
296-
-- block, and so it would be interruptible. Hence we need an uninterruptible
297-
-- variant of mask here.
298-
signal :: Stripe a -> Maybe a -> IO (Stripe a)
286+
signal :: forall a. Stripe a -> Maybe a -> STM (Stripe a)
299287
signal stripe ma =
300288
if available stripe == 0
301289
then loop (queue stripe) (queueR stripe)
302290
else do
303291
newCache <- case ma of
304292
Just a -> do
305-
now <- getMonotonicTime
293+
now <- unsafeIOToSTM getMonotonicTime
306294
pure $ Entry a now : cache stripe
307295
Nothing -> pure $ cache stripe
308-
pure $!
296+
pure
309297
stripe
310298
{ available = available stripe + 1
311299
, cache = newCache
312300
}
313301
where
302+
loop :: Queue a -> Queue a -> STM (Stripe a)
314303
loop Empty Empty = do
315304
newCache <- case ma of
316305
Just a -> do
317-
now <- getMonotonicTime
306+
now <- unsafeIOToSTM getMonotonicTime
318307
pure [Entry a now]
319308
Nothing -> pure []
320-
pure $!
309+
pure
321310
Stripe
322311
{ available = 1
323312
, cache = newCache
@@ -326,22 +315,22 @@ signal stripe ma =
326315
}
327316
loop Empty qR = loop (reverseQueue qR) Empty
328317
loop (Queue q qs) qR =
329-
tryPutMVar q ma >>= \case
318+
tryPutTMVar q ma >>= \case
330319
-- This fails when 'waitForResource' went into the exception handler and
331-
-- filled the MVar (with an undefined value) itself. In such case we
320+
-- filled the TMVar (with an undefined value) itself. In such case we
332321
-- simply ignore it.
333322
False -> loop qs qR
334323
True ->
335-
pure $!
324+
pure
336325
stripe
337326
{ available = 0
338327
, queue = qs
339328
, queueR = qR
340329
}
341330

342-
reverseQueue :: Queue a -> Queue a
343-
reverseQueue = go Empty
344-
where
345-
go acc = \case
346-
Empty -> acc
347-
Queue x xs -> go (Queue x acc) xs
331+
reverseQueue :: Queue a -> Queue a
332+
reverseQueue = go Empty
333+
where
334+
go acc = \case
335+
Empty -> acc
336+
Queue x xs -> go (Queue x acc) xs

0 commit comments

Comments
 (0)