-
Notifications
You must be signed in to change notification settings - Fork 298
sink(ticdc): waiting the asynchronous ddl to synchronize (#12123) #12466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release-8.5
Are you sure you want to change the base?
sink(ticdc): waiting the asynchronous ddl to synchronize (#12123) #12466
Conversation
Signed-off-by: ti-chi-bot <[email protected]>
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@wk989898 This PR has conflicts, I have hold it. |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @ti-chi-bot, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the robustness of DDL (Data Definition Language) replication in TiCDC. It addresses a critical issue where long-running DDL operations, particularly in TiDB environments, could lead to connection timeouts and perceived failures, even if the DDL was successfully applied downstream. The changes introduce a proactive waiting mechanism that queries the downstream TiDB's DDL job status, ensuring that TiCDC accurately reflects the completion of DDLs and maintains data consistency, especially under network instability or during prolonged schema changes. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the DDL sink logic to handle long-running DDLs that might cause connection losses. Instead of failing, it now waits for the DDL to complete by polling its status from the downstream TiDB. This is a solid improvement for the sink's robustness. The overall implementation is good, but I've found a critical issue with merge conflict markers in a configuration file, a high-severity SQL injection vulnerability, and a few medium-severity issues related to error handling and code clarity that should be addressed.
| <<<<<<< HEAD | ||
| ======= | ||
| ["CDC:ErrDDLStateNotFound"] | ||
| error = ''' | ||
| DDL state not found %s | ||
| ''' | ||
|
|
||
| ["CDC:ErrDDLUnsupportType"] | ||
| error = ''' | ||
| unsupport ddl type %s, query %s | ||
| ''' | ||
|
|
||
| >>>>>>> d9a7e9f313 (sink(ticdc): waiting the asynchronous ddl to synchronize (#12123)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are merge conflict markers (<<<<<<< HEAD, =======, >>>>>>>) in this file. This will break the TOML parsing and cause the application to fail. Please remove them and keep the intended error definitions.
["CDC:ErrDDLStateNotFound"]
error = '''
DDL state not found %s
'''
["CDC:ErrDDLUnsupportType"]
error = '''
unsupport ddl type %s, query %s
'''
| showJobs := fmt.Sprintf(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs | ||
| WHERE CREATE_TIME >= "%s" AND QUERY = "%s";`, createTime, ddl) | ||
| //nolint:rowserrcheck | ||
| jobsRows, err := db.QueryContext(ctx, showJobs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Constructing SQL queries using fmt.Sprintf with external data (like the DDL query string) is vulnerable to SQL injection. Even if the ddl string is expected to be safe, it's a security best practice to always use parameterized queries to prevent potential vulnerabilities.
Please modify this to use query parameters.
showJobs := `SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs
WHERE CREATE_TIME >= ? AND QUERY = ?;`
//nolint:rowserrcheck
jobsRows, err := db.QueryContext(ctx, showJobs, createTime, ddl)|
|
||
| func (m *DDLSink) checkAsyncExecDDLDone(ctx context.Context, tables map[model.TableName]struct{}) bool { | ||
| ctx, cancel := context.WithTimeout(ctx, timeout) | ||
| ctx, cancel := context.WithTimeout(ctx, 5*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| log.Warn("Wait the asynchronous ddl to synchronize", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime), | ||
| zap.String("readTimeout", m.cfg.ReadTimeout), zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log message "Wait the asynchronous ddl to synchronize" is grammatically awkward. Consider rephrasing it for clarity, for example: "Waiting for asynchronous DDL to be synchronized".
| log.Warn("Wait the asynchronous ddl to synchronize", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime), | |
| zap.String("readTimeout", m.cfg.ReadTimeout), zap.Error(err)) | |
| log.Warn("Waiting for asynchronous DDL to be synchronized", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime), | |
| zap.String("readTimeout", m.cfg.ReadTimeout), zap.Error(err)) |
| state, err := getDDLStateFromTiDB(ctx, m.db, ddl.Query, ddlCreateTime) | ||
| if err != nil { | ||
| log.Error("Error when getting DDL state from TiDB", zap.Error(err)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error returned from getDDLStateFromTiDB is logged but not handled. The function continues execution with a zero-value state, which leads to a generic ErrDDLStateNotFound being returned. This loses the original error context, making debugging more difficult. The original error should be returned to be handled by the retry mechanism.
state, err := getDDLStateFromTiDB(ctx, m.db, ddl.Query, ddlCreateTime)
if err != nil {
log.Error("Error when getting DDL state from TiDB", zap.Error(err))
return err
}| for row.Next() { | ||
| err = row.Scan(&ddlCreateTime) | ||
| if err != nil { | ||
| log.Warn("getting ddlCreateTime failed", zap.Error(err)) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## release-8.5 #12466 +/- ##
================================================
Coverage ? 50.8469%
================================================
Files ? 484
Lines ? 69666
Branches ? 0
================================================
Hits ? 35423
Misses ? 31487
Partials ? 2756 🚀 New features to boost your workflow:
|
|
@ti-chi-bot: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
This is an automated cherry-pick of #12123
What problem does this PR solve?
Issue Number: close #12114 close #12128
What is changed and how it works?
When a DDL operation takes a long time, the connection may be lost, even though the DDL is actually executed. Instead of throwing a failure state, we can query the downstream and wait for the DDL to complete.
Notice:
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note