Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions maintainer/maintainer_controller_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func (c *Controller) FinishBootstrap(
zap.Int("nodeCount", len(allNodesResp)))

// Step 1: Determine start timestamp and update DDL dispatcher
startTs, redoStartTs := c.determineStartTs(allNodesResp)
startTs, redoStartTs, err := c.determineStartTs(allNodesResp)
if err != nil {
return nil, errors.Trace(err)
}
Comment on lines +91 to +96
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Replacing log.Panic with an error return is a critical improvement for the robustness of the system. Panicking can lead to unexpected service interruptions, whereas returning an error allows for graceful handling and recovery. The error message also provides clear context about the issue.

Comment on lines +91 to +96
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove panic on bootstrap start-ts resolution failure

Line 93 still panics, so the maintainer can crash in the exact failure path this PR is trying to harden. Return the error instead of panicking, and propagate it without re-wrapping at Line 95.

✅ Suggested fix
 	startTs, redoStartTs, err := c.determineStartTs(allNodesResp)
 	if err != nil {
-		log.Panic("cant not found the startTs from the bootstrap response",
-			zap.String("changefeed", c.changefeedID.Name()))
-		return nil, errors.Trace(err)
+		log.Error("cannot determine start ts from bootstrap response",
+			zap.Stringer("changefeed", c.changefeedID),
+			zap.Error(err))
+		return nil, err
 	}
