Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/test/jtx/Env.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <xrpl/protocol/STTx.h>

#include <functional>
#include <future>
#include <source_location>
#include <string>
#include <tuple>
Expand Down Expand Up @@ -393,6 +394,44 @@ class Env
return close(std::chrono::seconds(5));
}

/** Close and advance the ledger, then synchronize with the server's
io_context to ensure all async operations initiated by the close have
been started.

This function performs the same ledger close as close(), but additionally
ensures that all tasks posted to the server's io_context (such as
WebSocket subscription message sends) have been initiated before returning.

What it guarantees:
- All async operations posted before syncClose() have been STARTED
- For WebSocket sends: async_write_some() has been called
- The actual I/O completion may still be pending (async)

What it does NOT guarantee:
- Async operations have COMPLETED
- WebSocket messages have been received by clients
- However, for localhost connections, the remaining latency is typically
microseconds, making tests reliable

Use this instead of close() when:
- Test code immediately checks for subscription messages
- Race conditions between test and worker threads must be avoided
- Deterministic test behavior is required

@param timeout Maximum time to wait for the barrier task to execute
@return true if close succeeded and barrier executed within timeout,
false otherwise
*/
[[nodiscard]] bool
syncClose(std::chrono::system_clock::duration timeout = std::chrono::seconds{1})
{
auto const result = close();
std::promise<void> server_barrier;
boost::asio::post(app().getIOContext(), [&]() { server_barrier.set_value(); });
auto const status = server_barrier.get_future().wait_for(timeout);
return result && status == std::future_status::ready;
}

/** Turn on JSON tracing.
With no arguments, trace all
*/
Expand Down
82 changes: 44 additions & 38 deletions src/test/rpc/Subscribe_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class Subscribe_test : public beast::unit_test::suite

{
// Accept a ledger
env.close();
BEAST_EXPECT(env.syncClose());

// Check stream update
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
Expand All @@ -125,7 +125,7 @@ class Subscribe_test : public beast::unit_test::suite

{
// Accept another ledger
env.close();
BEAST_EXPECT(env.syncClose());

// Check stream update
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
Expand Down Expand Up @@ -171,7 +171,7 @@ class Subscribe_test : public beast::unit_test::suite

{
env.fund(XRP(10000), "alice");
env.close();
BEAST_EXPECT(env.syncClose());

// Check stream update for payment transaction
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
Expand All @@ -195,7 +195,7 @@ class Subscribe_test : public beast::unit_test::suite
}));

env.fund(XRP(10000), "bob");
env.close();
BEAST_EXPECT(env.syncClose());

