Skip to content

Commit 2f8a704

Browse files
authored
feat: Ledger publisher use async framework (#2756)
1 parent fcc5a54 commit 2f8a704

File tree

5 files changed

+63
-60
lines changed

5 files changed

+63
-60
lines changed

src/app/ClioApplication.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,7 @@ ClioApplication::run(bool const useNgWebServer)
146146
);
147147

148148
// ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes
149-
// TODO: don't use ioc (Publisher uses it)
150-
auto etl = etl::ETLService::makeETLService(config_, ioc, ctx, backend, subscriptions, balancer, ledgers);
149+
auto etl = etl::ETLService::makeETLService(config_, ctx, backend, subscriptions, balancer, ledgers);
151150

152151
auto workQueue = rpc::WorkQueue::makeWorkQueue(config_);
153152
auto counters = rpc::Counters::makeCounters(workQueue);

src/etl/ETLService.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
#include "etl/CacheLoader.hpp"
2626
#include "etl/CacheLoaderInterface.hpp"
2727
#include "etl/CacheUpdaterInterface.hpp"
28+
#include "etl/CorruptionDetector.hpp"
29+
#include "etl/ETLServiceInterface.hpp"
2830
#include "etl/ETLState.hpp"
2931
#include "etl/ExtractorInterface.hpp"
3032
#include "etl/InitialLoadObserverInterface.hpp"
@@ -52,6 +54,7 @@
5254
#include "etl/impl/ext/MPT.hpp"
5355
#include "etl/impl/ext/NFT.hpp"
5456
#include "etl/impl/ext/Successor.hpp"
57+
#include "feed/SubscriptionManagerInterface.hpp"
5558
#include "util/Assert.hpp"
5659
#include "util/Profiler.hpp"
5760
#include "util/async/AnyExecutionContext.hpp"
@@ -75,7 +78,6 @@ namespace etl {
7578
std::shared_ptr<ETLServiceInterface>
7679
ETLService::makeETLService(
7780
util::config::ClioConfigDefinition const& config,
78-
boost::asio::io_context& ioc,
7981
util::async::AnyExecutionContext ctx,
8082
std::shared_ptr<BackendInterface> backend,
8183
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
@@ -90,7 +92,7 @@ ETLService::makeETLService(
9092

9193
auto fetcher = std::make_shared<impl::LedgerFetcher>(backend, balancer);
9294
auto extractor = std::make_shared<impl::Extractor>(fetcher);
93-
auto publisher = std::make_shared<impl::LedgerPublisher>(ioc, backend, subscriptions, *state);
95+
auto publisher = std::make_shared<impl::LedgerPublisher>(ctx, backend, subscriptions, *state);
9496
auto cacheLoader = std::make_shared<CacheLoader<>>(config, backend, backend->cache());
9597
auto cacheUpdater = std::make_shared<impl::CacheUpdater>(backend->cache());
9698
auto amendmentBlockHandler = std::make_shared<impl::AmendmentBlockHandler>(ctx, *state);

src/etl/ETLService.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ class ETLService : public ETLServiceInterface {
127127
* Creates and runs the ETL service.
128128
*
129129
* @param config The configuration to use
130-
* @param ioc io context to run on
131130
* @param ctx Execution context for asynchronous operations
132131
* @param backend BackendInterface implementation
133132
* @param subscriptions Subscription manager
@@ -138,7 +137,6 @@ class ETLService : public ETLServiceInterface {
138137
static std::shared_ptr<ETLServiceInterface>
139138
makeETLService(
140139
util::config::ClioConfigDefinition const& config,
141-
boost::asio::io_context& ioc, // TODO: remove (LedgerPublisher needs to be changed)
142140
util::async::AnyExecutionContext ctx,
143141
std::shared_ptr<BackendInterface> backend,
144142
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,

src/etl/impl/LedgerPublisher.hpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
#include "feed/SubscriptionManagerInterface.hpp"
2828
#include "util/Assert.hpp"
2929
#include "util/Mutex.hpp"
30+
#include "util/async/AnyExecutionContext.hpp"
31+
#include "util/async/AnyStrand.hpp"
3032
#include "util/log/Logger.hpp"
3133
#include "util/prometheus/Counter.hpp"
3234
#include "util/prometheus/Prometheus.hpp"
@@ -72,7 +74,7 @@ namespace etl::impl {
7274
class LedgerPublisher : public LedgerPublisherInterface {
7375
util::Logger log_{"ETL"};
7476

75-
boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_;
77+
util::async::AnyStrand publishStrand_;
7678

7779
std::shared_ptr<BackendInterface> backend_;
7880
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
@@ -93,12 +95,12 @@ class LedgerPublisher : public LedgerPublisherInterface {
9395
* @brief Create an instance of the publisher
9496
*/
9597
LedgerPublisher(
96-
boost::asio::io_context& ioc, // TODO: replace with AsyncContext shared with ETLService
98+
util::async::AnyExecutionContext ctx,
9799
std::shared_ptr<BackendInterface> backend,
98100
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
99101
SystemState const& state
100102
)
101-
: publishStrand_{boost::asio::make_strand(ioc)}
103+
: publishStrand_{ctx.makeStrand()}
102104
, backend_{std::move(backend)}
103105
, subscriptions_{std::move(subscriptions)}
104106
, state_{std::cref(state)}
@@ -161,7 +163,7 @@ class LedgerPublisher : public LedgerPublisherInterface {
161163
void
162164
publish(ripple::LedgerHeader const& lgrInfo)
163165
{
164-
boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() {
166+
publishStrand_.submit([this, lgrInfo = lgrInfo] {
165167
LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
166168

167169
setLastClose(lgrInfo.closeTime);

tests/unit/etl/LedgerPublisherTests.cpp

Lines changed: 52 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
#include "data/Types.hpp"
2222
#include "etl/SystemState.hpp"
2323
#include "etl/impl/LedgerPublisher.hpp"
24-
#include "util/AsioContextTestFixture.hpp"
2524
#include "util/MockBackendTestFixture.hpp"
2625
#include "util/MockPrometheus.hpp"
2726
#include "util/MockSubscriptionManager.hpp"
2827
#include "util/TestObject.hpp"
28+
#include "util/async/context/BasicExecutionContext.hpp"
2929
#include "util/config/ConfigDefinition.hpp"
3030

3131
#include <fmt/format.h>
@@ -64,17 +64,18 @@ MATCHER_P(ledgerHeaderMatcher, expectedHeader, "Headers match")
6464

6565
} // namespace
6666

67-
struct ETLLedgerPublisherTest : util::prometheus::WithPrometheus, MockBackendTestStrict, SyncAsioContextTest {
67+
struct ETLLedgerPublisherTest : util::prometheus::WithPrometheus, MockBackendTestStrict {
6868
util::config::ClioConfigDefinition cfg{{}};
6969
StrictMockSubscriptionManagerSharedPtr mockSubscriptionManagerPtr;
70+
util::async::CoroExecutionContext ctx;
7071
};
7172

7273
TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderSkipDueToAge)
7374
{
7475
// Use kAGE (800) which is > MAX_LEDGER_AGE_SECONDS (600) to test skipping
7576
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE);
7677
auto dummyState = etl::SystemState{};
77-
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
78+
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
7879

7980
backend_->setRange(kSEQ - 1, kSEQ);
8081
publisher.publish(dummyLedgerHeader);
@@ -90,22 +91,15 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderSkipDueToAge)
9091
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0);
9192
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0);
9293

93-
ctx_.run();
94+
ctx.join();
9495
}
9596

9697
TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderWithinAgeLimit)
9798
{
9899
// Use age 0 which is < MAX_LEDGER_AGE_SECONDS to ensure publishing happens
99100
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0);
100101
auto dummyState = etl::SystemState{};
101-
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
102-
103-
backend_->setRange(kSEQ - 1, kSEQ);
104-
publisher.publish(dummyLedgerHeader);
105-
106-
// Verify last published sequence is set immediately
107-
EXPECT_TRUE(publisher.getLastPublishedSequence());
108-
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
102+
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
109103

110104
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
111105
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
@@ -115,7 +109,14 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderWithinAgeLimit)
115109
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 0));
116110
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
117111

118-
ctx_.run();
112+
backend_->setRange(kSEQ - 1, kSEQ);
113+
publisher.publish(dummyLedgerHeader);
114+
115+
// Verify last published sequence is set immediately
116+
EXPECT_TRUE(publisher.getLastPublishedSequence());
117+
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
118+
119+
ctx.join();
119120
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
120121
}
121122

@@ -124,13 +125,14 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderIsWritingTrue)
124125
auto dummyState = etl::SystemState{};
125126
dummyState.isWriting = true;
126127
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE);
127-
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
128-
publisher.publish(dummyLedgerHeader);
128+
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
129129

130+
publisher.publish(dummyLedgerHeader);
130131
EXPECT_TRUE(publisher.getLastPublishedSequence());
131132
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
132133

133-
ctx_.run();
134+
ctx.join();
135+
134136
EXPECT_FALSE(backend_->fetchLedgerRange());
135137
}
136138

@@ -140,11 +142,9 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderInRange)
140142
dummyState.isWriting = true;
141143

