-
Notifications
You must be signed in to change notification settings - Fork 120
Expand file tree
/
Copy pathParallel.hs
More file actions
115 lines (104 loc) · 3.93 KB
/
Parallel.hs
File metadata and controls
115 lines (104 loc) · 3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
-- | A helper module which takes care of parallelism
module Test.Tasty.Parallel (ActionStatus(..), Action(..), runInParallel) where
import Control.Monad
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Foreign.StablePtr
-- | What to do about an t'Action'?
data ActionStatus
= ActionReady
-- ^ the action is ready to be executed
| ActionSkip
-- ^ the action should be skipped
| ActionWait
-- ^ not sure what to do yet; wait
deriving Eq
data Action = Action
{ actionStatus :: STM ActionStatus
, actionRun :: IO ()
, actionSkip :: STM ()
}
-- | Take a list of actions and execute them in parallel, no more than @n@
-- at the same time.
--
-- The action itself is asynchronous, ie. it returns immediately and does
-- the work in new threads. It returns an action which aborts tests and
-- cleans up.
runInParallel
:: Int -- ^ maximum number of parallel threads
-> [Action] -- ^ list of actions to execute.
-- The first action in the pair tells if the second action is ready to run.
-> IO (IO ())
-- This implementation tries its best to ensure that exceptions are
-- properly propagated to the caller and threads are not left running.
--
-- Note that exceptions inside tests are already caught by the test
-- actions themselves. Any exceptions that reach this function or its
-- threads are by definition unexpected.
runInParallel nthreads actions = do
-- When linked with threaded RTS, ensure we have enough Capabilities
-- so that all Haskell worker threads can truly run in parallel.
when rtsSupportsBoundThreads $ do
ncap <- getNumCapabilities
when (ncap < nthreads) $ setNumCapabilities nthreads
callingThread <- myThreadId
-- Don't let the main thread be garbage-collected
-- Otherwise we may get a "thread blocked indefinitely in an STM
-- transaction" exception when a child thread is blocked and GC'd.
-- (See e.g. https://github.com/UnkindPartition/tasty/issues/15)
-- FIXME is this still needed?
_ <- newStablePtr callingThread
actionsVar <- atomically $ newTMVar actions
pids <- replicateM nthreads (async $ work actionsVar)
return $ do
-- Tell worker threads there is no more work after their current task.
-- 'cancel' below by itself is not sufficient because if an exception
-- is thrown in the middle of a test, the worker thread simply marks
-- the test as failed and moves on to their next task. We also need to
-- make it clear that there are no further tasks.
_ <- atomically $ swapTMVar actionsVar []
-- Cancel all the current tasks, waiting for workers to clean up.
-- The waiting part is important (see #249), that's why we use cancel
-- instead of killThread.
mapM_ cancel pids
work :: TMVar [Action] -> IO ()
work actionsVar = go
where
go = do
join . atomically $ do
mb_ready <- findBool =<< takeTMVar actionsVar
case mb_ready of
Nothing -> do
-- Nothing left to do. Put back the TMVar so that other threads
-- do not block on an empty TMVar (see #249) and return.
putTMVar actionsVar []
return $ return ()
Just (this, rest) -> do
putTMVar actionsVar rest
return $ actionRun this >> go
-- | Find a ready-to-run item. Filter out the items that will never be
-- ready to run.
--
-- Return the ready item and the remaining ones.
--
-- This action may block if no items are ready to run just yet.
--
-- Return 'Nothing' if there are no runnable items left.
findBool :: [Action] -> STM (Maybe (Action, [Action]))
findBool = go []
where
go [] [] =
-- nothing to do
return Nothing
go _ [] =
-- nothing ready yet
retry
go past (this : rest) = do
status <- actionStatus this
case status of
ActionReady -> return $ Just (this, reverse past ++ rest)
ActionWait -> go (this : past) rest
ActionSkip -> do
actionSkip this
go past rest