// Check stream update for payment transaction
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
Expand Down Expand Up @@ -249,12 +249,12 @@ class Subscribe_test : public beast::unit_test::suite
{
// Transaction that does not affect stream
env.fund(XRP(10000), "carol");
env.close();
BEAST_EXPECT(env.syncClose());
BEAST_EXPECT(!wsc->getMsg(10ms));

// Transactions concerning alice
env.trust(Account("bob")["USD"](100), "alice");
env.close();
BEAST_EXPECT(env.syncClose());

// Check stream updates
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
Expand Down Expand Up @@ -310,7 +310,7 @@ class Subscribe_test : public beast::unit_test::suite

{
env.fund(XRP(10000), "alice");
env.close();
BEAST_EXPECT(env.syncClose());

// Check stream update for payment transaction
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
Expand Down Expand Up @@ -483,7 +483,7 @@ class Subscribe_test : public beast::unit_test::suite
// at least one flag ledger.
while (env.closed()->header().seq < 300)
{
env.close();
BEAST_EXPECT(env.syncClose());
using namespace std::chrono_literals;
BEAST_EXPECT(wsc->findMsg(5s, validValidationFields));
}
Expand Down Expand Up @@ -834,12 +834,13 @@ class Subscribe_test : public beast::unit_test::suite
* send payments between the two accounts a and b,
* and close ledgersToClose ledgers
*/
auto sendPayments = [](Env& env,
Account const& a,
Account const& b,
int newTxns,
std::uint32_t ledgersToClose,
int numXRP = 10) {
auto sendPayments = [this](
Env& env,
Account const& a,
Account const& b,
int newTxns,
std::uint32_t ledgersToClose,
int numXRP = 10) {
env.memoize(a);
env.memoize(b);
for (int i = 0; i < newTxns; ++i)
Expand All @@ -852,7 +853,7 @@ class Subscribe_test : public beast::unit_test::suite
jtx::sig(jtx::autofill));
}
for (int i = 0; i < ledgersToClose; ++i)
env.close();
BEAST_EXPECT(env.syncClose());
return newTxns;
};

Expand Down Expand Up @@ -998,6 +999,7 @@ class Subscribe_test : public beast::unit_test::suite
if (!BEAST_EXPECT(goodSubRPC(jv)))
return;
IdxHashVec genesisFullHistoryVec;
BEAST_EXPECT(env.syncClose());
if (!BEAST_EXPECT(!getTxHash(*wscTxHistory, genesisFullHistoryVec, 1).first))
return;

Expand All @@ -1016,6 +1018,7 @@ class Subscribe_test : public beast::unit_test::suite
if (!BEAST_EXPECT(goodSubRPC(jv)))
return;
IdxHashVec bobFullHistoryVec;
BEAST_EXPECT(env.syncClose());
r = getTxHash(*wscTxHistory, bobFullHistoryVec, 1);
if (!BEAST_EXPECT(r.first && r.second))
return;
Expand Down Expand Up @@ -1050,6 +1053,7 @@ class Subscribe_test : public beast::unit_test::suite
"rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh";
jv = wscTxHistory->invoke("subscribe", request);
genesisFullHistoryVec.clear();
BEAST_EXPECT(env.syncClose());
BEAST_EXPECT(getTxHash(*wscTxHistory, genesisFullHistoryVec, 31).second);
jv = wscTxHistory->invoke("unsubscribe", request);

Expand All @@ -1068,7 +1072,7 @@ class Subscribe_test : public beast::unit_test::suite

std::array<Account, 2> accounts = {alice, bob};
env.fund(XRP(222222), accounts);
env.close();
BEAST_EXPECT(env.syncClose());

// subscribe account
Json::Value stream = Json::objectValue;
Expand Down Expand Up @@ -1137,12 +1141,12 @@ class Subscribe_test : public beast::unit_test::suite
std::array<Account, 2> accounts = {alice, carol};
env.fund(XRP(333333), accounts);
env.trust(USD_a(20000), carol);
env.close();
BEAST_EXPECT(env.syncClose());

auto mixedPayments = [&]() -> int {
sendPayments(env, alice, carol, 1, 0);
env(pay(alice, carol, USD_a(100)));
env.close();
BEAST_EXPECT(env.syncClose());
return 2;
};

Expand All @@ -1152,6 +1156,7 @@ class Subscribe_test : public beast::unit_test::suite
request[jss::account_history_tx_stream][jss::account] = carol.human();
auto ws = makeWSClient(env.app().config());
auto jv = ws->invoke("subscribe", request);
BEAST_EXPECT(env.syncClose());
{
// take out existing txns from the stream
IdxHashVec tempVec;
Expand All @@ -1172,7 +1177,7 @@ class Subscribe_test : public beast::unit_test::suite
Env env(*this);
std::array<Account, 2> accounts = {alice, carol};
env.fund(XRP(444444), accounts);
env.close();
BEAST_EXPECT(env.syncClose());

// many payments, and close lots of ledgers
auto oneRound = [&](int numPayments) {
Expand All @@ -1185,6 +1190,7 @@ class Subscribe_test : public beast::unit_test::suite
request[jss::account_history_tx_stream][jss::account] = carol.human();
auto wscLong = makeWSClient(env.app().config());
auto jv = wscLong->invoke("subscribe", request);
BEAST_EXPECT(env.syncClose());
{
// take out existing txns from the stream
IdxHashVec tempVec;
Expand Down Expand Up @@ -1241,10 +1247,10 @@ class Subscribe_test : public beast::unit_test::suite
if (!BEAST_EXPECT(jv[jss::status] == "success"))
return;
env(offer(alice, XRP(10), USD(10)), domain(domainID), txflags(tfHybrid));
env.close();
BEAST_EXPECT(env.syncClose());

env(pay(bob, carol, USD(5)), path(~USD), sendmax(XRP(5)), domain(domainID));
env.close();
BEAST_EXPECT(env.syncClose());

BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
if (jv[jss::changes].size() != 1)
Expand Down Expand Up @@ -1286,7 +1292,7 @@ class Subscribe_test : public beast::unit_test::suite

Env env{*this, features};
env.fund(XRP(10000), alice, bob, broker);
env.close();
BEAST_EXPECT(env.syncClose());

auto wsc = test::makeWSClient(env.app().config());
Json::Value stream;
Expand Down Expand Up @@ -1350,45 +1356,45 @@ class Subscribe_test : public beast::unit_test::suite
// Verify the NFTokenIDs are correct in the NFTokenMint tx meta
uint256 const nftId1{token::getNextID(env, alice, 0u, tfTransferable)};
env(token::mint(alice, 0u), txflags(tfTransferable));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId1);

uint256 const nftId2{token::getNextID(env, alice, 0u, tfTransferable)};
env(token::mint(alice, 0u), txflags(tfTransferable));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId2);

// Alice creates one sell offer for each NFT
// Verify the offer indexes are correct in the NFTokenCreateOffer tx
// meta
uint256 const aliceOfferIndex1 = keylet::nftoffer(alice, env.seq(alice)).key;
env(token::createOffer(alice, nftId1, drops(1)), txflags(tfSellNFToken));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(aliceOfferIndex1);

uint256 const aliceOfferIndex2 = keylet::nftoffer(alice, env.seq(alice)).key;
env(token::createOffer(alice, nftId2, drops(1)), txflags(tfSellNFToken));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(aliceOfferIndex2);

// Alice cancels two offers she created
// Verify the NFTokenIDs are correct in the NFTokenCancelOffer tx
// meta
env(token::cancelOffer(alice, {aliceOfferIndex1, aliceOfferIndex2}));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenIDsInCancelOffer({nftId1, nftId2});

// Bobs creates a buy offer for nftId1
// Verify the offer id is correct in the NFTokenCreateOffer tx meta
auto const bobBuyOfferIndex = keylet::nftoffer(bob, env.seq(bob)).key;
env(token::createOffer(bob, nftId1, drops(1)), token::owner(alice));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(bobBuyOfferIndex);

// Alice accepts bob's buy offer
// Verify the NFTokenID is correct in the NFTokenAcceptOffer tx meta
env(token::acceptBuyOffer(alice, bobBuyOfferIndex));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId1);
}

