Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c20c948
feat: partial sync mode proof of concept
sublimator Nov 30, 2025
5d85d2d
feat: add priority node fetching and network-observed ledger tracking
sublimator Nov 30, 2025
dc5ec93
feat: trigger early quorum calculation when VL is fetched
sublimator Nov 30, 2025
8263f39
feat: add Coro::sleepFor() for non-blocking poll-waits
sublimator Nov 30, 2025
aeb2888
feat: add submit_and_wait RPC for partial sync mode
sublimator Dec 1, 2025
9ddf649
feat: add ledger range-based TX priority for faster tx detection
sublimator Dec 1, 2025
b3c4e56
feat: check LedgerMaster in submit_and_wait for synced mode
sublimator Dec 2, 2025
11cce2e
Merge remote-tracking branch 'origin/dev' into partial-sync-mode
sublimator Feb 20, 2026
ef01ac6
fix: remove leftover conflict marker text in RippledCore.cmake
sublimator Feb 20, 2026
c351266
style: fix clang-format for Coro.ipp
sublimator Feb 20, 2026
fe56844
chore: update levelization ordering
sublimator Feb 20, 2026
2ce3b3f
fix: make partial sync poll-wait opt-in via partialSyncWait flag
sublimator Feb 20, 2026
95bc06d
fix: remove debug log in onTimer() that causes use-after-free crash
sublimator Feb 20, 2026
44881c6
merge: sync with origin/dev (rippled 2.4.0 repo restructure)
sublimator Feb 24, 2026
1f0e1e0
Merge remote-tracking branch 'origin/dev' into partial-sync-mode
sublimator Feb 26, 2026
f629836
chore: format RCLValidations.cpp and SubmitAndWait.cpp
sublimator Feb 26, 2026
ab4fb65
chore: update levelization ordering
sublimator Feb 26, 2026
31044a8
fix: replace detached threads in sleepFor(), fix tx-priority leak and…
sublimator Feb 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Builds/levelization/results/ordering.txt
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,6 @@ xrpld.rpc > xrpl.protocol
xrpld.rpc > xrpl.resource
xrpld.rpc > xrpl.server
xrpld.shamap > xrpl.basics
xrpld.shamap > xrpld.core
xrpld.shamap > xrpld.nodestore
xrpld.shamap > xrpl.protocol
62 changes: 62 additions & 0 deletions include/xrpl/basics/LocalValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#define RIPPLE_BASICS_LOCALVALUE_H_INCLUDED

#include <boost/thread/tss.hpp>
#include <chrono>
#include <memory>
#include <unordered_map>

Expand All @@ -33,6 +34,16 @@ struct LocalValues
explicit LocalValues() = default;

bool onCoro = true;
void* coroPtr = nullptr; // Pointer to owning JobQueue::Coro (if any)

// When true, SHAMap::finishFetch() will poll-wait for missing nodes
// instead of returning empty. Only set by partial sync code paths.
bool partialSyncWait = false;

// Configurable timeout for SHAMap node fetching during partial sync.
// Zero means use the default (30s). RPC handlers can set this to
// customize poll-wait behavior.
std::chrono::milliseconds fetchTimeout{0};

struct BasicValue
{
Expand Down Expand Up @@ -127,6 +138,57 @@ LocalValue<T>::operator*()
.emplace(this, std::make_unique<detail::LocalValues::Value<T>>(t_))
.first->second->get());
}

// Returns pointer to current coroutine if running inside one, nullptr otherwise
inline void*
getCurrentCoroPtr()
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
return lvs->coroPtr;
return nullptr;
}

// Check if partial sync wait is enabled for the current coroutine context.
inline bool
isPartialSyncWaitEnabled()
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
return lvs->partialSyncWait;
return false;
}

// Enable/disable partial sync wait for the current coroutine context.
inline void
setPartialSyncWait(bool enabled)
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
lvs->partialSyncWait = enabled;
}

// Get the configured fetch timeout for current coroutine context.
// Returns 0ms if not in a coroutine or no custom timeout set.
inline std::chrono::milliseconds
getCoroFetchTimeout()
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
return lvs->fetchTimeout;
return std::chrono::milliseconds{0};
}

// Set the fetch timeout for the current coroutine context.
// Only works if called from within a coroutine.
inline void
setCoroFetchTimeout(std::chrono::milliseconds timeout)
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
lvs->fetchTimeout = timeout;
}

} // namespace ripple

