Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func (c *Controller) handleBootstrapResponses(ctx context.Context, responses map
}
}
c.finishBootstrap(ctx, runningCfs)
c.bootstrapper.ClearBootstrapResponses()
}

// handleMaintainerStatus handle the status report from the maintainers
Expand Down
1 change: 1 addition & 0 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func (c *coordinator) handleStateChange(
c.controller.updateChangefeedEpoch(ctx, event.changefeedID)
c.controller.moveChangefeedToSchedulingQueue(event.changefeedID, false, false)
case config.StateFailed, config.StateFinished:
failpoint.Inject("BlockBeforeStopChangefeed", func() {})
c.controller.operatorController.StopChangefeed(ctx, event.changefeedID, false)
default:
}
Expand Down
1 change: 1 addition & 0 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ func (m *Maintainer) onBootstrapResponses(responses map[node.ID]*heartbeatpb.Mai
m.handleError(err)
return
}
m.bootstrapper.ClearBootstrapResponses()

if postBootstrapRequest == nil {
return
Expand Down
23 changes: 16 additions & 7 deletions maintainer/maintainer_controller_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ func (c *Controller) FinishBootstrap(
zap.Int("nodeCount", len(allNodesResp)))

// Step 1: Determine start timestamp and update DDL dispatcher
startTs, redoStartTs := c.determineStartTs(allNodesResp)
startTs, redoStartTs, err := c.determineStartTs(allNodesResp)
if err != nil {
log.Error("can not determine the startTs from the bootstrap response",
zap.String("changefeed", c.changefeedID.Name()), zap.Error(err))
return nil, errors.Trace(err)
}
Comment on lines +91 to +96
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Replacing log.Panic with an error return is a critical improvement for the robustness of the system. Panicking can lead to unexpected service interruptions, whereas returning an error allows for graceful handling and recovery. The error message also provides clear context about the issue.

Comment on lines +91 to +96
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove panic on bootstrap start-ts resolution failure

Line 93 still panics, so the maintainer can crash in the exact failure path this PR is trying to harden. Return the error instead of panicking, and propagate it without re-wrapping at Line 95.

✅ Suggested fix
 	startTs, redoStartTs, err := c.determineStartTs(allNodesResp)
 	if err != nil {
-		log.Panic("cant not found the startTs from the bootstrap response",
-			zap.String("changefeed", c.changefeedID.Name()))
-		return nil, errors.Trace(err)
+		log.Error("cannot determine start ts from bootstrap response",
+			zap.Stringer("changefeed", c.changefeedID),
+			zap.Error(err))
+		return nil, err
 	}
As per coding guidelines "When an error comes from a third-party or library call in Go, wrap it immediately with `errors.Trace(err)` or `errors.WrapError(...)` to attach a stack trace; upstream callers should propagate wrapped errors without wrapping again".
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
startTs, redoStartTs, err := c.determineStartTs(allNodesResp)
if err != nil {
log.Panic("cant not found the startTs from the bootstrap response",
zap.String("changefeed", c.changefeedID.Name()))
return nil, errors.Trace(err)
}
startTs, redoStartTs, err := c.determineStartTs(allNodesResp)
if err != nil {
log.Error("cannot determine start ts from bootstrap response",
zap.Stringer("changefeed", c.changefeedID),
zap.Error(err))
return nil, err
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/maintainer_controller_bootstrap.go` around lines 91 - 96, The
current bootstrap code calls log.Panic when c.determineStartTs(allNodesResp)
fails and then returns errors.Trace(err); remove the panic and instead log the
failure with log.Error (including zap.String("changefeed",
c.changefeedID.Name())) and return the original err value (not re-wrapped) so
the caller can propagate the already-wrapped error from determineStartTs; update
the failing block around c.determineStartTs to perform a non-panicking log and
return nil, err.


// Step 2: Load tables from schema store
tables, err := c.loadTables(startTs)
Expand Down Expand Up @@ -144,7 +149,7 @@ func (c *Controller) FinishBootstrap(
}, nil
}

func (c *Controller) determineStartTs(allNodesResp map[node.ID]*heartbeatpb.MaintainerBootstrapResponse) (uint64, uint64) {
func (c *Controller) determineStartTs(allNodesResp map[node.ID]*heartbeatpb.MaintainerBootstrapResponse) (uint64, uint64, error) {
Comment thread
wk989898 marked this conversation as resolved.
var (
startTs uint64
redoStartTs uint64
Expand All @@ -170,14 +175,18 @@ func (c *Controller) determineStartTs(allNodesResp map[node.ID]*heartbeatpb.Main
}
}
if startTs == 0 {
log.Panic("cant not found the startTs from the bootstrap response",
zap.String("changefeed", c.changefeedID.Name()))
return 0, 0, errors.WrapError(
errors.ErrChangefeedInitTableTriggerDispatcherFailed,
errors.New("all bootstrap responses reported empty checkpointTs"),
)
Comment on lines 177 to +181
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This change correctly replaces a log.Panic with a structured error return. This is crucial for system stability, as it allows the application to handle the absence of startTs gracefully rather than crashing. The use of errors.WrapError provides a clear and traceable error message.

}
if c.enableRedo && redoStartTs == 0 {
log.Panic("cant not found the redoStartTs from the bootstrap response",
zap.String("changefeed", c.changefeedID.Name()))
return 0, 0, errors.WrapError(
errors.ErrChangefeedInitTableTriggerDispatcherFailed,
errors.New("all bootstrap responses reported empty redoCheckpointTs"),
)
Comment thread
wk989898 marked this conversation as resolved.
}
return startTs, redoStartTs
return startTs, redoStartTs, nil
Comment thread
wk989898 marked this conversation as resolved.
}

func (c *Controller) buildWorkingTaskMap(
Expand Down
33 changes: 33 additions & 0 deletions maintainer/maintainer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
appcontext "github.com/pingcap/ticdc/pkg/common/context"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/eventservice"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
Expand Down Expand Up @@ -1434,6 +1435,38 @@
require.Nil(t, postBootstrapRequest)
}

func TestFinishBootstrapReturnsErrorWhenCheckpointMissing(t *testing.T) {
testutil.SetUpTestServices()

Check failure on line 1439 in maintainer/maintainer_controller_test.go

View workflow job for this annotation

GitHub Actions / Next Gen Unit Tests

not enough arguments in call to testutil.SetUpTestServices
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"}

tableTriggerEventDispatcherID := common.NewDispatcherID()
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID,
common.DDLSpanSchemaID,
common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{
ID: tableTriggerEventDispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1", false)
refresher := replica.NewRegionCountRefresher(cfID, time.Minute)
controller := NewController(cfID, 1, &mockThreadPool{},
config.GetDefaultReplicaConfig(), ddlSpan, nil, 1000, 0, refresher, common.DefaultKeyspace, false)

postBootstrapRequest, err := controller.FinishBootstrap(map[node.ID]*heartbeatpb.MaintainerBootstrapResponse{
"node1": {
ChangefeedID: cfID.ToPB(),
},
}, false)
require.Nil(t, postBootstrapRequest)
require.Error(t, err)
code, ok := cerrors.RFCCode(err)
require.True(t, ok)
require.Equal(t, cerrors.ErrChangefeedInitTableTriggerDispatcherFailed.RFCCode(), code)
require.Contains(t, err.Error(), "all bootstrap responses reported empty checkpointTs")
require.False(t, controller.bootstrapped)
Comment on lines +1438 to +1467
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This new test case TestFinishBootstrapReturnsErrorWhenCheckpointMissing is well-designed. It specifically targets the scenario where checkpointTs is missing, ensuring that the new error handling in determineStartTs functions as expected. Verifying the error type, code, and message is crucial for robust error management.

}

// TestFinishBootstrapSkipsStaleCreateOperatorForDroppedTable covers stale bootstrap Create requests
// for dropped tables across add/move/split operator types. Each subtest boots from an empty schema
// snapshot and verifies bootstrap skips the stale create phase instead of recreating ghost tasks or
Expand Down
20 changes: 16 additions & 4 deletions pkg/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,11 @@ func (b *Bootstrapper[T]) AllNodesReady() bool {
return b.allNodesReady
}

// collectBootstrapResponses return all cached bootstrapped responses after make sure all nodes responses received.
// Returns:
// - newly added nodes responses if all they are initialized, and clear all cached responses.
// - else nil
// collectBootstrapResponses returns all cached bootstrapped responses after
// making sure every tracked node has reported once.
//
// The responses are kept until the caller explicitly acknowledges that the
// higher level bootstrap phase finished successfully via ClearBootstrapResponses.
//
// Note: this method must be called after lock.
func (b *Bootstrapper[T]) collectBootstrapResponses() map[node.ID]*T {
Expand Down Expand Up @@ -211,4 +212,15 @@ func (b *Bootstrapper[T]) collectBootstrapResponses() map[node.ID]*T {
return responses
}

// ClearBootstrapResponses drops all cached bootstrap responses after the caller
// has successfully finished its own bootstrap phase.
func (b *Bootstrapper[T]) ClearBootstrapResponses() {
b.mutex.Lock()
defer b.mutex.Unlock()

for _, status := range b.nodes {
status.ClearResponse()
}
}

type NewBootstrapRequestFn func(id node.ID, addr string) *messaging.TargetMessage
60 changes: 60 additions & 0 deletions pkg/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func TestHandleNewNodes(t *testing.T) {
require.Len(t, responses, 2)
require.Equal(t, 1, len(responses[node1.ID].Spans))
require.Equal(t, 2, len(responses[node2.ID].Spans))
b.ClearBootstrapResponses()

// add one new node
node3 := node.NewInfo("", "")
Expand All @@ -105,6 +106,7 @@ func TestHandleNewNodes(t *testing.T) {
require.True(t, b.AllNodesReady())
require.Len(t, responses, 1)
require.Equal(t, 3, len(responses[node3.ID].Spans))
b.ClearBootstrapResponses()

// remove a node
delete(nodes, node1.ID)
Expand Down Expand Up @@ -136,6 +138,64 @@ func TestHandleNewNodes(t *testing.T) {
require.Equal(t, 1, len(responses[node1.ID].Spans))
}

func TestBootstrapperRetainsResponsesUntilCleared(t *testing.T) {
b := NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse]("test", func(id node.ID, addr string) *messaging.TargetMessage {
return &messaging.TargetMessage{}
})

nodes := make(map[node.ID]*node.Info)
node1 := node.NewInfo("", "")
node2 := node.NewInfo("", "")
nodes[node1.ID] = node1
nodes[node2.ID] = node2

_, _, _, responses := b.HandleNodesChange(nodes)
require.Nil(t, responses)

changefeedIDPB := common.NewChangefeedID4Test("ns", "cf").ToPB()
responses = b.HandleBootstrapResponse(
node1.ID,
&heartbeatpb.MaintainerBootstrapResponse{
ChangefeedID: changefeedIDPB,
CheckpointTs: 10,
Spans: []*heartbeatpb.BootstrapTableSpan{{}},
},
)
require.Nil(t, responses)

responses = b.HandleBootstrapResponse(
node2.ID,
&heartbeatpb.MaintainerBootstrapResponse{
ChangefeedID: changefeedIDPB,
CheckpointTs: 20,
Spans: []*heartbeatpb.BootstrapTableSpan{{}, {}},
},
)
require.Len(t, responses, 2)
require.Equal(t, uint64(10), responses[node1.ID].CheckpointTs)
require.Equal(t, uint64(20), responses[node2.ID].CheckpointTs)

// Simulate the higher level bootstrap failing after the first round.
node3 := node.NewInfo("", "")
nodes[node3.ID] = node3
_, _, _, responses = b.HandleNodesChange(nodes)
require.Nil(t, responses)

responses = b.HandleBootstrapResponse(
node3.ID,
&heartbeatpb.MaintainerBootstrapResponse{
ChangefeedID: changefeedIDPB,
Spans: []*heartbeatpb.BootstrapTableSpan{{}, {}, {}},
},
)
require.Len(t, responses, 3)
require.Equal(t, uint64(10), responses[node1.ID].CheckpointTs)
require.Equal(t, uint64(20), responses[node2.ID].CheckpointTs)
require.Equal(t, uint64(0), responses[node3.ID].CheckpointTs)

b.ClearBootstrapResponses()
}

func TestResendBootstrapMessage(t *testing.T) {
b := NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse]("test", func(id node.ID, addr string) *messaging.TargetMessage {
return &messaging.TargetMessage{
Expand Down
6 changes: 4 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ func (t *Status[T]) Initialized() bool {
}

func (t *Status[T]) GetResponse() *T {
response := t.response
return t.response
}

func (t *Status[T]) ClearResponse() {
t.response = nil
return response
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/bootstrap_retry_after_error/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["test.?*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
111 changes: 111 additions & 0 deletions tests/integration_tests/bootstrap_retry_after_error/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/bin/bash

set -eu

# This integration test covers bootstrap retry handling after an initial
# bootstrap failure.
#
# Steps:
# 1. Start one TiCDC node with a schema store failpoint that keeps bootstrap
# failing with ErrSnapshotLostByGC on the maintainer node.
# 2. Create a mysql sink changefeed and wait until bootstrap fails with
# ErrSnapshotLostByGC.
# 3. Start a second TiCDC node immediately after the first bootstrap error so
# the failed maintainer still observes node scheduling and processes another
# bootstrap response.
# 4. Verify logs contain the retry path:
# maintainer node changed -> bootstrap response -> handle bootstrap response.
# 5. Verify both TiCDC servers keep running and the changefeed remains failed
# with ErrSnapshotLostByGC.

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
REPO_ROOT=$(cd "$CUR/../../.." && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
MAX_RETRIES=20
FAILPOINT_BLOCK_BEFORE_STOP_CHANGEFEED="github.com/pingcap/ticdc/coordinator/BlockBeforeStopChangefeed"

PD_ADDR="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}"
SINK_URI="mysql://normal:123456@127.0.0.1:3306/"

function check_node_change_triggers_bootstrap() {
local work_dir=$1
local file

for file in "$work_dir"/cdc*.log; do
if [ ! -f "$file" ]; then
continue
fi
if awk '
/maintainer node changed/ {
nodeChanged = 1
gotResp = 0
handled = 0
}
nodeChanged && /maintainer received bootstrap response/ {
gotResp = 1
}
nodeChanged && gotResp && /handle bootstrap response/ {
handled = 1
exit 0
}
END {
exit handled ? 0 : 1
}
' "$file"; then
return 0
fi
done

return 1
}

export -f check_node_change_triggers_bootstrap

function run() {
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

export GO_FAILPOINTS='github.com/pingcap/ticdc/logservice/schemastore/getAllPhysicalTablesGCFastFail=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0"
# wait for 5 seconds to let the server start and hit the failpoint
sleep 20
enable_failpoint --addr "127.0.0.1:8300" --name "$FAILPOINT_BLOCK_BEFORE_STOP_CHANGEFEED" --expr "pause"

run_sql "CREATE DATABASE bootstrap_retry_after_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE TABLE bootstrap_retry_after_error.t1(id INT PRIMARY KEY, val INT);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE DATABASE bootstrap_retry_after_error;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "CREATE TABLE bootstrap_retry_after_error.t1(id INT PRIMARY KEY, val INT);" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

cdc_cli_changefeed create --sink-uri="$SINK_URI" -c "test"

ensure $MAX_RETRIES "grep -Eq 'ErrSnapshotLostByGC' $WORK_DIR/cdc*.log"

# Start the second node without the schema-store failpoint. The retry still
# fails because the maintainer is running on the first node, which keeps the
# bootstrap error active while we verify node scheduling triggers another
# bootstrap round.
export GO_FAILPOINTS=''

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301" --pd "$PD_ADDR"

ensure $MAX_RETRIES "check_node_change_triggers_bootstrap $WORK_DIR"
disable_failpoint --addr "127.0.0.1:8300" --name "$FAILPOINT_BLOCK_BEFORE_STOP_CHANGEFEED"
ensure $MAX_RETRIES "get_cdc_pid 127.0.0.1 8300 >/dev/null"
ensure $MAX_RETRIES "get_cdc_pid 127.0.0.1 8301 >/dev/null"
# sleep for a while to let the logs flush
sleep 10
ensure $MAX_RETRIES "check_changefeed_state $PD_ADDR test failed ErrSnapshotLostByGC ''"
}

trap 'stop_test $WORK_DIR' EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
2 changes: 1 addition & 1 deletion tests/integration_tests/run_light_it_in_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ mysql_groups=(
# G14
'batch_add_table batch_update_to_no_batch fail_over_ddl_O update_changefeed_check_config pause_changefeed_with_long_time_ddl'
# G15
'split_region changefeed_resume_with_checkpoint_ts autorandom gc_safepoint foreign_key_check old_arch_compatibility'
'split_region changefeed_resume_with_checkpoint_ts autorandom gc_safepoint foreign_key_check old_arch_compatibility bootstrap_retry_after_error'
)

# Resource allocation for kafka light integration tests in CI pipelines:
Expand Down
Loading