-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Optimize cdc_state checkpoint updates #29268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…pulating cdc tablet checkpoints
|
|
✅ Deploy Preview for infallible-bardeen-164bc9 ready!Built without sensitive environment variables
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
Thank you for submitting the diff. |
|
|
||
| Result<std::vector<CDCStateTableEntry>> CDCStateTable::FetchEntriesForTablet( | ||
| const TabletId& tablet_id, CDCStateTableEntrySelector&& field_filter) { | ||
| DCHECK(!tablet_id.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| DCHECK(!tablet_id.empty()); | |
| RSTATUS_DCHECK(!tablet_id.empty(), IllegalState, "Invalid tablet id provided"); |
| LOG_WITH_FUNC(WARNING) << "Failed operation: " << error->failed_op().ToString() | ||
| << ", status: " << error->status(); | ||
| } | ||
| RETURN_NOT_OK(flush_status.status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| RETURN_NOT_OK(flush_status.status); | |
| return flush_status.status; |
| auto row_block = ql::RowsResult(read_op.get()).GetRowBlock(); | ||
| RETURN_NOT_OK(row_block); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| auto row_block = ql::RowsResult(read_op.get()).GetRowBlock(); | |
| RETURN_NOT_OK(row_block); | |
| auto row_block = VERIFY_RESULT(ql::RowsResult(read_op.get()).GetRowBlock()); |
| // Bind hash key to limit results to the specific tablet_id. | ||
| QLAddStringHashValue(req_read, tablet_id); | ||
| req_read->set_return_paging_state(true); | ||
| req_read->set_limit(1024); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1024 should be a gFlag so that we can configure it at runtime.
| if (!read_op->response().has_paging_state()) { | ||
| break; | ||
| } | ||
| *req_read->mutable_paging_state() = read_op->response().paging_state(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| *req_read->mutable_paging_state() = read_op->response().paging_state(); | |
| *req_read->mutable_paging_state() = std::move(read_op->response().paging_state()); |
|
|
||
| std::vector<CDCStateTableEntry> results; | ||
|
|
||
| const auto read_op = cdc_table->NewReadOp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| const auto read_op = cdc_table->NewReadOp(); |
|
|
||
| // Apply and page until done. | ||
| while (true) { | ||
| session->Apply(read_op); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| session->Apply(read_op); | |
| auto read_op = cdc_table->NewReadOp(); | |
| session->Apply(read_op); |
| RSTATUS_DCHECK( | ||
| entry.record_id_commit_time.has_value(), InternalError, | ||
| Format( | ||
| "The slot entry for the stream $0 did not have a value for record_id_commit_time", | ||
| stream_id), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| RSTATUS_DCHECK( | |
| entry.record_id_commit_time.has_value(), InternalError, | |
| Format( | |
| "The slot entry for the stream $0 did not have a value for record_id_commit_time", | |
| stream_id), | |
| if (!entry.record_id_commit_time.has_value() { | |
| LOG(DFATAL) << "The slot entry for the stream " << stream_id << " did not have a value for record_id_commit_time"; | |
| continue; | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We dont want one corrupted stream to impact the rest of the sytem
| // Get the minimum record_id_commit_time for each namespace by looking at all the slot entries. | ||
| // Build the set of tablet_ids that this tserver is actively tracking for CDC. | ||
| std::unordered_set<TabletId> local_tablet_ids; | ||
| for (const auto& it : impl_->TabletCheckpointsCopy()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who populates TabletCheckpointsCopy?
I am not sure if this works with xCluster which expects the entry to exist before the GetChanges request comes in. cc @hulien22
Right now, the CDC service is scanning the entire
cdc_statetable on each tserver when updating checkpoint info. This causes a very large number of RocksDB ops when there are a significant amount of tservers in the cluster.To address this, these changes:
CDCStateTable::FetchEntriesForTablet(tablet_id, selector)to do hash-keyed reads (binds tablet_id) with paging.CDCServiceImpl::PopulateTabletCheckPointInfoto:FetchEntriesForTabletinstead of table-wide scan.FetchEntriesForTablet(kCDCSDKSlotEntryTabletId, ...)and compute per-namespace min record_id_commit_time.CDCServiceImpl::PopulateCDCSDKTabletCheckPointInfo(single-tablet path) to useFetchEntriesForTablet(input_tablet_id, ...)instead of scanning all rows.