Skip to content

Commit 4dc5ff0

Browse files
committed
Fix ETLService tests
1 parent d3fe795 commit 4dc5ff0

File tree

2 files changed

+87
-25
lines changed

2 files changed

+87
-25
lines changed

src/etl/ETLService.cpp

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -212,14 +212,8 @@ ETLService::run()
212212
return;
213213
}
214214

215-
auto nextSequence = rng->maxSequence + 1;
216-
if (backend_->cache().latestLedgerSequence() != 0) {
217-
nextSequence = backend_->cache().latestLedgerSequence();
218-
}
219-
215+
auto const nextSequence = syncCacheWithDb();
220216
LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence;
221-
nextSequence = syncCacheWithDb();
222-
223217

224218
startMonitor(nextSequence);
225219

@@ -358,16 +352,17 @@ uint32_t
358352
ETLService::syncCacheWithDb()
359353
{
360354
auto rng = backend_->hardFetchLedgerRangeNoThrow();
361-
while (rng->maxSequence > backend_->cache().latestLedgerSequence()) {
362-
LOG(log_.info()) << "Syncing cache with DB. DB latest seq: " << rng->maxSequence << ". Cache latest seq: "
363-
<< backend_->cache().latestLedgerSequence();
355+
356+
while (not backend_->cache().isDisabled() and rng->maxSequence > backend_->cache().latestLedgerSequence()) {
357+
LOG(log_.info()) << "Syncing cache with DB. DB latest seq: " << rng->maxSequence
358+
<< ". Cache latest seq: " << backend_->cache().latestLedgerSequence();
364359
for (auto seq = backend_->cache().latestLedgerSequence(); seq <= rng->maxSequence; ++seq) {
365360
LOG(log_.info()) << "ETLService (via syncCacheWithDb) got new seq from db: " << seq;
366361
updateCache(seq);
367362
}
368363
rng = backend_->hardFetchLedgerRangeNoThrow();
369364
}
370-
return rng->maxSequence;
365+
return rng->maxSequence + 1;
371366
}
372367

373368
void

tests/unit/etl/ETLServiceTests.cpp

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ TEST_F(ETLServiceTests, RunWithEmptyDatabase)
304304
auto mockTaskManager = std::make_unique<testing::NiceMock<MockTaskManager>>();
305305
auto& mockTaskManagerRef = *mockTaskManager;
306306
auto ledgerData = createTestData(kSEQ);
307+
EXPECT_TRUE(systemState_->isLoadingCache);
307308

308309
testing::Sequence const s;
309310
EXPECT_CALL(*backend_, hardFetchLedgerRange).InSequence(s).WillOnce(testing::Return(std::nullopt));
@@ -312,25 +313,61 @@ TEST_F(ETLServiceTests, RunWithEmptyDatabase)
312313
EXPECT_CALL(*balancer_, loadInitialLedger(kSEQ, testing::_, testing::_))
313314
.WillOnce(testing::Return(std::vector<std::string>{}));
314315
EXPECT_CALL(*loader_, loadInitialLedger).WillOnce(testing::Return(ripple::LedgerHeader{}));
315-
EXPECT_CALL(*backend_, hardFetchLedgerRange)
316-
.InSequence(s)
317-
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
316+
// In syncCacheWithDb()
317+
EXPECT_CALL(*backend_, hardFetchLedgerRange).Times(2).InSequence(s).WillRepeatedly([this]() {
318+
backend_->cache().update({}, kSEQ, false);
319+
return data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ};
320+
});
318321
EXPECT_CALL(mockTaskManagerRef, run);
319-
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1, testing::_))
320-
.WillOnce(testing::Return(std::unique_ptr<etl::TaskManagerInterface>(mockTaskManager.release())));
321-
EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, testing::_, testing::_))
322-
.WillOnce([](auto, auto, auto, auto, auto) { return std::make_unique<testing::NiceMock<MockMonitor>>(); });
322+
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1, testing::_)).WillOnce([&](auto&&...) {
323+
EXPECT_FALSE(systemState_->isLoadingCache);
324+
return std::unique_ptr<etl::TaskManagerInterface>(mockTaskManager.release());
325+
});
326+
EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, kSEQ + 1, testing::_))
327+
.WillOnce([this](auto, auto, auto, auto, auto) {
328+
EXPECT_TRUE(systemState_->isLoadingCache);
329+
return std::make_unique<testing::NiceMock<MockMonitor>>();
330+
});
323331

324332
service_.run();
325333
}
326334

