diff --git a/Builds/levelization/results/ordering.txt b/Builds/levelization/results/ordering.txt index b10a62584b..c7257478fa 100644 --- a/Builds/levelization/results/ordering.txt +++ b/Builds/levelization/results/ordering.txt @@ -203,5 +203,6 @@ xrpld.rpc > xrpl.protocol xrpld.rpc > xrpl.resource xrpld.rpc > xrpl.server xrpld.shamap > xrpl.basics +xrpld.shamap > xrpld.core xrpld.shamap > xrpld.nodestore xrpld.shamap > xrpl.protocol diff --git a/include/xrpl/basics/LocalValue.h b/include/xrpl/basics/LocalValue.h index 41556f34f2..dc4a70ef31 100644 --- a/include/xrpl/basics/LocalValue.h +++ b/include/xrpl/basics/LocalValue.h @@ -21,6 +21,7 @@ #define RIPPLE_BASICS_LOCALVALUE_H_INCLUDED #include +#include #include #include @@ -33,6 +34,16 @@ struct LocalValues explicit LocalValues() = default; bool onCoro = true; + void* coroPtr = nullptr; // Pointer to owning JobQueue::Coro (if any) + + // When true, SHAMap::finishFetch() will poll-wait for missing nodes + // instead of returning empty. Only set by partial sync code paths. + bool partialSyncWait = false; + + // Configurable timeout for SHAMap node fetching during partial sync. + // Zero means use the default (30s). RPC handlers can set this to + // customize poll-wait behavior. + std::chrono::milliseconds fetchTimeout{0}; struct BasicValue { @@ -127,6 +138,57 @@ LocalValue::operator*() .emplace(this, std::make_unique>(t_)) .first->second->get()); } + +// Returns pointer to current coroutine if running inside one, nullptr otherwise +inline void* +getCurrentCoroPtr() +{ + auto lvs = detail::getLocalValues().get(); + if (lvs && lvs->onCoro) + return lvs->coroPtr; + return nullptr; +} + +// Check if partial sync wait is enabled for the current coroutine context. +inline bool +isPartialSyncWaitEnabled() +{ + auto lvs = detail::getLocalValues().get(); + if (lvs && lvs->onCoro) + return lvs->partialSyncWait; + return false; +} + +// Enable/disable partial sync wait for the current coroutine context. +inline void +setPartialSyncWait(bool enabled) +{ + auto lvs = detail::getLocalValues().get(); + if (lvs && lvs->onCoro) + lvs->partialSyncWait = enabled; +} + +// Get the configured fetch timeout for current coroutine context. +// Returns 0ms if not in a coroutine or no custom timeout set. +inline std::chrono::milliseconds +getCoroFetchTimeout() +{ + auto lvs = detail::getLocalValues().get(); + if (lvs && lvs->onCoro) + return lvs->fetchTimeout; + return std::chrono::milliseconds{0}; +} + +// Set the fetch timeout for the current coroutine context. +// Only works if called from within a coroutine. +inline void +setCoroFetchTimeout(std::chrono::milliseconds timeout) +{ + auto lvs = detail::getLocalValues().get(); + if (lvs && lvs->onCoro) + lvs->fetchTimeout = timeout; +} + } // namespace ripple #endif diff --git a/include/xrpl/protocol/ErrorCodes.h b/include/xrpl/protocol/ErrorCodes.h index b93747008c..210b46b828 100644 --- a/include/xrpl/protocol/ErrorCodes.h +++ b/include/xrpl/protocol/ErrorCodes.h @@ -61,10 +61,12 @@ enum error_code_i { rpcAMENDMENT_BLOCKED = 14, // Networking + //@@start network-error-codes rpcNO_CLOSED = 15, rpcNO_CURRENT = 16, rpcNO_NETWORK = 17, rpcNOT_SYNCED = 18, + //@@end network-error-codes // Ledger state rpcACT_NOT_FOUND = 19, diff --git a/src/libxrpl/protocol/ErrorCodes.cpp b/src/libxrpl/protocol/ErrorCodes.cpp index 509711aaab..988a8b09ed 100644 --- a/src/libxrpl/protocol/ErrorCodes.cpp +++ b/src/libxrpl/protocol/ErrorCodes.cpp @@ -89,9 +89,11 @@ constexpr static ErrorInfo unorderedErrorInfos[]{ {rpcNOT_SUPPORTED, "notSupported", "Operation not supported.", 501}, {rpcNO_CLOSED, "noClosed", "Closed ledger is unavailable.", 503}, {rpcNO_CURRENT, "noCurrent", "Current ledger is unavailable.", 503}, + //@@start network-error-messages {rpcNOT_SYNCED, "notSynced", "Not synced to the network.", 503}, {rpcNO_EVENTS, "noEvents", "Current transport does not support events.", 405}, {rpcNO_NETWORK, "noNetwork", "Not synced to the network.", 503}, + //@@end network-error-messages {rpcWRONG_NETWORK, "wrongNetwork", "Wrong network.", 503}, {rpcNO_PERMISSION, "noPermission", "You don't have permission for this command.", 401}, {rpcNO_PF_REQUEST, "noPathRequest", "No pathfinding request in progress.", 404}, diff --git a/src/test/app/LedgerReplay_test.cpp b/src/test/app/LedgerReplay_test.cpp index 20b7f1d521..fcac76688c 100644 --- a/src/test/app/LedgerReplay_test.cpp +++ b/src/test/app/LedgerReplay_test.cpp @@ -126,6 +126,34 @@ class MagicInboundLedgers : public InboundLedgers return {}; } + virtual std::shared_ptr + getPartialLedger(uint256 const& hash) override + { + return {}; + } + + virtual std::optional + findTxLedger(uint256 const& txHash) override + { + return std::nullopt; + } + + virtual void + addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override + { + } + + virtual void + prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override + { + } + + virtual bool + isTxPrioritized(std::uint32_t seq) const override + { + return false; + } + virtual bool gotLedgerData( LedgerHash const& ledgerHash, diff --git a/src/test/shamap/common.h b/src/test/shamap/common.h index 86acc8e2b0..6b0667083f 100644 --- a/src/test/shamap/common.h +++ b/src/test/shamap/common.h @@ -100,8 +100,10 @@ class TestNodeFamily : public Family } void - missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash) - override + missingNodeAcquireBySeq( + std::uint32_t refNum, + uint256 const& nodeHash, + bool prioritize = false) override { Throw("missing node"); } diff --git a/src/xrpld/app/consensus/RCLValidations.cpp b/src/xrpld/app/consensus/RCLValidations.cpp index 0a505ea84a..f5ff5889bc 100644 --- a/src/xrpld/app/consensus/RCLValidations.cpp +++ b/src/xrpld/app/consensus/RCLValidations.cpp @@ -192,8 +192,24 @@ handleNewValidation( auto const outcome = validations.add(calcNodeID(masterKey.value_or(signingKey)), val); + if (j.has_value()) + { + JLOG(j->warn()) << "handleNewValidation: seq=" << seq + << " hash=" << hash << " trusted=" << val->isTrusted() + << " outcome=" + << (outcome == ValStatus::current ? "current" + : outcome == ValStatus::stale ? "stale" + : outcome == ValStatus::badSeq ? "badSeq" + : "other"); + } + if (outcome == ValStatus::current) { + // For partial sync: track the network-observed ledger from ANY + // validation (not just trusted). This allows queries before + // trusted validators are fully configured. + app.getLedgerMaster().setNetworkObservedLedger(hash, seq); + if (val->isTrusted()) { // Was: app.getLedgerMaster().checkAccept(hash, seq); @@ -213,6 +229,23 @@ handleNewValidation( app.getLedgerMaster().checkAccept(hash, seq); } } + else + { + // Partial sync debug: only log untrusted validations during startup + // (before we have any validated ledger) + auto [lastHash, lastSeq] = + app.getLedgerMaster().getLastValidatedLedger(); + if (lastSeq == 0) + { + auto jPartialSync = app.journal("PartialSync"); + auto const quorum = app.validators().quorum(); + auto const unlSize = app.validators().count(); + JLOG(jPartialSync.debug()) + << "validation NOT trusted: seq=" << seq << " hash=" << hash + << " unlSize=" << unlSize << " quorum=" << quorum + << " (masterKey=" << (masterKey ? "found" : "none") << ")"; + } + } return; } diff --git a/src/xrpld/app/ledger/InboundLedger.h b/src/xrpld/app/ledger/InboundLedger.h index 13f603e79d..fc84738198 100644 --- a/src/xrpld/app/ledger/InboundLedger.h +++ b/src/xrpld/app/ledger/InboundLedger.h @@ -80,6 +80,13 @@ class InboundLedger final : public TimeoutCounter, return mLedger; } + /** Returns true if we have the ledger header (may still be incomplete). */ + bool + hasHeader() const + { + return mHaveHeader; + } + std::uint32_t getSeq() const { @@ -106,6 +113,26 @@ class InboundLedger final : public TimeoutCounter, void runData(); + /** Add a node hash to the priority queue for immediate fetching. + Used by partial sync mode to prioritize nodes needed by queries. + */ + void + addPriorityHash(uint256 const& hash); + + /** Check if a transaction hash has been seen in this ledger's txMap. + Used by submit_and_wait to find transactions in partial ledgers. + */ + bool + hasTx(uint256 const& txHash) const; + + /** Return the count of known transaction hashes (for debugging). */ + std::size_t + knownTxCount() const + { + ScopedLockType sl(mtx_); + return knownTxHashes_.size(); + } + void touch() { @@ -175,9 +202,11 @@ class InboundLedger final : public TimeoutCounter, clock_type::time_point mLastAction; std::shared_ptr mLedger; + //@@start state-tracking-members bool mHaveHeader; bool mHaveState; bool mHaveTransactions; + //@@end state-tracking-members bool mSignaled; bool mByHash; std::uint32_t mSeq; @@ -185,6 +214,13 @@ class InboundLedger final : public TimeoutCounter, std::set mRecentNodes; + // Priority nodes to fetch immediately (for partial sync queries) + std::set priorityHashes_; + + // Transaction hashes seen in incoming txMap leaf nodes (for + // submit_and_wait) + std::set knownTxHashes_; + SHAMapAddNode mStats; // Data we have received from peers diff --git a/src/xrpld/app/ledger/InboundLedgers.h b/src/xrpld/app/ledger/InboundLedgers.h index 6e6f893697..8650c42997 100644 --- a/src/xrpld/app/ledger/InboundLedgers.h +++ b/src/xrpld/app/ledger/InboundLedgers.h @@ -23,6 +23,7 @@ #include #include #include +#include namespace ripple { @@ -56,6 +57,45 @@ class InboundLedgers virtual std::shared_ptr find(LedgerHash const& hash) = 0; + /** Get a partial ledger (has header but may be incomplete). + Used for partial sync mode - allows RPC queries against + ledgers that are still being acquired. + @return The ledger if header exists and not failed, nullptr otherwise. + */ + virtual std::shared_ptr + getPartialLedger(uint256 const& hash) = 0; + + /** Find which partial ledger contains a transaction. + Used by submit_and_wait to locate transactions as they appear + in incoming ledgers' txMaps. + @param txHash The transaction hash to search for + @return The ledger hash if found, nullopt otherwise + */ + virtual std::optional + findTxLedger(uint256 const& txHash) = 0; + + /** Add a priority node hash for immediate fetching. + Used by partial sync mode to prioritize specific nodes + needed by queries. + @param ledgerSeq The ledger sequence being acquired + @param nodeHash The specific node hash to prioritize + */ + virtual void + addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) = 0; + + /** Add a ledger range where TX fetching should be prioritized. + Ledgers in this range will fetch TX nodes BEFORE state nodes. + Used by submit_and_wait to quickly detect transactions. + @param start First ledger sequence (inclusive) + @param end Last ledger sequence (inclusive) + */ + virtual void + prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) = 0; + + /** Check if TX fetching should be prioritized for a ledger sequence. */ + virtual bool + isTxPrioritized(std::uint32_t seq) const = 0; + // VFALCO TODO Remove the dependency on the Peer object. // virtual bool diff --git a/src/xrpld/app/ledger/LedgerMaster.h b/src/xrpld/app/ledger/LedgerMaster.h index e59b1b78fb..400ba74b4a 100644 --- a/src/xrpld/app/ledger/LedgerMaster.h +++ b/src/xrpld/app/ledger/LedgerMaster.h @@ -280,6 +280,38 @@ class LedgerMaster : public AbstractFetchPackContainer return !mValidLedger.empty(); } + //! Get the hash/seq of the last validated ledger (even if not resident). + std::pair + getLastValidatedLedger() + { + std::lock_guard lock(m_mutex); + return mLastValidLedger; + } + + //! For partial sync: set the network-observed ledger from any validation. + //! This allows queries before trusted validators are fully configured. + void + setNetworkObservedLedger(uint256 const& hash, LedgerIndex seq) + { + std::lock_guard lock(m_mutex); + if (seq > mNetworkObservedLedger.second) + { + JLOG(jPartialSync_.warn()) + << "network-observed ledger updated to seq=" << seq + << " hash=" << hash; + mNetworkObservedLedger = std::make_pair(hash, seq); + } + } + + //! Get the network-observed ledger (from any validations, not just + //! trusted). + std::pair + getNetworkObservedLedger() + { + std::lock_guard lock(m_mutex); + return mNetworkObservedLedger; + } + // Returns the minimum ledger sequence in SQL database, if any. std::optional minSqlSeq(); @@ -329,6 +361,7 @@ class LedgerMaster : public AbstractFetchPackContainer Application& app_; beast::Journal m_journal; + beast::Journal jPartialSync_; std::recursive_mutex mutable m_mutex; @@ -350,6 +383,9 @@ class LedgerMaster : public AbstractFetchPackContainer // Fully validated ledger, whether or not we have the ledger resident. std::pair mLastValidLedger{uint256(), 0}; + // Network-observed ledger from any validations (for partial sync). + std::pair mNetworkObservedLedger{uint256(), 0}; + LedgerHistory mLedgerHistory; CanonicalTXSet mHeldTransactions{uint256()}; diff --git a/src/xrpld/app/ledger/detail/InboundLedger.cpp b/src/xrpld/app/ledger/detail/InboundLedger.cpp index ba01dce17d..2826f606fe 100644 --- a/src/xrpld/app/ledger/detail/InboundLedger.cpp +++ b/src/xrpld/app/ledger/detail/InboundLedger.cpp @@ -41,6 +41,7 @@ namespace ripple { using namespace std::chrono_literals; +//@@start tx-fetch-constants enum { // Number of peers to start with peerCountStart = 5 @@ -69,6 +70,7 @@ enum { , reqNodes = 12 }; +//@@end tx-fetch-constants // millisecond for each ledger timeout auto constexpr ledgerAcquireTimeout = 3000ms; @@ -98,6 +100,8 @@ InboundLedger::InboundLedger( , mPeerSet(std::move(peerSet)) { JLOG(journal_.trace()) << "Acquiring ledger " << hash_; + JLOG(app_.journal("TxTrack").warn()) + << "NEW LEDGER seq=" << seq << " hash=" << hash; touch(); } @@ -157,6 +161,22 @@ InboundLedger::update(std::uint32_t seq) touch(); } +void +InboundLedger::addPriorityHash(uint256 const& hash) +{ + ScopedLockType sl(mtx_); + priorityHashes_.insert(hash); + JLOG(journal_.debug()) << "Added priority hash " << hash << " for ledger " + << hash_; +} + +bool +InboundLedger::hasTx(uint256 const& txHash) const +{ + ScopedLockType sl(mtx_); + return knownTxHashes_.count(txHash) > 0; +} + bool InboundLedger::checkLocal() { @@ -347,6 +367,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB) } } + //@@start completion-check if (mHaveTransactions && mHaveState) { JLOG(journal_.debug()) << "Had everything locally"; @@ -356,6 +377,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB) "ripple::InboundLedger::tryDB : valid ledger fees"); mLedger->setImmutable(); } + //@@end completion-check } /** Called with a lock by the PeerSet when the timer expires @@ -520,6 +542,43 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) } } + // Handle priority hashes immediately (for partial sync queries) + if (mHaveHeader && !priorityHashes_.empty()) + { + JLOG(journal_.warn()) << "PRIORITY: trigger() sending " + << priorityHashes_.size() << " priority requests"; + + protocol::TMGetObjectByHash tmBH; + tmBH.set_query(true); + tmBH.set_type(protocol::TMGetObjectByHash::otSTATE_NODE); + tmBH.set_ledgerhash(hash_.begin(), hash_.size()); + + for (auto const& h : priorityHashes_) + { + JLOG(journal_.warn()) << "PRIORITY: requesting node " << h; + protocol::TMIndexedObject* io = tmBH.add_objects(); + io->set_hash(h.begin(), h.size()); + if (mSeq != 0) + io->set_ledgerseq(mSeq); + } + + // Send to all peers in our peer set + auto packet = std::make_shared(tmBH, protocol::mtGET_OBJECTS); + auto const& peerIds = mPeerSet->getPeerIds(); + std::size_t sentCount = 0; + for (auto id : peerIds) + { + if (auto p = app_.overlay().findPeerByShortID(id)) + { + p->send(packet); + ++sentCount; + } + } + JLOG(journal_.warn()) << "PRIORITY: sent to " << sentCount << " peers"; + + priorityHashes_.clear(); + } + protocol::TMGetLedger tmGL; tmGL.set_ledgerhash(hash_.begin(), hash_.size()); @@ -613,7 +672,12 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) // Get the state data first because it's the most likely to be useful // if we wind up abandoning this fetch. - if (mHaveHeader && !mHaveState && !failed_) + // When TX is prioritized for this ledger range, skip state until TX + // complete. + bool const txPrioritized = + mSeq != 0 && app_.getInboundLedgers().isTxPrioritized(mSeq); + if (mHaveHeader && !mHaveState && !failed_ && + !(txPrioritized && !mHaveTransactions)) { XRPL_ASSERT( mLedger, @@ -837,6 +901,9 @@ InboundLedger::takeHeader(std::string const& data) mLedger->txMap().setLedgerSeq(mSeq); mHaveHeader = true; + JLOG(app_.journal("TxTrack").warn()) + << "GOT HEADER seq=" << mSeq << " txHash=" << mLedger->info().txHash; + Serializer s(data.size() + 4); s.add32(HashPrefix::ledgerMaster); s.addRaw(data.data(), data.size()); @@ -906,6 +973,33 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san) if (!nodeID) throw std::runtime_error("data does not properly deserialize"); + // For TX nodes, extract tx hash from leaf nodes for submit_and_wait + if (packet.type() == protocol::liTX_NODE) + { + auto const& data = node.nodedata(); + // Leaf nodes have wire type as last byte + // Format: [tx+meta data...][32-byte tx hash][1-byte type] + if (data.size() >= 33) + { + uint8_t wireType = + static_cast(data[data.size() - 1]); + // wireTypeTransactionWithMeta = 4 + if (wireType == 4) + { + uint256 txHash; + std::memcpy( + txHash.data(), data.data() + data.size() - 33, 32); + auto [it, inserted] = knownTxHashes_.insert(txHash); + if (inserted) + { + JLOG(app_.journal("TxTrack").warn()) + << "GOT TX ledger=" << mSeq << " tx=" << txHash + << " count=" << knownTxHashes_.size(); + } + } + } + } + if (nodeID->isRoot()) { san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f); diff --git a/src/xrpld/app/ledger/detail/InboundLedgers.cpp b/src/xrpld/app/ledger/detail/InboundLedgers.cpp index 99a26ce8f9..7528478047 100644 --- a/src/xrpld/app/ledger/detail/InboundLedgers.cpp +++ b/src/xrpld/app/ledger/detail/InboundLedgers.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -183,6 +184,89 @@ class InboundLedgersImp : public InboundLedgers return ret; } + std::shared_ptr + getPartialLedger(uint256 const& hash) override + { + auto inbound = find(hash); + if (inbound && inbound->hasHeader() && !inbound->isFailed()) + return inbound->getLedger(); + return nullptr; + } + + std::optional + findTxLedger(uint256 const& txHash) override + { + auto const swj = app_.journal("SubmitAndWait"); + ScopedLockType sl(mLock); + JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " searching " + << mLedgers.size() << " inbound ledgers"; + for (auto const& [hash, inbound] : mLedgers) + { + bool hasHdr = inbound->hasHeader(); + bool failed = inbound->isFailed(); + bool hasTx = hasHdr && !failed && inbound->hasTx(txHash); + JLOG(swj.trace()) + << "findTxLedger checking ledger seq=" << inbound->getSeq() + << " hash=" << hash << " hasHeader=" << hasHdr + << " failed=" << failed << " hasTx=" << hasTx; + if (hasTx) + { + JLOG(swj.warn()) << "findTxLedger FOUND tx=" << txHash + << " in ledger seq=" << inbound->getSeq(); + return hash; + } + } + JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " NOT FOUND"; + return std::nullopt; + } + + void + addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override + { + std::shared_ptr inbound; + { + ScopedLockType sl(mLock); + // Find inbound ledger by sequence (need to iterate) + for (auto const& [hash, ledger] : mLedgers) + { + if (ledger->getSeq() == ledgerSeq && !ledger->isFailed() && + !ledger->isComplete()) + { + inbound = ledger; + break; + } + } + } + + if (inbound) + { + inbound->addPriorityHash(nodeHash); + JLOG(j_.warn()) << "PRIORITY: added node " << nodeHash + << " for ledger seq " << ledgerSeq; + } + else + { + JLOG(j_.warn()) << "PRIORITY: no inbound ledger for seq " + << ledgerSeq << " (node " << nodeHash << ")"; + } + } + + void + prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override + { + std::lock_guard lock(txPriorityMutex_); + txPriorityRange_.insert(ClosedInterval(start, end)); + JLOG(j_.debug()) << "TX priority added for ledgers " << start << "-" + << end; + } + + bool + isTxPrioritized(std::uint32_t seq) const override + { + std::lock_guard lock(txPriorityMutex_); + return boost::icl::contains(txPriorityRange_, seq); + } + /* This gets called when "We got some data from an inbound ledger" @@ -406,6 +490,11 @@ class InboundLedgersImp : public InboundLedgers } else if ((la + std::chrono::minutes(1)) < start) { + JLOG(app_.journal("SubmitAndWait").debug()) + << "sweep removing ledger seq=" << it->second->getSeq() + << " complete=" << it->second->isComplete() + << " failed=" << it->second->isFailed() + << " knownTxCount=" << it->second->knownTxCount(); stuffToSweep.push_back(it->second); // shouldn't cause the actual final delete // since we are holding a reference in the vector. @@ -420,13 +509,22 @@ class InboundLedgersImp : public InboundLedgers beast::expire(mRecentFailures, kReacquireInterval); } - JLOG(j_.debug()) - << "Swept " << stuffToSweep.size() << " out of " << total + JLOG(app_.journal("SubmitAndWait").debug()) + << "sweep removed " << stuffToSweep.size() << " out of " << total << " inbound ledgers. Duration: " << std::chrono::duration_cast( m_clock.now() - start) .count() << "ms"; + + // Clear expired TX-priority ranges (anything at or below validated) + { + std::lock_guard lock(txPriorityMutex_); + auto const validSeq = app_.getLedgerMaster().getValidLedgerIndex(); + if (validSeq > 0 && !txPriorityRange_.empty()) + txPriorityRange_.erase( + ClosedInterval(0, validSeq)); + } } void @@ -463,6 +561,10 @@ class InboundLedgersImp : public InboundLedgers std::set pendingAcquires_; std::mutex acquiresMutex_; + + // Ledger ranges where TX fetching should be prioritized + mutable std::mutex txPriorityMutex_; + RangeSet txPriorityRange_; }; //------------------------------------------------------------------------------ diff --git a/src/xrpld/app/ledger/detail/LedgerMaster.cpp b/src/xrpld/app/ledger/detail/LedgerMaster.cpp index 2efc6dd9e7..662cec1292 100644 --- a/src/xrpld/app/ledger/detail/LedgerMaster.cpp +++ b/src/xrpld/app/ledger/detail/LedgerMaster.cpp @@ -107,6 +107,7 @@ LedgerMaster::LedgerMaster( beast::Journal journal) : app_(app) , m_journal(journal) + , jPartialSync_(app.journal("PartialSync")) , mLedgerHistory(collector, app) , standalone_(app_.config().standalone()) , fetch_depth_( @@ -916,11 +917,29 @@ LedgerMaster::checkAccept(uint256 const& hash, std::uint32_t seq) auto validations = app_.validators().negativeUNLFilter( app_.getValidations().getTrustedForLedger(hash, seq)); valCount = validations.size(); - if (valCount >= app_.validators().quorum()) + auto const quorum = app_.validators().quorum(); + + JLOG(jPartialSync_.warn()) + << "checkAccept: hash=" << hash << " seq=" << seq + << " valCount=" << valCount << " quorum=" << quorum + << " mLastValidLedger.seq=" << mLastValidLedger.second; + + if (valCount >= quorum) { std::lock_guard ml(m_mutex); if (seq > mLastValidLedger.second) + { + JLOG(jPartialSync_.warn()) + << "checkAccept: QUORUM REACHED - setting mLastValidLedger" + << " seq=" << seq << " hash=" << hash; mLastValidLedger = std::make_pair(hash, seq); + } + } + else + { + JLOG(jPartialSync_.debug()) + << "checkAccept: quorum not reached, need " << quorum + << " have " << valCount; } if (seq == mValidLedgerSeq) diff --git a/src/xrpld/app/main/Application.cpp b/src/xrpld/app/main/Application.cpp index bfbfe660d5..a0e253a4dc 100644 --- a/src/xrpld/app/main/Application.cpp +++ b/src/xrpld/app/main/Application.cpp @@ -299,6 +299,7 @@ class ApplicationImp : public Application, public BasicApp logs_->journal("Collector"))) , m_jobQueue(std::make_unique( + get_io_service(), [](std::unique_ptr const& config) { if (config->standalone() && !config->FORCE_MULTI_THREAD) return 1; diff --git a/src/xrpld/app/misc/NetworkOPs.cpp b/src/xrpld/app/misc/NetworkOPs.cpp index 017e800ecf..bb94add5ac 100644 --- a/src/xrpld/app/misc/NetworkOPs.cpp +++ b/src/xrpld/app/misc/NetworkOPs.cpp @@ -225,6 +225,9 @@ class NetworkOPsImp final : public NetworkOPs bool bLocal, FailHard failType) override; + std::optional + broadcastRawTransaction(Blob const& txBlob) override; + /** * For transactions submitted directly by a client, apply batch of * transactions and wait for this transaction to complete. @@ -823,11 +826,13 @@ NetworkOPsImp::isNeedNetworkLedger() return needNetworkLedger_; } +//@@start is-full-check inline bool NetworkOPsImp::isFull() { return !needNetworkLedger_ && (mMode == OperatingMode::FULL); } +//@@end is-full-check std::string NetworkOPsImp::getHostId(bool forAdmin) @@ -1224,6 +1229,43 @@ NetworkOPsImp::processTransaction( doTransactionAsync(transaction, bUnlimited, failType); } +std::optional +NetworkOPsImp::broadcastRawTransaction(Blob const& txBlob) +{ + // Parse the transaction blob to get the hash + std::shared_ptr stx; + try + { + SerialIter sit(makeSlice(txBlob)); + stx = std::make_shared(std::ref(sit)); + } + catch (std::exception const& e) + { + JLOG(m_journal.warn()) + << "broadcastRawTransaction: Failed to parse tx blob: " << e.what(); + return std::nullopt; + } + + uint256 txHash = stx->getTransactionID(); + + // Broadcast to all peers without local validation + protocol::TMTransaction msg; + Serializer s; + stx->add(s); + msg.set_rawtransaction(s.data(), s.size()); + msg.set_status(protocol::tsNEW); // tsNEW = origin node could not validate + msg.set_receivetimestamp( + app_.timeKeeper().now().time_since_epoch().count()); + + app_.overlay().foreach( + send_always(std::make_shared(msg, protocol::mtTRANSACTION))); + + JLOG(m_journal.info()) << "broadcastRawTransaction: Broadcast tx " + << txHash; + + return txHash; +} + void NetworkOPsImp::doTransactionAsync( std::shared_ptr transaction, @@ -1494,6 +1536,7 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) bool const isEmitted = hook::isEmittedTxn(*(e.transaction->getSTransaction())); + //@@start tx-relay if (toSkip && !isEmitted) { protocol::TMTransaction tx; @@ -1509,6 +1552,7 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) app_.overlay().relay(e.transaction->getID(), tx, *toSkip); e.transaction->setBroadcast(); } + //@@end tx-relay } if (validatedLedgerIndex) @@ -1741,6 +1785,14 @@ NetworkOPsImp::checkLastClosedLedger( if (!switchLedgers) return false; + // Safety check: can't acquire a ledger with an invalid hash + if (!closedLedger.isNonZero()) + { + JLOG(m_journal.warn()) + << "checkLastClosedLedger: closedLedger hash is zero, skipping"; + return false; + } + auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger); if (!consensus) @@ -1963,6 +2015,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr const& clog) // timing to make sure there shouldn't be a newer LCL. We need this // information to do the next three tests. + //@@start mode-transitions if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::SYNCING)) && !ledgerChange) @@ -1988,8 +2041,11 @@ NetworkOPsImp::endConsensus(std::unique_ptr const& clog) setMode(OperatingMode::FULL); } } + //@@end mode-transitions + //@@start consensus-gate beginConsensus(networkClosed, clog); + //@@end consensus-gate } void diff --git a/src/xrpld/app/misc/NetworkOPs.h b/src/xrpld/app/misc/NetworkOPs.h index aafef2585a..f3ed3fd7e8 100644 --- a/src/xrpld/app/misc/NetworkOPs.h +++ b/src/xrpld/app/misc/NetworkOPs.h @@ -32,6 +32,7 @@ #include #include #include +#include #include namespace ripple { @@ -112,6 +113,17 @@ class NetworkOPs : public InfoSub::Source bool bLocal, FailHard failType) = 0; + /** + * Broadcast a raw transaction to peers without local validation. + * Used by submit_and_wait during partial sync mode when local state + * is not available for validation. + * + * @param txBlob The raw serialized transaction blob + * @return The transaction hash, or nullopt if parsing failed + */ + virtual std::optional + broadcastRawTransaction(Blob const& txBlob) = 0; + //-------------------------------------------------------------------------- // // Owner functions diff --git a/src/xrpld/app/misc/StateAccounting.h b/src/xrpld/app/misc/StateAccounting.h index c120e2bd1c..69fe4f7eb8 100644 --- a/src/xrpld/app/misc/StateAccounting.h +++ b/src/xrpld/app/misc/StateAccounting.h @@ -31,6 +31,7 @@ namespace ripple { not change them without verifying each use and ensuring that it is not a breaking change. */ +//@@start operating-mode-enum enum class OperatingMode { DISCONNECTED = 0, //!< not ready to process requests CONNECTED = 1, //!< convinced we are talking to the network @@ -38,6 +39,7 @@ enum class OperatingMode { TRACKING = 3, //!< convinced we agree with the network FULL = 4 //!< we have the ledger and can even validate }; +//@@end operating-mode-enum class StateAccounting { diff --git a/src/xrpld/app/misc/detail/ValidatorList.cpp b/src/xrpld/app/misc/detail/ValidatorList.cpp index 3a81443db6..d3d96f15e8 100644 --- a/src/xrpld/app/misc/detail/ValidatorList.cpp +++ b/src/xrpld/app/misc/detail/ValidatorList.cpp @@ -966,6 +966,16 @@ ValidatorList::applyListsAndBroadcast( if (good) { networkOPs.clearUNLBlocked(); + // For partial sync: trigger early quorum calculation so + // validations can be trusted before consensus starts + JLOG(j_.warn()) << "All publisher lists available, triggering " + "early updateTrusted for partial sync"; + updateTrusted( + {}, // empty seenValidators - we just need quorum calculated + timeKeeper_.now(), + networkOPs, + overlay, + hashRouter); } } bool broadcast = disposition <= ListDisposition::known_sequence; diff --git a/src/xrpld/app/misc/detail/ValidatorSite.cpp b/src/xrpld/app/misc/detail/ValidatorSite.cpp index 1994f49861..e6c71c947f 100644 --- a/src/xrpld/app/misc/detail/ValidatorSite.cpp +++ b/src/xrpld/app/misc/detail/ValidatorSite.cpp @@ -166,6 +166,7 @@ ValidatorSite::load( void ValidatorSite::start() { + JLOG(j_.warn()) << "ValidatorSite::start() called"; std::lock_guard l0{sites_mutex_}; std::lock_guard l1{state_mutex_}; if (timer_.expires_at() == clock_type::time_point{}) @@ -218,6 +219,11 @@ ValidatorSite::setTimer( if (next != sites_.end()) { pending_ = next->nextRefresh <= clock_type::now(); + auto delay = std::chrono::duration_cast( + next->nextRefresh - clock_type::now()); + JLOG(j_.warn()) << "ValidatorSite::setTimer() pending=" << pending_ + << " delay=" << delay.count() << "ms" + << " uri=" << next->startingResource->uri; cv_.notify_all(); timer_.expires_at(next->nextRefresh); auto idx = std::distance(sites_.begin(), next); @@ -225,6 +231,10 @@ ValidatorSite::setTimer( this->onTimer(idx, ec); }); } + else + { + JLOG(j_.warn()) << "ValidatorSite::setTimer() no sites configured"; + } } void diff --git a/src/xrpld/core/Coro.ipp b/src/xrpld/core/Coro.ipp index 5901e07c68..3469cc7add 100644 --- a/src/xrpld/core/Coro.ipp +++ b/src/xrpld/core/Coro.ipp @@ -21,6 +21,8 @@ #define RIPPLE_CORE_COROINL_H_INCLUDED #include +#include +#include namespace ripple { @@ -48,6 +50,7 @@ JobQueue::Coro::Coro( }, boost::coroutines::attributes(megabytes(1))) { + lvs_.coroPtr = this; } inline JobQueue::Coro::~Coro() @@ -57,6 +60,7 @@ inline JobQueue::Coro::~Coro() #endif } +//@@start coro-yield inline void JobQueue::Coro::yield() const { @@ -66,6 +70,7 @@ JobQueue::Coro::yield() const } (*yield_)(); } +//@@end coro-yield inline bool JobQueue::Coro::post() @@ -89,6 +94,7 @@ JobQueue::Coro::post() return false; } +//@@start coro-resume inline void JobQueue::Coro::resume() { @@ -113,6 +119,7 @@ JobQueue::Coro::resume() running_ = false; cv_.notify_all(); } +//@@end coro-resume inline bool JobQueue::Coro::runnable() const @@ -148,6 +155,65 @@ JobQueue::Coro::join() cv_.wait(lk, [this]() { return running_ == false; }); } +inline bool +JobQueue::Coro::postAndYield() +{ + { + std::lock_guard lk(mutex_run_); + running_ = true; + } + + // Flag starts false - will be set true right before yield() + yielding_.store(false, std::memory_order_release); + + // Post a job that waits for yield to be ready, then resumes + if (!jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { + // Spin-wait until yield() is about to happen + // yielding_ is set true immediately before (*yield_)() is called + while (!yielding_.load(std::memory_order_acquire)) + std::this_thread::yield(); + resume(); + })) + { + std::lock_guard lk(mutex_run_); + running_ = false; + cv_.notify_all(); + return false; + } + + // Signal that we're about to yield, then yield + yielding_.store(true, std::memory_order_release); + yield(); + + // Clear flag after resuming + yielding_.store(false, std::memory_order_release); + return true; +} + +inline bool +JobQueue::Coro::sleepFor(std::chrono::milliseconds delay) +{ + { + std::lock_guard lk(mutex_run_); + running_ = true; + } + + // Use an asio timer on the existing io_service thread pool + // instead of spawning a detached thread per sleep call + auto timer = + std::make_shared(jq_.io_service_); + timer->expires_after(delay); + timer->async_wait( + [sp = shared_from_this(), timer]( + boost::system::error_code const& ec) { + if (ec != boost::asio::error::operation_aborted) + sp->post(); + }); + + yield(); + return true; +} + } // namespace ripple #endif diff --git a/src/xrpld/core/JobQueue.h b/src/xrpld/core/JobQueue.h index 06f90dd1c4..bde0452c94 100644 --- a/src/xrpld/core/JobQueue.h +++ b/src/xrpld/core/JobQueue.h @@ -26,9 +26,11 @@ #include #include #include +#include #include #include // workaround for boost 1.72 bug #include // workaround for boost 1.72 bug +#include namespace ripple { @@ -69,6 +71,7 @@ class JobQueue : private Workers::Callback std::condition_variable cv_; boost::coroutines::asymmetric_coroutine::pull_type coro_; boost::coroutines::asymmetric_coroutine::push_type* yield_; + std::atomic yielding_{false}; // For postAndYield synchronization #ifndef NDEBUG bool finished_ = false; #endif @@ -136,11 +139,28 @@ class JobQueue : private Workers::Callback /** Waits until coroutine returns from the user function. */ void join(); + + /** Combined post and yield for poll-wait patterns. + Safely schedules resume before yielding, avoiding race conditions. + @return true if successfully posted and yielded, false if job queue + stopping. + */ + bool + postAndYield(); + + /** Sleep for a duration without blocking the job queue thread. + Yields the coroutine and schedules resume after the delay. + @param delay The duration to sleep. + @return true if successfully slept, false if job queue stopping. + */ + bool + sleepFor(std::chrono::milliseconds delay); }; using JobFunction = std::function; JobQueue( + boost::asio::io_service& io_service, int threadCount, beast::insight::Collector::ptr const& collector, beast::Journal journal, @@ -242,6 +262,7 @@ class JobQueue : private Workers::Callback using JobDataMap = std::map; + boost::asio::io_service& io_service_; beast::Journal m_journal; mutable std::mutex m_mutex; std::uint64_t m_lastJob; diff --git a/src/xrpld/core/detail/JobQueue.cpp b/src/xrpld/core/detail/JobQueue.cpp index 5eb1f24d43..61967763ba 100644 --- a/src/xrpld/core/detail/JobQueue.cpp +++ b/src/xrpld/core/detail/JobQueue.cpp @@ -25,12 +25,14 @@ namespace ripple { JobQueue::JobQueue( + boost::asio::io_service& io_service, int threadCount, beast::insight::Collector::ptr const& collector, beast::Journal journal, Logs& logs, perf::PerfLog& perfLog) - : m_journal(journal) + : io_service_(io_service) + , m_journal(journal) , m_lastJob(0) , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs) , m_processCount(0) diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 700966b6d0..6e030eac63 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -2493,6 +2493,12 @@ PeerImp::onMessage(std::shared_ptr const& m) bool pLDo = true; bool progress = false; + // For state/transaction node requests, store directly to db + // (not fetch pack) so partial sync queries can find them immediately + bool const directStore = + packet.type() == protocol::TMGetObjectByHash::otSTATE_NODE || + packet.type() == protocol::TMGetObjectByHash::otTRANSACTION_NODE; + for (int i = 0; i < packet.objects_size(); ++i) { const protocol::TMIndexedObject& obj = packet.objects(i); @@ -2525,10 +2531,33 @@ PeerImp::onMessage(std::shared_ptr const& m) { uint256 const hash{obj.hash()}; - app_.getLedgerMaster().addFetchPack( - hash, - std::make_shared( - obj.data().begin(), obj.data().end())); + if (directStore) + { + // Store directly to node store for immediate + // availability + auto const hotType = + (packet.type() == + protocol::TMGetObjectByHash::otSTATE_NODE) + ? hotACCOUNT_NODE + : hotTRANSACTION_NODE; + + JLOG(p_journal_.warn()) + << "PRIORITY: received node " << hash << " for seq " + << pLSeq << " storing to db"; + + app_.getNodeStore().store( + hotType, + Blob(obj.data().begin(), obj.data().end()), + hash, + pLSeq); + } + else + { + app_.getLedgerMaster().addFetchPack( + hash, + std::make_shared( + obj.data().begin(), obj.data().end())); + } } } } diff --git a/src/xrpld/rpc/detail/Handler.cpp b/src/xrpld/rpc/detail/Handler.cpp index 2dee29c2a6..8a9e8b41fe 100644 --- a/src/xrpld/rpc/detail/Handler.cpp +++ b/src/xrpld/rpc/detail/Handler.cpp @@ -174,6 +174,7 @@ Handler const handlerArray[]{ byRef(&doSubmitMultiSigned), Role::USER, NEEDS_CURRENT_LEDGER}, + {"submit_and_wait", byRef(&doSubmitAndWait), Role::USER, NO_CONDITION}, {"transaction_entry", byRef(&doTransactionEntry), Role::USER, NO_CONDITION}, {"tx", byRef(&doTxJson), Role::USER, NEEDS_NETWORK_CONNECTION}, {"tx_history", byRef(&doTxHistory), Role::USER, NO_CONDITION, 1, 1}, diff --git a/src/xrpld/rpc/detail/Handler.h b/src/xrpld/rpc/detail/Handler.h index cb1a2579ec..e3c5fe81a3 100644 --- a/src/xrpld/rpc/detail/Handler.h +++ b/src/xrpld/rpc/detail/Handler.h @@ -93,6 +93,7 @@ conditionMet(Condition condition_required, T& context) return rpcEXPIRED_VALIDATOR_LIST; } + //@@start network-condition-check if ((condition_required != NO_CONDITION) && (context.netOps.getOperatingMode() < OperatingMode::SYNCING)) { @@ -103,6 +104,7 @@ conditionMet(Condition condition_required, T& context) return rpcNO_NETWORK; return rpcNOT_SYNCED; } + //@@end network-condition-check if (!context.app.config().standalone() && condition_required != NO_CONDITION) diff --git a/src/xrpld/rpc/detail/RPCHelpers.cpp b/src/xrpld/rpc/detail/RPCHelpers.cpp index e5dbb2ef43..2da768b84d 100644 --- a/src/xrpld/rpc/detail/RPCHelpers.cpp +++ b/src/xrpld/rpc/detail/RPCHelpers.cpp @@ -17,6 +17,7 @@ */ //============================================================================== +#include #include #include #include @@ -28,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -37,7 +39,6 @@ #include #include -#include #include namespace ripple { @@ -574,6 +575,11 @@ Status getLedger(T& ledger, uint256 const& ledgerHash, Context& context) { ledger = context.ledgerMaster.getLedgerByHash(ledgerHash); + if (ledger == nullptr) + { + // Partial sync fallback: try to get incomplete ledger being acquired + ledger = context.app.getInboundLedgers().getPartialLedger(ledgerHash); + } if (ledger == nullptr) return {rpcLGR_NOT_FOUND, "ledgerNotFound"}; return Status::OK; @@ -611,6 +617,14 @@ getLedger(T& ledger, uint32_t ledgerIndex, Context& context) } } + // Partial sync fallback: try to get incomplete ledger being acquired + if (ledger == nullptr) + { + auto hash = context.ledgerMaster.getHashBySeq(ledgerIndex); + if (hash.isNonZero()) + ledger = context.app.getInboundLedgers().getPartialLedger(hash); + } + if (ledger == nullptr) return {rpcLGR_NOT_FOUND, "ledgerNotFound"}; @@ -633,16 +647,87 @@ template Status getLedger(T& ledger, LedgerShortcut shortcut, Context& context) { - if (isValidatedOld(context.ledgerMaster, context.app.config().standalone())) - { - if (context.apiVersion == 1) - return {rpcNO_NETWORK, "InsufficientNetworkMode"}; - return {rpcNOT_SYNCED, "notSynced"}; - } + //@@start sync-validation + // TODO: Re-enable for production. Disabled for partial sync testing. + // if (isValidatedOld(context.ledgerMaster, + // context.app.config().standalone())) + // { + // if (context.apiVersion == 1) + // return {rpcNO_NETWORK, "InsufficientNetworkMode"}; + // return {rpcNOT_SYNCED, "notSynced"}; + // } + //@@end sync-validation if (shortcut == LedgerShortcut::VALIDATED) { ledger = context.ledgerMaster.getValidatedLedger(); + + // Partial sync fallback: try to get incomplete validated ledger + if (ledger == nullptr) + { + auto [hash, seq] = context.ledgerMaster.getLastValidatedLedger(); + JLOG(context.j.warn()) + << "Partial sync: getValidatedLedger null, trying trusted hash=" + << hash << " seq=" << seq; + + // If no trusted validations yet, try network-observed ledger + if (hash.isZero()) + { + std::tie(hash, seq) = + context.ledgerMaster.getNetworkObservedLedger(); + JLOG(context.j.warn()) + << "Partial sync: trying network-observed hash=" << hash + << " seq=" << seq; + + // Poll-wait for validations to arrive (up to ~10 seconds) + if (hash.isZero() && context.coro) + { + for (int i = 0; i < 100 && hash.isZero(); ++i) + { + context.coro->sleepFor(std::chrono::milliseconds(100)); + std::tie(hash, seq) = + context.ledgerMaster.getNetworkObservedLedger(); + } + if (hash.isNonZero()) + { + JLOG(context.j.warn()) + << "Partial sync: got network-observed hash=" + << hash << " seq=" << seq; + } + } + } + + if (hash.isNonZero()) + { + setPartialSyncWait(true); + ledger = context.app.getInboundLedgers().getPartialLedger(hash); + // If no InboundLedger exists yet, trigger acquisition and wait + if (!ledger) + { + JLOG(context.j.warn()) + << "Partial sync: acquiring ledger " << hash; + context.app.getInboundLedgers().acquire( + hash, seq, InboundLedger::Reason::CONSENSUS); + + // Poll-wait for the ledger header (up to ~10 seconds) + int i = 0; + for (; i < 100 && !ledger && context.coro; ++i) + { + context.coro->sleepFor(std::chrono::milliseconds(100)); + ledger = + context.app.getInboundLedgers().getPartialLedger( + hash); + } + JLOG(context.j.warn()) + << "Partial sync: poll-wait completed after " << i + << " iterations, ledger=" + << (ledger ? "found" : "null"); + } + } + JLOG(context.j.warn()) << "Partial sync: getPartialLedger returned " + << (ledger ? "ledger" : "null"); + } + if (ledger == nullptr) { if (context.apiVersion == 1) diff --git a/src/xrpld/rpc/detail/ServerHandler.cpp b/src/xrpld/rpc/detail/ServerHandler.cpp index fafc56e752..20aad04196 100644 --- a/src/xrpld/rpc/detail/ServerHandler.cpp +++ b/src/xrpld/rpc/detail/ServerHandler.cpp @@ -315,12 +315,14 @@ ServerHandler::onRequest(Session& session) } std::shared_ptr detachedSession = session.detach(); + //@@start rpc-coro-usage auto const postResult = m_jobQueue.postCoro( jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr coro) { processSession(detachedSession, coro); }); + //@@end rpc-coro-usage if (postResult == nullptr) { // The coroutine was rejected, probably because we're shutting down. diff --git a/src/xrpld/rpc/handlers/Handlers.h b/src/xrpld/rpc/handlers/Handlers.h index 549d576b58..76846294e1 100644 --- a/src/xrpld/rpc/handlers/Handlers.h +++ b/src/xrpld/rpc/handlers/Handlers.h @@ -147,6 +147,8 @@ doSubmit(RPC::JsonContext&); Json::Value doSubmitMultiSigned(RPC::JsonContext&); Json::Value +doSubmitAndWait(RPC::JsonContext&); +Json::Value doSubscribe(RPC::JsonContext&); Json::Value doTransactionEntry(RPC::JsonContext&); diff --git a/src/xrpld/rpc/handlers/SubmitAndWait.cpp b/src/xrpld/rpc/handlers/SubmitAndWait.cpp new file mode 100644 index 0000000000..4ee3b91ee1 --- /dev/null +++ b/src/xrpld/rpc/handlers/SubmitAndWait.cpp @@ -0,0 +1,335 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2024 XRPL Labs + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +// Custom journal partition for submit_and_wait debugging +// Configure with [rpc_startup] { "command": "log_level", "partition": +// "SubmitAndWait", "severity": "debug" } +#define SWLOG(level) JLOG(context.app.journal("SubmitAndWait").level()) + +// { +// tx_blob: +// timeout: +// } +// +// Submit a transaction and wait for it to appear in a VALIDATED ledger. +// Designed for partial sync mode where the node may not have full state +// to validate locally - broadcasts raw transaction and monitors incoming +// ledgers for the result. +// +// The handler waits until: +// 1. Transaction is found in a ledger, AND +// 2. That ledger reaches validation quorum (enough trusted validators) +// +// Response: +// "validated": true - Transaction confirmed in validated ledger +// "error": "timeout" - Timeout waiting +// "error": "expired" - LastLedgerSequence exceeded +Json::Value +doSubmitAndWait(RPC::JsonContext& context) +{ + Json::Value jvResult; + + // Must have coroutine for polling + if (!context.coro) + { + return RPC::make_error( + rpcINTERNAL, "submit_and_wait requires coroutine context"); + } + + // Parse tx_blob + if (!context.params.isMember(jss::tx_blob)) + { + return rpcError(rpcINVALID_PARAMS); + } + + auto const txBlobHex = context.params[jss::tx_blob].asString(); + auto const txBlob = strUnHex(txBlobHex); + + if (!txBlob || txBlob->empty()) + { + return rpcError(rpcINVALID_PARAMS); + } + + // Parse the transaction to get hash and LastLedgerSequence + std::shared_ptr stx; + try + { + SerialIter sit(makeSlice(*txBlob)); + stx = std::make_shared(std::ref(sit)); + } + catch (std::exception& e) + { + jvResult[jss::error] = "invalidTransaction"; + jvResult[jss::error_exception] = e.what(); + return jvResult; + } + + uint256 const txHash = stx->getTransactionID(); + + // Extract LastLedgerSequence if present + std::optional lastLedgerSeq; + if (stx->isFieldPresent(sfLastLedgerSequence)) + { + lastLedgerSeq = stx->getFieldU32(sfLastLedgerSequence); + } + + // Parse timeout (default 60 seconds, max 120 seconds) + auto timeout = std::chrono::seconds(60); + if (context.params.isMember("timeout")) + { + auto const t = context.params["timeout"].asUInt(); + if (t > 120) + { + return RPC::make_error( + rpcINVALID_PARAMS, "timeout must be <= 120 seconds"); + } + timeout = std::chrono::seconds(t); + } + + // Enable partial sync wait for SHAMap operations + setPartialSyncWait(true); + setCoroFetchTimeout( + std::chrono::duration_cast(timeout / 2)); + + SWLOG(warn) << "starting for tx=" << txHash + << " lastLedgerSeq=" << (lastLedgerSeq ? *lastLedgerSeq : 0) + << " timeout=" << timeout.count() << "s"; + + // Poll for the transaction result + constexpr auto pollInterval = std::chrono::milliseconds(10); + auto const startTime = std::chrono::steady_clock::now(); + + // Broadcast IMMEDIATELY - don't wait for anything + SWLOG(warn) << "broadcasting tx=" << txHash; + auto broadcastResult = context.netOps.broadcastRawTransaction(*txBlob); + if (!broadcastResult) + { + SWLOG(warn) << "broadcast FAILED for tx=" << txHash; + jvResult[jss::error] = "broadcastFailed"; + jvResult[jss::error_exception] = + "Failed to parse/broadcast transaction"; + return jvResult; + } + SWLOG(warn) << "broadcast SUCCESS for tx=" << txHash; + + // Prioritize TX fetching for ledgers in our window + // This makes TX nodes fetch before state nodes for faster detection + auto const startSeq = context.ledgerMaster.getValidLedgerIndex(); + auto const endSeq = lastLedgerSeq.value_or(startSeq + 20); + context.app.getInboundLedgers().prioritizeTxForLedgers(startSeq, endSeq); + + jvResult[jss::tx_hash] = to_string(txHash); + jvResult[jss::broadcast] = true; + + // Track when we find the tx and in which ledger + std::optional foundLedgerHash; + std::optional foundLedgerSeq; + + // Track last checked seq to avoid rescanning old ledgers + auto lastCheckedSeq = startSeq; + + // Helper to check if a ledger is validated (has quorum) + auto isLedgerValidated = [&](uint256 const& ledgerHash) -> bool { + auto const quorum = context.app.validators().quorum(); + if (quorum == 0) + return false; // No validators configured + + auto const valCount = + context.app.getValidations().numTrustedForLedger(ledgerHash); + + return valCount >= quorum; + }; + + // Helper to read tx result from a ledger + auto readTxResult = [&](std::shared_ptr const& ledger, + std::string const& source) -> bool { + if (!ledger) + return false; + + auto [sttx, stobj] = ledger->txRead(txHash); + if (!sttx || !stobj) + return false; + + jvResult[jss::status] = "success"; + jvResult[jss::validated] = true; + jvResult["found_via"] = source; + jvResult[jss::tx_json] = sttx->getJson(JsonOptions::none); + jvResult[jss::metadata] = stobj->getJson(JsonOptions::none); + jvResult[jss::ledger_hash] = to_string(ledger->info().hash); + jvResult[jss::ledger_index] = ledger->info().seq; + + // Extract result code from metadata + if (stobj->isFieldPresent(sfTransactionResult)) + { + auto const result = + TER::fromInt(stobj->getFieldU8(sfTransactionResult)); + std::string token; + std::string human; + transResultInfo(result, token, human); + jvResult[jss::engine_result] = token; + jvResult[jss::engine_result_code] = TERtoInt(result); + jvResult[jss::engine_result_message] = human; + } + + return true; + }; + + while (true) + { + auto const elapsed = std::chrono::steady_clock::now() - startTime; + if (elapsed >= timeout) + { + jvResult[jss::error] = "transactionTimeout"; + jvResult[jss::error_message] = + "Transaction not validated within timeout period"; + if (foundLedgerSeq) + { + jvResult["found_in_ledger"] = *foundLedgerSeq; + auto const valCount = + context.app.getValidations().numTrustedForLedger( + *foundLedgerHash); + auto const quorum = context.app.validators().quorum(); + jvResult["validation_count"] = + static_cast(valCount); + jvResult["quorum"] = static_cast(quorum); + } + return jvResult; + } + + // If we already found the tx, check if its ledger is now validated + if (foundLedgerHash) + { + if (isLedgerValidated(*foundLedgerHash)) + { + // Ledger is validated! Try to read from InboundLedgers first + auto ledger = context.app.getInboundLedgers().getPartialLedger( + *foundLedgerHash); + if (ledger && readTxResult(ledger, "InboundLedgers")) + { + return jvResult; + } + // Try LedgerMaster (for when synced) + if (foundLedgerSeq) + { + ledger = + context.ledgerMaster.getLedgerBySeq(*foundLedgerSeq); + if (ledger && readTxResult(ledger, "LedgerMaster")) + { + return jvResult; + } + } + // Ledger validated but can't read yet - keep waiting + } + } + else + { + auto const currentValidatedSeq = + context.ledgerMaster.getValidLedgerIndex(); + + // Search InboundLedgers for the tx (partial sync mode) + auto const ledgerHash = + context.app.getInboundLedgers().findTxLedger(txHash); + + if (ledgerHash) + { + auto const ledger = + context.app.getInboundLedgers().getPartialLedger( + *ledgerHash); + + if (ledger) + { + foundLedgerHash = ledgerHash; + foundLedgerSeq = ledger->info().seq; + SWLOG(warn) << "FOUND tx in InboundLedgers seq=" + << ledger->info().seq; + + if (isLedgerValidated(*ledgerHash)) + { + if (readTxResult(ledger, "InboundLedgers")) + { + return jvResult; + } + } + } + } + + // Search LedgerMaster for the tx (synced mode via gossip) + // Only check new ledgers since last iteration + if (!foundLedgerHash) + { + for (auto seq = lastCheckedSeq; seq <= currentValidatedSeq; + ++seq) + { + auto ledger = context.ledgerMaster.getLedgerBySeq(seq); + if (ledger) + { + auto [sttx, stobj] = ledger->txRead(txHash); + if (sttx && stobj) + { + foundLedgerHash = ledger->info().hash; + foundLedgerSeq = seq; + SWLOG(warn) + << "FOUND tx in LedgerMaster seq=" << seq; + + // LedgerMaster ledgers are already validated + if (readTxResult(ledger, "LedgerMaster")) + { + return jvResult; + } + } + } + } + lastCheckedSeq = currentValidatedSeq + 1; + } + + // Check LastLedgerSequence expiry + if (lastLedgerSeq && currentValidatedSeq > *lastLedgerSeq) + { + jvResult[jss::error] = "transactionExpired"; + jvResult[jss::error_message] = + "LastLedgerSequence exceeded and transaction not found"; + jvResult["last_ledger_sequence"] = *lastLedgerSeq; + jvResult["validated_ledger"] = currentValidatedSeq; + return jvResult; + } + } + + // Sleep and continue polling + context.coro->sleepFor(pollInterval); + } +} + +} // namespace ripple diff --git a/src/xrpld/shamap/Family.h b/src/xrpld/shamap/Family.h index bbb22c273d..19492835d7 100644 --- a/src/xrpld/shamap/Family.h +++ b/src/xrpld/shamap/Family.h @@ -68,9 +68,14 @@ class Family * * @param refNum Sequence of ledger to acquire. * @param nodeHash Hash of missing node to report in throw. + * @param prioritize If true, prioritize fetching this specific node + * (used by partial sync mode for RPC queries). */ virtual void - missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash) = 0; + missingNodeAcquireBySeq( + std::uint32_t refNum, + uint256 const& nodeHash, + bool prioritize = false) = 0; /** Acquire ledger that has a missing node by ledger hash * diff --git a/src/xrpld/shamap/NodeFamily.h b/src/xrpld/shamap/NodeFamily.h index 4062ea2389..6dc93f4ca1 100644 --- a/src/xrpld/shamap/NodeFamily.h +++ b/src/xrpld/shamap/NodeFamily.h @@ -79,7 +79,10 @@ class NodeFamily : public Family reset() override; void - missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& hash) override; + missingNodeAcquireBySeq( + std::uint32_t seq, + uint256 const& hash, + bool prioritize = false) override; void missingNodeAcquireByHash(uint256 const& hash, std::uint32_t seq) override diff --git a/src/xrpld/shamap/SHAMapInnerNode.h b/src/xrpld/shamap/SHAMapInnerNode.h index d2791915c3..f945c6464f 100644 --- a/src/xrpld/shamap/SHAMapInnerNode.h +++ b/src/xrpld/shamap/SHAMapInnerNode.h @@ -213,6 +213,7 @@ SHAMapInnerNode::getBranchCount() const return popcnt16(isBranch_); } +//@@start full-below-methods inline bool SHAMapInnerNode::isFullBelow(std::uint32_t generation) const { @@ -224,6 +225,7 @@ SHAMapInnerNode::setFullBelowGen(std::uint32_t gen) { fullBelowGen_ = gen; } +//@@end full-below-methods } // namespace ripple #endif diff --git a/src/xrpld/shamap/SHAMapMissingNode.h b/src/xrpld/shamap/SHAMapMissingNode.h index 50aa193b2b..149b7e5ce4 100644 --- a/src/xrpld/shamap/SHAMapMissingNode.h +++ b/src/xrpld/shamap/SHAMapMissingNode.h @@ -29,11 +29,13 @@ namespace ripple { +//@@start shamap-type-enum enum class SHAMapType { TRANSACTION = 1, // A tree of transactions STATE = 2, // A tree of state nodes FREE = 3, // A tree not part of a ledger }; +//@@end shamap-type-enum inline std::string to_string(SHAMapType t) @@ -52,6 +54,7 @@ to_string(SHAMapType t) } } +//@@start shamap-missing-node-class class SHAMapMissingNode : public std::runtime_error { public: @@ -67,6 +70,7 @@ class SHAMapMissingNode : public std::runtime_error { } }; +//@@end shamap-missing-node-class } // namespace ripple diff --git a/src/xrpld/shamap/detail/NodeFamily.cpp b/src/xrpld/shamap/detail/NodeFamily.cpp index bf95003aef..57c3b22160 100644 --- a/src/xrpld/shamap/detail/NodeFamily.cpp +++ b/src/xrpld/shamap/detail/NodeFamily.cpp @@ -21,7 +21,6 @@ #include #include #include -#include namespace ripple { @@ -66,9 +65,18 @@ NodeFamily::reset() } void -NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash) +NodeFamily::missingNodeAcquireBySeq( + std::uint32_t seq, + uint256 const& nodeHash, + bool prioritize) { - JLOG(j_.error()) << "Missing node in " << seq; + JLOG(j_.error()) << "Missing node in " << seq << " hash=" << nodeHash + << (prioritize ? " [PRIORITY]" : ""); + + // Add priority for the specific node hash needed by the query + if (prioritize && nodeHash.isNonZero()) + app_.getInboundLedgers().addPriorityNode(seq, nodeHash); + std::unique_lock lock(maxSeqMutex_); if (maxSeq_ == 0) { diff --git a/src/xrpld/shamap/detail/SHAMap.cpp b/src/xrpld/shamap/detail/SHAMap.cpp index a6b7563ca9..9322ee2424 100644 --- a/src/xrpld/shamap/detail/SHAMap.cpp +++ b/src/xrpld/shamap/detail/SHAMap.cpp @@ -17,13 +17,16 @@ */ //============================================================================== +#include #include #include #include #include #include #include +#include #include +#include namespace ripple { @@ -154,6 +157,7 @@ SHAMap::walkTowardsKey(uint256 const& id, SharedPtrNodeStack* stack) const return static_cast(inNode.get()); } +//@@start find-key SHAMapLeafNode* SHAMap::findKey(uint256 const& id) const { @@ -162,6 +166,7 @@ SHAMap::findKey(uint256 const& id) const leaf = nullptr; return leaf; } +//@@end find-key std::shared_ptr SHAMap::fetchNodeFromDB(SHAMapHash const& hash) const @@ -187,6 +192,70 @@ SHAMap::finishFetch( full_ = false; f_.missingNodeAcquireBySeq(ledgerSeq_, hash.as_uint256()); } + + // If partial sync wait is enabled, poll-wait for the node + if (isPartialSyncWaitEnabled()) + if (auto* coro = + static_cast(getCurrentCoroPtr())) + { + using namespace std::chrono; + constexpr auto pollInterval = 50ms; + constexpr auto defaultTimeout = 30s; + // Use coroutine-local timeout if set, otherwise default + auto coroTimeout = getCoroFetchTimeout(); + auto timeout = + coroTimeout.count() > 0 ? coroTimeout : defaultTimeout; + auto const deadline = steady_clock::now() + timeout; + + // Linear backoff for re-requests: 50ms, 100ms, 150ms... up + // to 2s + auto nextRequestDelay = 50ms; + constexpr auto maxRequestDelay = 2000ms; + constexpr auto backoffStep = 50ms; + auto nextRequestTime = + steady_clock::now() + nextRequestDelay; + + JLOG(journal_.debug()) + << "finishFetch: waiting for node " << hash; + + while (steady_clock::now() < deadline) + { + // Sleep for the poll interval (yields coroutine, frees + // job thread) + coro->sleepFor(pollInterval); + + // Try to fetch from cache/db again + if (auto obj = f_.db().fetchNodeObject( + hash.as_uint256(), ledgerSeq_)) + { + JLOG(journal_.debug()) + << "finishFetch: got node " << hash; + auto node = SHAMapTreeNode::makeFromPrefix( + makeSlice(obj->getData()), hash); + if (node) + canonicalize(hash, node); + return node; + } + + // Re-request with priority using linear backoff + auto now = steady_clock::now(); + if (now >= nextRequestTime) + { + f_.missingNodeAcquireBySeq( + ledgerSeq_, + hash.as_uint256(), + true /*prioritize*/); + // Increase delay for next request (linear backoff) + if (nextRequestDelay < maxRequestDelay) + nextRequestDelay += backoffStep; + nextRequestTime = now + nextRequestDelay; + } + } + + JLOG(journal_.warn()) + << "finishFetch: timeout waiting for node " << hash; + } + return {}; } @@ -268,6 +337,7 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const } */ +//@@start fetch-with-timeout std::shared_ptr SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const { @@ -309,6 +379,7 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const return nullptr; } +//@@end fetch-with-timeout std::shared_ptr SHAMap::fetchNodeNT(SHAMapHash const& hash) const @@ -333,6 +404,7 @@ SHAMap::fetchNode(SHAMapHash const& hash) const return node; } +//@@start throw-on-missing SHAMapTreeNode* SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const { @@ -343,6 +415,7 @@ SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const return ret; } +//@@end throw-on-missing std::shared_ptr SHAMap::descendThrow(std::shared_ptr const& parent, int branch) @@ -435,6 +508,7 @@ SHAMap::descend( return std::make_pair(child, parentID.getChildNodeID(branch)); } +//@@start async-fetch SHAMapTreeNode* SHAMap::descendAsync( SHAMapInnerNode* parent, @@ -457,6 +531,7 @@ SHAMap::descendAsync( if (filter) ptr = checkFilter(hash, filter); + //@@start db-async-fetch if (!ptr && backed_) { f_.db().asyncFetch( @@ -470,6 +545,7 @@ SHAMap::descendAsync( pending = true; return nullptr; } + //@@end db-async-fetch } if (ptr) @@ -477,6 +553,7 @@ SHAMap::descendAsync( return ptr.get(); } +//@@end async-fetch template std::shared_ptr