Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions cmd/cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func (o *options) run(cmd *cobra.Command) error {
util.InitSignalHandling(shutdown, cancel)

err = svr.Run(ctx)
if err != nil && !errors.Is(errors.Cause(err), context.Canceled) {
isNormalExit := isNormalServerShutdown(err, ctx)
if !isNormalExit {
log.Error("cdc server exits with error", zap.Error(err))
} else {
log.Info("cdc server exits normally")
Expand All @@ -157,17 +158,33 @@ func (o *options) run(cmd *cobra.Command) error {
ticker := time.NewTicker(server.GracefulShutdownTimeout)
defer ticker.Stop()
go func() {
svr.Close(ctx)
svr.Close()
close(ch)
}()
select {
case <-ch:
case <-ticker.C:
log.Warn("graceful shutdown timeout, exit server")
if isNormalExit {
return errors.New("graceful shutdown timeout")
}
}
if isNormalExit {
return nil
}
return err
}

func isNormalServerShutdown(err error, ctx context.Context) bool {
if err == nil {
return true
}
// Treat cancellation as a normal exit only when the top-level context was
// explicitly canceled by shutdown/signal. This avoids masking internal module
// failures that also surface as context.Canceled via errgroup cancellation.
return errors.Is(err, context.Canceled) && ctx.Err() == context.Canceled
}

// complete adapts from the command line args and config file to the data required.
func (o *options) complete(command *cobra.Command) error {
cfg := config.GetDefaultServerConfig()
Expand Down
57 changes: 57 additions & 0 deletions cmd/cdc/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package server

import (
"context"
"strings"
"testing"

cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -118,3 +120,58 @@ func TestNewOptionsDefaultSecurity(t *testing.T) {
require.Empty(t, o.serverConfig.Security.CertPath)
require.Empty(t, o.serverConfig.Security.KeyPath)
}

func TestIsNormalServerShutdown(t *testing.T) {
testCases := []struct {
name string
err error
cancelCtx bool
expected bool
}{
{
name: "nil error",
err: nil,
expected: true,
},
{
name: "context canceled by shutdown",
err: context.Canceled,
cancelCtx: true,
expected: true,
},
{
name: "wrapped context canceled by shutdown",
err: cerror.Trace(context.Canceled),
cancelCtx: true,
expected: true,
},
{
name: "context canceled without shutdown",
err: context.Canceled,
expected: false,
},
{
name: "wrapped context canceled without shutdown",
err: cerror.Trace(context.Canceled),
expected: false,
},
{
name: "other error",
err: cerror.New("boom"),
expected: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
if tc.cancelCtx {
cancel()
} else {
defer cancel()
}

require.Equal(t, tc.expected, isNormalServerShutdown(tc.err, ctx))
})
}
}
3 changes: 3 additions & 0 deletions logservice/logpuller/region_req_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) {
}

delete(regionReqs, regionID)
if len(regionReqs) == 0 {
delete(c.sentRequests.regionReqs, subID)
}
c.markDone()
}

Expand Down
23 changes: 23 additions & 0 deletions logservice/logpuller/region_req_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,26 @@ func TestRequestCacheMarkSent_DuplicateReleaseSlot(t *testing.T) {
require.True(t, cache.resolve(region.subscribedSpan.subID, region.verID.GetID()))
require.Equal(t, 0, cache.getPendingCount())
}

func TestRequestCacheMarkStopped_ReleasesSlot(t *testing.T) {
cache := newRequestCache(10)
ctx := context.Background()

region := createTestRegionInfo(1, 1)

ok, err := cache.add(ctx, region, false)
require.True(t, ok)
require.NoError(t, err)
require.Equal(t, 1, cache.getPendingCount())

req, err := cache.pop(ctx)
require.NoError(t, err)

cache.markSent(req)
require.Equal(t, 1, cache.getPendingCount())
require.Contains(t, cache.sentRequests.regionReqs, req.regionInfo.subscribedSpan.subID)

cache.markStopped(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID())
require.Equal(t, 0, cache.getPendingCount())
require.NotContains(t, cache.sentRequests.regionReqs, req.regionInfo.subscribedSpan.subID)
}
18 changes: 16 additions & 2 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,25 @@ func (s *regionRequestWorker) processRegionSendTask(
state := newRegionFeedState(region, uint64(subID), s)
state.start()
s.addRegionState(subID, region.verID.GetID(), state)
// Mark the request as sent before sending it.
// Otherwise there is a race with the receiver goroutine:
// 1. addRegionState makes the region visible to error handling.
// 2. doSend sends the request.
// 3. the receiver goroutine may receive a region error immediately.
// 4. markStopped runs before markSent, so requestCache.markStopped cannot
// find the request in sentRequests.
// 5. the sender goroutine then calls markSent and leaves a stale sent
// request behind, even though the region has already been
// unlocked/rescheduled.
//
// Tracking the request before Send keeps requestedRegions and
// sentRequests visible in the same order and avoids leaving stale
// requests in cleanup.
s.requestCache.markSent(regionReq)
if err := doSend(s.createRegionRequest(region)); err != nil {
s.requestCache.markDone()
state.markStopped(err)
return err
}
s.requestCache.markSent(regionReq)
}
regionReq, err = fetchMoreReq()
if err != nil {
Expand Down
99 changes: 99 additions & 0 deletions logservice/logpuller/region_request_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,40 @@ import (
"context"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/ticdc/logservice/logpuller/regionlock"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type mockEventFeedV2Client struct {
sendErr error
}

func (m *mockEventFeedV2Client) Send(*cdcpb.ChangeDataRequest) error { return m.sendErr }
func (m *mockEventFeedV2Client) Recv() (*cdcpb.ChangeDataEvent, error) { return nil, nil }
func (m *mockEventFeedV2Client) Header() (metadata.MD, error) { return metadata.MD{}, nil }
func (m *mockEventFeedV2Client) Trailer() metadata.MD { return metadata.MD{} }
func (m *mockEventFeedV2Client) CloseSend() error { return nil }
func (m *mockEventFeedV2Client) Context() context.Context { return context.Background() }
func (m *mockEventFeedV2Client) SendMsg(any) error { return nil }
func (m *mockEventFeedV2Client) RecvMsg(any) error { return nil }

func prepareRegionForSendTest(region regionInfo) regionInfo {
region.rpcCtx = &tikv.RPCContext{
Meta: &metapb.Region{
RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1},
},
}
region.lockedRangeState = &regionlock.LockedRangeState{}
region.lockedRangeState.ResolvedTs.Store(100)
return region
}

func TestRegionStatesOperation(t *testing.T) {
worker := &regionRequestWorker{}
worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates)
Expand Down Expand Up @@ -64,3 +95,71 @@ func TestClearPendingRegionsReleaseSlotForPreFetchedRegion(t *testing.T) {
require.Nil(t, worker.preFetchForConnecting)
require.Equal(t, 0, worker.requestCache.getPendingCount())
}

func TestClearPendingRegionsDoesNotReturnStoppedSentRegion(t *testing.T) {
worker := &regionRequestWorker{
requestCache: newRequestCache(10),
}
worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates)

ctx := context.Background()
region := createTestRegionInfo(1, 1)

ok, err := worker.requestCache.add(ctx, region, false)
require.NoError(t, err)
require.True(t, ok)

req, err := worker.requestCache.pop(ctx)
require.NoError(t, err)

state := newRegionFeedState(req.regionInfo, uint64(req.regionInfo.subscribedSpan.subID), worker)
state.start()
worker.addRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID(), state)

// Simulate the race we are fixing in processRegionSendTask:
// once a request is visible in sentRequests, a fast region error may mark the
// region stopped before worker cleanup runs. In that case, markStopped should
// remove the sent request immediately, so clearPendingRegions must not return
// the stale region again during worker shutdown.
worker.requestCache.markSent(req)
state.markStopped(errors.New("send request to store error"))
worker.takeRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID())

