Skip to content

Commit f08ab83

Browse files
authored
cmd/cdc/server: return code 0 on graceful shutdown (pingcap#4564)
close pingcap#4563
1 parent d2189d5 commit f08ab83

File tree

4 files changed

+86
-9
lines changed

4 files changed

+86
-9
lines changed

cmd/cdc/server/server.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ func (o *options) run(cmd *cobra.Command) error {
147147
util.InitSignalHandling(shutdown, cancel)
148148

149149
err = svr.Run(ctx)
150-
if err != nil && !errors.Is(errors.Cause(err), context.Canceled) {
150+
isNormalExit := isNormalServerShutdown(err, ctx)
151+
if !isNormalExit {
151152
log.Error("cdc server exits with error", zap.Error(err))
152153
} else {
153154
log.Info("cdc server exits normally")
@@ -157,17 +158,33 @@ func (o *options) run(cmd *cobra.Command) error {
157158
ticker := time.NewTicker(server.GracefulShutdownTimeout)
158159
defer ticker.Stop()
159160
go func() {
160-
svr.Close(ctx)
161+
svr.Close()
161162
close(ch)
162163
}()
163164
select {
164165
case <-ch:
165166
case <-ticker.C:
166167
log.Warn("graceful shutdown timeout, exit server")
168+
if isNormalExit {
169+
return errors.New("graceful shutdown timeout")
170+
}
171+
}
172+
if isNormalExit {
173+
return nil
167174
}
168175
return err
169176
}
170177

178+
func isNormalServerShutdown(err error, ctx context.Context) bool {
179+
if err == nil {
180+
return true
181+
}
182+
// Treat cancellation as a normal exit only when the top-level context was
183+
// explicitly canceled by shutdown/signal. This avoids masking internal module
184+
// failures that also surface as context.Canceled via errgroup cancellation.
185+
return errors.Is(err, context.Canceled) && ctx.Err() == context.Canceled
186+
}
187+
171188
// complete adapts from the command line args and config file to the data required.
172189
func (o *options) complete(command *cobra.Command) error {
173190
cfg := config.GetDefaultServerConfig()

cmd/cdc/server/server_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
package server
1515

1616
import (
17+
"context"
1718
"strings"
1819
"testing"
1920

21+
cerror "github.com/pingcap/ticdc/pkg/errors"
2022
"github.com/stretchr/testify/require"
2123
)
2224

@@ -118,3 +120,58 @@ func TestNewOptionsDefaultSecurity(t *testing.T) {
118120
require.Empty(t, o.serverConfig.Security.CertPath)
119121
require.Empty(t, o.serverConfig.Security.KeyPath)
120122
}
123+
124+
func TestIsNormalServerShutdown(t *testing.T) {
125+
testCases := []struct {
126+
name string
127+
err error
128+
cancelCtx bool
129+
expected bool
130+
}{
131+
{
132+
name: "nil error",
133+
err: nil,
134+
expected: true,
135+
},
136+
{
137+
name: "context canceled by shutdown",
138+
err: context.Canceled,
139+
cancelCtx: true,
140+
expected: true,
141+
},
142+
{
143+
name: "wrapped context canceled by shutdown",
144+
err: cerror.Trace(context.Canceled),
145+
cancelCtx: true,
146+
expected: true,
147+
},
148+
{
149+
name: "context canceled without shutdown",
150+
err: context.Canceled,
151+
expected: false,
152+
},
153+
{
154+
name: "wrapped context canceled without shutdown",
155+
err: cerror.Trace(context.Canceled),
156+
expected: false,
157+
},
158+
{
159+
name: "other error",
160+
err: cerror.New("boom"),
161+
expected: false,
162+
},
163+
}
164+
165+
for _, tc := range testCases {
166+
t.Run(tc.name, func(t *testing.T) {
167+
ctx, cancel := context.WithCancel(context.Background())
168+
if tc.cancelCtx {
169+
cancel()
170+
} else {
171+
defer cancel()
172+
}
173+
174+
require.Equal(t, tc.expected, isNormalServerShutdown(tc.err, ctx))
175+
})
176+
}
177+
}

pkg/server/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
// information in etcd and schedules Task on it.
2828
type Server interface {
2929
Run(ctx context.Context) error
30-
Close(ctx context.Context)
30+
Close()
3131

3232
SelfInfo() (*node.Info, error)
3333
Liveness() api.Liveness

server/server.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ func (c *server) GetCoordinator() (tiserver.Coordinator, error) {
415415
// Close closes the server by deregister it from etcd,
416416
// it also closes the coordinator and processorManager
417417
// Note: this function should be reentrant
418-
func (c *server) Close(ctx context.Context) {
418+
func (c *server) Close() {
419419
if !c.closed.CompareAndSwap(false, true) {
420420
return
421421
}
@@ -435,11 +435,14 @@ func (c *server) Close(ctx context.Context) {
435435
c.closePreServices()
436436
}()
437437

438+
closeCtx, closeCancel := context.WithTimeout(context.Background(), GracefulShutdownTimeout)
439+
defer closeCancel()
440+
438441
// There are also some dependencies inside subModules,
439442
// so we close subModules in reverse order of their startup.
440443
for i := len(c.subModules) - 1; i >= 0; i-- {
441444
m := c.subModules[i]
442-
if err := m.Close(ctx); err != nil {
445+
if err := m.Close(closeCtx); err != nil {
443446
log.Warn("failed to close sub module",
444447
zap.String("module", m.Name()),
445448
zap.Error(err))
@@ -448,7 +451,7 @@ func (c *server) Close(ctx context.Context) {
448451
}
449452

450453
for _, m := range c.nodeModules {
451-
if err := m.Close(ctx); err != nil {
454+
if err := m.Close(closeCtx); err != nil {
452455
log.Warn("failed to close sub common module",
453456
zap.String("module", m.Name()),
454457
zap.Error(err))
@@ -457,7 +460,7 @@ func (c *server) Close(ctx context.Context) {
457460
}
458461

459462
for _, nm := range c.networkModules {
460-
if err := nm.Close(ctx); err != nil {
463+
if err := nm.Close(closeCtx); err != nil {
461464
log.Warn("failed to close sub base module",
462465
zap.String("module", nm.Name()),
463466
zap.Error(err))
@@ -466,8 +469,8 @@ func (c *server) Close(ctx context.Context) {
466469
}
467470

468471
// delete server info from etcd
469-
timeoutCtx, cancel := context.WithTimeout(context.Background(), cleanMetaDuration)
470-
defer cancel()
472+
timeoutCtx, timeoutCancel := context.WithTimeout(closeCtx, cleanMetaDuration)
473+
defer timeoutCancel()
471474
if err := c.EtcdClient.DeleteCaptureInfo(timeoutCtx, string(c.info.ID)); err != nil {
472475
log.Warn("failed to delete server info when server exited",
473476
zap.String("captureID", string(c.info.ID)),

0 commit comments

Comments
 (0)