Skip to content

Commit 39de3e6

Browse files
committed
Some cleanup around separate tx queue thread
1 parent 357bfee commit 39de3e6

File tree

7 files changed

+85
-123
lines changed

7 files changed

+85
-123
lines changed

src/herder/HerderImpl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ Herder::recvTransaction(TransactionQueuesPtr txQueues,
600600
// would be whatever is handling ledger close. However, that will only
601601
// decrease the sourceAccountPending value, which means this erroneously
602602
// rejects (which is safe). I guess it's possible for a user-submitted
603-
// transaction to come in and conflict with the overlay thread, but that
603+
// transaction to come in and conflict with the tx queue thread, but that
604604
// would require them to be simultaneously running two clients and
605605
// submitting from both of them. Still, it might be safest to use some kind
606606
// of atomic function that handles both this check AND the add.

src/herder/HerderImpl.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,6 @@ class HerderImpl : public Herder
250250
void purgeOldPersistedTxSets();
251251
void writeDebugTxSet(LedgerCloseData const& lcd);
252252

253-
// TODO: Need some way to get these queues
254-
// TODO: Maybe something else should create this and pass it in somehow,
255-
// either via Application or explicitly in the constructor for HerderImpl.
256253
TransactionQueuesPtr const mTransactionQueues =
257254
std::make_shared<TransactionQueues>();
258255

src/herder/TransactionQueue.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class Application;
6767
// incoming SCP messages need to wait for tx queue additions to occur, which
6868
// is bad.
6969
// * My note: Try both approaches and benchmark
70+
// * Follow up note: The priority locking scheme also helps to address
71+
// this.
7072

7173
enum class TxQueueAddResultCode
7274
{
@@ -172,6 +174,9 @@ class TransactionQueue
172174
protected:
173175
// TODO: Docs?
174176
// TODO: Move?
177+
// TODO: It might be worth benchmarking this against the solution that does
178+
// not use priority locking (just uses std::mutex). The added complexity of
179+
// this may not be worth it.
175180
class TxQueueLock : NonMovableOrCopyable
176181
{
177182
public:

src/main/ApplicationImpl.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,10 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg)
199199
if (mConfig.BACKGROUND_TX_QUEUE)
200200
{
201201
// TODO: Keep priority unchanged as tx queue processes time-sensitive
202-
// tasks? Or should tx queue priority be downgraded?
202+
// tasks? Or should tx queue priority be downgraded? The priority
203+
// locking mechanism in TransactionQueue is designed to prevent tx queue
204+
// from starving other work, so it may be fine to keep priority
205+
// unchanged.
203206
mTxQueueThread = std::thread{[this]() { mTxQueueIOContext->run(); }};
204207
mThreadTypes[mTxQueueThread->get_id()] = ThreadType::TX_QUEUE;
205208
}

src/overlay/OverlayManagerImpl.cpp

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,17 +1231,6 @@ OverlayManagerImpl::recvTransaction(StellarMessage const& msg,
12311231

12321232
// add it to our current set
12331233
// and make sure it is valid
1234-
// TODO: I think I could pull this one call into Peer::recvTransaction
1235-
// and move basically all of the rest of this function to a new function
1236-
// called something like "recordTransactionStats" or something. Then,
1237-
// Peer:recvTransaction would invoke HerderImpl::recvTransaction in the
1238-
// background, and then pass the result to the new function on the main
1239-
// thread. That way I don't have to make OverlayManagerImpl and its
1240-
// dependencies (Floodgate, Peer, TxDemandsManager, maybe more), or much
1241-
// of Peer thread safe. Note that the recordTransactionStats function
1242-
// would probably need to take a shared ptr to the message so that the
1243-
// message doesn't get deleted before the function is called. The lambda
1244-
// capture will need to copy this pointer in.
12451234
auto addResult = mApp.getHerder().recvTransaction(transaction, false);
12461235
recordAddTransactionStats(addResult, transaction->getFullHash(), peer,
12471236
index);

src/overlay/Peer.cpp

Lines changed: 72 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -878,119 +878,94 @@ Peer::recvAuthenticatedMessage(AuthenticatedMessage&& msg)
878878
{
879879
ZoneScoped;
880880
releaseAssert(!threadIsMain() || !useBackgroundThread());
881+
RECURSIVE_LOCK_GUARD(mStateMutex, guard);
881882

882-
// TODO: Remove if I get rid of the special lock scoping vv
883-
std::shared_ptr<CapacityTrackedMessage> msgTracker = nullptr;
884-
885-
// TODO: Move back if I git rid of lock scoping vv
886-
Scheduler::ActionType type = Scheduler::ActionType::NORMAL_ACTION;
887-
std::string queueName;
883+
if (shouldAbort(guard))
888884
{
889-
RECURSIVE_LOCK_GUARD(mStateMutex, guard);
890-
891-
if (shouldAbort(guard))
892-
{
893-
return false;
894-
}
885+
return false;
886+
}
895887

896-
std::string errorMsg;
897-
if (getState(guard) >= GOT_HELLO &&
898-
msg.v0().message.type() != ERROR_MSG)
888+
std::string errorMsg;
889+
if (getState(guard) >= GOT_HELLO && msg.v0().message.type() != ERROR_MSG)
890+
{
891+
if (!mHmac.checkAuthenticatedMessage(msg, errorMsg))
899892
{
900-
if (!mHmac.checkAuthenticatedMessage(msg, errorMsg))
893+
if (!threadIsMain())
901894
{
902-
if (!threadIsMain())
903-
{
904-
mAppConnector.postOnMainThread(
905-
[self = shared_from_this(), errorMsg]() {
906-
self->sendErrorAndDrop(ERR_AUTH, errorMsg);
907-
},
908-
"Peer::sendErrorAndDrop");
909-
}
910-
else
911-
{
912-
sendErrorAndDrop(ERR_AUTH, errorMsg);
913-
}
914-
return false;
895+
mAppConnector.postOnMainThread(
896+
[self = shared_from_this(), errorMsg]() {
897+
self->sendErrorAndDrop(ERR_AUTH, errorMsg);
898+
},
899+
"Peer::sendErrorAndDrop");
900+
}
901+
else
902+
{
903+
sendErrorAndDrop(ERR_AUTH, errorMsg);
915904
}
905+
return false;
916906
}
907+
}
917908

918-
// NOTE: Additionally, we may use state snapshots to verify TRANSACTION
919-
// type messages in the background.
920-
921-
// Start tracking capacity here, so read throttling is applied
922-
// appropriately. Flow control might not be started at that time
923-
msgTracker = std::make_shared<CapacityTrackedMessage>(
924-
shared_from_this(), msg.v0().message);
925-
926-
std::string cat;
927-
928-
switch (msgTracker->getMessage().type())
929-
{
930-
case HELLO:
931-
case AUTH:
932-
cat = AUTH_ACTION_QUEUE;
933-
break;
934-
// control messages
935-
case PEERS:
936-
case ERROR_MSG:
937-
case SEND_MORE:
938-
case SEND_MORE_EXTENDED:
939-
cat = "CTRL";
940-
break;
941-
// high volume flooding
942-
case TRANSACTION:
943-
case FLOOD_ADVERT:
944-
case FLOOD_DEMAND:
945-
{
946-
cat = "TX";
947-
type = Scheduler::ActionType::DROPPABLE_ACTION;
948-
break;
949-
}
909+
// NOTE: Additionally, we may use state snapshots to verify TRANSACTION type
910+
// messages in the background.
950911

951-
// consensus, inbound
952-
case GET_TX_SET:
953-
case GET_SCP_QUORUMSET:
954-
case GET_SCP_STATE:
955-
cat = "SCPQ";
956-
type = Scheduler::ActionType::DROPPABLE_ACTION;
957-
break;
912+
// Start tracking capacity here, so read throttling is applied
913+
// appropriately. Flow control might not be started at that time
914+
auto msgTracker = std::make_shared<CapacityTrackedMessage>(
915+
shared_from_this(), msg.v0().message);
958916

959-
// consensus, self
960-
case DONT_HAVE:
961-
case TX_SET:
962-
case GENERALIZED_TX_SET:
963-
case SCP_QUORUMSET:
964-
case SCP_MESSAGE:
965-
cat = "SCP";
966-
break;
917+
std::string cat;
918+
Scheduler::ActionType type = Scheduler::ActionType::NORMAL_ACTION;
967919

968-
default:
969-
cat = "MISC";
970-
}
920+
switch (msgTracker->getMessage().type())
921+
{
922+
case HELLO:
923+
case AUTH:
924+
cat = AUTH_ACTION_QUEUE;
925+
break;
926+
// control messages
927+
case PEERS:
928+
case ERROR_MSG:
929+
case SEND_MORE:
930+
case SEND_MORE_EXTENDED:
931+
cat = "CTRL";
932+
break;
933+
// high volume flooding
934+
case TRANSACTION:
935+
case FLOOD_ADVERT:
936+
case FLOOD_DEMAND:
937+
{
938+
cat = "TX";
939+
type = Scheduler::ActionType::DROPPABLE_ACTION;
940+
break;
941+
}
971942

972-
// processing of incoming messages during authenticated must be
973-
// in-order, so while not authenticated, place all messages onto
974-
// AUTH_ACTION_QUEUE scheduler queue
975-
queueName = isAuthenticated(guard) ? cat : AUTH_ACTION_QUEUE;
976-
type = isAuthenticated(guard) ? type
977-
: Scheduler::ActionType::NORMAL_ACTION;
943+
// consensus, inbound
944+
case GET_TX_SET:
945+
case GET_SCP_QUORUMSET:
946+
case GET_SCP_STATE:
947+
cat = "SCPQ";
948+
type = Scheduler::ActionType::DROPPABLE_ACTION;
949+
break;
978950

979-
// TODO: This scope (ending here) exists to ensure this doesn't hold the
980-
// state lock upon entry to the transaction queue. This can cause
981-
// deadlocks! I think it's safe to release the lock here as there's no
982-
// longer any state querying. In practice though, if I end up posting
983-
// the tryAdd action onto some tx-queue specific thread, then I can
984-
// remove the scoping I added here and the lock will be released upon
985-
// return from this function (like it always has).
951+
// consensus, self
952+
case DONT_HAVE:
953+
case TX_SET:
954+
case GENERALIZED_TX_SET:
955+
case SCP_QUORUMSET:
956+
case SCP_MESSAGE:
957+
cat = "SCP";
958+
break;
986959

987-
// TODO: Really investigate whether this peer+transaction queue locking
988-
// each other issue can come up anywhere else.
960+
default:
961+
cat = "MISC";
989962
}
990963

991-
// TODO: vv Remove asserts if I get rid of the scoping above
992-
releaseAssert(msgTracker);
993-
releaseAssert(!queueName.empty());
964+
// processing of incoming messages during authenticated must be in-order, so
965+
// while not authenticated, place all messages onto AUTH_ACTION_QUEUE
966+
// scheduler queue
967+
auto queueName = isAuthenticated(guard) ? cat : AUTH_ACTION_QUEUE;
968+
type = isAuthenticated(guard) ? type : Scheduler::ActionType::NORMAL_ACTION;
994969

995970
// If a message is already scheduled, drop
996971
if (mAppConnector.checkScheduledAndCache(msgTracker))
@@ -1027,9 +1002,6 @@ Peer::recvAuthenticatedMessage(AuthenticatedMessage&& msg)
10271002
self->recvMessage(t);
10281003
},
10291004
"Peer::recvMessage"); // TODO: Change message to something better
1030-
// TODO: If I end up running this on a different thread then I need to
1031-
// be sure to std::move `msgTracker` into the lambda as-per the note
1032-
// below.
10331005
}
10341006
else
10351007
{

src/overlay/TCPPeer.cpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -528,14 +528,10 @@ TCPPeer::startRead()
528528
ZoneScoped;
529529
releaseAssert(!threadIsMain() || !useBackgroundThread());
530530
releaseAssert(canRead());
531+
RECURSIVE_LOCK_GUARD(mStateMutex, guard);
532+
if (shouldAbort(guard))
531533
{
532-
RECURSIVE_LOCK_GUARD(mStateMutex, guard);
533-
if (shouldAbort(guard))
534-
{
535-
return;
536-
}
537-
// TODO: Remove this outer scoping if I add separate thread for bg tx
538-
// queue
534+
return;
539535
}
540536

541537
mThreadVars.getIncomingHeader().clear();

0 commit comments

Comments
 (0)