Skip to content

Commit 74a9d6d

Browse files
Migrate production entry points from Boost.Coroutine to C++20 coroutines
Replace all postCoro() call sites with postCoroTask() using C++20 coroutine lambdas. The key changes are: - Remove Context::coro field (shared_ptr<JobQueue::Coro>) from RPC::Context, eliminating it from all aggregate initializations - Replace RipplePathFind's yield/post/resume pattern with a local std::condition_variable that blocks until path-finding completes, avoiding colored-function infection across the RPC call chain - Switch ServerHandler entry points (onRequest, onWSMessage) from postCoro to postCoroTask with co_return lambdas - Switch GRPCServer::CallData::process() to use postCoroTask, rename private handler to processRequest() - Update Path_test and AMMTest to use postCoroTask (they set context.coro which no longer exists) The old postCoro() API remains available for Coroutine_test and JobQueue_test, which will be migrated in a subsequent commit. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4613377 commit 74a9d6d

File tree

8 files changed

+52
-124
lines changed

8 files changed

+52
-124
lines changed

src/test/app/Path_test.cpp

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <xrpld/rpc/detail/Tuning.h>
99

1010
#include <xrpl/beast/unit_test.h>
11+
#include <xrpl/core/CoroTask.h>
1112
#include <xrpl/core/JobQueue.h>
1213
#include <xrpl/json/json_reader.h>
1314
#include <xrpl/protocol/ApiVersion.h>
@@ -131,7 +132,6 @@ class Path_test : public beast::unit_test::suite
131132
c,
132133
Role::USER,
133134
{},
134-
{},
135135
RPC::apiVersionIfUnspecified},
136136
{},
137137
{}};
@@ -155,11 +155,11 @@ class Path_test : public beast::unit_test::suite
155155

156156
Json::Value result;
157157
gate g;
158-
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
158+
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
159159
context.params = std::move(params);
160-
context.coro = coro;
161160
RPC::doCommand(context, result);
162161
g.signal();
162+
co_return;
163163
});
164164

165165
using namespace std::chrono_literals;
@@ -240,51 +240,50 @@ class Path_test : public beast::unit_test::suite
240240
c,
241241
Role::USER,
242242
{},
243-
{},
244243
RPC::apiVersionIfUnspecified},
245244
{},
246245
{}};
247246
Json::Value result;
248247
gate g;
249248
// Test RPC::Tuning::max_src_cur source currencies.
250-
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
249+
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
251250
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur);
252-
context.coro = coro;
253251
RPC::doCommand(context, result);
254252
g.signal();
253+
co_return;
255254
});
256255
BEAST_EXPECT(g.wait_for(5s));
257256
BEAST_EXPECT(!result.isMember(jss::error));
258257

259258
// Test more than RPC::Tuning::max_src_cur source currencies.
260-
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
259+
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
261260
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur + 1);
262-
context.coro = coro;
263261
RPC::doCommand(context, result);
264262
g.signal();
263+
co_return;
265264
});
266265
BEAST_EXPECT(g.wait_for(5s));
267266
BEAST_EXPECT(result.isMember(jss::error));
268267

269268
// Test RPC::Tuning::max_auto_src_cur source currencies.
270269
for (auto i = 0; i < (RPC::Tuning::max_auto_src_cur - 1); ++i)
271270
env.trust(Account("alice")[std::to_string(i + 100)](100), "bob");
272-
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
271+
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
273272
context.params = rpf(Account("alice"), Account("bob"), 0);
274-
context.coro = coro;
275273
RPC::doCommand(context, result);
276274
g.signal();
275+
co_return;
277276
});
278277
BEAST_EXPECT(g.wait_for(5s));
279278
BEAST_EXPECT(!result.isMember(jss::error));
280279

281280
// Test more than RPC::Tuning::max_auto_src_cur source currencies.
282281
env.trust(Account("alice")["AUD"](100), "bob");
283-
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
282+
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
284283
context.params = rpf(Account("alice"), Account("bob"), 0);
285-
context.coro = coro;
286284
RPC::doCommand(context, result);
287285
g.signal();
286+
co_return;
288287
});
289288
BEAST_EXPECT(g.wait_for(5s));
290289
BEAST_EXPECT(result.isMember(jss::error));