As per coding guidelines "When an error comes from a third-party or library call in Go, wrap it immediately with `errors.Trace(err)` or `errors.WrapError(...)` to attach a stack trace; upstream callers should propagate wrapped errors without wrapping again".
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
startTs, redoStartTs, err := c.determineStartTs(allNodesResp)
if err != nil {
log.Panic("cant not found the startTs from the bootstrap response",
zap.String("changefeed", c.changefeedID.Name()))
return nil, errors.Trace(err)
}
startTs, redoStartTs, err := c.determineStartTs(allNodesResp)
if err != nil {
log.Error("cannot determine start ts from bootstrap response",
zap.Stringer("changefeed", c.changefeedID),
zap.Error(err))
return nil, err
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/maintainer_controller_bootstrap.go` around lines 91 - 96, The
current bootstrap code calls log.Panic when c.determineStartTs(allNodesResp)
fails and then returns errors.Trace(err); remove the panic and instead log the
failure with log.Error (including zap.String("changefeed",
c.changefeedID.Name())) and return the original err value (not re-wrapped) so
the caller can propagate the already-wrapped error from determineStartTs; update
the failing block around c.determineStartTs to perform a non-panicking log and
return nil, err.


// Step 2: Load tables from schema store
tables, err := c.loadTables(startTs)
Expand Down Expand Up @@ -144,7 +147,7 @@ func (c *Controller) FinishBootstrap(
}, nil
}

func (c *Controller) determineStartTs(allNodesResp map[node.ID]*heartbeatpb.MaintainerBootstrapResponse) (uint64, uint64) {
func (c *Controller) determineStartTs(allNodesResp map[node.ID]*heartbeatpb.MaintainerBootstrapResponse) (uint64, uint64, error) {
Comment thread
wk989898 marked this conversation as resolved.
var (
startTs uint64
redoStartTs uint64
Expand All @@ -170,14 +173,18 @@ func (c *Controller) determineStartTs(allNodesResp map[node.ID]*heartbeatpb.Main
}
}
if startTs == 0 {
log.Panic("cant not found the startTs from the bootstrap response",
zap.String("changefeed", c.changefeedID.Name()))
return 0, 0, errors.WrapError(
errors.ErrChangefeedInitTableTriggerDispatcherFailed,
errors.New("all bootstrap responses reported empty checkpointTs"),
)
Comment on lines 177 to +181
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This change correctly replaces a log.Panic with a structured error return. This is crucial for system stability, as it allows the application to handle the absence of startTs gracefully rather than crashing. The use of errors.WrapError provides a clear and traceable error message.

}
if c.enableRedo && redoStartTs == 0 {
log.Panic("cant not found the redoStartTs from the bootstrap response",
zap.String("changefeed", c.changefeedID.Name()))
return 0, 0, errors.WrapError(
errors.ErrChangefeedInitTableTriggerDispatcherFailed,
errors.New("all bootstrap responses reported empty redoCheckpointTs"),
)
Comment thread
wk989898 marked this conversation as resolved.
}
return startTs, redoStartTs
return startTs, redoStartTs, nil
Comment thread
wk989898 marked this conversation as resolved.
}

func (c *Controller) buildWorkingTaskMap(
Expand Down
33 changes: 33 additions & 0 deletions maintainer/maintainer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
appcontext "github.com/pingcap/ticdc/pkg/common/context"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/eventservice"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
Expand Down Expand Up @@ -1434,6 +1435,38 @@ func TestFinishBootstrap(t *testing.T) {
require.Nil(t, postBootstrapRequest)
}

func TestFinishBootstrapReturnsErrorWhenCheckpointMissing(t *testing.T) {
testutil.SetUpTestServices()
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"}

tableTriggerEventDispatcherID := common.NewDispatcherID()
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID,
common.DDLSpanSchemaID,
common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{
ID: tableTriggerEventDispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1", false)
refresher := replica.NewRegionCountRefresher(cfID, time.Minute)
controller := NewController(cfID, 1, &mockThreadPool{},
config.GetDefaultReplicaConfig(), ddlSpan, nil, 1000, 0, refresher, common.DefaultKeyspace, false)

postBootstrapRequest, err := controller.FinishBootstrap(map[node.ID]*heartbeatpb.MaintainerBootstrapResponse{
"node1": {
ChangefeedID: cfID.ToPB(),
},
}, false)
require.Nil(t, postBootstrapRequest)
require.Error(t, err)
code, ok := cerrors.RFCCode(err)
require.True(t, ok)
require.Equal(t, cerrors.ErrChangefeedInitTableTriggerDispatcherFailed.RFCCode(), code)
require.Contains(t, err.Error(), "all bootstrap responses reported empty checkpointTs")
require.False(t, controller.bootstrapped)
Comment on lines +1438 to +1467
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This new test case TestFinishBootstrapReturnsErrorWhenCheckpointMissing is well-designed. It specifically targets the scenario where checkpointTs is missing, ensuring that the new error handling in determineStartTs functions as expected. Verifying the error type, code, and message is crucial for robust error management.

}

// TestFinishBootstrapSkipsStaleCreateOperatorForDroppedTable covers stale bootstrap Create requests
// for dropped tables across add/move/split operator types. Each subtest boots from an empty schema
// snapshot and verifies bootstrap skips the stale create phase instead of recreating ghost tasks or
Expand Down
79 changes: 79 additions & 0 deletions tests/integration_tests/bootstrap_retry_after_error/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/bin/bash

set -eu

# This integration test covers a bootstrap retry sequence after the first bootstrap
# round has already failed and consumed the cached bootstrap responses.
#
# Steps:
# 1. Start a single TiCDC node.
# 2. Enable a one-shot failpoint so the first maintainer bootstrap fails while
# loading tables from schema store.
# 3. Create a blackhole changefeed and wait until the bootstrap failure is logged.
# 4. Start a second TiCDC node to trigger node change / bootstrap retry.
# 5. Verify the retry reaches the "empty checkpointTs" error path instead of panicking.

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
MAX_RETRIES=20

PD_ADDR="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}"
CHANGEFEED_ID="bootstrap-retry-after-error"
FAILPOINT_NAME="github.com/pingcap/ticdc/logservice/schemastore/getAllPhysicalTablesGCFastFail"

function check_changefeed_failed() {
local pd_addr=$1
local changefeed_id=$2
info=$(cdc_cli_changefeed query --pd=$pd_addr -c "$changefeed_id" -s | grep -v "Command to ticdc")
state=$(echo "$info" | jq -r '.state')
if [[ "$state" != "failed" ]]; then
echo "changefeed state $state does not equal to failed"
exit 1
fi
}

function check_cdc_logs_not_contains() {
local work_dir=$1
local pattern=$2
if grep -Eqs "$pattern" "$work_dir"/cdc*.log; then
echo "TEST FAILED: CDC logs contain unexpected pattern '$pattern'"
grep -Ens "$pattern" "$work_dir"/cdc*.log || true
exit 1
fi
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

export -f check_changefeed_failed
export -f check_cdc_logs_not_contains

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "0" --addr "127.0.0.1:8300" --pd "$PD_ADDR"
enable_failpoint --addr "127.0.0.1:8300" --name "$FAILPOINT_NAME" --expr "1*return(true)"

cdc_cli_changefeed create --pd="$PD_ADDR" --sink-uri="blackhole://" -c "$CHANGEFEED_ID"

ensure $MAX_RETRIES "check_logs_contains $WORK_DIR 'ErrSnapshotLostByGC' '0'"

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8301" --pd "$PD_ADDR"

ensure $MAX_RETRIES "check_logs_contains $WORK_DIR 'maintainer node changed' '0'"
ensure $MAX_RETRIES "check_logs_contains $WORK_DIR 'all bootstrap responses reported empty checkpointTs' '0'"
ensure $MAX_RETRIES "get_cdc_pid 127.0.0.1 8300 >/dev/null"
ensure $MAX_RETRIES "get_cdc_pid 127.0.0.1 8301 >/dev/null"
ensure $MAX_RETRIES "check_changefeed_failed $PD_ADDR $CHANGEFEED_ID"

check_cdc_logs_not_contains "$WORK_DIR" "cant not found the startTs from the bootstrap response|\\[PANIC\\]"

cleanup_process $CDC_BINARY
stop_tidb_cluster
}

trap 'stop_test $WORK_DIR' EXIT
run
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
2 changes: 1 addition & 1 deletion tests/integration_tests/run_light_it_in_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ mysql_groups=(
# G14
'batch_add_table batch_update_to_no_batch fail_over_ddl_O update_changefeed_check_config pause_changefeed_with_long_time_ddl'
# G15
'split_region changefeed_resume_with_checkpoint_ts autorandom gc_safepoint foreign_key_check old_arch_compatibility'
'split_region changefeed_resume_with_checkpoint_ts autorandom gc_safepoint foreign_key_check old_arch_compatibility bootstrap_retry_after_error'
)

# Resource allocation for kafka light integration tests in CI pipelines:
Expand Down
Loading