Expand All @@ -1397,26 +1403,26 @@ class Subscribe_test : public beast::unit_test::suite
// Alice mints a NFT
uint256 const nftId{token::getNextID(env, alice, 0u, tfTransferable)};
env(token::mint(alice, 0u), txflags(tfTransferable));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId);

// Alice creates sell offer and set broker as destination
uint256 const offerAliceToBroker = keylet::nftoffer(alice, env.seq(alice)).key;
env(token::createOffer(alice, nftId, drops(1)),
token::destination(broker),
txflags(tfSellNFToken));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(offerAliceToBroker);

// Bob creates buy offer
uint256 const offerBobToBroker = keylet::nftoffer(bob, env.seq(bob)).key;
env(token::createOffer(bob, nftId, drops(1)), token::owner(alice));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(offerBobToBroker);

// Check NFTokenID meta for NFTokenAcceptOffer in brokered mode
env(token::brokerOffers(broker, offerBobToBroker, offerAliceToBroker));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId);
}

Expand All @@ -1426,32 +1432,32 @@ class Subscribe_test : public beast::unit_test::suite
// Alice mints a NFT
uint256 const nftId{token::getNextID(env, alice, 0u, tfTransferable)};
env(token::mint(alice, 0u), txflags(tfTransferable));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId);

// Alice creates 2 sell offers for the same NFT
uint256 const aliceOfferIndex1 = keylet::nftoffer(alice, env.seq(alice)).key;
env(token::createOffer(alice, nftId, drops(1)), txflags(tfSellNFToken));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(aliceOfferIndex1);

uint256 const aliceOfferIndex2 = keylet::nftoffer(alice, env.seq(alice)).key;
env(token::createOffer(alice, nftId, drops(1)), txflags(tfSellNFToken));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(aliceOfferIndex2);

// Make sure the metadata only has 1 nft id, since both offers are
// for the same nft
env(token::cancelOffer(alice, {aliceOfferIndex1, aliceOfferIndex2}));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenIDsInCancelOffer({nftId});
}

if (features[featureNFTokenMintOffer])
{
uint256 const aliceMintWithOfferIndex1 = keylet::nftoffer(alice, env.seq(alice)).key;
env(token::mint(alice), token::amount(XRP(0)));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(aliceMintWithOfferIndex1);
}
}
Expand Down
Loading