@@ -270,7 +270,7 @@ isDuplicateTx(TransactionFrameBasePtr oldTx, TransactionFrameBasePtr newTx)
270
270
bool
271
271
TransactionQueue::sourceAccountPending (AccountID const & accountID) const
272
272
{
273
- std::lock_guard<std::recursive_mutex > guard (mTxQueueMutex );
273
+ std::lock_guard<std::mutex > guard (mTxQueueMutex );
274
274
return mAccountStates .find (accountID) != mAccountStates .end ();
275
275
}
276
276
@@ -334,7 +334,7 @@ TransactionQueue::canAdd(
334
334
std::vector<std::pair<TransactionFrameBasePtr, bool >>& txsToEvict)
335
335
{
336
336
ZoneScoped;
337
- if (isBanned (tx->getFullHash ()))
337
+ if (isBannedInternal (tx->getFullHash ()))
338
338
{
339
339
return AddResult (
340
340
TransactionQueue::AddResultCode::ADD_STATUS_TRY_AGAIN_LATER);
@@ -436,7 +436,7 @@ TransactionQueue::canAdd(
436
436
mTxQueueLimiter .canAddTx (tx, currentTx, txsToEvict, ledgerVersion);
437
437
if (!canAddRes.first )
438
438
{
439
- ban ({tx});
439
+ banInternal ({tx});
440
440
if (canAddRes.second != 0 )
441
441
{
442
442
AddResult result (TransactionQueue::AddResultCode::ADD_STATUS_ERROR,
@@ -454,10 +454,6 @@ TransactionQueue::canAdd(
454
454
// This is done so minSeqLedgerGap is validated against the next
455
455
// ledgerSeq, which is what will be used at apply time
456
456
++ls.getLedgerHeader ().currentToModify ().ledgerSeq ;
457
- // TODO: ^^ I think this is the right thing to do. Was previously the
458
- // commented out line below.
459
- // ls.getLedgerHeader().currentToModify().ledgerSeq =
460
- // mApp.getLedgerManager().getLastClosedLedgerNum() + 1;
461
457
}
462
458
463
459
auto txResult =
@@ -645,7 +641,7 @@ TransactionQueue::AddResult
645
641
TransactionQueue::tryAdd (TransactionFrameBasePtr tx, bool submittedFromSelf)
646
642
{
647
643
ZoneScoped;
648
- std::lock_guard<std::recursive_mutex > guard (mTxQueueMutex );
644
+ std::lock_guard<std::mutex > guard (mTxQueueMutex );
649
645
650
646
auto c1 =
651
647
tx->getEnvelope ().type () == ENVELOPE_TYPE_TX_FEE_BUMP &&
@@ -701,8 +697,9 @@ TransactionQueue::tryAdd(TransactionFrameBasePtr tx, bool submittedFromSelf)
701
697
// make space so that we can add this transaction
702
698
// this will succeed as `canAdd` ensures that this is the case
703
699
mTxQueueLimiter .evictTransactions (
704
- txsToEvict, *tx,
705
- [&](TransactionFrameBasePtr const & txToEvict) { ban ({txToEvict}); });
700
+ txsToEvict, *tx, [&](TransactionFrameBasePtr const & txToEvict) {
701
+ banInternal ({txToEvict});
702
+ });
706
703
mTxQueueLimiter .addTransaction (tx);
707
704
mKnownTxHashes [tx->getFullHash ()] = tx;
708
705
@@ -806,7 +803,14 @@ void
806
803
TransactionQueue::ban (Transactions const & banTxs)
807
804
{
808
805
ZoneScoped;
809
- std::lock_guard<std::recursive_mutex> guard (mTxQueueMutex );
806
+ std::lock_guard<std::mutex> guard (mTxQueueMutex );
807
+ banInternal (banTxs);
808
+ }
809
+
810
+ void
811
+ TransactionQueue::banInternal (Transactions const & banTxs)
812
+ {
813
+ ZoneScoped;
810
814
auto & bannedFront = mBannedTransactions .front ();
811
815
812
816
// Group the transactions by source account and ban all the transactions
@@ -852,7 +856,7 @@ TransactionQueue::AccountState
852
856
TransactionQueue::getAccountTransactionQueueInfo (
853
857
AccountID const & accountID) const
854
858
{
855
- std::lock_guard<std::recursive_mutex > guard (mTxQueueMutex );
859
+ std::lock_guard<std::mutex > guard (mTxQueueMutex );
856
860
auto i = mAccountStates .find (accountID);
857
861
if (i == std::end (mAccountStates ))
858
862
{
@@ -864,7 +868,7 @@ TransactionQueue::getAccountTransactionQueueInfo(
864
868
size_t
865
869
TransactionQueue::countBanned (int index) const
866
870
{
867
- std::lock_guard<std::recursive_mutex > guard (mTxQueueMutex );
871
+ std::lock_guard<std::mutex > guard (mTxQueueMutex );
868
872
return mBannedTransactions [index ].size ();
869
873
}
870
874
#endif
@@ -939,7 +943,13 @@ TransactionQueue::shift()
939
943
bool
940
944
TransactionQueue::isBanned (Hash const & hash) const
941
945
{
942
- std::lock_guard<std::recursive_mutex> guard (mTxQueueMutex );
946
+ std::lock_guard<std::mutex> guard (mTxQueueMutex );
947
+ return isBannedInternal (hash);
948
+ }
949
+
950
+ bool
951
+ TransactionQueue::isBannedInternal (Hash const & hash) const
952
+ {
943
953
return std::any_of (
944
954
std::begin (mBannedTransactions ), std::end (mBannedTransactions ),
945
955
[&](UnorderedSet<Hash> const & transactions) {
@@ -951,7 +961,14 @@ TxFrameList
951
961
TransactionQueue::getTransactions (LedgerHeader const & lcl) const
952
962
{
953
963
ZoneScoped;
954
- std::lock_guard<std::recursive_mutex> guard (mTxQueueMutex );
964
+ std::lock_guard<std::mutex> guard (mTxQueueMutex );
965
+ return getTransactionsInternal (lcl);
966
+ }
967
+
968
+ TxFrameList
969
+ TransactionQueue::getTransactionsInternal (LedgerHeader const & lcl) const
970
+ {
971
+ ZoneScoped;
955
972
TxFrameList txs;
956
973
957
974
uint32_t const nextLedgerSeq = lcl.ledgerSeq + 1 ;
@@ -972,7 +989,7 @@ TransactionFrameBaseConstPtr
972
989
TransactionQueue::getTx (Hash const & hash) const
973
990
{
974
991
ZoneScoped;
975
- std::lock_guard<std::recursive_mutex > guard (mTxQueueMutex );
992
+ std::lock_guard<std::mutex > guard (mTxQueueMutex );
976
993
auto it = mKnownTxHashes .find (hash);
977
994
if (it != mKnownTxHashes .end ())
978
995
{
@@ -1184,6 +1201,8 @@ SorobanTransactionQueue::broadcastSome()
1184
1201
size_t
1185
1202
SorobanTransactionQueue::getMaxQueueSizeOps () const
1186
1203
{
1204
+ ZoneScoped;
1205
+ std::lock_guard<std::mutex> guard (mTxQueueMutex );
1187
1206
if (protocolVersionStartsFrom (
1188
1207
mBucketSnapshot ->getLedgerHeader ().ledgerVersion ,
1189
1208
SOROBAN_PROTOCOL_VERSION))
@@ -1264,7 +1283,7 @@ ClassicTransactionQueue::broadcastSome()
1264
1283
std::make_shared<DexLimitingLaneConfig>(opsToFlood, dexOpsToFlood),
1265
1284
mBroadcastSeed );
1266
1285
queue.visitTopTxs (txsToBroadcast, visitor, mBroadcastOpCarryover );
1267
- ban (banningTxs);
1286
+ banInternal (banningTxs);
1268
1287
// carry over remainder, up to MAX_OPS_PER_TX ops
1269
1288
// reason is that if we add 1 next round, we can flood a "worst case fee
1270
1289
// bump" tx
@@ -1277,15 +1296,12 @@ ClassicTransactionQueue::broadcastSome()
1277
1296
}
1278
1297
1279
1298
void
1280
- TransactionQueue::broadcast (bool fromCallback)
1299
+ TransactionQueue::broadcast (bool fromCallback,
1300
+ std::lock_guard<std::mutex> const & guard)
1281
1301
{
1282
1302
// Must be called from the main thread due to the use of `mBroadcastTimer`
1283
1303
releaseAssert (threadIsMain ());
1284
1304
1285
- // NOTE: Although this is not a public function, it can be called from
1286
- // `mBroadcastTimer` and so it needs to be synchronized.
1287
- std::lock_guard<std::recursive_mutex> guard (mTxQueueMutex );
1288
-
1289
1305
if (mShutdown || (!fromCallback && mWaiting ))
1290
1306
{
1291
1307
return ;
@@ -1317,7 +1333,14 @@ TransactionQueue::broadcast(bool fromCallback)
1317
1333
}
1318
1334
1319
1335
void
1320
- TransactionQueue::rebroadcast ()
1336
+ TransactionQueue::broadcast (bool fromCallback)
1337
+ {
1338
+ std::lock_guard<std::mutex> guard (mTxQueueMutex );
1339
+ broadcast (fromCallback, guard);
1340
+ }
1341
+
1342
+ void
1343
+ TransactionQueue::rebroadcast (std::lock_guard<std::mutex> const & guard)
1321
1344
{
1322
1345
// For `broadcast` call
1323
1346
releaseAssert (threadIsMain ());
@@ -1331,14 +1354,14 @@ TransactionQueue::rebroadcast()
1331
1354
as.mTransaction ->mBroadcasted = false ;
1332
1355
}
1333
1356
}
1334
- broadcast (false );
1357
+ broadcast (false , guard );
1335
1358
}
1336
1359
1337
1360
void
1338
1361
TransactionQueue::shutdown ()
1339
1362
{
1340
1363
releaseAssert (threadIsMain ());
1341
- std::lock_guard<std::recursive_mutex > guard (mTxQueueMutex );
1364
+ std::lock_guard<std::mutex > guard (mTxQueueMutex );
1342
1365
mShutdown = true ;
1343
1366
mBroadcastTimer .cancel ();
1344
1367
}
@@ -1351,7 +1374,7 @@ TransactionQueue::update(
1351
1374
{
1352
1375
ZoneScoped;
1353
1376
releaseAssert (threadIsMain ());
1354
- std::lock_guard<std::recursive_mutex > guard (mTxQueueMutex );
1377
+ std::lock_guard<std::mutex > guard (mTxQueueMutex );
1355
1378
1356
1379
mValidationSnapshot =
1357
1380
std::make_shared<ImmutableValidationSnapshot>(mAppConn );
@@ -1361,11 +1384,11 @@ TransactionQueue::update(
1361
1384
removeApplied (applied);
1362
1385
shift ();
1363
1386
1364
- auto txs = getTransactions (lcl);
1387
+ auto txs = getTransactionsInternal (lcl);
1365
1388
auto invalidTxs = filterInvalidTxs (txs);
1366
- ban (invalidTxs);
1389
+ banInternal (invalidTxs);
1367
1390
1368
- rebroadcast ();
1391
+ rebroadcast (guard );
1369
1392
}
1370
1393
1371
1394
static bool
@@ -1409,14 +1432,14 @@ TransactionQueue::isFiltered(TransactionFrameBasePtr tx) const
1409
1432
size_t
1410
1433
TransactionQueue::getQueueSizeOps () const
1411
1434
{
1412
- std::lock_guard<std::recursive_mutex > guard (mTxQueueMutex );
1435
+ std::lock_guard<std::mutex > guard (mTxQueueMutex );
1413
1436
return mTxQueueLimiter .size ();
1414
1437
}
1415
1438
1416
1439
std::optional<int64_t >
1417
1440
TransactionQueue::getInQueueSeqNum (AccountID const & account) const
1418
1441
{
1419
- std::lock_guard<std::recursive_mutex > guard (mTxQueueMutex );
1442
+ std::lock_guard<std::mutex > guard (mTxQueueMutex );
1420
1443
auto stateIter = mAccountStates .find (account);
1421
1444
if (stateIter == mAccountStates .end ())
1422
1445
{
@@ -1433,7 +1456,8 @@ TransactionQueue::getInQueueSeqNum(AccountID const& account) const
1433
1456
size_t
1434
1457
ClassicTransactionQueue::getMaxQueueSizeOps () const
1435
1458
{
1436
- std::lock_guard<std::recursive_mutex> guard (mTxQueueMutex );
1459
+ ZoneScoped;
1460
+ std::lock_guard<std::mutex> guard (mTxQueueMutex );
1437
1461
auto res = mTxQueueLimiter .maxScaledLedgerResources (false );
1438
1462
releaseAssert (res.size () == NUM_CLASSIC_TX_RESOURCES);
1439
1463
return res.getVal (Resource::Type::OPERATIONS);
0 commit comments