require.Equal(t, 0, worker.requestCache.getPendingCount())
require.Empty(t, worker.clearPendingRegions())
}

func TestProcessRegionSendTaskSendFailureCleansSentRequest(t *testing.T) {
worker := &regionRequestWorker{
requestCache: newRequestCache(10),
store: &requestedStore{storeAddr: "store-1"},
client: &subscriptionClient{},
}
worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates)

ctx := context.Background()
region := prepareRegionForSendTest(createTestRegionInfo(1, 1))

ok, err := worker.requestCache.add(ctx, region, false)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, 1, worker.requestCache.getPendingCount())

req, err := worker.requestCache.pop(ctx)
require.NoError(t, err)
worker.preFetchForConnecting = new(regionInfo)
*worker.preFetchForConnecting = req.regionInfo

sendErr := errors.New("send failed")
conn := &ConnAndClient{
Client: &mockEventFeedV2Client{sendErr: sendErr},
Conn: &grpc.ClientConn{},
}

err = worker.processRegionSendTask(ctx, conn)
require.ErrorIs(t, err, sendErr)
require.Equal(t, 0, worker.requestCache.getPendingCount())
require.Empty(t, worker.requestCache.sentRequests.regionReqs)
state := worker.getRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID())
require.True(t, state == nil || state.isStale(), "region state should be removed or marked stale after send failure")
}
2 changes: 1 addition & 1 deletion pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

