Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 82 additions & 19 deletions be/src/service/service_be/lake_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,62 @@ void LakeServiceImpl::vacuum_full(::google::protobuf::RpcController* controller,
latch.wait();
}

// Check missing files, like segment, delete vector, pk index sst, cols file
static Status check_missing_files(const TabletMetadata& metadata, const lake::TabletManager* tablet_mgr,
::starrocks::TabletMetadataEntry* entry) {
std::unordered_set<std::string> missing_files;
std::shared_ptr<FileSystem> fs = nullptr;
auto check_file = [&](const std::string& path, const std::string& filename) -> Status {
if (fs == nullptr) {
ASSIGN_OR_RETURN(fs, FileSystem::CreateSharedFromString(path));
}
auto st = fs->path_exists(path);
if (st.is_not_found()) {
missing_files.emplace(filename);
} else if (!st.ok()) {
return st;
}
return Status::OK();
};

auto tablet_id = metadata.id();

// segment
for (const auto& rowset : metadata.rowsets()) {
for (const auto& seg_name : rowset.segments()) {
RETURN_IF_ERROR(check_file(tablet_mgr->segment_location(tablet_id, seg_name), seg_name));
}
}

// delete vector
if (metadata.has_delvec_meta()) {
for (const auto& [_, file_meta] : metadata.delvec_meta().version_to_file()) {
RETURN_IF_ERROR(check_file(tablet_mgr->delvec_location(tablet_id, file_meta.name()), file_meta.name()));
}
}

// pk index sst
if (metadata.has_sstable_meta()) {
for (const auto& sst : metadata.sstable_meta().sstables()) {
RETURN_IF_ERROR(check_file(tablet_mgr->sst_location(tablet_id, sst.filename()), sst.filename()));
}
}

// cols
if (metadata.has_dcg_meta()) {
for (const auto& [_, dcg_ver] : metadata.dcg_meta().dcgs()) {
for (const auto& filename : dcg_ver.column_files()) {
RETURN_IF_ERROR(check_file(tablet_mgr->segment_location(tablet_id, filename), filename));
}
}
}

for (const auto& filename : missing_files) {
entry->add_missing_files(filename);
}
return Status::OK();
}