#endif
2 changes: 2 additions & 0 deletions include/xrpl/protocol/ErrorCodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ enum error_code_i {
rpcAMENDMENT_BLOCKED = 14,

// Networking
//@@start network-error-codes
rpcNO_CLOSED = 15,
rpcNO_CURRENT = 16,
rpcNO_NETWORK = 17,
rpcNOT_SYNCED = 18,
//@@end network-error-codes

// Ledger state
rpcACT_NOT_FOUND = 19,
Expand Down
2 changes: 2 additions & 0 deletions src/libxrpl/protocol/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ constexpr static ErrorInfo unorderedErrorInfos[]{
{rpcNOT_SUPPORTED, "notSupported", "Operation not supported.", 501},
{rpcNO_CLOSED, "noClosed", "Closed ledger is unavailable.", 503},
{rpcNO_CURRENT, "noCurrent", "Current ledger is unavailable.", 503},
//@@start network-error-messages
{rpcNOT_SYNCED, "notSynced", "Not synced to the network.", 503},
{rpcNO_EVENTS, "noEvents", "Current transport does not support events.", 405},
{rpcNO_NETWORK, "noNetwork", "Not synced to the network.", 503},
//@@end network-error-messages
{rpcWRONG_NETWORK, "wrongNetwork", "Wrong network.", 503},
{rpcNO_PERMISSION, "noPermission", "You don't have permission for this command.", 401},
{rpcNO_PF_REQUEST, "noPathRequest", "No pathfinding request in progress.", 404},
Expand Down
28 changes: 28 additions & 0 deletions src/test/app/LedgerReplay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,34 @@ class MagicInboundLedgers : public InboundLedgers
return {};
}

virtual std::shared_ptr<Ledger const>
getPartialLedger(uint256 const& hash) override
{
return {};
}

virtual std::optional<uint256>
findTxLedger(uint256 const& txHash) override
{
return std::nullopt;
}

virtual void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
{
}

virtual void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
{
}

virtual bool
isTxPrioritized(std::uint32_t seq) const override
{
return false;
}

virtual bool
gotLedgerData(
LedgerHash const& ledgerHash,
Expand Down
6 changes: 4 additions & 2 deletions src/test/shamap/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ class TestNodeFamily : public Family
}

void
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash)
override
missingNodeAcquireBySeq(
std::uint32_t refNum,
uint256 const& nodeHash,
bool prioritize = false) override
{
Throw<std::runtime_error>("missing node");
}
Expand Down
33 changes: 33 additions & 0 deletions src/xrpld/app/consensus/RCLValidations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,24 @@ handleNewValidation(
auto const outcome =
validations.add(calcNodeID(masterKey.value_or(signingKey)), val);

if (j.has_value())
{
JLOG(j->warn()) << "handleNewValidation: seq=" << seq
<< " hash=" << hash << " trusted=" << val->isTrusted()
<< " outcome="
<< (outcome == ValStatus::current ? "current"
: outcome == ValStatus::stale ? "stale"
: outcome == ValStatus::badSeq ? "badSeq"
: "other");
}

if (outcome == ValStatus::current)
{
// For partial sync: track the network-observed ledger from ANY
// validation (not just trusted). This allows queries before
// trusted validators are fully configured.
app.getLedgerMaster().setNetworkObservedLedger(hash, seq);

if (val->isTrusted())
{
// Was: app.getLedgerMaster().checkAccept(hash, seq);
Expand All @@ -213,6 +229,23 @@ handleNewValidation(
app.getLedgerMaster().checkAccept(hash, seq);
}
}
else
{
// Partial sync debug: only log untrusted validations during startup
// (before we have any validated ledger)
auto [lastHash, lastSeq] =
app.getLedgerMaster().getLastValidatedLedger();
if (lastSeq == 0)
{
auto jPartialSync = app.journal("PartialSync");
auto const quorum = app.validators().quorum();
auto const unlSize = app.validators().count();
JLOG(jPartialSync.debug())
<< "validation NOT trusted: seq=" << seq << " hash=" << hash
<< " unlSize=" << unlSize << " quorum=" << quorum
<< " (masterKey=" << (masterKey ? "found" : "none") << ")";
}
}
return;
}

Expand Down
36 changes: 36 additions & 0 deletions src/xrpld/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ class InboundLedger final : public TimeoutCounter,
return mLedger;
}

/** Returns true if we have the ledger header (may still be incomplete). */
bool
hasHeader() const
{
return mHaveHeader;
}