142144
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0
143-
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
145+
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
144146
backend_->setRange(kSEQ - 1, kSEQ);
145147

146-
publisher.publish(dummyLedgerHeader);
147-
148148
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
149149
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
150150

@@ -158,15 +158,17 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderInRange)
158158

159159
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger).WillOnce(Return(std::vector<TransactionAndMetadata>{t1}));
160160

161-
EXPECT_TRUE(publisher.getLastPublishedSequence());
162-
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
163-
164161
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1));
165162
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
166163
// mock 1 transaction
167164
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction);
168165

169-
ctx_.run();
166+
publisher.publish(dummyLedgerHeader);
167+
EXPECT_TRUE(publisher.getLastPublishedSequence());
168+
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
169+
170+
ctx.join();
171+
170172
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
171173
}
172174

@@ -182,8 +184,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderCloseTimeGreaterThanNow)
182184

183185
backend_->setRange(kSEQ - 1, kSEQ);
184186

185-
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
186-
publisher.publish(dummyLedgerHeader);
187+
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
187188

188189
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
189190
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
@@ -199,30 +200,32 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderCloseTimeGreaterThanNow)
199200
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _))
200201
.WillOnce(Return(std::vector<TransactionAndMetadata>{t1}));
201202

202-
EXPECT_TRUE(publisher.getLastPublishedSequence());
203-
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
204-
205203
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1));
206204
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
207205
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction);
208206

