Skip to content

Commit aec6d8c

Browse files
committed
Make parallel tx set builder to flexibly choose the number of stages.
This just runs the same algorithm with a different number of stages and picks the tx set with the minimum amount of stages and roughly the maximum utilization. Also fixed the benchmark and added a small optimization that addresses a performance issue found.
1 parent db55f18 commit aec6d8c

File tree

4 files changed

+384
-164
lines changed

4 files changed

+384
-164
lines changed

src/herder/ParallelTxSetBuilder.cpp

Lines changed: 196 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@ namespace
1717
// Configuration for parallel partitioning of transactions.
1818
struct ParallelPartitionConfig
1919
{
20-
ParallelPartitionConfig(Config const& cfg,
20+
ParallelPartitionConfig(uint32_t stageCount,
2121
SorobanNetworkConfig const& sorobanCfg)
22-
: mStageCount(
23-
std::max(cfg.SOROBAN_PHASE_STAGE_COUNT, static_cast<uint32_t>(1)))
22+
: mStageCount(stageCount)
2423
, mClustersPerStage(sorobanCfg.ledgerMaxDependentTxClusters())
2524
, mInstructionsPerCluster(sorobanCfg.ledgerMaxInstructions() /
2625
mStageCount)
@@ -133,7 +132,42 @@ class Stage
133132
mInstructions += tx.mInstructions;
134133
return true;
135134
}
136-
// Otherwise, we need try to recompute the bin-packing from scratch.
135+
136+
// The following is a not particularly scientific, but a useful
137+
// optimization.
138+
// The logic is as follows: in-place bin-packing is an unordered
139+
// first-fit heuristic with 1.7 approximation factor. Full bin
140+
// packing is first-fit-decreasing heuristic with 11/9 approximation,
141+
// which is better, but also more expensive due to full rebuild.
142+
// The first time we can't fit a cluster that has no conflicts with
143+
// first-fit heuristic, it makes sense to try re-packing all the
144+
// clusters with a better algorithm (thus potentially 'compacting'
145+
// the bins). However, after that we can say that the packing is both
146+
// almost at capacity and is already as compact as it gets with our
147+
// heuristics, so it's unlikely that if a cluster doesn't fit with
148+
// in-place packing, it will fit with full packing.
149+
// This optimization provides tremendous savings for the case when we
150+
// have a lot of independent transactions (say, a full tx queue with
151+
// 2x more transactions than we can fit into transaction set), which
152+
// also happens to be the worst case performance-wise. Without it we
153+
// might end up rebuilding the bin-packing for every single transaction
154+
// even though the bin-packing is already at capacity.
155+
// We don't do a similar optimization for the cases when there are
156+
// conflicts for now, as it's much less likely that all the
157+
// transactions would cause the cluster merge and then fail to be
158+
// packed (you'd need very specific set of transactions for that to
159+
// occur). But we can consider doing the full packing just once or a
160+
// few times without any additional conditions if that's ever an issue.
161+
if (conflictingClusters.empty())
162+
{
163+
if (mTriedCompactingBinPacking)
164+
{
165+
return false;
166+
}
167+
mTriedCompactingBinPacking = true;
168+
}
169+
// Try to recompute the bin-packing from scratch with a more efficient
170+
// heuristic.
137171
std::vector<uint64_t> newBinInstructions;
138172
auto newPacking = binPacking(*newClusters, newBinInstructions);
139173
// Even if the new cluster is below the limit, it may invalidate the
@@ -339,8 +373,113 @@ class Stage
339373
std::vector<uint64_t> mBinInstructions;
340374
int64_t mInstructions = 0;
341375
ParallelPartitionConfig mConfig;
376+
bool mTriedCompactingBinPacking = false;
342377
};
343378

