Skip to content

Commit

Permalink
Unify worker implementations
Browse files Browse the repository at this point in the history
Introduce an optional second frontend plugin option that selects the
implementation, either the default GHC main function or the local logic
with custom session management.
  • Loading branch information
tek committed Jan 22, 2025
1 parent 4201707 commit 62bdbb4
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 87 deletions.
2 changes: 1 addition & 1 deletion buck-multiplex-worker/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ processRequest pool buckArgs env@Env {args} = do
ghcPath <- note "no --ghc-path given" args.ghcPath
requestWorkerTargetId <- Just . TargetId <$> note "no --worker-target-id given" args.workerTargetId
liftIO do
(j, i, hset) <- assignLoop ghcPath (maybeToList buckArgs.pluginDb) pool requestWorkerTargetId
(j, i, hset) <- assignLoop buckArgs.multiplexerCustom ghcPath (maybeToList buckArgs.pluginDb) pool requestWorkerTargetId
let
req = Request {
requestWorkerTargetId,
Expand Down
35 changes: 30 additions & 5 deletions buck-worker/lib/BuckArgs.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module BuckArgs where
import Control.Applicative ((<|>))
import Data.Foldable (for_)
import Data.Int (Int32)
import Data.List (dropWhileEnd, stripPrefix)
import Data.List (dropWhileEnd)
import Data.Map (Map)
import Data.Map.Strict ((!?))
import Data.Maybe (fromMaybe)
Expand All @@ -22,6 +22,23 @@ data CompileResult =
}
deriving stock (Show)

data Mode =
ModeCompile
|
ModeLink
|
ModeMetadata
|
ModeUnknown String
deriving stock (Eq, Show)

parseMode :: String -> Mode
parseMode = \case
"compile" -> ModeCompile
"link" -> ModeLink
"metadata" -> ModeMetadata
mode -> ModeUnknown mode

data BuckArgs =
BuckArgs {
topdir :: Maybe String,
Expand All @@ -37,7 +54,9 @@ data BuckArgs =
ghcPath :: Maybe String,
ghcDirFile :: Maybe String,
ghcDbFile :: Maybe String,
ghcOptions :: [String]
ghcOptions :: [String],
multiplexerCustom :: Bool,
mode :: Maybe Mode
}
deriving stock (Eq, Show)

Expand All @@ -57,7 +76,9 @@ emptyBuckArgs env =
ghcPath = Nothing,
ghcDirFile = Nothing,
ghcDbFile = Nothing,
ghcOptions = []
ghcOptions = [],
multiplexerCustom = False,
mode = Nothing
}

options :: Map String ([String] -> BuckArgs -> Either String ([String], BuckArgs))
Expand All @@ -76,11 +97,16 @@ options =
withArg "--extra-pkg-db" \ z a -> z {ghcDbFile = Just a},
withArg "--bin-path" \ z a -> z {binPath = a : z.binPath},
withArg "--bin-exe" \ z a -> z {binPath = takeDirectory a : z.binPath},
skip "-c"
withArg "--worker-mode" \ z a -> z {mode = Just (parseMode a)},
flag "--worker-multiplexer-custom" \ z -> z {multiplexerCustom = True},
("-c", \ rest z -> Right (rest, z {mode = Just ModeCompile}))
-- skip "-c"
]
where
skip name = (name, \ rest z -> Right (rest, z))

flag name f = (name, \ rest z -> Right (rest, f z))

withArg name f = (name, \ argv z -> takeArg name argv (f z))

