Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 98 additions & 106 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2913,19 +2913,11 @@ Result<TabletCDCCheckpointInfo> CDCServiceImpl::PopulateCDCSDKTabletCheckPointIn
const TabletId& input_tablet_id) {
TabletIdCDCCheckpointMap tablet_min_checkpoint_map;

Status iteration_status;
auto table_range = VERIFY_RESULT(cdc_state_table_->GetTableRange(
CDCStateTableEntrySelector().IncludeCheckpoint().IncludeLastReplicationTime().IncludeData(),
&iteration_status));

for (auto entry_result : table_range) {
if (!entry_result) {
LOG(WARNING) << "Populate tablet checkpoint failed for row. " << entry_result.status();
continue;
}

const auto& entry = *entry_result;
auto entries = VERIFY_RESULT(cdc_state_table_->FetchEntriesForTablet(
input_tablet_id,
CDCStateTableEntrySelector().IncludeCheckpoint().IncludeLastReplicationTime().IncludeData()));

for (const auto& entry : entries) {
// We ignore rows added for colocated tables.
if (!entry.key.colocated_table_id.empty()) {
continue;
Expand All @@ -2934,10 +2926,6 @@ Result<TabletCDCCheckpointInfo> CDCServiceImpl::PopulateCDCSDKTabletCheckPointIn
const auto& stream_id = entry.key.stream_id;
const auto& tablet_id = entry.key.tablet_id;

if (input_tablet_id != tablet_id) {
continue;
}

auto tablet_peer = context_->LookupTablet(tablet_id);
if (!tablet_peer) {
LOG(WARNING) << "Could not find tablet peer for tablet_id: " << tablet_id
Expand All @@ -2962,8 +2950,6 @@ Result<TabletCDCCheckpointInfo> CDCServiceImpl::PopulateCDCSDKTabletCheckPointIn
ProcessEntryForCdcsdk(entry, stream_metadata, tablet_peer, tablet_min_checkpoint_map);
}

RETURN_NOT_OK(iteration_status);

auto it = FindOrNull(tablet_min_checkpoint_map, input_tablet_id);
SCHECK(it != nullptr, NotFound,
Format("Failed to populate CDCSDK checkpoint info for tablet: $0", input_tablet_id));
Expand Down Expand Up @@ -3039,119 +3025,125 @@ Status CDCServiceImpl::PopulateTabletCheckPointInfo(
TabletIdStreamIdSet& tablet_stream_to_be_deleted, StreamIdSet& slot_entries_to_be_deleted,
TableIdToStreamIdMap& expired_tables_map) {
std::unordered_set<xrepl::StreamId> refreshed_metadata_set;

StreamIdSet streams_with_tablet_entries_to_be_deleted;
int count = 0;
Status iteration_status;
auto table_range = VERIFY_RESULT(cdc_state_table_->GetTableRange(
CDCStateTableEntrySelector().IncludeCheckpoint().IncludeLastReplicationTime().IncludeData(),
&iteration_status));

// Get the minimum record_id_commit_time for each namespace by looking at all the slot entries.
// Build the set of tablet_ids that this tserver is actively tracking for CDC.
std::unordered_set<TabletId> local_tablet_ids;
for (const auto& it : impl_->TabletCheckpointsCopy()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who populates TabletCheckpointsCopy?

I am not sure if this works with xCluster which expects the entry to exist before the GetChanges request comes in. cc @hulien22

local_tablet_ids.insert(it.producer_tablet_info.tablet_id);
}

// Optionally compute namespace min record_id_commit_time from slot entries only once.
Result<std::unordered_map<NamespaceId, uint64_t>> namespace_to_min_record_id_commit_time =
std::unordered_map<NamespaceId, uint64_t>();
StreamIdSet streams_with_tablet_entries_to_be_deleted;
if (FLAGS_ysql_yb_enable_replication_slot_consumption) {
namespace_to_min_record_id_commit_time = GetNamespaceMinRecordIdCommitTimeMap(
table_range, &iteration_status, &slot_entries_to_be_deleted);
LOG_IF(WARNING, !namespace_to_min_record_id_commit_time.ok())
<< "Failed to get namespace_to_min_record_id_commit_time: "
<< namespace_to_min_record_id_commit_time.status();
}

for (auto entry_result : table_range) {
if (!entry_result) {
LOG(WARNING) << "Populate tablet checkpoint failed for row. " << entry_result.status();
continue;
auto slot_entries = VERIFY_RESULT(cdc_state_table_->FetchEntriesForTablet(
kCDCSDKSlotEntryTabletId,
CDCStateTableEntrySelector().IncludeCheckpoint().IncludeData()));
std::unordered_map<NamespaceId, uint64_t> ns_min_map;
for (const auto& entry : slot_entries) {
const auto& stream_id = entry.key.stream_id;
if (entry.checkpoint && *entry.checkpoint == OpId::Max()) {
LOG(INFO) << "Stream : " << stream_id << " is being deleted";
slot_entries_to_be_deleted.insert(stream_id);
continue;
}
RSTATUS_DCHECK(
entry.record_id_commit_time.has_value(), InternalError,
Format(
"The slot entry for the stream $0 did not have a value for record_id_commit_time",
stream_id),
Comment on lines +3052 to +3056
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RSTATUS_DCHECK(
entry.record_id_commit_time.has_value(), InternalError,
Format(
"The slot entry for the stream $0 did not have a value for record_id_commit_time",
stream_id),
if (!entry.record_id_commit_time.has_value() {
LOG(DFATAL) << "The slot entry for the stream " << stream_id << " did not have a value for record_id_commit_time";
continue;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont want one corrupted stream to impact the rest of the sytem

stream_id);
auto stream_metadata = VERIFY_RESULT(GetStream(stream_id));
auto namespace_id = (*stream_metadata).GetNamespaceId();
auto record_id_commit_time = *entry.record_id_commit_time;
auto [it, inserted] = ns_min_map.try_emplace(namespace_id, record_id_commit_time);
if (!inserted) {
it->second = std::min(it->second, record_id_commit_time);
}
}
namespace_to_min_record_id_commit_time = std::move(ns_min_map);
}

const auto& entry = *entry_result;
// For each local tablet, fetch and process only its entries.
for (const auto& tablet_id : local_tablet_ids) {
auto entries = VERIFY_RESULT(cdc_state_table_->FetchEntriesForTablet(
tablet_id, CDCStateTableEntrySelector().IncludeCheckpoint().IncludeLastReplicationTime().IncludeData()));

// We ignore rows added for colocated tables.
if (!entry.key.colocated_table_id.empty()) {
continue;
}
for (const auto& entry : entries) {
// We ignore rows added for colocated tables.
if (!entry.key.colocated_table_id.empty()) {
continue;
}

const auto& stream_id = entry.key.stream_id;
const auto& tablet_id = entry.key.tablet_id;
count++;
const auto& stream_id = entry.key.stream_id;
count++;

// kCDCSDKSlotEntryTabletId represent cdc_state entry for a replication slot. The required
// information from slot entry has been extracted and stored into
// 'namespace_to_min_record_id_commit_time' map.
if (tablet_id == kCDCSDKSlotEntryTabletId) {
continue;
}
RefreshStreamMapOption refresh_option = RefreshStreamMapOption::kNone;
if (refreshed_metadata_set.find(stream_id) == refreshed_metadata_set.end()) {
refresh_option = RefreshStreamMapOption::kIfInitiatedState;
}
auto get_stream_metadata = GetStream(stream_id, refresh_option);
if (!get_stream_metadata.ok()) {
LOG(WARNING) << "Read invalid stream id: " << stream_id << " for tablet " << tablet_id
<< ": " << get_stream_metadata.status();
if (!cdcsdk_min_checkpoint_map.contains(tablet_id)) {
VLOG(2) << "We could not get the metadata for the stream: " << stream_id;
auto& tablet_info = cdcsdk_min_checkpoint_map[tablet_id];
tablet_info.cdc_op_id = OpId::Max();
tablet_info.cdc_sdk_op_id = OpId::Max();
tablet_info.cdc_sdk_safe_time = HybridTime::kInvalid;
}
if (get_stream_metadata.status().IsNotFound()) {
VLOG(2) << "We will remove the entry for the stream: " << stream_id
<< ", from cdc_state table.";
tablet_stream_to_be_deleted.insert({tablet_id, stream_id});
streams_with_tablet_entries_to_be_deleted.insert(stream_id);
RemoveStreamFromCache(stream_id);
}
continue;
}

RefreshStreamMapOption refresh_option = RefreshStreamMapOption::kNone;
if (refreshed_metadata_set.find(stream_id) == refreshed_metadata_set.end()) {
refresh_option = RefreshStreamMapOption::kIfInitiatedState;
}
auto get_stream_metadata = GetStream(stream_id, refresh_option);
if (!get_stream_metadata.ok()) {
LOG(WARNING) << "Read invalid stream id: " << stream_id << " for tablet " << tablet_id << ": "
<< get_stream_metadata.status();
// The stream_id present in the cdc_state table was not found in the master cache, it means
// that the stream is deleted. To update the corresponding tablet PEERs, give an entry in
// cdcsdk_min_checkpoint_map which will update cdc_sdk_min_checkpoint_op_id to
// OpId::Max()(i.e no need to retain the intents.). And also mark the row to be deleted.
if (!cdcsdk_min_checkpoint_map.contains(tablet_id)) {
VLOG(2) << "We could not get the metadata for the stream: " << stream_id;
auto& tablet_info = cdcsdk_min_checkpoint_map[tablet_id];
tablet_info.cdc_op_id = OpId::Max();
tablet_info.cdc_sdk_op_id = OpId::Max();
tablet_info.cdc_sdk_safe_time = HybridTime::kInvalid;
StreamMetadata& stream_metadata = **get_stream_metadata;
if (stream_metadata.GetState() == master::SysCDCStreamEntryPB_State_INITIATED) {
refreshed_metadata_set.insert(stream_id);
}
if (get_stream_metadata.status().IsNotFound()) {

if (entry.checkpoint && *entry.checkpoint == OpId::Max()) {
VLOG(2) << "We will remove the entry for the stream: " << stream_id
<< ", from cdc_state table.";
<< ", tablet_id: " << tablet_id
<< ", from cdc_state table since it has OpId::Max().";
tablet_stream_to_be_deleted.insert({tablet_id, stream_id});
streams_with_tablet_entries_to_be_deleted.insert(stream_id);
RemoveStreamFromCache(stream_id);
}
continue;
}
StreamMetadata& stream_metadata = **get_stream_metadata;
// Refresh metadata from master for a stream in INITIATED state only once per round
if (stream_metadata.GetState() == master::SysCDCStreamEntryPB_State_INITIATED) {
refreshed_metadata_set.insert(stream_id);
}

// Add the {tablet_id, stream_id} pair to the set if its checkpoint is OpId::Max().
if (*entry.checkpoint == OpId::Max()) {
VLOG(2) << "We will remove the entry for the stream: " << stream_id
<< ", tablet_id: " << tablet_id
<< ", from cdc_state table since it has OpId::Max().";
tablet_stream_to_be_deleted.insert({tablet_id, stream_id});
streams_with_tablet_entries_to_be_deleted.insert(stream_id);
}
switch (stream_metadata.GetSourceType()) {
case CDCRequestSource::CDCSDK: {
if (!namespace_to_min_record_id_commit_time.ok()) {
continue;
}
auto tablet_peer = context_->LookupTablet(tablet_id);
if (!tablet_peer) {
VLOG(2) << "Could not find tablet peer for tablet_id: " << tablet_id
<< ". Will not update its peers in this round";
continue;
}

switch (stream_metadata.GetSourceType()) {
case CDCRequestSource::CDCSDK: {
if (!namespace_to_min_record_id_commit_time.ok()) {
ProcessEntryForCdcsdk(
entry, stream_metadata, tablet_peer, cdcsdk_min_checkpoint_map,
&slot_entries_to_be_deleted, *namespace_to_min_record_id_commit_time,
&expired_tables_map);
continue;
}
auto tablet_peer = context_->LookupTablet(tablet_id);
if (!tablet_peer) {
VLOG(2) << "Could not find tablet peer for tablet_id: " << tablet_id
<< ". Will not update its peers in this round";
case CDCRequestSource::XCLUSTER:
ProcessEntryForXCluster(entry, xcluster_tablet_min_opid_map);
continue;
}

ProcessEntryForCdcsdk(
entry, stream_metadata, tablet_peer, cdcsdk_min_checkpoint_map,
&slot_entries_to_be_deleted, *namespace_to_min_record_id_commit_time,
&expired_tables_map);
continue;
}
case CDCRequestSource::XCLUSTER:
ProcessEntryForXCluster(entry, xcluster_tablet_min_opid_map);
continue;
FATAL_INVALID_ENUM_VALUE(CDCRequestSource, stream_metadata.GetSourceType());
}
FATAL_INVALID_ENUM_VALUE(CDCRequestSource, stream_metadata.GetSourceType());
}

RETURN_NOT_OK(iteration_status);

// Delete the slot entry in the state table in the next pass of Update Peers and Metrics, if
// entries with valid tablet_id are being deleted in this pass. This will ensure that the slot
// entry is the last entry to be deleted from the state table for a particular stream.
Expand Down
56 changes: 56 additions & 0 deletions src/yb/cdc/cdc_state_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,62 @@ Result<std::optional<CDCStateTableEntry>> CDCStateTable::TryFetchEntry(
return entry;
}

Result<std::vector<CDCStateTableEntry>> CDCStateTable::FetchEntriesForTablet(
const TabletId& tablet_id, CDCStateTableEntrySelector&& field_filter) {
DCHECK(!tablet_id.empty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DCHECK(!tablet_id.empty());
RSTATUS_DCHECK(!tablet_id.empty(), IllegalState, "Invalid tablet id provided");


std::vector<std::string> columns;
// Always project key columns first, followed by requested columns, to reuse DeserializeRow.
columns.emplace_back(kCdcTabletId);
columns.emplace_back(kCdcStreamId);
MoveCollection(&field_filter.columns_, &columns);

VLOG_WITH_FUNC(1) << tablet_id << ", Columns: " << yb::ToString(columns);

auto cdc_table = VERIFY_RESULT(GetTable());
auto session = MakeSession();

std::vector<CDCStateTableEntry> results;

const auto read_op = cdc_table->NewReadOp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const auto read_op = cdc_table->NewReadOp();

auto* const req_read = read_op->mutable_request();
// Bind hash key to limit results to the specific tablet_id.
QLAddStringHashValue(req_read, tablet_id);
req_read->set_return_paging_state(true);
req_read->set_limit(1024);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1024 should be a gFlag so that we can configure it at runtime.

cdc_table->AddColumns(columns, req_read);

// Apply and page until done.
while (true) {
session->Apply(read_op);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
session->Apply(read_op);
auto read_op = cdc_table->NewReadOp();
session->Apply(read_op);

// TODO(async_flush): https://github.com/yugabyte/yugabyte-db/issues/12173
auto flush_status = session->TEST_FlushAndGetOpsErrors();
if (!flush_status.status.ok()) {
for (const auto& error : flush_status.errors) {
LOG_WITH_FUNC(WARNING) << "Failed operation: " << error->failed_op().ToString()
<< ", status: " << error->status();
}
RETURN_NOT_OK(flush_status.status);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RETURN_NOT_OK(flush_status.status);
return flush_status.status;

}

auto row_block = ql::RowsResult(read_op.get()).GetRowBlock();
RETURN_NOT_OK(row_block);
Comment on lines +784 to +785
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto row_block = ql::RowsResult(read_op.get()).GetRowBlock();
RETURN_NOT_OK(row_block);
auto row_block = VERIFY_RESULT(ql::RowsResult(read_op.get()).GetRowBlock());

const auto row_count = (*row_block)->row_count();
for (int i = 0; i < row_count; ++i) {
const auto& row = (*row_block)->row(i);
auto entry = VERIFY_RESULT(DeserializeRow(row, columns));
results.push_back(std::move(entry));
}

if (!read_op->response().has_paging_state()) {
break;
}
*req_read->mutable_paging_state() = read_op->response().paging_state();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
*req_read->mutable_paging_state() = read_op->response().paging_state();
*req_read->mutable_paging_state() = std::move(read_op->response().paging_state());

}

return results;
}

void CDCStateTable::Shutdown() {
shutdown_.SetShuttingDown();
}
Expand Down
5 changes: 5 additions & 0 deletions src/yb/cdc/cdc_state_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ class CDCStateTable {
Result<std::optional<CDCStateTableEntry>> TryFetchEntry(
const CDCStateTableKey& key, CDCStateTableEntrySelector&& field_filter = {}) EXCLUDES(mutex_);

// Get all rows for a given hash key (tablet_id). This avoids scanning the entire table and
// returns all clustering rows (streams) for that tablet_id.
Result<std::vector<CDCStateTableEntry>> FetchEntriesForTablet(
const TabletId& tablet_id, CDCStateTableEntrySelector&& field_filter = {}) EXCLUDES(mutex_);

void Shutdown();

private:
Expand Down
Loading