std::uint32_t
getSeq() const
{
Expand All @@ -106,6 +113,26 @@ class InboundLedger final : public TimeoutCounter,
void
runData();

/** Add a node hash to the priority queue for immediate fetching.
Used by partial sync mode to prioritize nodes needed by queries.
*/
void
addPriorityHash(uint256 const& hash);

/** Check if a transaction hash has been seen in this ledger's txMap.
Used by submit_and_wait to find transactions in partial ledgers.
*/
bool
hasTx(uint256 const& txHash) const;

/** Return the count of known transaction hashes (for debugging). */
std::size_t
knownTxCount() const
{
ScopedLockType sl(mtx_);
return knownTxHashes_.size();
}

void
touch()
{
Expand Down Expand Up @@ -175,16 +202,25 @@ class InboundLedger final : public TimeoutCounter,
clock_type::time_point mLastAction;

std::shared_ptr<Ledger> mLedger;
//@@start state-tracking-members
bool mHaveHeader;
bool mHaveState;
bool mHaveTransactions;
//@@end state-tracking-members
bool mSignaled;
bool mByHash;
std::uint32_t mSeq;
Reason const mReason;

std::set<uint256> mRecentNodes;

// Priority nodes to fetch immediately (for partial sync queries)
std::set<uint256> priorityHashes_;

// Transaction hashes seen in incoming txMap leaf nodes (for
// submit_and_wait)
std::set<uint256> knownTxHashes_;

SHAMapAddNode mStats;

// Data we have received from peers
Expand Down
40 changes: 40 additions & 0 deletions src/xrpld/app/ledger/InboundLedgers.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <xrpld/app/ledger/InboundLedger.h>
#include <xrpl/protocol/RippleLedgerHash.h>
#include <memory>
#include <optional>

namespace ripple {

Expand Down Expand Up @@ -56,6 +57,45 @@ class InboundLedgers
virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) = 0;

/** Get a partial ledger (has header but may be incomplete).
Used for partial sync mode - allows RPC queries against
ledgers that are still being acquired.
@return The ledger if header exists and not failed, nullptr otherwise.
*/
virtual std::shared_ptr<Ledger const>
getPartialLedger(uint256 const& hash) = 0;

/** Find which partial ledger contains a transaction.
Used by submit_and_wait to locate transactions as they appear
in incoming ledgers' txMaps.
@param txHash The transaction hash to search for
@return The ledger hash if found, nullopt otherwise
*/
virtual std::optional<uint256>
findTxLedger(uint256 const& txHash) = 0;

/** Add a priority node hash for immediate fetching.
Used by partial sync mode to prioritize specific nodes
needed by queries.
@param ledgerSeq The ledger sequence being acquired
@param nodeHash The specific node hash to prioritize
*/
virtual void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) = 0;

/** Add a ledger range where TX fetching should be prioritized.
Ledgers in this range will fetch TX nodes BEFORE state nodes.
Used by submit_and_wait to quickly detect transactions.
@param start First ledger sequence (inclusive)
@param end Last ledger sequence (inclusive)
*/
virtual void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) = 0;

/** Check if TX fetching should be prioritized for a ledger sequence. */
virtual bool
isTxPrioritized(std::uint32_t seq) const = 0;

// VFALCO TODO Remove the dependency on the Peer object.
//
virtual bool
Expand Down
36 changes: 36 additions & 0 deletions src/xrpld/app/ledger/LedgerMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,38 @@ class LedgerMaster : public AbstractFetchPackContainer
return !mValidLedger.empty();
}

//! Get the hash/seq of the last validated ledger (even if not resident).
std::pair<uint256, LedgerIndex>
getLastValidatedLedger()
{
std::lock_guard lock(m_mutex);
return mLastValidLedger;
}

//! For partial sync: set the network-observed ledger from any validation.
//! This allows queries before trusted validators are fully configured.
void
setNetworkObservedLedger(uint256 const& hash, LedgerIndex seq)
{
std::lock_guard lock(m_mutex);
if (seq > mNetworkObservedLedger.second)
{
JLOG(jPartialSync_.warn())
<< "network-observed ledger updated to seq=" << seq
<< " hash=" << hash;
mNetworkObservedLedger = std::make_pair(hash, seq);
}
}

//! Get the network-observed ledger (from any validations, not just
//! trusted).
std::pair<uint256, LedgerIndex>
getNetworkObservedLedger()
{
std::lock_guard lock(m_mutex);
return mNetworkObservedLedger;
}

// Returns the minimum ledger sequence in SQL database, if any.
std::optional<LedgerIndex>
minSqlSeq();
Expand Down Expand Up @@ -329,6 +361,7 @@ class LedgerMaster : public AbstractFetchPackContainer

Application& app_;
beast::Journal m_journal;
beast::Journal jPartialSync_;

std::recursive_mutex mutable m_mutex;

Expand All @@ -350,6 +383,9 @@ class LedgerMaster : public AbstractFetchPackContainer
// Fully validated ledger, whether or not we have the ledger resident.
std::pair<uint256, LedgerIndex> mLastValidLedger{uint256(), 0};

// Network-observed ledger from any validations (for partial sync).
std::pair<uint256, LedgerIndex> mNetworkObservedLedger{uint256(), 0};

LedgerHistory mLedgerHistory;

CanonicalTXSet mHeldTransactions{uint256()};
Expand Down
Loading
Loading