Skip to content
This repository was archived by the owner on Nov 24, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions src/Chainweb/Mempool/InMem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ toMempoolBackend logger mempool = do
, mempoolLookup = lookupInMem tcfg lockMVar
, mempoolLookupEncoded = lookupEncodedInMem lockMVar
, mempoolInsert = insertInMem logger cfg lockMVar
, mempoolInsertEncoded = insertEncodedInMem logger tcfg cfg lockMVar
, mempoolInsertCheck = insertCheckInMem cfg lockMVar
, mempoolInsertCheckVerbose = insertCheckVerboseInMem cfg lockMVar
, mempoolMarkValidated = markValidatedInMem logger tcfg lockMVar
Expand Down Expand Up @@ -206,16 +207,15 @@ lookupInMem :: NFData t
-> IO (Vector (LookupResult t))
lookupInMem txcfg lock txs = do
q <- withMVarMasked lock (readIORef . _inmemPending)
v <- V.mapM (evaluate . force . fromJuste . lookupOne q) txs
return $! v
v <- V.mapM (evaluate . fromMaybe Missing . lookupQ q) txs
return v
where
lookupOne q txHash = lookupQ q txHash <|> pure Missing
codec = txCodec txcfg
fixup pe =
let bs = _inmemPeBytes pe
in either (const Missing) Pending
$! codecDecode codec
$! SB.fromShort bs
$ codecDecode codec
$ SB.fromShort bs
lookupQ q txHash = fixup <$!> HashMap.lookup txHash q

------------------------------------------------------------------------------
Expand Down Expand Up @@ -525,10 +525,11 @@ insertInMem
-> InMemConfig t -- ^ in-memory config
-> MVar (InMemoryMempoolData t) -- ^ in-memory state
-> InsertType
-> HashMap TransactionHash ByteString -- ^ byte-encodings of new transactions if possible
-> Vector t -- ^ new transactions
-> IO ()
insertInMem logger cfg lock runCheck txs0 = do
logFunctionText logger Debug $ "insertInMem: " <> sshow (runCheck, V.length txs0)
insertInMem logger cfg lock runCheck txsBytes txsDecoded = do
logFunctionText logger Debug $ "insertInMem: " <> sshow (runCheck, V.length txsDecoded)
txhashes <- insertCheck
withMVarMasked lock $ \mdata -> do
pending <- readIORef (_inmemPending mdata)
Expand All @@ -544,8 +545,8 @@ insertInMem logger cfg lock runCheck txs0 = do
where
insertCheck :: IO (Vector (T2 TransactionHash t))
insertCheck = case runCheck of
CheckedInsert -> insertCheckInMem' cfg lock txs0
UncheckedInsert -> return $! V.map (\tx -> T2 (hasher tx) tx) txs0
CheckedInsert -> insertCheckInMem' cfg lock txsDecoded
UncheckedInsert -> return $! V.map (\tx -> T2 (hasher tx) tx) txsDecoded

txcfg = _inmemTxCfg cfg
encodeTx = codecEncode (txCodec txcfg)
Expand All @@ -555,11 +556,28 @@ insertInMem logger cfg lock runCheck txs0 = do
insOne (T2 pending soFar) (T2 txhash tx) =
let !gp = txGasPrice txcfg tx
!gl = txGasLimit txcfg tx
!bytes = SB.toShort $! encodeTx tx
!bytes = SB.toShort $! fromMaybe (encodeTx tx) (HashMap.lookup txhash txsBytes)
!expTime = txMetaExpiryTime $ txMetadata txcfg tx
!x = PendingEntry gp gl bytes expTime
in T2 (HashMap.insert txhash x pending) (soFar . (txhash:))

insertEncodedInMem
:: forall t logger. (NFData t, Logger logger)
=> logger
-> TransactionConfig t
-> InMemConfig t -- ^ in-memory config
-> MVar (InMemoryMempoolData t) -- ^ in-memory state
-> InsertType
-> Vector ByteString -- ^ new transactions
-> IO ()
insertEncodedInMem logger tcfg cfg lock runCheck txsBytes = do
Right newTxsDecoded <- return $
traverse (\bytes -> (bytes,) <$> codecDecode (txCodec tcfg) bytes) txsBytes
let txBytesByHash = HashMap.fromList
[ (txHasher tcfg tx, bytes)
| (bytes, tx) <- V.toList newTxsDecoded
]
insertInMem logger cfg lock runCheck txBytesByHash (snd <$> newTxsDecoded)

