|
25 | 25 | #include "reflection/adl.h" |
26 | 26 | #include "storage/batch_cache.h" |
27 | 27 | #include "storage/disk_log_impl.h" |
| 28 | +#include "storage/log_housekeeping_meta.h" |
28 | 29 | #include "storage/log_manager.h" |
29 | 30 | #include "storage/log_reader.h" |
30 | 31 | #include "storage/ntp_config.h" |
|
40 | 41 | #include "test_utils/randoms.h" |
41 | 42 | #include "test_utils/tmp_dir.h" |
42 | 43 | #include "utils/directory_walker.h" |
| 44 | +#include "utils/tristate.h" |
43 | 45 |
|
44 | 46 | #include <seastar/core/io_priority_class.hh> |
45 | 47 | #include <seastar/core/loop.hh> |
@@ -5378,3 +5380,116 @@ FIXTURE_TEST(dirty_ratio, storage_test_fixture) { |
5378 | 5380 | BOOST_REQUIRE_EQUAL(disk_log->closed_segment_bytes(), 0); |
5379 | 5381 | BOOST_REQUIRE_CLOSE(disk_log->dirty_ratio(), 0.0, tolerance); |
5380 | 5382 | } |
| 5383 | + |
| 5384 | +FIXTURE_TEST(compaction_scheduling, storage_test_fixture) { |
| 5385 | + using log_manager_accessor = storage::testing_details::log_manager_accessor; |
| 5386 | + storage::log_manager mgr = make_log_manager(); |
| 5387 | + info("Configuration: {}", mgr.config()); |
| 5388 | + auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); }); |
| 5389 | + std::vector<ss::shared_ptr<storage::log>> logs; |
| 5390 | + |
| 5391 | + using overrides_t = storage::ntp_config::default_overrides; |
| 5392 | + overrides_t ov; |
| 5393 | + ov.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction; |
| 5394 | + ov.min_cleanable_dirty_ratio = tristate<double>{0.2}; |
| 5395 | + |
| 5396 | + for (const auto& topic : {"tapioca", "cassava", "kudzu"}) { |
| 5397 | + auto ntp = model::ntp("kafka", topic, 0); |
| 5398 | + auto log |
| 5399 | + = mgr |
| 5400 | + .manage(storage::ntp_config( |
| 5401 | + ntp, mgr.config().base_dir, std::make_unique<overrides_t>(ov))) |
| 5402 | + .get(); |
| 5403 | + logs.push_back(log); |
| 5404 | + } |
| 5405 | + |
| 5406 | + auto& meta_list = log_manager_accessor::logs_list(mgr); |
| 5407 | + |
| 5408 | + using bflags = storage::log_housekeeping_meta::bitflags; |
| 5409 | + |
| 5410 | + static constexpr auto is_set = [](bflags var, auto flag) { |
| 5411 | + return (var & flag) == flag; |
| 5412 | + }; |
| 5413 | + |
| 5414 | + // Floating point comparison tolerance |
| 5415 | + static constexpr auto tol = 1.0e-6; |
| 5416 | + |
| 5417 | + auto append_and_force_roll = [this](auto& log, int num_batches = 10) { |
| 5418 | + auto headers = append_random_batches<linear_int_kv_batch_generator>( |
| 5419 | + log, num_batches); |
| 5420 | + log->force_roll(ss::default_priority_class()).get(); |
| 5421 | + }; |
| 5422 | + |
| 5423 | + // Attempt a housekeeping scan with no partitions to compact |
| 5424 | + log_manager_accessor::housekeeping_scan(mgr).get(); |
| 5425 | + |
| 5426 | + for (const auto& meta : meta_list) { |
| 5427 | + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); |
| 5428 | + BOOST_REQUIRE(!is_set(meta.flags, bflags::compacted)); |
| 5429 | + } |
| 5430 | + |
| 5431 | + // Append batches and force roll with first log- it should be the only one |
| 5432 | + // compacted |
| 5433 | + append_and_force_roll(logs[0], 30); |
| 5434 | + BOOST_REQUIRE_CLOSE(logs[0]->dirty_ratio(), 1.0, tol); |
| 5435 | + |
| 5436 | + log_manager_accessor::housekeeping_scan(mgr).get(); |
| 5437 | + |
| 5438 | + for (const auto& meta : meta_list) { |
| 5439 | + bool expect_compacted = meta.handle->config().ntp() |
| 5440 | + == logs[0]->config().ntp(); |
| 5441 | + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); |
| 5442 | + BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked)); |
| 5443 | + BOOST_REQUIRE_EQUAL( |
| 5444 | + expect_compacted, is_set(meta.flags, bflags::compacted)); |
| 5445 | + auto batches = read_and_validate_all_batches(logs[0]); |
| 5446 | + linear_int_kv_batch_generator::validate_post_compaction( |
| 5447 | + std::move(batches)); |
| 5448 | + } |
| 5449 | + |
| 5450 | + // Append fewer batches and force roll with second log- it should be the |
| 5451 | + // only one compacted |
| 5452 | + append_and_force_roll(logs[1], 20); |
| 5453 | + BOOST_REQUIRE_CLOSE(logs[1]->dirty_ratio(), 1.0, tol); |
| 5454 | + |
| 5455 | + log_manager_accessor::housekeeping_scan(mgr).get(); |
| 5456 | + |
| 5457 | + for (const auto& meta : meta_list) { |
| 5458 | + bool expect_compacted = meta.handle->config().ntp() |
| 5459 | + == logs[1]->config().ntp(); |
| 5460 | + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); |
| 5461 | + BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked)); |
| 5462 | + BOOST_REQUIRE_EQUAL( |
| 5463 | + expect_compacted, is_set(meta.flags, bflags::compacted)); |
| 5464 | + auto batches = read_and_validate_all_batches(logs[1]); |
| 5465 | + linear_int_kv_batch_generator::validate_post_compaction( |
| 5466 | + std::move(batches)); |
| 5467 | + } |
| 5468 | + |
| 5469 | + // Append batches and force roll all logs- all of them will be compacted |
| 5470 | + for (auto& log : logs) { |
| 5471 | + append_and_force_roll(log, 10); |
| 5472 | + } |
| 5473 | + |
| 5474 | + BOOST_REQUIRE_GE(logs[0]->dirty_ratio(), 1.0 / 3.0); |
| 5475 | + BOOST_REQUIRE_GE(logs[1]->dirty_ratio(), 1.0 / 2.0); |
| 5476 | + BOOST_REQUIRE_CLOSE(logs[2]->dirty_ratio(), 1.0, tol); |
| 5477 | + |
| 5478 | + log_manager_accessor::housekeeping_scan(mgr).get(); |
| 5479 | + |
| 5480 | + // Logs in the meta list will be ordered w/r/t their dirty ratio |
| 5481 | + // (descending) post compaction |
| 5482 | + auto log_it = logs.rbegin(); |
| 5483 | + for (const auto& meta : meta_list) { |
| 5484 | + BOOST_REQUIRE(is_set(meta.flags, bflags::lifetime_checked)); |
| 5485 | + BOOST_REQUIRE(is_set(meta.flags, bflags::compaction_checked)); |
| 5486 | + BOOST_REQUIRE(is_set(meta.flags, bflags::compacted)); |
| 5487 | + BOOST_REQUIRE_EQUAL( |
| 5488 | + meta.handle->config().ntp(), (*log_it)->config().ntp()); |
| 5489 | + ++log_it; |
| 5490 | + } |
| 5491 | + |
| 5492 | + for (auto& log : logs) { |
| 5493 | + auto batches = read_and_validate_all_batches(log); |
| 5494 | + } |
| 5495 | +}; |
0 commit comments