// Get metadatas for a list of tablets within a specified version range.
// This function supports concurrent processing of tablet metadata fetch tasks.
void LakeServiceImpl::get_tablet_metadatas(::google::protobuf::RpcController* controller,
Expand Down Expand Up @@ -1426,58 +1482,65 @@ void LakeServiceImpl::get_tablet_metadatas(::google::protobuf::RpcController* co
auto latch = BThreadCountDownLatch(request->tablet_ids_size());
int64_t max_version = request->max_version();
int64_t min_version = request->min_version();
bool enable_check_missing_files = request->has_check_missing_files() && request->check_missing_files();

response->mutable_tablet_metadatas()->Reserve(request->tablet_ids_size());
response->mutable_tablet_results()->Reserve(request->tablet_ids_size());
for (int i = 0; i < request->tablet_ids_size(); ++i) {
response->add_tablet_metadatas();
response->add_tablet_results();
}

// traverse each tablet_id and submit get tablet metadatas task
for (int i = 0; i < request->tablet_ids_size(); ++i) {
auto tablet_id = request->tablet_ids(i);
auto* tablet_metadatas = response->mutable_tablet_metadatas(i);
tablet_metadatas->set_tablet_id(tablet_id);
auto* tablet_result = response->mutable_tablet_results(i);
tablet_result->set_tablet_id(tablet_id);

auto task = std::make_shared<CancellableRunnable>(
[&, tablet_id, max_version, min_version, tablet_metadatas] {
[&, tablet_id, max_version, min_version, tablet_result] {
DeferOp defer([&] { latch.count_down(); });

// get tablet metadatas within the specified version range
std::vector<TabletMetadataPtr> metadatas;
for (int64_t version = max_version; version >= min_version; --version) {
// don't fill meta cache to avoid polluting the cache
lake::CacheOptions cache_opts{.fill_meta_cache = false, .fill_data_cache = true};
auto tablet_metadata_or = _tablet_mgr->get_tablet_metadata(tablet_id, version, cache_opts);
const auto& st = tablet_metadata_or.status();
if (st.ok()) {
metadatas.emplace_back(std::move(tablet_metadata_or).value());
const auto& tablet_metadata = tablet_metadata_or.value();
auto* entry = tablet_result->add_metadata_entries();
entry->mutable_metadata()->CopyFrom(*tablet_metadata);

if (enable_check_missing_files) {
auto check_st = check_missing_files(*tablet_metadata, _tablet_mgr, entry);
if (!check_st.ok()) {
check_st.to_protobuf(tablet_result->mutable_status());
return;
}
}
} else if (!st.is_not_found()) {
st.to_protobuf(tablet_metadatas->mutable_status());
st.to_protobuf(tablet_result->mutable_status());
return;
}
}

if (metadatas.empty()) {
if (tablet_result->metadata_entries_size() > 0) {
Status::OK().to_protobuf(tablet_result->mutable_status());
} else {
auto st = Status::NotFound(fmt::format("tablet {} metadata not found in version range [{}, {}]",
tablet_id, min_version, max_version));
st.to_protobuf(tablet_metadatas->mutable_status());
} else {
Status::OK().to_protobuf(tablet_metadatas->mutable_status());
for (const auto& metadata : metadatas) {
(*tablet_metadatas->mutable_version_metadatas())[metadata->version()].CopyFrom(*metadata);
}
st.to_protobuf(tablet_result->mutable_status());
}
},
[&, tablet_id, tablet_metadatas] {
[&, tablet_id, tablet_result] {
auto st = Status::Cancelled(
fmt::format("get tablet metadatas task has been cancelled. tablet: {}", tablet_id));
st.to_protobuf(tablet_metadatas->mutable_status());
st.to_protobuf(tablet_result->mutable_status());
latch.count_down();
});

auto st = thread_pool->submit(std::move(task));
if (!st.ok()) {
st.to_protobuf(tablet_metadatas->mutable_status());
st.to_protobuf(tablet_result->mutable_status());
latch.count_down();
}
}
Expand All @@ -1487,7 +1550,7 @@ void LakeServiceImpl::get_tablet_metadatas(::google::protobuf::RpcController* co
// add a warning log if any tablets fail, show the first 10 failed tablets
std::vector<std::string> messages;
size_t failed_count = 0;
for (const auto& tm : response->tablet_metadatas()) {
for (const auto& tm : response->tablet_results()) {
if (tm.status().status_code() != 0) {
++failed_count;
if (messages.size() < 10) {
Expand Down
139 changes: 104 additions & 35 deletions be/test/service/lake_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2787,6 +2787,27 @@ TEST_F(LakeServiceTest, test_task_cleared_in_thread_pool_queue) {
}

TEST_F(LakeServiceTest, test_get_tablet_metadatas) {
// Helper to check if a specific version exists in metadata_entries
auto has_version = [](const ::starrocks::TabletResult& tablet_result, int64_t version_to_find) {
for (const auto& entry : tablet_result.metadata_entries()) {
if (entry.metadata().version() == version_to_find) {
return true;
}
}
return false;
};

// Helper to get metadata entry for a specific version
auto get_metadata_entry = [](const ::starrocks::TabletResult& tablet_result,
int64_t version_to_find) -> const TabletMetadataEntry* {
for (const auto& entry : tablet_result.metadata_entries()) {
if (entry.metadata().version() == version_to_find) {
return &entry;
}
}
return nullptr;
};

// 0. setup: create tablet with version 1, 2, 3, 4
auto publish_version = [&](int64_t base_version, int64_t new_version) {
auto txn_log = generate_write_txn_log(1, 10 * new_version, 100 * new_version);
Expand Down Expand Up @@ -2893,14 +2914,18 @@ TEST_F(LakeServiceTest, test_get_tablet_metadatas) {

ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(0, response.status().status_code());
ASSERT_EQ(response.tablet_metadatas_size(), 1);
const auto& tablet_metadatas = response.tablet_metadatas(0);
ASSERT_EQ(tablet_metadatas.tablet_id(), _tablet_id);
ASSERT_EQ(tablet_metadatas.version_metadatas_size(), 2);
ASSERT_TRUE(tablet_metadatas.version_metadatas().contains(2));
ASSERT_TRUE(tablet_metadatas.version_metadatas().contains(3));
ASSERT_EQ(tablet_metadatas.version_metadatas().at(2).version(), 2);
ASSERT_EQ(tablet_metadatas.version_metadatas().at(3).version(), 3);
ASSERT_EQ(response.tablet_results_size(), 1);
const auto& tablet_result = response.tablet_results(0);
ASSERT_EQ(tablet_result.tablet_id(), _tablet_id);
ASSERT_EQ(tablet_result.metadata_entries_size(), 2);
ASSERT_TRUE(has_version(tablet_result, 2));
ASSERT_TRUE(has_version(tablet_result, 3));
auto* entry2 = get_metadata_entry(tablet_result, 2);
ASSERT_TRUE(entry2 != nullptr);
ASSERT_EQ(entry2->metadata().version(), 2);
auto* entry3 = get_metadata_entry(tablet_result, 3);
ASSERT_TRUE(entry3 != nullptr);
ASSERT_EQ(entry3->metadata().version(), 3);
}

// 3.2 tablet not found
Expand All @@ -2919,11 +2944,11 @@ TEST_F(LakeServiceTest, test_get_tablet_metadatas) {
ASSERT_FALSE(cntl.Failed());
// rpc-level status should be OK
ASSERT_EQ(0, response.status().status_code());
ASSERT_EQ(response.tablet_metadatas_size(), 1);
const auto& tablet_metadatas = response.tablet_metadatas(0);
ASSERT_EQ(tablet_metadatas.tablet_id(), non_existent_tablet_id);
ASSERT_EQ(TStatusCode::NOT_FOUND, tablet_metadatas.status().status_code());
ASSERT_TRUE(MatchPattern(tablet_metadatas.status().error_msgs(0),
ASSERT_EQ(response.tablet_results_size(), 1);
const auto& tablet_result = response.tablet_results(0);
ASSERT_EQ(tablet_result.tablet_id(), non_existent_tablet_id);
ASSERT_EQ(TStatusCode::NOT_FOUND, tablet_result.status().status_code());
ASSERT_TRUE(MatchPattern(tablet_result.status().error_msgs(0),
"tablet -1 metadata not found in version range [1, 2]"));
}

Expand All @@ -2943,15 +2968,15 @@ TEST_F(LakeServiceTest, test_get_tablet_metadatas) {
ASSERT_FALSE(cntl.Failed());
// rpc-level status should be OK
ASSERT_EQ(0, response.status().status_code());
ASSERT_EQ(response.tablet_metadatas_size(), 1);
const auto& tablet_metadatas = response.tablet_metadatas(0);
ASSERT_EQ(tablet_metadatas.tablet_id(), _tablet_id);
ASSERT_EQ(TStatusCode::OK, tablet_metadatas.status().status_code());
ASSERT_EQ(response.tablet_results_size(), 1);
const auto& tablet_result = response.tablet_results(0);
ASSERT_EQ(tablet_result.tablet_id(), _tablet_id);
ASSERT_EQ(TStatusCode::OK, tablet_result.status().status_code());

// should find version 3 and 4
ASSERT_EQ(tablet_metadatas.version_metadatas_size(), 2);
ASSERT_TRUE(tablet_metadatas.version_metadatas().contains(3));
ASSERT_TRUE(tablet_metadatas.version_metadatas().contains(4));
ASSERT_EQ(tablet_result.metadata_entries_size(), 2);
ASSERT_TRUE(has_version(tablet_result, 3));
ASSERT_TRUE(has_version(tablet_result, 4));
}

// 4. failed case
Expand All @@ -2977,11 +3002,11 @@ TEST_F(LakeServiceTest, test_get_tablet_metadatas) {
ASSERT_FALSE(cntl.Failed());
// rpc-level status should be OK even if one tablet fails
ASSERT_EQ(0, response.status().status_code());
ASSERT_EQ(response.tablet_metadatas_size(), 1);
const auto& tablet_metadatas = response.tablet_metadatas(0);
ASSERT_EQ(tablet_metadatas.tablet_id(), _tablet_id);
ASSERT_EQ(TStatusCode::IO_ERROR, tablet_metadatas.status().status_code());
ASSERT_TRUE(MatchPattern(tablet_metadatas.status().error_msgs(0), "injected get tablet metadata error"));
ASSERT_EQ(response.tablet_results_size(), 1);
const auto& tablet_result = response.tablet_results(0);
ASSERT_EQ(tablet_result.tablet_id(), _tablet_id);
ASSERT_EQ(TStatusCode::IO_ERROR, tablet_result.status().status_code());
ASSERT_TRUE(MatchPattern(tablet_result.status().error_msgs(0), "injected get tablet metadata error"));
}

// 4.2 thread pool is full
Expand All @@ -3003,10 +3028,10 @@ TEST_F(LakeServiceTest, test_get_tablet_metadatas) {
ASSERT_FALSE(cntl.Failed());
// rpc-level status should be OK even if one tablet fails
ASSERT_EQ(0, response.status().status_code());
ASSERT_EQ(response.tablet_metadatas_size(), 1);
const auto& tablet_metadatas = response.tablet_metadatas(0);
ASSERT_EQ(tablet_metadatas.tablet_id(), _tablet_id);
ASSERT_EQ(TStatusCode::SERVICE_UNAVAILABLE, tablet_metadatas.status().status_code());
ASSERT_EQ(response.tablet_results_size(), 1);
const auto& tablet_result = response.tablet_results(0);
ASSERT_EQ(tablet_result.tablet_id(), _tablet_id);
ASSERT_EQ(TStatusCode::SERVICE_UNAVAILABLE, tablet_result.status().status_code());
}

// 4.3 task cancelled
Expand Down Expand Up @@ -3040,12 +3065,56 @@ TEST_F(LakeServiceTest, test_get_tablet_metadatas) {
ASSERT_FALSE(cntl.Failed());
// rpc-level status should be OK even if one tablet fails
ASSERT_EQ(0, response.status().status_code());
ASSERT_EQ(response.tablet_metadatas_size(), 1);
const auto& tablet_metadatas = response.tablet_metadatas(0);
ASSERT_EQ(tablet_metadatas.tablet_id(), _tablet_id);
ASSERT_EQ(TStatusCode::CANCELLED, tablet_metadatas.status().status_code());
ASSERT_TRUE(MatchPattern(tablet_metadatas.status().error_msgs(0),
"*get tablet metadatas task has been cancelled*"));
ASSERT_EQ(response.tablet_results_size(), 1);
const auto& tablet_result = response.tablet_results(0);
ASSERT_EQ(tablet_result.tablet_id(), _tablet_id);
ASSERT_EQ(TStatusCode::CANCELLED, tablet_result.status().status_code());
ASSERT_TRUE(
MatchPattern(tablet_result.status().error_msgs(0), "*get tablet metadatas task has been cancelled*"));
}

// 5. check missing files
{
// 5.1 no missing files for version 4
brpc::Controller cntl;
GetTabletMetadatasRequest request;
GetTabletMetadatasResponse response;

request.add_tablet_ids(_tablet_id);
request.set_min_version(4);
request.set_max_version(4);
request.set_check_missing_files(true);
_lake_service.get_tablet_metadatas(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(TStatusCode::OK, response.status().status_code());
ASSERT_EQ(1, response.tablet_results_size());
ASSERT_EQ(_tablet_id, response.tablet_results(0).tablet_id());
ASSERT_EQ(1, response.tablet_results(0).metadata_entries_size());
const auto& entry = response.tablet_results(0).metadata_entries(0);
ASSERT_EQ(0, entry.missing_files_size());
const auto& metadata = entry.metadata();
ASSERT_EQ(3, metadata.rowsets_size());
ASSERT_EQ(1, metadata.rowsets(0).segments_size());
std::string seg_name = metadata.rowsets(0).segments(0);

response.Clear();
request.Clear();

// 5.2 missing segment files for version 4
// delete one segment file from version 4
ASSERT_OK(fs::remove(_tablet_mgr->segment_location(_tablet_id, seg_name)));
request.add_tablet_ids(_tablet_id);
request.set_min_version(4);
request.set_max_version(4);
request.set_check_missing_files(true);
_lake_service.get_tablet_metadatas(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(TStatusCode::OK, response.status().status_code());
ASSERT_EQ(1, response.tablet_results_size());
ASSERT_EQ(_tablet_id, response.tablet_results(0).tablet_id());
ASSERT_EQ(1, response.tablet_results(0).metadata_entries_size());
ASSERT_EQ(1, response.tablet_results(0).metadata_entries(0).missing_files_size());
ASSERT_EQ(seg_name, response.tablet_results(0).metadata_entries(0).missing_files(0));
}
}

Expand Down
Loading
Loading