33
44module Echidna.Campaign where
55
6- import Control.Concurrent ( writeChan )
6+ import Control.Concurrent
77import Control.DeepSeq (force )
88import Control.Monad (replicateM , when , void , forM_ )
99import Control.Monad.Catch (MonadThrow (.. ))
@@ -22,6 +22,7 @@ import Data.Maybe (isJust, mapMaybe, fromMaybe)
2222import Data.Set (Set )
2323import Data.Set qualified as Set
2424import Data.Text (Text )
25+ import Data.Time (LocalTime )
2526import System.Random (mkStdGen )
2627
2728import EVM (cheatCode )
@@ -67,7 +68,7 @@ replayCorpus
6768replayCorpus vm txSeqs =
6869 forM_ (zip [1 .. ] txSeqs) $ \ (i, txSeq) -> do
6970 _ <- callseq vm txSeq
70- pushEvent (TxSequenceReplayed i (length txSeqs))
71+ pushWorkerEvent (TxSequenceReplayed i (length txSeqs))
7172
7273-- | Run a fuzzing campaign given an initial universe state, some tests, and an
7374-- optional dictionary to generate calls with. Return the 'Campaign' state once
@@ -206,7 +207,11 @@ callseq vm txSeq = do
206207
207208 cov <- liftIO . readIORef =<< asks (. coverageRef)
208209 points <- liftIO $ scoveragePoints cov
209- pushEvent (NewCoverage points (length cov) newSize)
210+ pushWorkerEvent NewCoverage { points
211+ , numCodehashes = length cov
212+ , corpusSize = newSize
213+ , transactions = fst <$> results
214+ }
210215
211216 modify' $ \ workerState ->
212217
@@ -368,10 +373,10 @@ updateTest vmForShrink (vm, xs) test = do
368373 test' = updateOpenTest test xs (testValue, vm', results)
369374 case test'. state of
370375 Large _ -> do
371- pushEvent (TestFalsified test')
376+ pushWorkerEvent (TestFalsified test')
372377 pure (Just test')
373378 _ | test'. value > test. value -> do
374- pushEvent (TestOptimized test')
379+ pushWorkerEvent (TestOptimized test')
375380 pure (Just test')
376381 _ -> pure Nothing
377382 Large _ ->
@@ -381,12 +386,46 @@ updateTest vmForShrink (vm, xs) test = do
381386 shrinkTest vmForShrink test
382387 _ -> pure Nothing
383388
384- pushEvent
389+ pushWorkerEvent
385390 :: (MonadReader Env m , MonadState WorkerState m , MonadIO m )
386- => CampaignEvent
391+ => WorkerEvent
387392 -> m ()
388- pushEvent event = do
393+ pushWorkerEvent event = do
389394 workerId <- gets (. workerId)
395+ env <- ask
396+ liftIO $ pushCampaignEvent env (WorkerEvent workerId event)
397+
398+ pushCampaignEvent :: Env -> CampaignEvent -> IO ()
399+ pushCampaignEvent env event = do
390400 time <- liftIO getTimestamp
391- chan <- asks (. eventQueue)
392- liftIO $ writeChan chan (workerId, time, event)
401+ writeChan env. eventQueue (time, event)
402+
403+ -- | Listener reads events and runs the given 'handler' function. It exits after
404+ -- receiving all 'WorkerStopped' events and sets the returned 'MVar' so the
405+ -- parent thread can safely block on listener until all events are processed.
406+ --
407+ -- NOTE: because the 'Failure' event does not come from a specific fuzzing worker
408+ -- it is possible that a listener won't process it if emitted after all workers
409+ -- are stopped. This is quite unlikely and non-critical but should be addressed
410+ -- in the long term.
411+ spawnListener
412+ :: (MonadReader Env m , MonadIO m )
413+ => ((LocalTime , CampaignEvent ) -> IO () )
414+ -- ^ a function that handles the events
415+ -> m (MVar () )
416+ spawnListener handler = do
417+ cfg <- asks (. cfg)
418+ let nworkers = fromMaybe 1 cfg. campaignConf. workers
419+ eventQueue <- asks (. eventQueue)
420+ chan <- liftIO $ dupChan eventQueue
421+ stopVar <- liftIO newEmptyMVar
422+ liftIO $ void $ forkFinally (loop chan nworkers) (const $ putMVar stopVar () )
423+ pure stopVar
424+ where
425+ loop chan ! workersAlive =
426+ when (workersAlive > 0 ) $ do
427+ event <- readChan chan
428+ handler event
429+ case event of
430+ (_, WorkerEvent _ (WorkerStopped _)) -> loop chan (workersAlive - 1 )
431+ _ -> loop chan workersAlive
0 commit comments