diff --git a/cmd/cdc/server/server.go b/cmd/cdc/server/server.go index 93dadf28a9..78f89a58a1 100644 --- a/cmd/cdc/server/server.go +++ b/cmd/cdc/server/server.go @@ -147,7 +147,8 @@ func (o *options) run(cmd *cobra.Command) error { util.InitSignalHandling(shutdown, cancel) err = svr.Run(ctx) - if err != nil && !errors.Is(errors.Cause(err), context.Canceled) { + isNormalExit := isNormalServerShutdown(err, ctx) + if !isNormalExit { log.Error("cdc server exits with error", zap.Error(err)) } else { log.Info("cdc server exits normally") @@ -157,17 +158,33 @@ func (o *options) run(cmd *cobra.Command) error { ticker := time.NewTicker(server.GracefulShutdownTimeout) defer ticker.Stop() go func() { - svr.Close(ctx) + svr.Close() close(ch) }() select { case <-ch: case <-ticker.C: log.Warn("graceful shutdown timeout, exit server") + if isNormalExit { + return errors.New("graceful shutdown timeout") + } + } + if isNormalExit { + return nil } return err } +func isNormalServerShutdown(err error, ctx context.Context) bool { + if err == nil { + return true + } + // Treat cancellation as a normal exit only when the top-level context was + // explicitly canceled by shutdown/signal. This avoids masking internal module + // failures that also surface as context.Canceled via errgroup cancellation. + return errors.Is(err, context.Canceled) && ctx.Err() == context.Canceled +} + // complete adapts from the command line args and config file to the data required. func (o *options) complete(command *cobra.Command) error { cfg := config.GetDefaultServerConfig() diff --git a/cmd/cdc/server/server_test.go b/cmd/cdc/server/server_test.go index 54a9b90c1c..f906e30f4a 100644 --- a/cmd/cdc/server/server_test.go +++ b/cmd/cdc/server/server_test.go @@ -14,9 +14,11 @@ package server import ( + "context" "strings" "testing" + cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/stretchr/testify/require" ) @@ -118,3 +120,58 @@ func TestNewOptionsDefaultSecurity(t *testing.T) { require.Empty(t, o.serverConfig.Security.CertPath) require.Empty(t, o.serverConfig.Security.KeyPath) } + +func TestIsNormalServerShutdown(t *testing.T) { + testCases := []struct { + name string + err error + cancelCtx bool + expected bool + }{ + { + name: "nil error", + err: nil, + expected: true, + }, + { + name: "context canceled by shutdown", + err: context.Canceled, + cancelCtx: true, + expected: true, + }, + { + name: "wrapped context canceled by shutdown", + err: cerror.Trace(context.Canceled), + cancelCtx: true, + expected: true, + }, + { + name: "context canceled without shutdown", + err: context.Canceled, + expected: false, + }, + { + name: "wrapped context canceled without shutdown", + err: cerror.Trace(context.Canceled), + expected: false, + }, + { + name: "other error", + err: cerror.New("boom"), + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + if tc.cancelCtx { + cancel() + } else { + defer cancel() + } + + require.Equal(t, tc.expected, isNormalServerShutdown(tc.err, ctx)) + }) + } +} diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go index 07dfd49d11..b4478cab07 100644 --- a/logservice/logpuller/region_req_cache.go +++ b/logservice/logpuller/region_req_cache.go @@ -191,6 +191,9 @@ func (c *requestCache) markStopped(subID SubscriptionID, regionID uint64) { } delete(regionReqs, regionID) + if len(regionReqs) == 0 { + delete(c.sentRequests.regionReqs, subID) + } c.markDone() } diff --git a/logservice/logpuller/region_req_cache_test.go b/logservice/logpuller/region_req_cache_test.go index e745f54e45..62706a8542 100644 --- a/logservice/logpuller/region_req_cache_test.go +++ b/logservice/logpuller/region_req_cache_test.go @@ -311,3 +311,26 @@ func TestRequestCacheMarkSent_DuplicateReleaseSlot(t *testing.T) { require.True(t, cache.resolve(region.subscribedSpan.subID, region.verID.GetID())) require.Equal(t, 0, cache.getPendingCount()) } + +func TestRequestCacheMarkStopped_ReleasesSlot(t *testing.T) { + cache := newRequestCache(10) + ctx := context.Background() + + region := createTestRegionInfo(1, 1) + + ok, err := cache.add(ctx, region, false) + require.True(t, ok) + require.NoError(t, err) + require.Equal(t, 1, cache.getPendingCount()) + + req, err := cache.pop(ctx) + require.NoError(t, err) + + cache.markSent(req) + require.Equal(t, 1, cache.getPendingCount()) + require.Contains(t, cache.sentRequests.regionReqs, req.regionInfo.subscribedSpan.subID) + + cache.markStopped(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) + require.Equal(t, 0, cache.getPendingCount()) + require.NotContains(t, cache.sentRequests.regionReqs, req.regionInfo.subscribedSpan.subID) +} diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index fafe7fcade..61ad45b312 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -406,11 +406,25 @@ func (s *regionRequestWorker) processRegionSendTask( state := newRegionFeedState(region, uint64(subID), s) state.start() s.addRegionState(subID, region.verID.GetID(), state) + // Mark the request as sent before sending it. + // Otherwise there is a race with the receiver goroutine: + // 1. addRegionState makes the region visible to error handling. + // 2. doSend sends the request. + // 3. the receiver goroutine may receive a region error immediately. + // 4. markStopped runs before markSent, so requestCache.markStopped cannot + // find the request in sentRequests. + // 5. the sender goroutine then calls markSent and leaves a stale sent + // request behind, even though the region has already been + // unlocked/rescheduled. + // + // Tracking the request before Send keeps requestedRegions and + // sentRequests visible in the same order and avoids leaving stale + // requests in cleanup. + s.requestCache.markSent(regionReq) if err := doSend(s.createRegionRequest(region)); err != nil { - s.requestCache.markDone() + state.markStopped(err) return err } - s.requestCache.markSent(regionReq) } regionReq, err = fetchMoreReq() if err != nil { diff --git a/logservice/logpuller/region_request_worker_test.go b/logservice/logpuller/region_request_worker_test.go index a855e45299..8ec790923e 100644 --- a/logservice/logpuller/region_request_worker_test.go +++ b/logservice/logpuller/region_request_worker_test.go @@ -17,9 +17,40 @@ import ( "context" "testing" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/ticdc/logservice/logpuller/regionlock" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) +type mockEventFeedV2Client struct { + sendErr error +} + +func (m *mockEventFeedV2Client) Send(*cdcpb.ChangeDataRequest) error { return m.sendErr } +func (m *mockEventFeedV2Client) Recv() (*cdcpb.ChangeDataEvent, error) { return nil, nil } +func (m *mockEventFeedV2Client) Header() (metadata.MD, error) { return metadata.MD{}, nil } +func (m *mockEventFeedV2Client) Trailer() metadata.MD { return metadata.MD{} } +func (m *mockEventFeedV2Client) CloseSend() error { return nil } +func (m *mockEventFeedV2Client) Context() context.Context { return context.Background() } +func (m *mockEventFeedV2Client) SendMsg(any) error { return nil } +func (m *mockEventFeedV2Client) RecvMsg(any) error { return nil } + +func prepareRegionForSendTest(region regionInfo) regionInfo { + region.rpcCtx = &tikv.RPCContext{ + Meta: &metapb.Region{ + RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}, + }, + } + region.lockedRangeState = ®ionlock.LockedRangeState{} + region.lockedRangeState.ResolvedTs.Store(100) + return region +} + func TestRegionStatesOperation(t *testing.T) { worker := ®ionRequestWorker{} worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) @@ -64,3 +95,71 @@ func TestClearPendingRegionsReleaseSlotForPreFetchedRegion(t *testing.T) { require.Nil(t, worker.preFetchForConnecting) require.Equal(t, 0, worker.requestCache.getPendingCount()) } + +func TestClearPendingRegionsDoesNotReturnStoppedSentRegion(t *testing.T) { + worker := ®ionRequestWorker{ + requestCache: newRequestCache(10), + } + worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + + ctx := context.Background() + region := createTestRegionInfo(1, 1) + + ok, err := worker.requestCache.add(ctx, region, false) + require.NoError(t, err) + require.True(t, ok) + + req, err := worker.requestCache.pop(ctx) + require.NoError(t, err) + + state := newRegionFeedState(req.regionInfo, uint64(req.regionInfo.subscribedSpan.subID), worker) + state.start() + worker.addRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID(), state) + + // Simulate the race we are fixing in processRegionSendTask: + // once a request is visible in sentRequests, a fast region error may mark the + // region stopped before worker cleanup runs. In that case, markStopped should + // remove the sent request immediately, so clearPendingRegions must not return + // the stale region again during worker shutdown. + worker.requestCache.markSent(req) + state.markStopped(errors.New("send request to store error")) + worker.takeRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) + + require.Equal(t, 0, worker.requestCache.getPendingCount()) + require.Empty(t, worker.clearPendingRegions()) +} + +func TestProcessRegionSendTaskSendFailureCleansSentRequest(t *testing.T) { + worker := ®ionRequestWorker{ + requestCache: newRequestCache(10), + store: &requestedStore{storeAddr: "store-1"}, + client: &subscriptionClient{}, + } + worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates) + + ctx := context.Background() + region := prepareRegionForSendTest(createTestRegionInfo(1, 1)) + + ok, err := worker.requestCache.add(ctx, region, false) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, 1, worker.requestCache.getPendingCount()) + + req, err := worker.requestCache.pop(ctx) + require.NoError(t, err) + worker.preFetchForConnecting = new(regionInfo) + *worker.preFetchForConnecting = req.regionInfo + + sendErr := errors.New("send failed") + conn := &ConnAndClient{ + Client: &mockEventFeedV2Client{sendErr: sendErr}, + Conn: &grpc.ClientConn{}, + } + + err = worker.processRegionSendTask(ctx, conn) + require.ErrorIs(t, err, sendErr) + require.Equal(t, 0, worker.requestCache.getPendingCount()) + require.Empty(t, worker.requestCache.sentRequests.regionReqs) + state := worker.getRegionState(req.regionInfo.subscribedSpan.subID, req.regionInfo.verID.GetID()) + require.True(t, state == nil || state.isStale(), "region state should be removed or marked stale after send failure") +} diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 9aa58ee7b7..d7836f2d16 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -33,7 +33,7 @@ import ( const ( // minSyncPointInterval is the minimum of SyncPointInterval can be set. - minSyncPointInterval = time.Second * 30 + minSyncPointInterval = time.Second * 1 // minSyncPointRetention is the minimum of SyncPointRetention can be set. minSyncPointRetention = time.Hour * 1 minChangeFeedErrorStuckDuration = time.Minute * 30 diff --git a/pkg/server/server.go b/pkg/server/server.go index 85432523ce..7f8f27f29e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -27,7 +27,7 @@ import ( // information in etcd and schedules Task on it. type Server interface { Run(ctx context.Context) error - Close(ctx context.Context) + Close() SelfInfo() (*node.Info, error) Liveness() api.Liveness diff --git a/server/server.go b/server/server.go index 1573fc3eb0..0a3a0c0303 100644 --- a/server/server.go +++ b/server/server.go @@ -415,7 +415,7 @@ func (c *server) GetCoordinator() (tiserver.Coordinator, error) { // Close closes the server by deregister it from etcd, // it also closes the coordinator and processorManager // Note: this function should be reentrant -func (c *server) Close(ctx context.Context) { +func (c *server) Close() { if !c.closed.CompareAndSwap(false, true) { return } @@ -435,11 +435,14 @@ func (c *server) Close(ctx context.Context) { c.closePreServices() }() + closeCtx, closeCancel := context.WithTimeout(context.Background(), GracefulShutdownTimeout) + defer closeCancel() + // There are also some dependencies inside subModules, // so we close subModules in reverse order of their startup. for i := len(c.subModules) - 1; i >= 0; i-- { m := c.subModules[i] - if err := m.Close(ctx); err != nil { + if err := m.Close(closeCtx); err != nil { log.Warn("failed to close sub module", zap.String("module", m.Name()), zap.Error(err)) @@ -448,7 +451,7 @@ func (c *server) Close(ctx context.Context) { } for _, m := range c.nodeModules { - if err := m.Close(ctx); err != nil { + if err := m.Close(closeCtx); err != nil { log.Warn("failed to close sub common module", zap.String("module", m.Name()), zap.Error(err)) @@ -457,7 +460,7 @@ func (c *server) Close(ctx context.Context) { } for _, nm := range c.networkModules { - if err := nm.Close(ctx); err != nil { + if err := nm.Close(closeCtx); err != nil { log.Warn("failed to close sub base module", zap.String("module", nm.Name()), zap.Error(err)) @@ -466,8 +469,8 @@ func (c *server) Close(ctx context.Context) { } // delete server info from etcd - timeoutCtx, cancel := context.WithTimeout(context.Background(), cleanMetaDuration) - defer cancel() + timeoutCtx, timeoutCancel := context.WithTimeout(closeCtx, cleanMetaDuration) + defer timeoutCancel() if err := c.EtcdClient.DeleteCaptureInfo(timeoutCtx, string(c.info.ID)); err != nil { log.Warn("failed to delete server info when server exited", zap.String("captureID", string(c.info.ID)),