Skip to content

perf: using asynchronous worker to validate BLS signatures in quorum commitments #6692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/llmq/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <chain.h>
#include <chainparams.h>
#include <checkqueue.h>
#include <consensus/params.h>
#include <consensus/validation.h>
#include <deploymentstatus.h>
Expand Down Expand Up @@ -52,8 +53,18 @@ CQuorumBlockProcessor::CQuorumBlockProcessor(CChainState& chainstate, CDetermini
m_qsnapman(qsnapman)
{
utils::InitQuorumsCache(mapHasMinedCommitmentCache);

int qc_threads = gArgs.GetIntArg("-par", DEFAULT_SCRIPTCHECK_THREADS);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably want a separate arg for this

if (qc_threads <= 0) {
// -par=0 means autodetect (number of cores - 1 validator threads)
// -par=-n means "leave n cores free" (number of cores - n - 1 validator threads)
qc_threads += GetNumCores();
}
m_bls_queue.StartWorkerThreads(qc_threads);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original script check logic has some additional limits

    // Subtract 1 because the main thread counts towards the par threads
    script_threads = std::max(script_threads - 1, 0);

    // Number of script-checking threads <= MAX_SCRIPTCHECK_THREADS
    script_threads = std::min(script_threads, MAX_SCRIPTCHECK_THREADS);

Shouldn't we have smth like that here too?

}

CQuorumBlockProcessor::~CQuorumBlockProcessor() { m_bls_queue.StopWorkerThreads(); }

MessageProcessingResult CQuorumBlockProcessor::ProcessMessage(const CNode& peer, std::string_view msg_type,
CDataStream& vRecv)
{
Expand Down Expand Up @@ -194,8 +205,21 @@ bool CQuorumBlockProcessor::ProcessBlock(const CBlock& block, gsl::not_null<cons
}
}

if (fBLSChecks) {
CCheckQueueControl<utils::BlsCheck> queue_control(&m_bls_queue);
for (const auto& [_, qc] : qcs) {
if (qc.IsNull()) continue;
const auto* pQuorumBaseBlockIndex = m_chainstate.m_blockman.LookupBlockIndex(qc.quorumHash);
qc.VerifySignatureAsync(m_dmnman, m_qsnapman, pQuorumBaseBlockIndex, &queue_control);
}

if (!queue_control.Wait()) {
// at least one check failed
return false;
}
}
for (const auto& [_, qc] : qcs) {
if (!ProcessCommitment(pindex->nHeight, blockHash, qc, state, fJustCheck, fBLSChecks)) {
if (!ProcessCommitment(pindex->nHeight, blockHash, qc, state, fJustCheck, false)) {
LogPrintf("[ProcessBlock] failed h[%d] llmqType[%d] version[%d] quorumIndex[%d] quorumHash[%s]\n", pindex->nHeight, ToUnderlying(qc.llmqType), qc.nVersion, qc.quorumIndex, qc.quorumHash.ToString());
return false;
}
Expand Down
8 changes: 8 additions & 0 deletions src/llmq/blockprocessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

#include <unordered_lru_cache.h>

#include <bls/bls.h>
#include <checkqueue.h>
#include <llmq/params.h>
#include <llmq/utils.h>
#include <protocol.h>
#include <saltedhasher.h>
#include <sync.h>
Expand All @@ -19,6 +22,7 @@
class BlockValidationState;
class CBlock;
class CBlockIndex;
class CBLSSignature;
class CChain;
class CChainState;
class CDataStream;
Expand All @@ -43,6 +47,8 @@ class CQuorumBlockProcessor
CEvoDB& m_evoDb;
CQuorumSnapshotManager& m_qsnapman;

CCheckQueue<utils::BlsCheck> m_bls_queue{32};

mutable Mutex minableCommitmentsCs;
std::map<std::pair<Consensus::LLMQType, uint256>, uint256> minableCommitmentsByQuorum GUARDED_BY(minableCommitmentsCs);
std::map<uint256, CFinalCommitment> minableCommitments GUARDED_BY(minableCommitmentsCs);
Expand All @@ -53,6 +59,8 @@ class CQuorumBlockProcessor
explicit CQuorumBlockProcessor(CChainState& chainstate, CDeterministicMNManager& dmnman, CEvoDB& evoDb,
CQuorumSnapshotManager& qsnapman);

~CQuorumBlockProcessor();

MessageProcessingResult ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv);

bool ProcessBlock(const CBlock& block, gsl::not_null<const CBlockIndex*> pindex, BlockValidationState& state, bool fJustCheck, bool fBLSChecks) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
Expand Down
103 changes: 70 additions & 33 deletions src/llmq/commitment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
#include <evo/specialtx.h>

#include <chainparams.h>
#include <checkqueue.h>
#include <consensus/validation.h>
#include <deploymentstatus.h>
#include <llmq/options.h>
#include <llmq/utils.h>
#include <logging.h>
#include <validation.h>
#include <util/underlying.h>
#include <validation.h>

namespace llmq
{
Expand All @@ -27,6 +28,73 @@ CFinalCommitment::CFinalCommitment(const Consensus::LLMQParams& params, const ui
{
}

bool CFinalCommitment::VerifySignatureAsync(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman,
gsl::not_null<const CBlockIndex*> pQuorumBaseBlockIndex,
CCheckQueueControl<utils::BlsCheck>* queue_control) const
{
auto members = utils::GetAllQuorumMembers(llmqType, dmnman, qsnapman, pQuorumBaseBlockIndex);
const auto& llmq_params_opt = Params().GetLLMQ(llmqType);
if (!llmq_params_opt.has_value()) {
LogPrint(BCLog::LLMQ, "CFinalCommitment -- q[%s] invalid llmqType=%d\n", quorumHash.ToString(),
ToUnderlying(llmqType));
return false;
}
const auto& llmq_params = llmq_params_opt.value();

uint256 commitmentHash = BuildCommitmentHash(llmq_params.type, quorumHash, validMembers, quorumPublicKey,
quorumVvecHash);
if (LogAcceptDebug(BCLog::LLMQ)) {
std::stringstream ss3;
for (const auto& mn : members) {
ss3 << mn->proTxHash.ToString().substr(0, 4) << " | ";
}
LogPrint(BCLog::LLMQ, "CFinalCommitment::%s members[%s] quorumPublicKey[%s] commitmentHash[%s]\n", __func__,
ss3.str(), quorumPublicKey.ToString(), commitmentHash.ToString());
}
if (llmq_params.size == 1) {
LogPrintf("pubkey operator: %s\n", members[0]->pdmnState->pubKeyOperator.Get().ToString());
if (!membersSig.VerifyInsecure(members[0]->pdmnState->pubKeyOperator.Get(), commitmentHash)) {
LogPrint(BCLog::LLMQ, "CFinalCommitment -- q[%s] invalid member signature\n", quorumHash.ToString());
return false;
}
} else {
std::vector<CBLSPublicKey> memberPubKeys;
for (const auto i : irange::range(members.size())) {
if (!signers[i]) {
continue;
}
memberPubKeys.emplace_back(members[i]->pdmnState->pubKeyOperator.Get());
}
std::string members_id_string{
strprintf("CFinalCommitment -- q[%s] invalid aggregated members signature", quorumHash.ToString())};
if (queue_control) {
std::vector<utils::BlsCheck> vChecks;
vChecks.emplace_back(membersSig, memberPubKeys, commitmentHash, members_id_string);
queue_control->Add(vChecks);
} else {
if (!membersSig.VerifySecureAggregated(memberPubKeys, commitmentHash)) {
LogPrint(BCLog::LLMQ, "%s\n", members_id_string);
return false;
}
}
}
std::string qsig_id_string{strprintf("CFinalCommitment -- q[%s] invalid quorum signature", quorumHash.ToString())};
if (queue_control) {
std::vector<utils::BlsCheck> vChecks;
std::vector<CBLSPublicKey> public_keys;
public_keys.push_back(quorumPublicKey);
vChecks.emplace_back(quorumSig, public_keys, commitmentHash, qsig_id_string);
queue_control->Add(vChecks);
} else {
if (!quorumSig.VerifyInsecure(quorumPublicKey, commitmentHash)) {
LogPrint(BCLog::LLMQ, "%s\n", qsig_id_string);
return false;
}
}
return true;
}


bool CFinalCommitment::Verify(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman,
gsl::not_null<const CBlockIndex*> pQuorumBaseBlockIndex, bool checkSigs) const
{
Expand Down Expand Up @@ -106,38 +174,7 @@ bool CFinalCommitment::Verify(CDeterministicMNManager& dmnman, CQuorumSnapshotMa

// sigs are only checked when the block is processed
if (checkSigs) {
uint256 commitmentHash = BuildCommitmentHash(llmq_params.type, quorumHash, validMembers, quorumPublicKey, quorumVvecHash);
if (LogAcceptDebug(BCLog::LLMQ)) {
std::stringstream ss3;
for (const auto &mn: members) {
ss3 << mn->proTxHash.ToString().substr(0, 4) << " | ";
}
LogPrint(BCLog::LLMQ, "CFinalCommitment::%s members[%s] quorumPublicKey[%s] commitmentHash[%s]\n",
__func__, ss3.str(), quorumPublicKey.ToString(), commitmentHash.ToString());
}
if (llmq_params.size == 1) {
LogPrintf("pubkey operator: %s\n", members[0]->pdmnState->pubKeyOperator.Get().ToString());
if (!membersSig.VerifyInsecure(members[0]->pdmnState->pubKeyOperator.Get(), commitmentHash)) {
LogPrint(BCLog::LLMQ, "CFinalCommitment -- q[%s] invalid member signature\n", quorumHash.ToString());
return false;
}
} else {
std::vector<CBLSPublicKey> memberPubKeys;
for (const auto i : irange::range(members.size())) {
if (!signers[i]) {
continue;
}
memberPubKeys.emplace_back(members[i]->pdmnState->pubKeyOperator.Get());
}

if (!membersSig.VerifySecureAggregated(memberPubKeys, commitmentHash)) {
LogPrint(BCLog::LLMQ, "CFinalCommitment -- q[%s] invalid aggregated members signature\n",
quorumHash.ToString());
return false;
}
}
if (!quorumSig.VerifyInsecure(quorumPublicKey, commitmentHash)) {
LogPrint(BCLog::LLMQ, "CFinalCommitment -- q[%s] invalid quorum signature\n", quorumHash.ToString());
if (!VerifySignatureAsync(dmnman, qsnapman, pQuorumBaseBlockIndex, nullptr)) {
return false;
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/llmq/commitment.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ class CBlockIndex;
class CDeterministicMNManager;
class ChainstateManager;
class TxValidationState;
template <typename T>
class CCheckQueueControl;

namespace llmq
{
class CQuorumSnapshotManager;

namespace utils {
struct BlsCheck;
} // namespace utils
// This message is an aggregation of all received premature commitments and only valid if
// enough (>=threshold) premature commitments were aggregated
// This is mined on-chain as part of TRANSACTION_QUORUM_COMMITMENT
Expand Down Expand Up @@ -66,6 +71,9 @@ class CFinalCommitment
return int(std::count(validMembers.begin(), validMembers.end(), true));
}

bool VerifySignatureAsync(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman,
gsl::not_null<const CBlockIndex*> pQuorumBaseBlockIndex,
CCheckQueueControl<utils::BlsCheck>* queue_control) const;
bool Verify(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman,
gsl::not_null<const CBlockIndex*> pQuorumBaseBlockIndex, bool checkSigs) const;
bool VerifyNull() const;
Expand Down
19 changes: 19 additions & 0 deletions src/llmq/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,25 @@ void AddQuorumProbeConnections(const Consensus::LLMQParams& llmqParams, CConnman
}
}

bool BlsCheck::operator()()
{
if (m_pubkeys.size() > 1) {
if (!m_sig.VerifySecureAggregated(m_pubkeys, m_msg_hash)) {
LogPrint(BCLog::LLMQ, "%s\n", m_id_string);
return false;
}
} else if (m_pubkeys.size() == 1) {
if (!m_sig.VerifyInsecure(m_pubkeys.back(), m_msg_hash)) {
LogPrint(BCLog::LLMQ, "%s\n", m_id_string);
return false;
}
} else {
// It is supposed to be at least one public key!
return false;
}
return true;
}

template <typename CacheType>
void InitQuorumsCache(CacheType& cache, bool limit_by_connections)
{
Expand Down
29 changes: 29 additions & 0 deletions src/llmq/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#ifndef BITCOIN_LLMQ_UTILS_H
#define BITCOIN_LLMQ_UTILS_H

#include <bls/bls.h>
#include <gsl/pointers.h>
#include <llmq/params.h>
#include <saltedhasher.h>
Expand Down Expand Up @@ -56,8 +57,36 @@ void AddQuorumProbeConnections(const Consensus::LLMQParams& llmqParams, CConnman
const CSporkManager& sporkman, const CDeterministicMNList& tip_mn_list,
gsl::not_null<const CBlockIndex*> pQuorumBaseBlockIndex, const uint256& myProTxHash);

struct BlsCheck {
CBLSSignature m_sig;
std::vector<CBLSPublicKey> m_pubkeys;
uint256 m_msg_hash;
std::string m_id_string;

BlsCheck() = default;

BlsCheck(CBLSSignature sig, std::vector<CBLSPublicKey> pubkeys, uint256 msg_hash, std::string id_string) :
m_sig(sig),
m_pubkeys(pubkeys),
m_msg_hash(msg_hash),
m_id_string(id_string)
{
}

void swap(BlsCheck& obj)
{
std::swap(m_sig, obj.m_sig);
std::swap(m_pubkeys, obj.m_pubkeys);
std::swap(m_msg_hash, obj.m_msg_hash);
std::swap(m_id_string, obj.m_id_string);
}

bool operator()();
};

template <typename CacheType>
void InitQuorumsCache(CacheType& cache, bool limit_by_connections = true);

} // namespace utils
} // namespace llmq

Expand Down
Loading