Skip to content

Commit 9fbde6e

Browse files
authored
kafka(ticdc): close sarama clients on init failures (#12573)
close #12572
1 parent ba2be49 commit 9fbde6e

File tree

6 files changed

+321
-19
lines changed

6 files changed

+321
-19
lines changed

pkg/sink/kafka/admin.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,21 @@ func (a *saramaAdminClient) CreateTopic(
176176
}
177177

178178
func (a *saramaAdminClient) Close() {
179-
if err := a.admin.Close(); err != nil {
180-
log.Warn("close admin client meet error",
181-
zap.String("namespace", a.changefeed.Namespace),
182-
zap.String("changefeed", a.changefeed.ID),
183-
zap.Error(err))
179+
if a.admin != nil {
180+
if err := a.admin.Close(); err != nil {
181+
log.Warn("close admin client meet error",
182+
zap.String("namespace", a.changefeed.Namespace),
183+
zap.String("changefeed", a.changefeed.ID),
184+
zap.Error(err))
185+
}
186+
}
187+
if a.client != nil {
188+
if err := a.client.Close(); err != nil && err != sarama.ErrClosedClient {
189+
log.Warn("close admin client connection meet error",
190+
zap.String("namespace", a.changefeed.Namespace),
191+
zap.String("changefeed", a.changefeed.ID),
192+
zap.Error(err))
193+
}
184194
}
185195
}
186196

pkg/sink/kafka/admin_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2026 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package kafka
15+
16+
import (
17+
"testing"
18+
19+
"github.com/IBM/sarama"
20+
"github.com/pingcap/tiflow/cdc/model"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
type testSyncProducer struct {
25+
sarama.SyncProducer
26+
closeCalls int
27+
closeErr error
28+
callOrder *[]string
29+
callLabel string
30+
}
31+
32+
func (p *testSyncProducer) Close() error {
33+
p.closeCalls++
34+
if p.callOrder != nil {
35+
*p.callOrder = append(*p.callOrder, p.callLabel)
36+
}
37+
return p.closeErr
38+
}
39+
40+
// TestSaramaAdminClientCloseClosesAdminThenClient covers the normal admin close
41+
// path and verifies the wrapper releases both the admin handle and the owned
42+
// client in a deterministic order.
43+
func TestSaramaAdminClientCloseClosesAdminThenClient(t *testing.T) {
44+
callOrder := make([]string, 0, 2)
45+
client := &testSaramaClient{callOrder: &callOrder, callLabel: "client"}
46+
admin := &testSaramaClusterAdmin{callOrder: &callOrder, callLabel: "admin"}
47+
48+
adminClient := &saramaAdminClient{
49+
changefeed: model.DefaultChangeFeedID("admin-close-test"),
50+
client: client,
51+
admin: admin,
52+
}
53+
54+
adminClient.Close()
55+
require.Equal(t, 1, admin.closeCalls)
56+
require.Equal(t, 1, client.closeCalls)
57+
require.Equal(t, []string{"admin", "client"}, callOrder)
58+
}
59+
60+
// TestSaramaAdminClientCloseStillClosesClientWhenAdminCloseFails covers the
61+
// error path where admin.Close reports an error but the wrapper must still close
62+
// the owned sarama client.
63+
func TestSaramaAdminClientCloseStillClosesClientWhenAdminCloseFails(t *testing.T) {
64+
client := &testSaramaClient{}
65+
admin := &testSaramaClusterAdmin{closeErr: sarama.ErrOutOfBrokers}
66+
67+
adminClient := &saramaAdminClient{
68+
changefeed: model.DefaultChangeFeedID("admin-close-error-test"),
69+
client: client,
70+
admin: admin,
71+
}
72+
73+
adminClient.Close()
74+
require.Equal(t, 1, admin.closeCalls)
75+
require.Equal(t, 1, client.closeCalls)
76+
require.True(t, client.closed)
77+
}
78+
79+
// TestSaramaSyncProducerCloseClosesProducerAndClient covers the normal cleanup
80+
// path for sync producers and verifies the wrapper closes the producer before
81+
// releasing the owned sarama client.
82+
func TestSaramaSyncProducerCloseClosesProducerAndClient(t *testing.T) {
83+
callOrder := make([]string, 0, 2)
84+
client := &testSaramaClient{callOrder: &callOrder, callLabel: "client"}
85+
producer := &testSyncProducer{callOrder: &callOrder, callLabel: "producer"}
86+
87+
syncProducer := &saramaSyncProducer{
88+
id: model.DefaultChangeFeedID("sync-close-test"),
89+
client: client,
90+
producer: producer,
91+
}
92+
93+
syncProducer.Close()
94+
require.Equal(t, 1, producer.closeCalls)
95+
require.Equal(t, 1, client.closeCalls)
96+
require.Equal(t, []string{"producer", "client"}, callOrder)
97+
}
98+
99+
// TestSaramaSyncProducerCloseStillClosesClientWhenProducerCloseFails covers the
100+
// partial-close path and verifies the wrapper still releases the owned client
101+
// even if producer.Close returns an error.
102+
func TestSaramaSyncProducerCloseStillClosesClientWhenProducerCloseFails(t *testing.T) {
103+
client := &testSaramaClient{}
104+
producer := &testSyncProducer{closeErr: sarama.ErrOutOfBrokers}
105+
106+
syncProducer := &saramaSyncProducer{
107+
id: model.DefaultChangeFeedID("sync-close-error-test"),
108+
client: client,
109+
producer: producer,
110+
}
111+
112+
syncProducer.Close()
113+
require.Equal(t, 1, producer.closeCalls)
114+
require.Equal(t, 1, client.closeCalls)
115+
require.True(t, client.closed)
116+
}

pkg/sink/kafka/cluster_admin_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,6 @@ type ClusterAdminClient interface {
5555
// HeartbeatBroker sends a heartbeat to all brokers to keep the kafka connection alive.
5656
HeartbeatBrokers()
5757

58-
// Close shuts down the admin client.
58+
// Close shuts down the admin client and releases any owned underlying client connections.
5959
Close()
6060
}

pkg/sink/kafka/factory.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,19 +60,18 @@ type SyncProducer interface {
6060
// HeartbeatBrokers sends heartbeat to all brokers to keep the connection alive.
6161
HeartbeatBrokers()
6262

63-
// Close shuts down the producer; you must call this function before a producer
64-
// object passes out of scope, as it may otherwise leak memory.
65-
// You must call this before calling Close on the underlying client.
63+
// Close shuts down the producer and releases the client owned by this wrapper.
64+
// You must call this function before the producer passes out of scope, as it
65+
// may otherwise leak memory.
6666
Close()
6767
}
6868

6969
// AsyncProducer is the kafka async producer
7070
type AsyncProducer interface {
71-
// Close shuts down the producer and waits for any buffered messages to be
72-
// flushed. You must call this function before a producer object passes out of
71+
// Close shuts down the producer and releases the client owned by this
72+
// wrapper. You must call this function before the producer passes out of
7373
// scope, as it may otherwise leak memory. You must call this before process
74-
// shutting down, or you may lose messages. You must call this before calling
75-
// Close on the underlying client.
74+
// shutting down, or you may lose messages.
7675
Close()
7776

7877
// AsyncSend is the input channel for the user to write messages to that they
@@ -148,6 +147,21 @@ func (p *saramaSyncProducer) Close() {
148147
zap.String("changefeed", p.id.ID),
149148
zap.Duration("duration", time.Since(start)))
150149
}
150+
151+
start = time.Now()
152+
err = p.client.Close()
153+
if err != nil && err != sarama.ErrClosedClient {
154+
log.Error("Close Kafka DDL producer client with error",
155+
zap.String("namespace", p.id.Namespace),
156+
zap.String("changefeed", p.id.ID),
157+
zap.Duration("duration", time.Since(start)),
158+
zap.Error(err))
159+
} else {
160+
log.Info("Kafka DDL producer client closed",
161+
zap.String("namespace", p.id.Namespace),
162+
zap.String("changefeed", p.id.ID),
163+
zap.Duration("duration", time.Since(start)))
164+
}
151165
}
152166

153167
type saramaAsyncProducer struct {

pkg/sink/kafka/sarama_factory.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@ import (
2626
"go.uber.org/zap"
2727
)
2828

29+
var (
30+
// These constructor seams let unit tests inject partial-init failures without
31+
// spinning up a real Kafka cluster.
32+
newSaramaClientImpl = sarama.NewClient
33+
newSaramaClusterAdminFromClientImpl = sarama.NewClusterAdminFromClient
34+
newSaramaSyncProducerFromClientImpl = sarama.NewSyncProducerFromClient
35+
newSaramaAsyncProducerFromClientImpl = sarama.NewAsyncProducerFromClient
36+
)
37+
2938
type saramaFactory struct {
3039
changefeedID model.ChangeFeedID
3140
option *Options
@@ -45,6 +54,14 @@ func NewSaramaFactory(
4554
}, nil
4655
}
4756

57+
func closeSaramaClientOnFailure(changefeedID model.ChangeFeedID, client sarama.Client, reason string) {
58+
if closeErr := client.Close(); closeErr != nil && closeErr != sarama.ErrClosedClient {
59+
log.Warn(reason,
60+
zap.Stringer("changefeedID", changefeedID),
61+
zap.Error(closeErr))
62+
}
63+
}
64+
4865
func (f *saramaFactory) AdminClient(ctx context.Context) (ClusterAdminClient, error) {
4966
start := time.Now()
5067
config, err := NewSaramaConfig(ctx, f.option)
@@ -57,7 +74,7 @@ func (f *saramaFactory) AdminClient(ctx context.Context) (ClusterAdminClient, er
5774
}
5875

5976
start = time.Now()
60-
client, err := sarama.NewClient(f.option.BrokerEndpoints, config)
77+
client, err := newSaramaClientImpl(f.option.BrokerEndpoints, config)
6178
duration = time.Since(start).Seconds()
6279
if duration > 2 {
6380
log.Warn("new sarama client cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
@@ -67,12 +84,13 @@ func (f *saramaFactory) AdminClient(ctx context.Context) (ClusterAdminClient, er
6784
}
6885

6986
start = time.Now()
70-
admin, err := sarama.NewClusterAdminFromClient(client)
87+
admin, err := newSaramaClusterAdminFromClientImpl(client)
7188
duration = time.Since(start).Seconds()
7289
if duration > 2 {
7390
log.Warn("new sarama cluster admin cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
7491
}
7592
if err != nil {
93+
closeSaramaClientOnFailure(f.changefeedID, client, "close sarama client after admin init failed")
7694
return nil, errors.Trace(err)
7795
}
7896

@@ -92,12 +110,13 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error)
92110
}
93111
config.MetricRegistry = f.registry
94112

95-
client, err := sarama.NewClient(f.option.BrokerEndpoints, config)
113+
client, err := newSaramaClientImpl(f.option.BrokerEndpoints, config)
96114
if err != nil {
97115
return nil, errors.Trace(err)
98116
}
99-
p, err := sarama.NewSyncProducerFromClient(client)
117+
p, err := newSaramaSyncProducerFromClientImpl(client)
100118
if err != nil {
119+
closeSaramaClientOnFailure(f.changefeedID, client, "close sarama client after sync producer init failed")
101120
return nil, errors.Trace(err)
102121
}
103122

@@ -122,12 +141,13 @@ func (f *saramaFactory) AsyncProducer(
122141
}
123142
config.MetricRegistry = f.registry
124143

125-
client, err := sarama.NewClient(f.option.BrokerEndpoints, config)
144+
client, err := newSaramaClientImpl(f.option.BrokerEndpoints, config)
126145
if err != nil {
127146
return nil, errors.Trace(err)
128147
}
129-
p, err := sarama.NewAsyncProducerFromClient(client)
148+
p, err := newSaramaAsyncProducerFromClientImpl(client)
130149
if err != nil {
150+
closeSaramaClientOnFailure(f.changefeedID, client, "close sarama client after async producer init failed")
131151
return nil, errors.Trace(err)
132152
}
133153
return &saramaAsyncProducer{

0 commit comments

Comments
 (0)