Skip to content

Commit e212fd2

Browse files
author
Gerrit Code Review
committed
Merge "Merge branch 'couchbase/morpheus' into 'couchbase/master'"
2 parents 042798a + bd83d3a commit e212fd2

4 files changed

Lines changed: 90 additions & 90 deletions

File tree

engines/ep/src/ep_engine.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1805,7 +1805,8 @@ EventuallyPersistentEngine::EventuallyPersistentEngine(
18051805
startupTime(0),
18061806
taskable(this),
18071807
compressionMode(BucketCompressionMode::Off),
1808-
minCompressionRatio(default_min_compression_ratio) {
1808+
minCompressionRatio(default_min_compression_ratio),
1809+
unitTesting(getenv("MEMCACHED_UNIT_TESTS") != nullptr) {
18091810
// Note: The use of offsetof below is non-standard according to GCC 13.2
18101811
// because EventuallyPersistentEngine is not a standard layout type - and
18111812
// GCC warns about it:

engines/ep/src/ep_engine.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,6 +1266,11 @@ class EventuallyPersistentEngine : public EngineIface, public DcpIface {
12661266
*/
12671267
StatusAndVBPtr getValidVBucketFromString(std::string_view vbNum);
12681268

1269+
/// @return true if getenv("MEMCACHED_UNIT_TESTING") returns something
1270+
bool isUnitTesting() const {
1271+
return unitTesting;
1272+
}
1273+
12691274
protected:
12701275
friend class EpEngineValueChangeListener;
12711276

@@ -1908,6 +1913,9 @@ class EventuallyPersistentEngine : public EngineIface, public DcpIface {
19081913
// Chronicle auth token. Cached at bucket creation for passing down the
19091914
// information to KVStore instantiation.
19101915
folly::Synchronized<std::string> chronicleAuthToken;
1916+
1917+
// cached value of getenv("MEMCACHED_UNIT_TESTS") != nullptr
1918+
const bool unitTesting{false};
19111919
};
19121920

19131921
/**

engines/ep/src/item_pager.cc

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -387,14 +387,16 @@ void StrictQuotaItemPager::schedulePagingVisitors(std::size_t bytesToEvict) {
387387
// to visit is less than the desired concurrency.
388388
pagerSemaphore->release(numConcurrentPagers - partFilters.size());
389389
}
390+
391+
// disable visitor pausing in unit tests to make paging deterministic.
392+
const auto canPause = !engine->isUnitTesting();
390393
for (const auto& partFilter : partFilters) {
391-
auto pv = std::make_unique<ItemPagingVisitor>(
392-
*kvBucket,
393-
stats,
394-
makeEvictionStrategy(),
395-
pagerSemaphore,
396-
true, /* allow pausing between vbuckets */
397-
partFilter);
394+
auto pv = std::make_unique<ItemPagingVisitor>(*kvBucket,
395+
stats,
396+
makeEvictionStrategy(),
397+
pagerSemaphore,
398+
canPause,
399+
partFilter);
398400

399401
kvBucket->visitAsync(std::move(pv),
400402
"Item pager",

engines/ep/tests/module_tests/ephemeral_bucket_test.cc

Lines changed: 71 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,7 +1289,9 @@ class SingleThreadedEphemeralAutoDeleteThrottleTest
12891289
store->enableItemPager();
12901290

12911291
int numItems = 0;
1292-
std::string value = std::string(1024 * 20, 'v');
1292+
// Use a small valueSize so that deletes don't immediatley reduce memory
1293+
// very much.
1294+
std::string value = std::string(256, 'v');
12931295
while (pageableMemCurrent < pageableMemHighWatermark) {
12941296
const std::string key("key" + std::to_string(numItems));
12951297
auto item = make_item(vbid, makeStoredDocKey(key), value);
@@ -1346,8 +1348,22 @@ class SingleThreadedEphemeralAutoDeleteThrottleTest
13461348
}
13471349
};
13481350

1351+
// Test that auto-delete throttles when the limit is reached and unthrottles
1352+
// once DCP progresses
13491353
TEST_F(SingleThreadedEphemeralAutoDeleteThrottleTest, AutoDeleteThrottling) {
13501354
populateTillHighWatermark();
1355+
// ensure StrictQuotaItemPager::shouldStopPaging returns false by
1356+
// artificially lowering the watermarks so paging will not bail early
1357+
// intermitently. Particularly ensuring the following is false:
1358+
// "|| stats.getEstimatedTotalMemoryUsed() <= stats.mem_low_wat".
1359+
// This also influences the "limit" that StrictQuotaItemPager::canPageItems
1360+
// generates. That is ok as we are testing that the pager throttles due
1361+
// to DCP cursors not advancing.
1362+
auto& stats = engine->getEpStats();
1363+
stats.setHighWaterMark(stats.mem_high_wat / 2);
1364+
stats.setLowWaterMark(stats.mem_low_wat / 2);
1365+
ASSERT_GT(stats.getPreciseTotalMemoryUsed(), stats.mem_high_wat);
1366+
13511367
auto vb = store->getVBucket(vbid);
13521368

13531369
auto numItemsBefore = vb->getNumItems();
@@ -1367,102 +1383,75 @@ TEST_F(SingleThreadedEphemeralAutoDeleteThrottleTest, AutoDeleteThrottling) {
13671383
// The item pager task is scheduled when the high watermark is reached
13681384
// in storeInner(), don't need to wake-it up again.
13691385
// Run the item pager task
1386+
const auto fqSize = lpNonioQ.getFutureQueueSize();
13701387
runNextTask(lpNonioQ, "Paging out items.");
1388+
// One new task in future-queue
1389+
ASSERT_GT(lpNonioQ.getFutureQueueSize(), fqSize);
1390+
1391+
// Run until no more visitors are scheduled.
1392+
while (lpNonioQ.getFutureQueueSize() != fqSize) {
1393+
runNextTask(lpNonioQ);
1394+
}
13711395

1372-
// The item pager task schedules a visitor task to visit the items in the
1373-
// vbucket and see which item are eligible for eviction. Run the visitor
1374-
// task until completion
1375-
runNextTask(lpNonioQ);
1376-
runNextTask(lpNonioQ);
1377-
ASSERT_EQ(0, lpNonioQ.getReadyQueueSize());
13781396
ASSERT_LT(vb->getNumItems(), numItemsBefore);
13791397

1380-
// trying running the item pager task again
1381-
store->attemptToFreeMemory();
1398+
// PagingVisitor calls checkAndMaybeFreeMemory which will schedule
1399+
// paging again, this one gets throttled because the deleted items still
1400+
// exist (memory reduction hasn't really happened yet).
1401+
ASSERT_EQ(0, ephBucket.getPagingThrottled());
13821402
runNextTask(lpNonioQ, "Paging out items.");
1383-
// We should have been throttled now & therefore the
1384-
// ready queue should be empty - i.e no visitor tasks are scheduled.
1385-
ASSERT_EQ(0, lpNonioQ.getReadyQueueSize());
1386-
ASSERT_LE(1, ephBucket.getPagingThrottled());
1387-
}
1388-
1389-
TEST_F(SingleThreadedEphemeralAutoDeleteThrottleTest,
1390-
AutoDeleteThrottledUntilDcpCursorAdvances) {
1391-
populateTillHighWatermark();
1392-
auto vb = store->getVBucket(vbid);
1393-
auto numItemsBefore = vb->getNumItems();
1394-
auto& lpNonioQ = *task_executor->getLpTaskQ(TaskType::NonIO);
1395-
1396-
// create a new producer and stream ...
1397-
auto producer = std::make_shared<MockDcpProducer>(
1398-
*engine, cookie, "test_producer", cb::mcbp::DcpOpenFlag::None);
1399-
1400-
createDcpStream(*producer);
1401-
1402-
ASSERT_EQ(1, vb->checkpointManager->getNumCursors());
1403-
MockDcpMessageProducers producers;
1404-
1405-
notifyAndStepToCheckpoint(*producer, producers);
1403+
EXPECT_EQ(fqSize, lpNonioQ.getFutureQueueSize())
1404+
<< "mem_used:" << stats.getPreciseTotalMemoryUsed()
1405+
<< " hwm:" << stats.mem_high_wat << " lwm:" << stats.mem_low_wat
1406+
<< " numItems:" << vb->getNumItems();
1407+
EXPECT_EQ(1, ephBucket.getPagingThrottled())
1408+
<< "paged:" << ephBucket.getPagedBytes() << " bytes paged, max "
1409+
<< ephBucket.getMaxPagingBytes();
1410+
if (fqSize != lpNonioQ.getFutureQueueSize()) {
1411+
lpNonioQ.forEachFutureTask([](const ExTask& task) {
1412+
std::cerr << "Future task: " << task->getDescription() << std::endl;
1413+
});
1414+
}
14061415

1407-
auto& ephBucket = static_cast<EphemeralBucket&>(*store);
1408-
auto throttledBefore = ephBucket.getPagingThrottled();
1416+
// trying running the item pager task again
1417+
store->attemptToFreeMemory();
14091418

1410-
// Run the item pager task, scheduled when the high watermark was breached.
1411-
// The pager will delete items and set pagedSeqno. Since the DCP cursor
1412-
// hasn't consumed the deletions yet, canPagingResume() will return false
1413-
// and throttling will kick in during this run.
14141419
runNextTask(lpNonioQ, "Paging out items.");
1415-
// The pager schedules a visitor task
1416-
runNextTask(lpNonioQ, "Item pager no vbucket assigned");
1417-
// The visitor runs and may reschedule the pager.
1418-
// The ItemPagingVisitor uses a 25ms wall-clock budget
1419-
// (CappedDurationVBucketVisitor::maxChunkDuration). Under CPU load
1420-
// that budget can be exceeded mid hash-table-walk, in which case
1421-
// VBCBAdaptor::run() snoozes and returns true (rather than calling
1422-
// ItemPagingVisitor::complete()). The visitor task is left queued as
1423-
// "Item pager on vb:X" and the pager is not woken. Re-run any such
1424-
// paused visitor tasks until the pager becomes the next ready task.
1420+
// We should have been throttled now & therefore the
1421+
// future queue should return to the old size - i.e no visitor tasks are
1422+
// scheduled.
1423+
ASSERT_EQ(fqSize, lpNonioQ.getFutureQueueSize());
1424+
ASSERT_EQ(2, ephBucket.getPagingThrottled());
1425+
1426+
// Now if we drain DCP, we can test that the pager becomes unthrottled.
1427+
// Drain DCP, step through the marker and all items
1428+
auto& quickNonIO = *task_executor->getLpTaskQ(TaskType::QuickNonIO);
14251429
while (true) {
1426-
CheckedExecutor executor(task_executor, lpNonioQ);
1427-
if (executor.getTaskName() == "Paging out items.") {
1428-
executor.runCurrentTask("Paging out items.");
1429-
executor.completeCurrentTask();
1430+
auto status = producer->step(false, producers);
1431+
if (status == cb::engine_errc::would_block) {
1432+
// DCP rescheduled the ActiveStreamCheckpointProcessor
1433+
runNextTask(quickNonIO);
14301434
break;
14311435
}
1432-
ASSERT_TRUE(executor.getTaskName().starts_with("Item pager "))
1433-
<< "Unexpected task: " << executor.getTaskName();
1434-
executor.runCurrentTask();
1435-
executor.completeCurrentTask();
1436-
}
1437-
ASSERT_EQ(0, lpNonioQ.getReadyQueueSize());
1438-
ASSERT_LT(vb->getNumItems(), numItemsBefore);
14391436

1440-
// Throttling should have kicked in because the DCP cursor hasn't
1441-
// consumed the deletions (pagedSeqno) yet
1442-
ASSERT_GT(ephBucket.getPagingThrottled(), throttledBefore);
1443-
auto throttledAfterFirstRun = ephBucket.getPagingThrottled();
1437+
EXPECT_EQ(status, cb::engine_errc::success);
1438+
}
14441439

1445-
// Try to run the pager again - it should be throttled
1440+
// trying running the item pager task again
14461441
store->attemptToFreeMemory();
1447-
runNextTask(lpNonioQ, "Paging out items.");
1448-
ASSERT_EQ(0, lpNonioQ.getReadyQueueSize());
1449-
1450-
// Throttling count should have increased
1451-
auto throttledAfterSecondRun = ephBucket.getPagingThrottled();
1452-
ASSERT_GT(throttledAfterSecondRun, throttledAfterFirstRun);
1453-
1454-
// Now consume items through DCP producer to advance the cursor
1455-
// past the pagedSeqno
1456-
auto stepCount = numItemsBefore;
1457-
while (stepCount > 0) {
1458-
EXPECT_EQ(cb::engine_errc::success, producer->step(false, producers));
1459-
stepCount--;
1460-
}
14611442

14621443
// The producer has consumed all the items, no item pager
14631444
// task should be scheduled since throttling should still be in effect
14641445
// until the cursor sees the pagedSeqno
1465-
store->attemptToFreeMemory();
1466-
ASSERT_EQ(0, lpNonioQ.getReadyQueueSize());
1467-
ASSERT_EQ(ephBucket.getPagingThrottled(), throttledAfterSecondRun);
1468-
}
1446+
runNextTask(lpNonioQ, "Paging out items.");
1447+
// No more throttling and a new task is scheduled.
1448+
ASSERT_EQ(2, ephBucket.getPagingThrottled());
1449+
EXPECT_GT(lpNonioQ.getFutureQueueSize(), fqSize);
1450+
bool found = false;
1451+
lpNonioQ.forEachFutureTask([&found](const ExTask& task) {
1452+
if ("Item pager no vbucket assigned" == task->getDescription()) {
1453+
found = true;
1454+
}
1455+
});
1456+
EXPECT_TRUE(found) << "Expected to have found the item pager visitor task";
1457+
}

0 commit comments

Comments
 (0)