const (
// minSyncPointInterval is the minimum of SyncPointInterval can be set.
minSyncPointInterval = time.Second * 30
minSyncPointInterval = time.Second * 1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Lowering minSyncPointInterval to 1 second introduces a risk of significant performance degradation. Sync points are blocking events, and a high frequency of these events can severely reduce changefeed throughput and increase load on the downstream database. A more conservative minimum, such as 10 seconds, would provide more flexibility than the previous 30-second limit while mitigating the performance risks of a very low interval.

Suggested change
minSyncPointInterval = time.Second * 1
minSyncPointInterval = time.Second * 10

// minSyncPointRetention is the minimum of SyncPointRetention can be set.
minSyncPointRetention = time.Hour * 1
minChangeFeedErrorStuckDuration = time.Minute * 30
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// information in etcd and schedules Task on it.
type Server interface {
Run(ctx context.Context) error
Close(ctx context.Context)
Close()

SelfInfo() (*node.Info, error)
Liveness() api.Liveness
Expand Down
15 changes: 9 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (c *server) GetCoordinator() (tiserver.Coordinator, error) {
// Close closes the server by deregister it from etcd,
// it also closes the coordinator and processorManager
// Note: this function should be reentrant
func (c *server) Close(ctx context.Context) {
func (c *server) Close() {
if !c.closed.CompareAndSwap(false, true) {
return
}
Expand All @@ -435,11 +435,14 @@ func (c *server) Close(ctx context.Context) {
c.closePreServices()
}()

closeCtx, closeCancel := context.WithTimeout(context.Background(), GracefulShutdownTimeout)
defer closeCancel()

// There are also some dependencies inside subModules,
// so we close subModules in reverse order of their startup.
for i := len(c.subModules) - 1; i >= 0; i-- {
m := c.subModules[i]
if err := m.Close(ctx); err != nil {
if err := m.Close(closeCtx); err != nil {
log.Warn("failed to close sub module",
zap.String("module", m.Name()),
zap.Error(err))
Expand All @@ -448,7 +451,7 @@ func (c *server) Close(ctx context.Context) {
}

for _, m := range c.nodeModules {
if err := m.Close(ctx); err != nil {
if err := m.Close(closeCtx); err != nil {
log.Warn("failed to close sub common module",
zap.String("module", m.Name()),
zap.Error(err))
Expand All @@ -457,7 +460,7 @@ func (c *server) Close(ctx context.Context) {
}

for _, nm := range c.networkModules {
if err := nm.Close(ctx); err != nil {
if err := nm.Close(closeCtx); err != nil {
log.Warn("failed to close sub base module",
zap.String("module", nm.Name()),
zap.Error(err))
Expand All @@ -466,8 +469,8 @@ func (c *server) Close(ctx context.Context) {
}

// delete server info from etcd
timeoutCtx, cancel := context.WithTimeout(context.Background(), cleanMetaDuration)
defer cancel()
timeoutCtx, timeoutCancel := context.WithTimeout(closeCtx, cleanMetaDuration)
defer timeoutCancel()
if err := c.EtcdClient.DeleteCaptureInfo(timeoutCtx, string(c.info.ID)); err != nil {
log.Warn("failed to delete server info when server exited",
zap.String("captureID", string(c.info.ID)),
Expand Down
Loading