------------------------------------------------------------------------------
getBlockInMem
Expand Down
20 changes: 15 additions & 5 deletions src/Chainweb/Mempool/Mempool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ import qualified Chainweb.Time as Time
import qualified Chainweb.Pact4.Transaction as Pact4
import Chainweb.Utils
import Chainweb.Utils.Serialization
import Data.HashMap.Strict (HashMap)
import Data.LogMessage (LogFunctionText)
import qualified Pact.Types.Command as Pact4
import qualified Pact.Core.Command.Types as Pact5
Expand Down Expand Up @@ -292,9 +293,16 @@ data MempoolBackend t = MempoolBackend {

-- | Insert the given transactions into the mempool.
, mempoolInsert :: InsertType -- run pre-gossip check? Ignored at remote pools.
-> HashMap TransactionHash ByteString -- if trustworthy, the encoded bytes of each tx
-> Vector t
-> IO ()

-- | Insert the given transactions into the mempool.
, mempoolInsertEncoded
:: InsertType -- run pre-gossip check? Ignored at remote pools.
-> Vector ByteString
-> IO ()

-- | Perform the pre-insert check for the given transactions. Short-circuits
-- on the first Transaction that fails.
, mempoolInsertCheck :: Vector t -> IO (Either (T2 TransactionHash InsertError) ())
Expand Down Expand Up @@ -345,6 +353,7 @@ noopMempool = do
, mempoolLookup = noopLookup
, mempoolLookupEncoded = noopLookupEncoded
, mempoolInsert = noopInsert
, mempoolInsertEncoded = noopInsertEncoded
, mempoolInsertCheck = noopInsertCheck
, mempoolInsertCheckVerbose = noopInsertCheckVerbose
, mempoolMarkValidated = noopMV
Expand All @@ -367,7 +376,8 @@ noopMempool = do
noopMember v = return $ V.replicate (V.length v) False
noopLookup v = return $ V.replicate (V.length v) Missing
noopLookupEncoded v = return $ V.replicate (V.length v) Missing
noopInsert = const $ const $ return ()
noopInsert _ _ _ = return ()
noopInsertEncoded = const $ const $ return ()
noopInsertCheck _ = fail "unsupported"
noopInsertCheckVerbose _ = fail "unsupported"
noopMV = const $ return ()
Expand Down Expand Up @@ -493,9 +503,9 @@ syncMempools' log0 us localMempool remoteMempool = sync
isPending _ = False

fetchMissing chunk = do
res <- mempoolLookup remoteMempool chunk
res <- mempoolLookupEncoded remoteMempool chunk
let !newTxs = V.map fromPending $ V.filter isPending res
mempoolInsert localMempool CheckedInsert newTxs
mempoolInsertEncoded localMempool CheckedInsert newTxs

deb :: Text -> IO ()
deb = log0 Debug
Expand Down Expand Up @@ -566,8 +576,8 @@ syncMempools' log0 us localMempool remoteMempool = sync
-- Send a chunk of tranactions to the remote pool.
--
sendChunk chunk = do
v <- (V.map fromPending . V.filter isPending) <$> mempoolLookup localMempool chunk
unless (V.null v) $ mempoolInsert remoteMempool CheckedInsert v
v <- (V.map fromPending . V.filter isPending) <$> mempoolLookupEncoded localMempool chunk
unless (V.null v) $ mempoolInsertEncoded remoteMempool CheckedInsert v


syncMempools
Expand Down
30 changes: 28 additions & 2 deletions src/Chainweb/Mempool/RestAPI/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module Chainweb.Mempool.RestAPI.Client
, getPendingClient
, memberClient
, lookupClient
, lookupEncodedClient
, toMempool
) where