src/test/jtx/impl/AMMTest.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <xrpld/rpc/RPCHandler.h>
88

9+
#include <xrpl/core/CoroTask.h>
910
#include <xrpl/protocol/ApiVersion.h>
1011
#include <xrpl/protocol/STParsedJSON.h>
1112
#include <xrpl/resource/Fees.h>
@@ -193,7 +194,6 @@ AMMTest::find_paths_request(
193194
c,
194195
Role::USER,
195196
{},
196-
{},
197197
RPC::apiVersionIfUnspecified},
198198
{},
199199
{}};
@@ -215,11 +215,11 @@ AMMTest::find_paths_request(
215215

216216
Json::Value result;
217217
gate g;
218-
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
218+
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
219219
context.params = std::move(params);
220-
context.coro = coro;
221220
RPC::doCommand(context, result);
222221
g.signal();
222+
co_return;
223223
});
224224

225225
using namespace std::chrono_literals;

src/xrpld/app/main/GRPCServer.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <xrpl/beast/core/CurrentThreadName.h>
55
#include <xrpl/beast/net/IPAddressConversion.h>
6+
#include <xrpl/core/CoroTask.h>
67
#include <xrpl/resource/Fees.h>
78

89
namespace xrpl {
@@ -99,13 +100,14 @@ GRPCServerImpl::CallData<Request, Response>::process()
99100
// ensures that finished is always true when this CallData object
100101
// is returned as a tag in handleRpcs(), after sending the response
101102
finished_ = true;
102-
auto coro = app_.getJobQueue().postCoro(
103-
JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
104-
thisShared->process(coro);
103+
auto runner = app_.getJobQueue().postCoroTask(
104+
JobType::jtRPC, "gRPC-Client", [thisShared](auto) -> CoroTask<void> {
105+
thisShared->processRequest();
106+
co_return;
105107
});
106108

107-
// If coro is null, then the JobQueue has already been shutdown
108-
if (!coro)
109+
// If runner is null, then the JobQueue has already been shutdown
110+
if (!runner)
109111
{
110112
grpc::Status status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
111113
responder_.FinishWithError(status, this);
@@ -114,7 +116,7 @@ GRPCServerImpl::CallData<Request, Response>::process()
114116

115117
template <class Request, class Response>
116118
void
117-
GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::Coro> coro)
119+
GRPCServerImpl::CallData<Request, Response>::processRequest()
118120
{
119121
try
120122
{
@@ -156,7 +158,6 @@ GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::C
156158
app_.getLedgerMaster(),
157159
usage,
158160
role,
159-
coro,
160161
InfoSub::pointer(),
161162
apiVersion},
162163
request_};

src/xrpld/app/main/GRPCServer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ class GRPCServerImpl final
208208
private:
209209
// process the request. Called inside the coroutine passed to JobQueue
210210
void
211-
process(std::shared_ptr<JobQueue::Coro> coro);
211+
processRequest();
212212

213213
// return load type of this RPC
214214
Resource::Charge

src/xrpld/rpc/Context.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
#include <xrpld/rpc/Role.h>
44

55
#include <xrpl/beast/utility/Journal.h>
6-
#include <xrpl/core/JobQueue.h>
76
#include <xrpl/server/InfoSub.h>
87

98
namespace xrpl {
@@ -24,7 +23,6 @@ struct Context
2423
LedgerMaster& ledgerMaster;
2524
Resource::Consumer& consumer;
2625
Role role;
27-
std::shared_ptr<JobQueue::Coro> coro{};
2826
InfoSub::pointer infoSub{};
2927
unsigned int apiVersion;
3028
};

src/xrpld/rpc/ServerHandler.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,19 +171,17 @@ class ServerHandler
171171
Json::Value
172172
processSession(
173173
std::shared_ptr<WSSession> const& session,
174-
std::shared_ptr<JobQueue::Coro> const& coro,
175174
Json::Value const& jv);
176175

177176
void
178-
processSession(std::shared_ptr<Session> const&, std::shared_ptr<JobQueue::Coro> coro);
177+
processSession(std::shared_ptr<Session> const&);
179178

180179
void
181180
processRequest(
182181
Port const& port,
183182
std::string const& request,
184183
beast::IP::Endpoint const& remoteIPAddress,
185184
Output&&,
186-
std::shared_ptr<JobQueue::Coro> coro,
187185
std::string_view forwardedFor,
188186
std::string_view user);
189187

src/xrpld/rpc/detail/ServerHandler.cpp

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <xrpl/basics/base64.h>
1313
#include <xrpl/basics/contract.h>
1414
#include <xrpl/basics/make_SSLContext.h>
15+
#include <xrpl/core/CoroTask.h>
1516
#include <xrpl/beast/net/IPAddressConversion.h>
1617
#include <xrpl/beast/rfc2616.h>
1718
#include <xrpl/core/JobQueue.h>
@@ -284,9 +285,10 @@ ServerHandler::onRequest(Session& session)
284285
}
285286

286287
std::shared_ptr<Session> detachedSession = session.detach();
287-
auto const postResult = m_jobQueue.postCoro(
288-
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
289-
processSession(detachedSession, coro);
288+
auto const postResult = m_jobQueue.postCoroTask(
289+
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](auto) -> CoroTask<void> {
290+
processSession(detachedSession);
291+
co_return;
290292
});
291293
if (postResult == nullptr)
292294
{
@@ -322,17 +324,18 @@ ServerHandler::onWSMessage(
322324

323325
JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
324326

325-
auto const postResult = m_jobQueue.postCoro(
327+
auto const postResult = m_jobQueue.postCoroTask(
326328
jtCLIENT_WEBSOCKET,
327329
"WS-Client",
328-
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
329-
auto const jr = this->processSession(session, coro, jv);
330+
[this, session, jv = std::move(jv)](auto) -> CoroTask<void> {
331+
auto const jr = this->processSession(session, jv);
330332
auto const s = to_string(jr);
331333
auto const n = s.length();
332334
boost::beast::multi_buffer sb(n);
333335
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
334336
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
335337
session->complete();
338+
co_return;
336339
});
337340
if (postResult == nullptr)
338341
{
@@ -375,7 +378,6 @@ logDuration(Json::Value const& request, T const& duration, beast::Journal& journ
375378
Json::Value
376379
ServerHandler::processSession(
377380
std::shared_ptr<WSSession> const& session,
378-
std::shared_ptr<JobQueue::Coro> const& coro,
379381
Json::Value const& jv)
380382
{
381383
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
@@ -443,7 +445,6 @@ ServerHandler::processSession(
443445
app_.getLedgerMaster(),
444446
is->getConsumer(),
445447
role,
446-
coro,
447448
is,
448449
apiVersion},
449450
jv,
@@ -514,18 +515,15 @@ ServerHandler::processSession(
514515
return jr;
515516
}
516517

517-
// Run as a coroutine.
518518
void
519519
ServerHandler::processSession(
520-
std::shared_ptr<Session> const& session,
521-
std::shared_ptr<JobQueue::Coro> coro)
520+
std::shared_ptr<Session> const& session)
522521
{
523522
processRequest(
524523
session->port(),
525524
buffers_to_string(session->request().body().data()),
526525
session->remoteAddress().at_port(0),
527526
makeOutput(*session),
528-
coro,
529527
forwardedFor(session->request()),
530528
[&] {
531529
auto const iter = session->request().find("X-User");
@@ -562,7 +560,6 @@ ServerHandler::processRequest(
562560
std::string const& request,
563561
beast::IP::Endpoint const& remoteIPAddress,
564562
Output&& output,
565-
std::shared_ptr<JobQueue::Coro> coro,
566563
std::string_view forwardedFor,
567564
std::string_view user)
568565
{
@@ -819,7 +816,6 @@ ServerHandler::processRequest(
819816
app_.getLedgerMaster(),
820817
usage,
821818
role,
822-
coro,
823819
InfoSub::pointer(),
824820
apiVersion},
825821
params,

0 commit comments

Comments
 (0)