379+
struct ParallelPhaseBuildResult
380+
{
381+
TxStageFrameList mStages;
382+
std::vector<bool> mHadTxNotFittingLane;
383+
int64_t mTotalInclusionFee = 0;
384+
};
385+
386+
ParallelPhaseBuildResult
387+
buildSurgePricedParallelSorobanPhaseWithStageCount(
388+
SurgePricingPriorityQueue queue,
389+
std::unordered_map<TransactionFrameBaseConstPtr, BuilderTx const*> const&
390+
builderTxForTx,
391+
TxFrameList const& txFrames, uint32_t stageCount,
392+
SorobanNetworkConfig const& sorobanCfg,
393+
std::shared_ptr<SurgePricingLaneConfig> laneConfig)
394+
{
395+
ZoneScoped;
396+
ParallelPartitionConfig partitionCfg(stageCount, sorobanCfg);
397+
398+
std::vector<Stage> stages(partitionCfg.mStageCount, partitionCfg);
399+
400+
// Visit the transactions in the surge pricing queue and try to add them to
401+
// at least one of the stages.
402+
auto visitor = [&stages,
403+
&builderTxForTx](TransactionFrameBaseConstPtr const& tx) {
404+
bool added = false;
405+
auto builderTxIt = builderTxForTx.find(tx);
406+
releaseAssert(builderTxIt != builderTxForTx.end());
407+
for (auto& stage : stages)
408+
{
409+
if (stage.tryAdd(*builderTxIt->second))
410+
{
411+
added = true;
412+
break;
413+
}
414+
}
415+
if (added)
416+
{
417+
return SurgePricingPriorityQueue::VisitTxResult::PROCESSED;
418+
}
419+
// If a transaction didn't fit into any of the stages, we consider it
420+
// to have been excluded due to resource limits and thus notify the
421+
// surge pricing queue that surge pricing should be triggered (
422+
// REJECTED imitates the behavior for exceeding the resource limit
423+
// within the queue itself).
424+
return SurgePricingPriorityQueue::VisitTxResult::REJECTED;
425+
};
426+
427+
ParallelPhaseBuildResult result;
428+
std::vector<Resource> laneLeftUntilLimitUnused;
429+
queue.popTopTxs(/* allowGaps */ true, visitor, laneLeftUntilLimitUnused,
430+
result.mHadTxNotFittingLane);
431+
// There is only a single fee lane for Soroban, so there is only a single
432+
// flag that indicates whether there was a transaction that didn't fit into
433+
// lane (and thus all transactions are surge priced at once).
434+
releaseAssert(result.mHadTxNotFittingLane.size() == 1);
435+
436+
// At this point the stages have been filled with transactions and we just
437+
// need to place the full transactions into the respective stages/clusters.
438+
result.mStages.reserve(stages.size());
439+
int64_t& totalInclusionFee = result.mTotalInclusionFee;
440+
for (auto const& stage : stages)
441+
{
442+
auto& resStage = result.mStages.emplace_back();
443+
resStage.reserve(partitionCfg.mClustersPerStage);
444+
445+
std::unordered_map<size_t, size_t> clusterIdToStageCluster;
446+
447+
stage.visitAllTransactions(
448+
[&resStage, &txFrames, &clusterIdToStageCluster,
449+
&totalInclusionFee](size_t clusterId, size_t txId) {
450+
auto it = clusterIdToStageCluster.find(clusterId);
451+
if (it == clusterIdToStageCluster.end())
452+
{
453+
it = clusterIdToStageCluster
454+
.emplace(clusterId, resStage.size())
455+
.first;
456+
resStage.emplace_back();
457+
}
458+
totalInclusionFee += txFrames[txId]->getInclusionFee();
459+
resStage[it->second].push_back(txFrames[txId]);
460+
});
461+
// Algorithm ensures that clusters are populated from first to last and
462+
// no empty clusters are generated.
463+
for (auto const& cluster : resStage)
464+
{
465+
releaseAssert(!cluster.empty());
466+
}
467+
}
468+
// Ensure we don't return any empty stages, which is prohibited by the
469+
// protocol. The algorithm builds the stages such that the stages are
470+
// populated from first to last.
471+
while (!result.mStages.empty() && result.mStages.back().empty())
472+
{
473+
result.mStages.pop_back();
474+
}
475+
for (auto const& stage : result.mStages)
476+
{
477+
releaseAssert(!stage.empty());
478+
}
479+
480+
return result;
481+
}
482+
344483
} // namespace
345484