327335
TEST_F(ETLServiceTests, RunWithPopulatedDatabase)
328336
{
337+
EXPECT_TRUE(systemState_->isLoadingCache);
338+
backend_->cache().update({}, kSEQ, false);
329339
EXPECT_CALL(*backend_, hardFetchLedgerRange)
330340
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
331-
EXPECT_CALL(*monitorProvider_, make).WillOnce([](auto, auto, auto, auto, auto) {
332-
return std::make_unique<testing::NiceMock<MockMonitor>>();
333-
});
341+
EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, kSEQ + 1, testing::_))
342+
.WillOnce([this](auto, auto, auto, auto, auto) {
343+
EXPECT_TRUE(systemState_->isLoadingCache);
344+
return std::make_unique<testing::NiceMock<MockMonitor>>();
345+
});
346+
EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ));
347+
EXPECT_CALL(*cacheLoader_, load(kSEQ));
348+
349+
service_.run();
350+
}
351+
352+
TEST_F(ETLServiceTests, SyncCacheWithDbBeforeStartingMonitor)
353+
{
354+
EXPECT_TRUE(systemState_->isLoadingCache);
355+
backend_->cache().update({}, kSEQ - 2, false);
356+
EXPECT_CALL(*backend_, hardFetchLedgerRange)
357+
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
358+
359+
EXPECT_CALL(*backend_, fetchLedgerDiff(kSEQ - 1, testing::_));
360+
EXPECT_CALL(*cacheUpdater_, update(kSEQ - 1, std::vector<data::LedgerObject>()))
361+
.WillOnce([this](auto const seq, auto&&...) { backend_->cache().update({}, seq, false); });
362+
EXPECT_CALL(*backend_, fetchLedgerDiff(kSEQ, testing::_));
363+
EXPECT_CALL(*cacheUpdater_, update(kSEQ, std::vector<data::LedgerObject>()))
364+
.WillOnce([this](auto const seq, auto&&...) { backend_->cache().update({}, seq, false); });
365+
366+
EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, kSEQ + 1, testing::_))
367+
.WillOnce([this](auto, auto, auto, auto, auto) {
368+
EXPECT_TRUE(systemState_->isLoadingCache);
369+
return std::make_unique<testing::NiceMock<MockMonitor>>();
370+
});
334371
EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ));
335372
EXPECT_CALL(*cacheLoader_, load(kSEQ));
336373

@@ -368,8 +405,11 @@ TEST_F(ETLServiceTests, HandlesWriteConflictInMonitorSubscription)
368405
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled);
369406
EXPECT_CALL(mockMonitorRef, run);
370407

408+
// Set cache to be in sync with DB to avoid syncCacheWithDb loop
409+
backend_->cache().update({}, kSEQ, false);
371410
EXPECT_CALL(*backend_, hardFetchLedgerRange)
372-
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
411+
.Times(2)
412+
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
373413
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
374414
EXPECT_CALL(*cacheLoader_, load(kSEQ));
375415

@@ -401,8 +441,11 @@ TEST_F(ETLServiceTests, NormalFlowInMonitorSubscription)
401441
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled);
402442
EXPECT_CALL(mockMonitorRef, run);
403443

444+
// Set cache to be in sync with DB to avoid syncCacheWithDb loop
445+
backend_->cache().update({}, kSEQ, false);
404446
EXPECT_CALL(*backend_, hardFetchLedgerRange)
405-
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
447+
.Times(2)
448+
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
406449
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
407450
EXPECT_CALL(*cacheLoader_, load(kSEQ));
408451

@@ -439,6 +482,8 @@ TEST_F(ETLServiceTests, AttemptTakeoverWriter)
439482
});
440483
EXPECT_CALL(mockMonitorRef, run);
441484

485+
// Set cache to be in sync with DB to avoid syncCacheWithDb loop
486+
backend_->cache().update({}, kSEQ, false);
442487
EXPECT_CALL(*backend_, hardFetchLedgerRange)
443488
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
444489
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
@@ -482,8 +527,11 @@ TEST_F(ETLServiceTests, GiveUpWriterAfterWriteConflict)
482527
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled);
483528
EXPECT_CALL(mockMonitorRef, run);
484529

530+
// Set cache to be in sync with DB to avoid syncCacheWithDb loop
531+
backend_->cache().update({}, kSEQ, false);
485532
EXPECT_CALL(*backend_, hardFetchLedgerRange)
486-
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
533+
.Times(2)
534+
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
487535
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
488536
EXPECT_CALL(*cacheLoader_, load(kSEQ));
489537

@@ -568,6 +616,8 @@ TEST_F(ETLServiceTests, DbStalledDoesNotTriggerSignalWhenStrictReadonly)
568616
});
569617
EXPECT_CALL(mockMonitorRef, run);
570618

619+
// Set cache to be in sync with DB to avoid syncCacheWithDb loop
620+
backend_->cache().update({}, kSEQ, false);
571621
EXPECT_CALL(*backend_, hardFetchLedgerRange)
572622
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
573623
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
@@ -602,6 +652,8 @@ TEST_F(ETLServiceTests, DbStalledDoesNotTriggerSignalWhenAlreadyWriting)
602652
});
603653
EXPECT_CALL(mockMonitorRef, run);
604654