Expand All @@ -25,6 +26,7 @@ import Control.Exception
import Control.Monad
import Control.Monad.Catch
import Control.Monad.Identity
import Data.ByteString (ByteString)
import Data.Proxy
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
Expand Down Expand Up @@ -56,8 +58,9 @@ toMempool version chain txcfg env =
{ mempoolTxConfig = txcfg
, mempoolMember = member
, mempoolLookup = lookup
, mempoolLookupEncoded = const unsupported
, mempoolLookupEncoded = lookupEncoded
, mempoolInsert = insert
, mempoolInsertEncoded = insertEncoded
, mempoolInsertCheck = const unsupported
, mempoolInsertCheckVerbose = const unsupported
, mempoolMarkValidated = const unsupported
Expand All @@ -73,7 +76,9 @@ toMempool version chain txcfg env =

member v = V.fromList <$> go (memberClient version chain (V.toList v))
lookup v = V.fromList <$> go (lookupClient txcfg version chain (V.toList v))
insert _ v = void $ go (insertClient txcfg version chain (V.toList v))
lookupEncoded v = V.fromList <$> go (lookupEncodedClient version chain (V.toList v))
insert _ _ v = void $ go (insertClient txcfg version chain (V.toList v))
insertEncoded _ v = void $ go (insertEncodedClient version chain (V.toList v))

getPending hw cb = do
runClientM (getPendingClient version chain hw) env >>= \case
Expand Down Expand Up @@ -105,6 +110,16 @@ insertClient txcfg v c k0 = runIdentity $ do
SomeChainIdT (_ :: Proxy c) <- return $ someChainIdVal c
return $ insertClient_ @v @c k

insertEncodedClient
:: ChainwebVersion
-> ChainId
-> [ByteString]
-> ClientM NoContent
insertEncodedClient v c k0 = runIdentity $ do
let k = map T.decodeUtf8 k0
SomeChainwebVersionT (_ :: Proxy v) <- return $ someChainwebVersionVal v
SomeChainIdT (_ :: Proxy c) <- return $ someChainIdVal c
return $ insertClient_ @v @c k

------------------------------------------------------------------------------
memberClient_
Expand Down Expand Up @@ -151,6 +166,17 @@ lookupClient txcfg v c txs = do

decode = codecDecode (txCodec txcfg) . T.encodeUtf8

lookupEncodedClient
:: ChainwebVersion
-> ChainId
-> [TransactionHash]
-> ClientM [LookupResult ByteString]
lookupEncodedClient v c txs = do
SomeChainwebVersionT (_ :: Proxy v) <- return $ someChainwebVersionVal v
SomeChainIdT (_ :: Proxy c) <- return $ someChainIdVal c
cs <- lookupClient_ @v @c txs
return $ fmap (fmap T.encodeUtf8) cs

------------------------------------------------------------------------------
getPendingClient_
:: forall (v :: ChainwebVersionT) (c :: ChainIdT)
Expand Down
17 changes: 2 additions & 15 deletions src/Chainweb/Mempool/RestAPI/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ module Chainweb.Mempool.RestAPI.Server

------------------------------------------------------------------------------
import Control.DeepSeq (NFData)
import Control.Monad.Catch hiding (Handler)
import Control.Monad.IO.Class
import qualified Data.DList as D
import Data.IORef
Expand All @@ -39,21 +38,9 @@ insertHandler
-> Handler NoContent
insertHandler mempool txsT = handleErrs (NoContent <$ begin)
where
txcfg = mempoolTxConfig mempool

decode :: T.Text -> Either String t
decode = codecDecode (txCodec txcfg) . T.encodeUtf8

go :: T.Text -> Handler t
go h = case decode h of
Left e -> throwM . DecodeException $ T.pack e
Right t -> return t

begin :: Handler ()
begin = do
txs <- mapM go txsT
let txV = V.fromList txs
liftIO $ mempoolInsert mempool CheckedInsert txV
begin = liftIO $
mempoolInsertEncoded mempool CheckedInsert (V.fromList $ fmap T.encodeUtf8 txsT)


memberHandler :: Show t => MempoolBackend t -> [TransactionHash] -> Handler [Bool]
Expand Down
2 changes: 1 addition & 1 deletion src/Chainweb/Pact/RestAPI/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ sendHandler logger mempool (Pact4.SubmitBatch cmds) = Handler $ do
let cmdsWithParsedPayloadsV = V.fromList $ NEL.toList cmdsWithParsedPayloads
-- If any of the txs in the batch fail validation, we reject them all.
liftIO (mempoolInsertCheckVerbose mempool cmdsWithParsedPayloadsV) >>= checkResult
liftIO (mempoolInsert mempool UncheckedInsert cmdsWithParsedPayloadsV)
liftIO (mempoolInsert mempool UncheckedInsert HM.empty cmdsWithParsedPayloadsV)
return $! Pact4.RequestKeys $ NEL.map Pact4.cmdToRequestKey cmdsWithParsedPayloads
Left err -> failWith $ "reading JSON for transaction failed: " <> T.pack err
where
Expand Down
2 changes: 1 addition & 1 deletion src/Chainweb/Pact/Service/PactInProcApi.hs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pactProcessFork mpc theLogger bHeader = do
"pactMemPoolAccess - " <> sshow (length reintroTxs) <> " transactions to reintroduce"
-- No need to run pre-insert check here -- we know these are ok, and
-- calling the pre-check would block here (it calls back into pact service)
mempoolInsert (mpcMempool mpc) UncheckedInsert reintroTxs
mempoolInsert (mpcMempool mpc) UncheckedInsert mempty reintroTxs
mempoolMarkValidated (mpcMempool mpc) validatedTxs

where
Expand Down
2 changes: 1 addition & 1 deletion test/lib/Chainweb/Test/Pact5/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ mempoolInsertPact5 mp insertType txs = do
case codecDecode Pact4.rawCommandCodec (codecEncode Pact5.payloadCodec tx) of
Left err -> error err
Right a -> a
mempoolInsert mp insertType $ Vector.fromList unparsedTxs
mempoolInsert mp insertType mempty $ Vector.fromList unparsedTxs

-- | Looks up transactions in the mempool. Returns a set which indicates pending membership of the mempool.
mempoolLookupPact5 :: MempoolBackend Pact4.UnparsedTransaction -> Vector Pact5.Hash -> IO (HashSet Pact5.Hash)
Expand Down
14 changes: 7 additions & 7 deletions test/unit/Chainweb/Test/Mempool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ propOverlarge (txs, overlarge0) _ mempool = runExceptT $ do
where
txcfg = mempoolTxConfig mempool
hash = txHasher txcfg
insert v = mempoolInsert mempool CheckedInsert $ V.fromList v
insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v
lookup = mempoolLookup mempool . V.fromList . map hash
overlarge = setOverlarge overlarge0
setOverlarge = map (\x -> x { mockGasLimit = mockBlockGasLimit + 100 })
Expand Down Expand Up @@ -218,7 +218,7 @@ propBadlistPreblock (txs, badTxs) _ mempool = runExceptT $ do

txcfg = mempoolTxConfig mempool
hash = txHasher txcfg
insert v = mempoolInsert mempool CheckedInsert $ V.fromList v
insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v
lookup = mempoolLookup mempool . V.fromList . map hash

propAddToBadList
Expand All @@ -243,7 +243,7 @@ propAddToBadList tx _ mempool = runExceptT $ do
where
txcfg = mempoolTxConfig mempool
hash = txHasher txcfg
insert v = mempoolInsert mempool CheckedInsert $ V.fromList v
insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v
lookup = mempoolLookup mempool . V.fromList . map hash
getBlock = liftIO
$ V.toList <$> mempoolGetBlock mempool mockBlockFill noopMempoolPreBlockCheck 1 nullBlockHash
Expand All @@ -267,7 +267,7 @@ propPreInsert (txs, badTxs) gossipMV mempool =
liftIO (lookup badTxs) >>= V.mapM_ lookupIsMissing
txcfg = mempoolTxConfig mempool
hash = txHasher txcfg
insert v = mempoolInsert mempool CheckedInsert $ V.fromList v
insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v
lookup = mempoolLookup mempool . V.fromList . map hash

checkOne :: MockTx -> Either InsertError MockTx
Expand Down Expand Up @@ -300,7 +300,7 @@ propTrivial txs _ mempool = runExceptT $ do
in V.and ffs
txcfg = mempoolTxConfig mempool
hash = txHasher txcfg
insert v = mempoolInsert mempool CheckedInsert $ V.fromList v
insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v
lookup = mempoolLookup mempool . V.fromList . map hash

getBlock = mempoolGetBlock mempool mockBlockFill noopMempoolPreBlockCheck 0 nullBlockHash
Expand Down Expand Up @@ -332,7 +332,7 @@ propGetPending txs0 _ mempool = runExceptT $ do
onFees x = (Down (mockGasPrice x), mockGasLimit x, mockNonce x)
hash = txHasher $ mempoolTxConfig mempool
getPending = mempoolGetPendingTransactions mempool
insert v = mempoolInsert mempool CheckedInsert $ V.fromList v
insert v = mempoolInsert mempool CheckedInsert mempty $ V.fromList v

propHighWater
:: ([MockTx], [MockTx])
Expand Down Expand Up @@ -364,7 +364,7 @@ propHighWater (txs0, txs1) _ mempool = runExceptT $ do
txdata = sort $ map hash txs1
hash = txHasher $ mempoolTxConfig mempool
getPending = mempoolGetPendingTransactions mempool
insert txs = mempoolInsert mempool CheckedInsert $ V.fromList txs
insert txs = mempoolInsert mempool CheckedInsert mempty $ V.fromList txs


uniq :: Eq a => [a] -> [a]
Expand Down
12 changes: 6 additions & 6 deletions test/unit/Chainweb/Test/Mempool/Sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ propSync
-> IO (Either String ())
propSync (txs, missing, later) _ localMempool' = do
remoteMempool <- startInMemoryMempoolTest testInMemCfg
mempoolInsert localMempool' CheckedInsert txsV
mempoolInsert remoteMempool CheckedInsert txsV
mempoolInsert remoteMempool CheckedInsert missingV
mempoolInsert localMempool' CheckedInsert mempty txsV
mempoolInsert remoteMempool CheckedInsert mempty txsV
mempoolInsert remoteMempool CheckedInsert mempty missingV

doneVar <- newEmptyMVar
syncFinished <- newEmptyMVar
Expand All @@ -101,7 +101,7 @@ propSync (txs, missing, later) _ localMempool' = do
-- We should now be subscribed and waiting for V.length laterV
-- more transactions before getting killed. Transactions
-- inserted into remote should get synced to us.
mempoolInsert remoteMempool CheckedInsert laterV
mempoolInsert remoteMempool CheckedInsert mempty laterV

-- wait until time bomb 2 goes off
takeMVar doneVar
Expand Down Expand Up @@ -129,10 +129,10 @@ propSync (txs, missing, later) _ localMempool' = do
timebomb :: Int -> IO a -> MempoolBackend t -> IO (MempoolBackend t)
timebomb k act mp = do
ref <- newIORef k
return $! mp { mempoolInsert = ins ref }
return $! mp { mempoolInsertEncoded = ins ref }
where
ins ref t v = do
mempoolInsert mp t v
mempoolInsertEncoded mp t v
c <- atomicModifyIORef' ref (\x -> let !x' = x - V.length v
in (x', x'))
when (c == 0) $ void act -- so that the bomb only triggers once
2 changes: 1 addition & 1 deletion test/unit/Chainweb/Test/Pact5/PactServiceTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ testNewBlockExcludesInvalid baseRdb = runResourceT $ do
return $ finalizeBlock bip

_ <- advanceAllChains fixture $ onChain chain0 $ \ph pactQueue mempool -> do
mempoolInsert mempool UncheckedInsert $ Vector.fromList [badParse, badSigs]
mempoolInsert mempool UncheckedInsert mempty $ Vector.fromList [badParse, badSigs]
mempoolInsertPact5 mempool UncheckedInsert [badChain, badUnique, badFuture, badPast, badTxHash]
bip <- throwIfNotPact5 =<< throwIfNoHistory =<< newBlock noMiner NewBlockFill (ParentHeader ph) pactQueue
let expectedTxs = []
Expand Down
Loading