209-
ctx_.run();
207+
publisher.publish(dummyLedgerHeader);
208+
EXPECT_TRUE(publisher.getLastPublishedSequence());
209+
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
210+
211+
ctx.join();
212+
210213
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
211214
}
212215

213216
TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsTrue)
214217
{
215218
auto dummyState = etl::SystemState{};
216219
dummyState.isStopping = true;
217-
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
220+
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
218221
EXPECT_FALSE(publisher.publish(kSEQ, {}));
219222
}
220223

221224
TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttempt)
222225
{
223226
auto dummyState = etl::SystemState{};
224227
dummyState.isStopping = false;
225-
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
228+
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
226229

227230
static constexpr auto kMAX_ATTEMPT = 2;
228231

@@ -236,7 +239,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsFalse)
236239
{
237240
auto dummyState = etl::SystemState{};
238241
dummyState.isStopping = false;
239-
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
242+
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
240243

241244
LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ};
242245
EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(Return(range));
@@ -245,7 +248,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsFalse)
245248
EXPECT_CALL(*backend_, fetchLedgerBySequence(kSEQ, _)).WillOnce(Return(dummyLedgerHeader));
246249

247250
EXPECT_TRUE(publisher.publish(kSEQ, {}));
248-
ctx_.run();
251+
ctx.join();
249252
}
250253

251254
TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder)
@@ -254,11 +257,9 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder)
254257
dummyState.isWriting = true;
255258

256259
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0
257-
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
260+
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
258261
backend_->setRange(kSEQ - 1, kSEQ);
259262

260-
publisher.publish(dummyLedgerHeader);
261-
262263
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
263264
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
264265

@@ -283,17 +284,19 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder)
283284
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _))
284285
.WillOnce(Return(std::vector<TransactionAndMetadata>{t1, t2}));
285286

286-
EXPECT_TRUE(publisher.getLastPublishedSequence());
287-
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
288-
289287
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 2));
290288
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
291289

292290
Sequence const s;
293291
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t2, _)).InSequence(s);
294292
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t1, _)).InSequence(s);
295293

296-
ctx_.run();
294+
publisher.publish(dummyLedgerHeader);
295+
EXPECT_TRUE(publisher.getLastPublishedSequence());
296+
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
297+
298+
ctx.join();
299+
297300
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
298301
}
299302

@@ -304,19 +307,18 @@ TEST_F(ETLLedgerPublisherTest, PublishVeryOldLedgerShouldSkip)
304307

305308
// Create a ledger header with age (800) greater than MAX_LEDGER_AGE_SECONDS (600)
306309
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 800);
307-
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
310+
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
308311
backend_->setRange(kSEQ - 1, kSEQ);
309312

310-
publisher.publish(dummyLedgerHeader);
311-
312313
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger).Times(0);
313314
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0);
314315
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0);
315316

317+
publisher.publish(dummyLedgerHeader);
316318
EXPECT_TRUE(publisher.getLastPublishedSequence());
317319
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
318320

319-
ctx_.run();
321+
ctx.join();
320322
}
321323

322324
TEST_F(ETLLedgerPublisherTest, PublishMultipleLedgersInQuickSuccession)
@@ -326,13 +328,9 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleLedgersInQuickSuccession)
326328

327329
auto const dummyLedgerHeader1 = createLedgerHeader(kLEDGER_HASH, kSEQ, 0);
328330
auto const dummyLedgerHeader2 = createLedgerHeader(kLEDGER_HASH, kSEQ + 1, 0);
329-
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
331+
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
330332
backend_->setRange(kSEQ - 1, kSEQ + 1);
331333

332-
// Publish two ledgers in quick succession
333-
publisher.publish(dummyLedgerHeader1);
334-
publisher.publish(dummyLedgerHeader2);
335-
336334
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
337335
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
338336
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ + 1, _))
@@ -349,8 +347,12 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleLedgersInQuickSuccession)
349347
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(ledgerHeaderMatcher(dummyLedgerHeader2), _, _, _)).InSequence(s);
350348
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges(ledgerHeaderMatcher(dummyLedgerHeader2), _)).InSequence(s);
351349

350+
// Publish two ledgers in quick succession
351+
publisher.publish(dummyLedgerHeader1);
352+
publisher.publish(dummyLedgerHeader2);
353+
352354
EXPECT_TRUE(publisher.getLastPublishedSequence());
353355
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ + 1);
354356

355-
ctx_.run();
357+
ctx.join();
356358
}

0 commit comments

Comments
 (0)