655+
// Set cache to be in sync with DB to avoid syncCacheWithDb loop
656+
backend_->cache().update({}, kSEQ, false);
605657
EXPECT_CALL(*backend_, hardFetchLedgerRange)
606658
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
607659
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
@@ -636,6 +688,8 @@ TEST_F(ETLServiceTests, CacheUpdatesDependOnActualCacheState_WriterMode)
636688
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled);
637689
EXPECT_CALL(mockMonitorRef, run);
638690

691+
// Set cache to be in sync with DB initially to avoid syncCacheWithDb loop
692+
backend_->cache().update({}, kSEQ, false);
639693
EXPECT_CALL(*backend_, hardFetchLedgerRange)
640694
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
641695
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
@@ -666,6 +720,8 @@ TEST_F(ETLServiceTests, OnlyCacheUpdatesWhenBackendIsCurrent)
666720
auto mockMonitor = std::make_unique<testing::NiceMock<MockMonitor>>();
667721
auto& mockMonitorRef = *mockMonitor;
668722
std::function<void(uint32_t)> capturedCallback;
723+
// Set cache to be in sync with DB initially to avoid syncCacheWithDb loop
724+
backend_->cache().update({}, kSEQ, false);
669725

670726
EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) {
671727
return std::move(mockMonitor);
@@ -679,6 +735,7 @@ TEST_F(ETLServiceTests, OnlyCacheUpdatesWhenBackendIsCurrent)
679735

680736
// Set backend range to be at kSEQ + 1 (already current)
681737
EXPECT_CALL(*backend_, hardFetchLedgerRange)
738+
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}))
682739
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}))
683740
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ + 1}));
684741
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
@@ -706,6 +763,8 @@ TEST_F(ETLServiceTests, NoUpdatesWhenBothCacheAndBackendAreCurrent)
706763
auto mockMonitor = std::make_unique<testing::NiceMock<MockMonitor>>();
707764
auto& mockMonitorRef = *mockMonitor;
708765
std::function<void(uint32_t)> capturedCallback;
766+
// Set cache to be in sync with DB initially to avoid syncCacheWithDb loop
767+
backend_->cache().update({}, kSEQ, false);
709768

710769
EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) {
711770
return std::move(mockMonitor);
@@ -719,6 +778,7 @@ TEST_F(ETLServiceTests, NoUpdatesWhenBothCacheAndBackendAreCurrent)
719778

720779
// Set backend range to be at kSEQ + 1 (already current)
721780
EXPECT_CALL(*backend_, hardFetchLedgerRange)
781+
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}))
722782
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}))
723783
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ + 1}));
724784
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
@@ -743,6 +803,8 @@ TEST_F(ETLServiceTests, NoUpdatesWhenBothCacheAndBackendAreCurrent)
743803
TEST_F(ETLServiceTests, StopWaitsForWriteCommandHandlersToComplete)
744804
{
745805
auto mockMonitor = std::make_unique<testing::NiceMock<MockMonitor>>();
806+
// Set cache to be in sync with DB to avoid syncCacheWithDb loop
807+
backend_->cache().update({}, kSEQ, false);
746808

747809
EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) {
748810
return std::move(mockMonitor);
@@ -769,7 +831,8 @@ TEST_F(ETLServiceTests, StopWaitsForWriteCommandHandlersToComplete)
769831
// Stop should wait for the handler to complete and disconnect the subscription
770832
service_.stop();
771833

772-
// The test will hang on stop() or in service_ destructor if there is a problem.
834+
// Verify stop() returned, meaning all handlers completed
835+
SUCCEED();
773836
}
774837

775838
TEST_F(ETLServiceTests, WriteConflictIsHandledImmediately_NotDelayed)
@@ -791,6 +854,8 @@ TEST_F(ETLServiceTests, WriteConflictIsHandledImmediately_NotDelayed)
791854
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled);
792855
EXPECT_CALL(mockMonitorRef, run);
793856

857+
// Set cache to be in sync with DB to avoid syncCacheWithDb loop
858+
backend_->cache().update({}, kSEQ, false);
794859
EXPECT_CALL(*backend_, hardFetchLedgerRange)
795860
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
796861
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
@@ -816,6 +881,8 @@ TEST_F(ETLServiceTests, WriteCommandsAreSerializedOnStrand)
816881
return std::move(mockMonitor);
817882
});
818883

884+
// Set cache to be in sync with DB to avoid syncCacheWithDb loop
885+
backend_->cache().update({}, kSEQ, false);
819886
EXPECT_CALL(*backend_, hardFetchLedgerRange)
820887
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
821888
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));

0 commit comments

Comments
 (0)