diff --git a/src/Chainweb/Mempool/InMem.hs b/src/Chainweb/Mempool/InMem.hs index 2c86640c5b..65dc645a22 100644 --- a/src/Chainweb/Mempool/InMem.hs +++ b/src/Chainweb/Mempool/InMem.hs @@ -127,6 +127,7 @@ toMempoolBackend logger mempool = do , mempoolLookup = lookupInMem tcfg lockMVar , mempoolLookupEncoded = lookupEncodedInMem lockMVar , mempoolInsert = insertInMem logger cfg lockMVar + , mempoolInsertEncoded = insertEncodedInMem logger tcfg cfg lockMVar , mempoolInsertCheck = insertCheckInMem cfg lockMVar , mempoolInsertCheckVerbose = insertCheckVerboseInMem cfg lockMVar , mempoolMarkValidated = markValidatedInMem logger tcfg lockMVar @@ -206,16 +207,15 @@ lookupInMem :: NFData t -> IO (Vector (LookupResult t)) lookupInMem txcfg lock txs = do q <- withMVarMasked lock (readIORef . _inmemPending) - v <- V.mapM (evaluate . force . fromJuste . lookupOne q) txs - return $! v + v <- V.mapM (evaluate . fromMaybe Missing . lookupQ q) txs + return v where - lookupOne q txHash = lookupQ q txHash <|> pure Missing codec = txCodec txcfg fixup pe = let bs = _inmemPeBytes pe in either (const Missing) Pending - $! codecDecode codec - $! SB.fromShort bs + $ codecDecode codec + $ SB.fromShort bs lookupQ q txHash = fixup <$!> HashMap.lookup txHash q ------------------------------------------------------------------------------ @@ -525,10 +525,11 @@ insertInMem -> InMemConfig t -- ^ in-memory config -> MVar (InMemoryMempoolData t) -- ^ in-memory state -> InsertType + -> HashMap TransactionHash ByteString -- ^ byte-encodings of new transactions if possible -> Vector t -- ^ new transactions -> IO () -insertInMem logger cfg lock runCheck txs0 = do - logFunctionText logger Debug $ "insertInMem: " <> sshow (runCheck, V.length txs0) +insertInMem logger cfg lock runCheck txsBytes txsDecoded = do + logFunctionText logger Debug $ "insertInMem: " <> sshow (runCheck, V.length txsDecoded) txhashes <- insertCheck withMVarMasked lock $ \mdata -> do pending <- readIORef (_inmemPending mdata) @@ -544,8 +545,8 @@ insertInMem logger cfg lock runCheck txs0 = do where insertCheck :: IO (Vector (T2 TransactionHash t)) insertCheck = case runCheck of - CheckedInsert -> insertCheckInMem' cfg lock txs0 - UncheckedInsert -> return $! V.map (\tx -> T2 (hasher tx) tx) txs0 + CheckedInsert -> insertCheckInMem' cfg lock txsDecoded + UncheckedInsert -> return $! V.map (\tx -> T2 (hasher tx) tx) txsDecoded txcfg = _inmemTxCfg cfg encodeTx = codecEncode (txCodec txcfg) @@ -555,11 +556,28 @@ insertInMem logger cfg lock runCheck txs0 = do insOne (T2 pending soFar) (T2 txhash tx) = let !gp = txGasPrice txcfg tx !gl = txGasLimit txcfg tx - !bytes = SB.toShort $! encodeTx tx + !bytes = SB.toShort $! fromMaybe (encodeTx tx) (HashMap.lookup txhash txsBytes) !expTime = txMetaExpiryTime $ txMetadata txcfg tx !x = PendingEntry gp gl bytes expTime in T2 (HashMap.insert txhash x pending) (soFar . (txhash:)) +insertEncodedInMem + :: forall t logger. (NFData t, Logger logger) + => logger + -> TransactionConfig t + -> InMemConfig t -- ^ in-memory config + -> MVar (InMemoryMempoolData t) -- ^ in-memory state + -> InsertType + -> Vector ByteString -- ^ new transactions + -> IO () +insertEncodedInMem logger tcfg cfg lock runCheck txsBytes = do + Right newTxsDecoded <- return $ + traverse (\bytes -> (bytes,) <$> codecDecode (txCodec tcfg) bytes) txsBytes + let txBytesByHash = HashMap.fromList + [ (txHasher tcfg tx, bytes) + | (bytes, tx) <- V.toList newTxsDecoded + ] + insertInMem logger cfg lock runCheck txBytesByHash (snd <$> newTxsDecoded) ------------------------------------------------------------------------------ getBlockInMem diff --git a/src/Chainweb/Mempool/Mempool.hs b/src/Chainweb/Mempool/Mempool.hs index 7ea32837eb..88e92e0868 100644 --- a/src/Chainweb/Mempool/Mempool.hs +++ b/src/Chainweb/Mempool/Mempool.hs @@ -143,6 +143,7 @@ import qualified Chainweb.Time as Time import qualified Chainweb.Pact4.Transaction as Pact4 import Chainweb.Utils import Chainweb.Utils.Serialization +import Data.HashMap.Strict (HashMap) import Data.LogMessage (LogFunctionText) import qualified Pact.Types.Command as Pact4 import qualified Pact.Core.Command.Types as Pact5 @@ -292,9 +293,16 @@ data MempoolBackend t = MempoolBackend { -- | Insert the given transactions into the mempool. , mempoolInsert :: InsertType -- run pre-gossip check? Ignored at remote pools. + -> HashMap TransactionHash ByteString -- if trustworthy, the encoded bytes of each tx -> Vector t -> IO () + -- | Insert the given transactions into the mempool. + , mempoolInsertEncoded + :: InsertType -- run pre-gossip check? Ignored at remote pools. + -> Vector ByteString + -> IO () + -- | Perform the pre-insert check for the given transactions. Short-circuits -- on the first Transaction that fails. , mempoolInsertCheck :: Vector t -> IO (Either (T2 TransactionHash InsertError) ()) @@ -345,6 +353,7 @@ noopMempool = do , mempoolLookup = noopLookup , mempoolLookupEncoded = noopLookupEncoded , mempoolInsert = noopInsert + , mempoolInsertEncoded = noopInsertEncoded , mempoolInsertCheck = noopInsertCheck , mempoolInsertCheckVerbose = noopInsertCheckVerbose , mempoolMarkValidated = noopMV @@ -367,7 +376,8 @@ noopMempool = do noopMember v = return $ V.replicate (V.length v) False noopLookup v = return $ V.replicate (V.length v) Missing noopLookupEncoded v = return $ V.replicate (V.length v) Missing - noopInsert = const $ const $ return () + noopInsert _ _ _ = return () + noopInsertEncoded = const $ const $ return () noopInsertCheck _ = fail "unsupported" noopInsertCheckVerbose _ = fail "unsupported" noopMV = const $ return () @@ -493,9 +503,9 @@ syncMempools' log0 us localMempool remoteMempool = sync isPending _ = False fetchMissing chunk = do - res <- mempoolLookup remoteMempool chunk + res <- mempoolLookupEncoded remoteMempool chunk let !newTxs = V.map fromPending $ V.filter isPending res - mempoolInsert localMempool CheckedInsert newTxs + mempoolInsertEncoded localMempool CheckedInsert newTxs deb :: Text -> IO () deb = log0 Debug @@ -566,8 +576,8 @@ syncMempools' log0 us localMempool remoteMempool = sync -- Send a chunk of tranactions to the remote pool. -- sendChunk chunk = do - v <- (V.map fromPending . V.filter isPending) <$> mempoolLookup localMempool chunk - unless (V.null v) $ mempoolInsert remoteMempool CheckedInsert v + v <- (V.map fromPending . V.filter isPending) <$> mempoolLookupEncoded localMempool chunk + unless (V.null v) $ mempoolInsertEncoded remoteMempool CheckedInsert v syncMempools diff --git a/src/Chainweb/Mempool/RestAPI/Client.hs b/src/Chainweb/Mempool/RestAPI/Client.hs index 0d9db652d6..efc988f52a 100644 --- a/src/Chainweb/Mempool/RestAPI/Client.hs +++ b/src/Chainweb/Mempool/RestAPI/Client.hs @@ -15,6 +15,7 @@ module Chainweb.Mempool.RestAPI.Client , getPendingClient , memberClient , lookupClient + , lookupEncodedClient , toMempool ) where @@ -25,6 +26,7 @@ import Control.Exception import Control.Monad import Control.Monad.Catch import Control.Monad.Identity +import Data.ByteString (ByteString) import Data.Proxy import qualified Data.Text as T import qualified Data.Text.Encoding as T @@ -56,8 +58,9 @@ toMempool version chain txcfg env = { mempoolTxConfig = txcfg , mempoolMember = member , mempoolLookup = lookup - , mempoolLookupEncoded = const unsupported + , mempoolLookupEncoded = lookupEncoded , mempoolInsert = insert + , mempoolInsertEncoded = insertEncoded , mempoolInsertCheck = const unsupported , mempoolInsertCheckVerbose = const unsupported , mempoolMarkValidated = const unsupported @@ -73,7 +76,9 @@ toMempool version chain txcfg env = member v = V.fromList <$> go (memberClient version chain (V.toList v)) lookup v = V.fromList <$> go (lookupClient txcfg version chain (V.toList v)) - insert _ v = void $ go (insertClient txcfg version chain (V.toList v)) + lookupEncoded v = V.fromList <$> go (lookupEncodedClient version chain (V.toList v)) + insert _ _ v = void $ go (insertClient txcfg version chain (V.toList v)) + insertEncoded _ v = void $ go (insertEncodedClient version chain (V.toList v)) getPending hw cb = do runClientM (getPendingClient version chain hw) env >>= \case @@ -105,6 +110,16 @@ insertClient txcfg v c k0 = runIdentity $ do SomeChainIdT (_ :: Proxy c) <- return $ someChainIdVal c return $ insertClient_ @v @c k +insertEncodedClient + :: ChainwebVersion + -> ChainId + -> [ByteString] + -> ClientM NoContent +insertEncodedClient v c k0 = runIdentity $ do + let k = map T.decodeUtf8 k0 + SomeChainwebVersionT (_ :: Proxy v) <- return $ someChainwebVersionVal v + SomeChainIdT (_ :: Proxy c) <- return $ someChainIdVal c + return $ insertClient_ @v @c k ------------------------------------------------------------------------------ memberClient_ @@ -151,6 +166,17 @@ lookupClient txcfg v c txs = do decode = codecDecode (txCodec txcfg) . T.encodeUtf8 +lookupEncodedClient + :: ChainwebVersion + -> ChainId + -> [TransactionHash] + -> ClientM [LookupResult ByteString] +lookupEncodedClient v c txs = do + SomeChainwebVersionT (_ :: Proxy v) <- return $ someChainwebVersionVal v + SomeChainIdT (_ :: Proxy c) <- return $ someChainIdVal c + cs <- lookupClient_ @v @c txs + return $ fmap (fmap T.encodeUtf8) cs + ------------------------------------------------------------------------------ getPendingClient_ :: forall (v :: ChainwebVersionT) (c :: ChainIdT) diff --git a/src/Chainweb/Mempool/RestAPI/Server.hs b/src/Chainweb/Mempool/RestAPI/Server.hs index c4a950cfc6..95b23e467e 100644 --- a/src/Chainweb/Mempool/RestAPI/Server.hs +++ b/src/Chainweb/Mempool/RestAPI/Server.hs @@ -13,7 +13,6 @@ module Chainweb.Mempool.RestAPI.Server ------------------------------------------------------------------------------ import Control.DeepSeq (NFData) -import Control.Monad.Catch hiding (Handler) import Control.Monad.IO.Class import qualified Data.DList as D import Data.IORef @@ -39,21 +38,9 @@ insertHandler -> Handler NoContent insertHandler mempool txsT = handleErrs (NoContent <$ begin) where - txcfg = mempoolTxConfig mempool - - decode :: T.Text -> Either String t - decode = codecDecode (txCodec txcfg) . T.encodeUtf8 - - go :: T.Text -> Handler t - go h = case decode h of - Left e -> throwM . DecodeException $ T.pack e - Right t -> return t - begin :: Handler () - begin = do - txs <- mapM go txsT - let txV = V.fromList txs - liftIO $ mempoolInsert mempool CheckedInsert txV + begin = liftIO $ + mempoolInsertEncoded mempool CheckedInsert (V.fromList $ fmap T.encodeUtf8 txsT) memberHandler :: Show t => MempoolBackend t -> [TransactionHash] -> Handler [Bool] diff --git a/src/Chainweb/Pact/RestAPI/Server.hs b/src/Chainweb/Pact/RestAPI/Server.hs index c4672db1be..428e88bca2 100644 --- a/src/Chainweb/Pact/RestAPI/Server.hs +++ b/src/Chainweb/Pact/RestAPI/Server.hs @@ -268,7 +268,7 @@ sendHandler logger mempool (Pact4.SubmitBatch cmds) = Handler $ do let cmdsWithParsedPayloadsV = V.fromList $ NEL.toList cmdsWithParsedPayloads -- If any of the txs in the batch fail validation, we reject them all. liftIO (mempoolInsertCheckVerbose mempool cmdsWithParsedPayloadsV) >>= checkResult - liftIO (mempoolInsert mempool UncheckedInsert cmdsWithParsedPayloadsV) + liftIO (mempoolInsert mempool UncheckedInsert HM.empty cmdsWithParsedPayloadsV) return $! Pact4.RequestKeys $ NEL.map Pact4.cmdToRequestKey cmdsWithParsedPayloads Left err -> failWith $ "reading JSON for transaction failed: " <> T.pack err where diff --git a/src/Chainweb/Pact/Service/PactInProcApi.hs b/src/Chainweb/Pact/Service/PactInProcApi.hs index 490b3e5fa9..2661ac604d 100644 --- a/src/Chainweb/Pact/Service/PactInProcApi.hs +++ b/src/Chainweb/Pact/Service/PactInProcApi.hs @@ -159,7 +159,7 @@ pactProcessFork mpc theLogger bHeader = do "pactMemPoolAccess - " <> sshow (length reintroTxs) <> " transactions to reintroduce" -- No need to run pre-insert check here -- we know these are ok, and -- calling the pre-check would block here (it calls back into pact service) - mempoolInsert (mpcMempool mpc) UncheckedInsert reintroTxs + mempoolInsert (mpcMempool mpc) UncheckedInsert mempty reintroTxs mempoolMarkValidated (mpcMempool mpc) validatedTxs where diff --git a/test/lib/Chainweb/Test/Pact5/Utils.hs b/test/lib/Chainweb/Test/Pact5/Utils.hs index a807fffc4a..e3e9fea2fa 100644 --- a/test/lib/Chainweb/Test/Pact5/Utils.hs +++ b/test/lib/Chainweb/Test/Pact5/Utils.hs @@ -209,7 +209,7 @@ mempoolInsertPact5 mp insertType txs = do case codecDecode Pact4.rawCommandCodec (codecEncode Pact5.payloadCodec tx) of Left err -> error err Right a -> a - mempoolInsert mp insertType $ Vector.fromList unparsedTxs + mempoolInsert mp insertType mempty $ Vector.fromList unparsedTxs -- | Looks up transactions in the mempool. Returns a set which indicates pending membership of the mempool. mempoolLookupPact5 :: MempoolBackend Pact4.UnparsedTransaction -> Vector Pact5.Hash -> IO (HashSet Pact5.Hash) diff --git a/test/unit/Chainweb/Test/Mempool.hs b/test/unit/Chainweb/Test/Mempool.hs index bc2b06929d..1d792208d5 100644 --- a/test/unit/Chainweb/Test/Mempool.hs +++ b/test/unit/Chainweb/Test/Mempool.hs @@ -184,7 +184,7 @@ propOverlarge (txs, overlarge0) _ mempool = runExceptT $ do where txcfg = mempoolTxConfig mempool hash = txHasher txcfg - insert v = mempoolInsert mempool CheckedInsert $ V.fromList v + insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v lookup = mempoolLookup mempool . V.fromList . map hash overlarge = setOverlarge overlarge0 setOverlarge = map (\x -> x { mockGasLimit = mockBlockGasLimit + 100 }) @@ -218,7 +218,7 @@ propBadlistPreblock (txs, badTxs) _ mempool = runExceptT $ do txcfg = mempoolTxConfig mempool hash = txHasher txcfg - insert v = mempoolInsert mempool CheckedInsert $ V.fromList v + insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v lookup = mempoolLookup mempool . V.fromList . map hash propAddToBadList @@ -243,7 +243,7 @@ propAddToBadList tx _ mempool = runExceptT $ do where txcfg = mempoolTxConfig mempool hash = txHasher txcfg - insert v = mempoolInsert mempool CheckedInsert $ V.fromList v + insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v lookup = mempoolLookup mempool . V.fromList . map hash getBlock = liftIO $ V.toList <$> mempoolGetBlock mempool mockBlockFill noopMempoolPreBlockCheck 1 nullBlockHash @@ -267,7 +267,7 @@ propPreInsert (txs, badTxs) gossipMV mempool = liftIO (lookup badTxs) >>= V.mapM_ lookupIsMissing txcfg = mempoolTxConfig mempool hash = txHasher txcfg - insert v = mempoolInsert mempool CheckedInsert $ V.fromList v + insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v lookup = mempoolLookup mempool . V.fromList . map hash checkOne :: MockTx -> Either InsertError MockTx @@ -300,7 +300,7 @@ propTrivial txs _ mempool = runExceptT $ do in V.and ffs txcfg = mempoolTxConfig mempool hash = txHasher txcfg - insert v = mempoolInsert mempool CheckedInsert $ V.fromList v + insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v lookup = mempoolLookup mempool . V.fromList . map hash getBlock = mempoolGetBlock mempool mockBlockFill noopMempoolPreBlockCheck 0 nullBlockHash @@ -332,7 +332,7 @@ propGetPending txs0 _ mempool = runExceptT $ do onFees x = (Down (mockGasPrice x), mockGasLimit x, mockNonce x) hash = txHasher $ mempoolTxConfig mempool getPending = mempoolGetPendingTransactions mempool - insert v = mempoolInsert mempool CheckedInsert $ V.fromList v + insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v propHighWater :: ([MockTx], [MockTx]) @@ -364,7 +364,7 @@ propHighWater (txs0, txs1) _ mempool = runExceptT $ do txdata = sort $ map hash txs1 hash = txHasher $ mempoolTxConfig mempool getPending = mempoolGetPendingTransactions mempool - insert txs = mempoolInsert mempool CheckedInsert $ V.fromList txs + insert txs = mempoolInsert mempool CheckedInsert mempty $ V.fromList txs uniq :: Eq a => [a] -> [a] diff --git a/test/unit/Chainweb/Test/Mempool/Sync.hs b/test/unit/Chainweb/Test/Mempool/Sync.hs index e6ccc27761..e618f210e3 100644 --- a/test/unit/Chainweb/Test/Mempool/Sync.hs +++ b/test/unit/Chainweb/Test/Mempool/Sync.hs @@ -74,9 +74,9 @@ propSync -> IO (Either String ()) propSync (txs, missing, later) _ localMempool' = do remoteMempool <- startInMemoryMempoolTest testInMemCfg - mempoolInsert localMempool' CheckedInsert txsV - mempoolInsert remoteMempool CheckedInsert txsV - mempoolInsert remoteMempool CheckedInsert missingV + mempoolInsert localMempool' CheckedInsert mempty txsV + mempoolInsert remoteMempool CheckedInsert mempty txsV + mempoolInsert remoteMempool CheckedInsert mempty missingV doneVar <- newEmptyMVar syncFinished <- newEmptyMVar @@ -101,7 +101,7 @@ propSync (txs, missing, later) _ localMempool' = do -- We should now be subscribed and waiting for V.length laterV -- more transactions before getting killed. Transactions -- inserted into remote should get synced to us. - mempoolInsert remoteMempool CheckedInsert laterV + mempoolInsert remoteMempool CheckedInsert mempty laterV -- wait until time bomb 2 goes off takeMVar doneVar @@ -129,10 +129,10 @@ propSync (txs, missing, later) _ localMempool' = do timebomb :: Int -> IO a -> MempoolBackend t -> IO (MempoolBackend t) timebomb k act mp = do ref <- newIORef k - return $! mp { mempoolInsert = ins ref } + return $! mp { mempoolInsertEncoded = ins ref } where ins ref t v = do - mempoolInsert mp t v + mempoolInsertEncoded mp t v c <- atomicModifyIORef' ref (\x -> let !x' = x - V.length v in (x', x')) when (c == 0) $ void act -- so that the bomb only triggers once diff --git a/test/unit/Chainweb/Test/Pact5/PactServiceTest.hs b/test/unit/Chainweb/Test/Pact5/PactServiceTest.hs index e303724ad9..276884e2e8 100644 --- a/test/unit/Chainweb/Test/Pact5/PactServiceTest.hs +++ b/test/unit/Chainweb/Test/Pact5/PactServiceTest.hs @@ -327,7 +327,7 @@ testNewBlockExcludesInvalid baseRdb = runResourceT $ do return $ finalizeBlock bip _ <- advanceAllChains fixture $ onChain chain0 $ \ph pactQueue mempool -> do - mempoolInsert mempool UncheckedInsert $ Vector.fromList [badParse, badSigs] + mempoolInsert mempool UncheckedInsert mempty $ Vector.fromList [badParse, badSigs] mempoolInsertPact5 mempool UncheckedInsert [badChain, badUnique, badFuture, badPast, badTxHash] bip <- throwIfNotPact5 =<< throwIfNoHistory =<< newBlock noMiner NewBlockFill (ParentHeader ph) pactQueue let expectedTxs = []