takeArg name argv store = case argv of
Expand All @@ -93,7 +119,6 @@ parseBuckArgs env =
where
spin z = \case
('-' : 'B' : path) : rest -> spin z {topdir = Just path} rest
arg : rest | Just exe <- stripPrefix "--bin-exe=" arg -> spin z {binPath = takeDirectory exe : z.binPath} rest
arg : args -> do
(rest, new) <- fromMaybe (equalsArg arg) (options !? arg) args z
spin new rest
Expand Down
218 changes: 158 additions & 60 deletions plugin/src/GHCPersistentWorkerPlugin.hs
Original file line number Diff line number Diff line change
@@ -1,46 +1,34 @@
{-# LANGUAGE NumericUnderscores, CPP #-}
{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-}
module GHCPersistentWorkerPlugin (frontendPlugin) where

import Internal.Log (logToState, logFlushBytes)
import Internal.Error (handleExceptions)
import Internal.Log (newLog)
import Control.Concurrent (forkOS)
import Control.Concurrent (MVar, forkOS)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Monad (forever, replicateM_)
import Control.Monad.IO.Class (liftIO)
import qualified Data.ByteString as B
import Data.Foldable (for_)
import Data.List (intercalate)
import Data.List (intercalate, uncons)
import Data.Maybe (fromMaybe)
import Data.Time.Clock (getCurrentTime)
import qualified GHC
import GHC.Driver.Env (HscEnv (hsc_NC, hsc_interp, hsc_unit_env))
import GHC.Driver.Monad (Ghc, withSession, modifySession, reflectGhc, reifyGhc)
import GHC.Driver.Monad (Ghc, modifySession, reflectGhc, reifyGhc, withSession)
import GHC.Driver.Plugins (FrontendPlugin (..), defaultFrontendPlugin)
import qualified GHC.Linker.Loader as Loader
import GHC.Main
( PreStartupMode (..),
main',
parseModeFlags,
showSupportedExtensions,
showVersion,
showOptions,
)
import GHC.Main (PreStartupMode (..), main', parseModeFlags, showOptions, showSupportedExtensions, showVersion)
import GHC.Plugins (GhcException (UsageError), throwGhcException)
import GHC.Runtime.Interpreter.Types (Interp (..), InterpInstance (..))
import GHC.Settings.Config (cProjectVersion)
import Internal.Args (Args (..), emptyArgs)
import Internal.Cache (emptyCache)
import Internal.Compile (compile)
import Internal.Error (handleExceptions)
import Internal.Log (Log, logFlushBytes, logToState, newLog)
import Internal.Session (Env (..), withGhc)
import System.Directory (setCurrentDirectory)
import System.Environment (setEnv)
import System.IO
( BufferMode (..),
Handle,
hFlush,
hGetLine,
hPutStrLn,
hSetBuffering,
stdin,
stderr,
stdout,
)
import System.IO (BufferMode (..), Handle, hFlush, hGetLine, hPutStrLn, hSetBuffering, stderr, stdin, stdout)
import Text.Read (readMaybe)

#if __GLASGOW_HASKELL__ >= 911
import Control.Concurrent.MVar (newMVar)
Expand All @@ -63,7 +51,7 @@ recvRequestFromServer :: Handle -> Int -> IO (Int, [(String, String)], [String])
recvRequestFromServer hin wid = do
s <- hGetLine hin
let (env, args0) :: ([(String, String)], [String]) = read s
let jobid_str : args = args0
let (jobid_str, args) = fromMaybe (error "empty args in request") (uncons args0)
jobid :: Int
jobid = read jobid_str
logMessage (prompt wid ++ " job id = " ++ show jobid)
Expand Down Expand Up @@ -108,64 +96,174 @@ bannerJobEnd wid = do
hPutStrLn stderr (show time)
replicateM_ 5 (hPutStrLn stderr "|||||||||||||||||||||||||||||||||")

workerMain :: [String] -> Ghc ()
workerMain flags = do
liftIO $ do
hSetBuffering stdout LineBuffering
hSetBuffering stderr LineBuffering
-- | Abstraction of the two worker variants, purely for convenience of sharing the loop logic.
data WorkerImpl where
WorkerImpl ::
-- | Initial resource acquisition, called once and passed to each compile job.
Ghc a ->
-- | Use the resource to execute a job specified by the argument list.
(a -> MVar Log -> [String] -> IO Int) ->
-- | Update the state after a job has concluded.
Ghc () ->
WorkerImpl

let wid :: Int = read (flags !! 0)
(hin, hout) = (stdin, stdout)
liftIO $ logMessage (prompt wid ++ " Started")
GHC.initGhcMonad Nothing
-- explicitly initialize loader.
loader <- liftIO Loader.uninitializedLoader
-- | The conventional implementation, unchanged by the refactoring, hopefully.
workerImplDefault :: WorkerImpl
workerImplDefault =
WorkerImpl acquire use finalizeJob
where
acquire = do
GHC.initGhcMonad Nothing
-- explicitly initialize loader.
loader <- liftIO Loader.uninitializedLoader
#if __GLASGOW_HASKELL__ >= 911
lookup_cache <- liftIO $ newMVar emptyUFM
let interp = Interp InternalInterp loader lookup_cache
lookup_cache <- liftIO $ newMVar emptyUFM
let interp = Interp InternalInterp loader lookup_cache
#else
let interp = Interp InternalInterp loader
let interp = Interp InternalInterp loader
#endif
modifySession $ \env -> env {hsc_interp = Just interp}
modifySession $ \env -> env {hsc_interp = Just interp}
reifyGhc pure

use session logVar args = do
flip reflectGhc session $ do
GHC.pushLogHookM (const (logToState logVar))
handleExceptions logVar 1 (0 <$ compileMain args)

finalizeJob = do
(minterp, _unit_env, nc) <-
withSession $ \env ->
pure $ (hsc_interp env, hsc_unit_env env, hsc_NC env)
GHC.initGhcMonad Nothing
modifySession $ \env ->
env
{ hsc_interp = minterp,
-- hsc_unit_env = unit_env,
hsc_NC = nc
}

-- | Worker implementation that uses the custom session management and compilation pipeline developed in the other
-- worker variant.
-- In contrast to the default implementation, state management is handled entirely within the @Internal@ modules.
--
-- From the Buck Haskell rules, this can be selected by adding @--worker-multiplexer-custom@ to the command args when
-- using the worker.
--
-- From the Buck CLI, this can be selected with @--config ghc-worker.multiplexer_custom=true@, or setting
-- @multiplexer_custom = true@ in a config file under @[ghc-worker]@.
workerImplCustom :: WorkerImpl
workerImplCustom =
WorkerImpl acquire use finalizeJob
where
acquire = liftIO $ emptyCache True

use cache logVar args = do
let env' = Env {log = logVar, cache, args = (emptyArgs mempty) {ghcOptions = sanitize args}}
fmap (fromMaybe 2) $ withGhc env' \ target ->
compile target >>= \case
Just _ -> pure (Just 0)
Nothing -> pure (Just 1)

finalizeJob = pure ()

sanitize = \case
"-c" : rest -> rest
a -> a

-- | Main loop abstracting over the session/pipeline implementation.
--
-- Repeatedly run the compilation pipeline for each job received from the worker server through the handle @hin@,
-- sending the result back through @hout@.
--
-- Implementations can have a resource acquisition action and an action to be run after each job, for updating the
-- state.
-- This is simply a crude refactoring of the two separate original implementations.
workerLoop ::
WorkerImpl ->
-- | Message input handle.
Handle ->
-- | Message output handle.
Handle ->
-- | Worker ID sent back to the server to match results to clients.
Int ->
Ghc ()
workerLoop (WorkerImpl acquire use finalizeJob) hin hout wid = do
resource <- acquire
forever $ do
lock <- liftIO newEmptyMVar
_ <- reifyGhc $ \session -> forkOS $ do
_ <- liftIO $ forkOS $ do
(jobid, env, args) <- recvRequestFromServer hin wid
setEnvForJob env
sendJobIdToServer hout jobid
logVar <- newLog False
--
bannerJobStart wid
result :: Int <- flip reflectGhc session $ do
GHC.pushLogHookM (const (logToState logVar))
handleExceptions logVar 1 (0 <$ compileMain args)
result <- use resource logVar args
bannerJobEnd wid
--
output <- logFlushBytes logVar
sendResultToServer hout (show result) output
putMVar lock ()
--
() <- liftIO $ takeMVar lock
(minterp, _unit_env, nc) <-
withSession $ \env ->
pure $ (hsc_interp env, hsc_unit_env env, hsc_NC env)
GHC.initGhcMonad Nothing
modifySession $ \env ->
env
{ hsc_interp = minterp,
-- hsc_unit_env = unit_env,
hsc_NC = nc
}
finalizeJob
liftIO $ putMVar lock ()

data ImplConf = WorkerDefault | WorkerCustom

-- | Parse flags passed to the plugin on invocation:
--
-- - The first argument is a mandatory integer.
-- It denotes the worker ID, corresponding to a Buck target or GHC unit, used to map compilation jobs to clients.
--
-- - The second argument is an optional string that selects the implementation of the compilation pipeline invocation,
-- which can be 'default' to just call GHC's main function or 'custom' to use the local experimental variant.
parseFlags :: [String] -> Either String (Int, ImplConf)
parseFlags = \case
[] -> Left "Need at least one argument specifying the integer worker ID"
[widSpec] -> do
wid <- parseId widSpec
Right (wid, WorkerDefault)
[widSpec, implSpec] -> do
wid <- parseId widSpec
impl <- parseImpl implSpec
Right (wid, impl)
flags -> Left ("Expecting at most two arguments, got: " ++ unwords flags)
where
parseId spec =
case readMaybe spec of
Just wid -> Right wid
Nothing -> Left ""

parseImpl = \case
"custom" -> Right WorkerCustom
"default" -> Right WorkerDefault
wrong -> Left ("Invalid argument for worker impl, should be 'custom' or 'default': " ++ wrong)

-- | Parse the @flags@, which are passed to the plugin from @Server.initWorker@ using @-ffrontend-opt@ when starting a
-- new worker, and start the compilation server loop based on the resulting configuration.
workerMain :: [String] -> Ghc ()
workerMain flags = do
liftIO $ do
hSetBuffering stdout LineBuffering
hSetBuffering stderr LineBuffering
case parseFlags flags of
Left message -> do
throwGhcException $ UsageError ("Invalid arguments for worker via '-ffrontend-opt': " ++ message)
Right (wid, impl) -> do
let handler = \case
WorkerCustom -> workerImplCustom
WorkerDefault -> workerImplDefault
liftIO $ logMessage (prompt wid ++ " Started")
workerLoop (handler impl) stdin stdout wid

compileMain :: [String] -> Ghc ()
compileMain args = do
let argv2 = map (GHC.mkGeneralLocated "on the commandline") args
(mode, units, argv3, flagWarnings) <- liftIO $ parseModeFlags argv2
(impl, units, argv3, flagWarnings) <- liftIO $ parseModeFlags argv2

dflags0 <- GHC.getSessionDynFlags
case mode of
case impl of
Left ShowSupportedExtensions ->
liftIO $ showSupportedExtensions Nothing
Left ShowVersion ->
Expand All @@ -174,6 +272,6 @@ compileMain args = do
liftIO $ putStrLn cProjectVersion
Left (ShowOptions isInteractive) ->
liftIO $ showOptions isInteractive
Right (Right postLoadMode) ->
main' postLoadMode units dflags0 argv3 flagWarnings
Right (Right postLoadImpl) ->
main' postLoadImpl units dflags0 argv3 flagWarnings
_ -> pure ()
Loading

0 comments on commit 62bdbb4

Please sign in to comment.