346485
TxStageFrameList
@@ -351,6 +490,13 @@ buildSurgePricedParallelSorobanPhase(
351490
std::vector<bool>& hadTxNotFittingLane)
352491
{
353492
ZoneScoped;
493+
// We prefer the transaction sets that are well utilized, but we also want
494+
// to lower the stage count when possible. Thus we will nominate a tx set
495+
// that has the lowest amount of stages while still being within
496+
// MAX_INCLUSION_FEE_TOLERANCE_FOR_STAGE_COUNT from the maximum total
497+
// inclusion fee (a proxy for the transaction set utilization).
498+
double const MAX_INCLUSION_FEE_TOLERANCE_FOR_STAGE_COUNT = 0.999;
499+
354500
// Simplify the transactions to the minimum necessary amount of data.
355501
std::unordered_map<TransactionFrameBaseConstPtr, BuilderTx const*>
356502
builderTxForTx;
@@ -434,87 +580,60 @@ buildSurgePricedParallelSorobanPhase(
434580
queue.add(tx);
435581
}
436582

437-
ParallelPartitionConfig partitionCfg(cfg, sorobanCfg);
438-
std::vector<Stage> stages(partitionCfg.mStageCount, partitionCfg);
439-
440-
// Visit the transactions in the surge pricing queue and try to add them to
441-
// at least one of the stages.
442-
auto visitor = [&stages,
443-
&builderTxForTx](TransactionFrameBaseConstPtr const& tx) {
444-
bool added = false;
445-
auto builderTxIt = builderTxForTx.find(tx);
446-
releaseAssert(builderTxIt != builderTxForTx.end());
447-
for (auto& stage : stages)
448-
{
449-
if (stage.tryAdd(*builderTxIt->second))
450-
{
451-
added = true;
452-
break;
453-
}
454-
}
455-
if (added)
456-
{
457-
return SurgePricingPriorityQueue::VisitTxResult::PROCESSED;
458-
}
459-
// If a transaction didn't fit into any of the stages, we consider it
460-
// to have been excluded due to resource limits and thus notify the
461-
// surge pricing queue that surge pricing should be triggered (
462-
// REJECTED imitates the behavior for exceeding the resource limit
463-
// within the queue itself).
464-
return SurgePricingPriorityQueue::VisitTxResult::REJECTED;
465-
};
583+
// Create a worker thread for each stage count.
584+
std::vector<std::thread> threads;
585+
uint32_t stageCountOptions = cfg.SOROBAN_PHASE_MAX_STAGE_COUNT -
586+
cfg.SOROBAN_PHASE_MIN_STAGE_COUNT + 1;
587+
std::vector<ParallelPhaseBuildResult> results(stageCountOptions);
466588

467-
std::vector<Resource> laneLeftUntilLimitUnused;
468-
queue.popTopTxs(/* allowGaps */ true, visitor, laneLeftUntilLimitUnused,
469-
hadTxNotFittingLane);
470-
// There is only a single fee lane for Soroban, so there is only a single
471-
// flag that indicates whether there was a transaction that didn't fit into
472-
// lane (and thus all transactions are surge priced at once).
473-
releaseAssert(hadTxNotFittingLane.size() == 1);
474-
475-
// At this point the stages have been filled with transactions and we just
476-
// need to place the full transactions into the respective stages/clusters.
477-
TxStageFrameList resStages;
478-
resStages.reserve(stages.size());
479-
for (auto const& stage : stages)
589+
for (uint32_t stageCount = cfg.SOROBAN_PHASE_MIN_STAGE_COUNT;
590+
stageCount <= cfg.SOROBAN_PHASE_MAX_STAGE_COUNT; ++stageCount)
480591
{
481-
auto& resStage = resStages.emplace_back();
482-
resStage.reserve(partitionCfg.mClustersPerStage);
483-
484-
std::unordered_map<size_t, size_t> clusterIdToStageCluster;
485-
486-
stage.visitAllTransactions([&resStage, &txFrames,
487-
&clusterIdToStageCluster](size_t clusterId,
488-
size_t txId) {
489-
auto it = clusterIdToStageCluster.find(clusterId);
490-
if (it == clusterIdToStageCluster.end())
491-
{
492-
it = clusterIdToStageCluster.emplace(clusterId, resStage.size())
493-
.first;
494-
resStage.emplace_back();
495-
}
496-
resStage[it->second].push_back(txFrames[txId]);
592+
size_t resultIndex = stageCount - cfg.SOROBAN_PHASE_MIN_STAGE_COUNT;
593+
threads.emplace_back([queue, &builderTxForTx, txFrames, stageCount,
594+
sorobanCfg, laneConfig, resultIndex, &results]() {
595+
results[resultIndex] =
596+
buildSurgePricedParallelSorobanPhaseWithStageCount(
597+
std::move(queue), builderTxForTx, txFrames, stageCount,
598+
sorobanCfg, laneConfig);
497599
});
498-
// Algorithm ensures that clusters are populated from first to last and
499-
// no empty clusters are generated.
500-
for (auto const& cluster : resStage)
501-
{
502-
releaseAssert(!cluster.empty());
503-
}
504600
}
505-
// Ensure we don't return any empty stages, which is prohibited by the
506-
// protocol. The algorithm builds the stages such that the stages are
507-
// populated from first to last.
508-
while (!resStages.empty() && resStages.back().empty())
601+
for (auto& thread : threads)
509602
{
510-
resStages.pop_back();
603+
thread.join();
511604
}
512-
for (auto const& stage : resStages)
605+
606+
releaseAssert(!results.empty());
607+
608+
int64_t maxTotalInclusionFee = 0;
609+
for (auto const& result : results)
513610
{
514-
releaseAssert(!stage.empty());
611+
maxTotalInclusionFee =
612+
std::max(maxTotalInclusionFee, result.mTotalInclusionFee);
515613
}
516-
517-
return resStages;
614+
maxTotalInclusionFee *= MAX_INCLUSION_FEE_TOLERANCE_FOR_STAGE_COUNT;
615+
std::optional<size_t> bestResultIndex = std::nullopt;
616+
for (size_t i = 0; i < results.size(); ++i)
617+
{
618+
CLOG_DEBUG(Herder,
619+
"Parallel Soroban tx set nomination: {} stages => {} total "
620+
"inclusion fee",
621+
results[i].mTotalInclusionFee, results[i].mStages.size());
622+
if (results[i].mTotalInclusionFee < maxTotalInclusionFee)
623+
{
624+
continue;
625+
}
626+
if (!bestResultIndex ||
627+
results[i].mStages.size() <
628+
results[bestResultIndex.value()].mStages.size())
629+
{
630+
bestResultIndex = std::make_optional(i);
631+
}
632+
}
633+
releaseAssert(bestResultIndex.has_value());
634+
auto& bestResult = results[bestResultIndex.value()];
635+
hadTxNotFittingLane = std::move(bestResult.mHadTxNotFittingLane);
636+
return std::move(bestResult.mStages);
518637
}
519638

520639
} // namespace stellar

0 commit comments

Comments
 (0)