Skip to content

Commit 4a80a01

Browse files
committed
cdc: mask sink uri secrets in errors
1 parent 073cdff commit 4a80a01

17 files changed

Lines changed: 140 additions & 30 deletions

File tree

cdc/api/v1/validator.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ func verifyCreateChangefeedConfig(
112112
// verify replicaConfig
113113
sinkURIParsed, err := url.Parse(changefeedConfig.SinkURI)
114114
if err != nil {
115-
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
115+
return nil, cerror.WrapError(
116+
cerror.ErrSinkURIInvalid,
117+
util.MaskSensitiveDataInURLError(err),
118+
util.MaskSensitiveDataInURIForError(changefeedConfig.SinkURI))
116119
}
117120
err = replicaConfig.ValidateAndAdjust(sinkURIParsed)
118121
if err != nil {

cdc/api/v2/api_helpers.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,10 @@ func (h APIV2HelpersImpl) verifyCreateChangefeedConfig(
213213
// verify replicaConfig
214214
sinkURIParsed, err := url.Parse(cfg.SinkURI)
215215
if err != nil {
216-
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
216+
return nil, cerror.WrapError(
217+
cerror.ErrSinkURIInvalid,
218+
util.MaskSensitiveDataInURLError(err),
219+
util.MaskSensitiveDataInURIForError(cfg.SinkURI))
217220
}
218221
err = replicaCfg.ValidateAndAdjust(sinkURIParsed)
219222
if err != nil {
@@ -343,6 +346,10 @@ func (h APIV2HelpersImpl) verifyUpdateChangefeedConfig(
343346
}
344347
sinkURIParsed, err := url.Parse(sinkURI)
345348
if err != nil {
349+
err = cerror.WrapError(
350+
cerror.ErrSinkURIInvalid,
351+
util.MaskSensitiveDataInURLError(err),
352+
util.MaskSensitiveDataInURIForError(sinkURI))
346353
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
347354
}
348355
err = newInfo.Config.ValidateAndAdjust(sinkURIParsed)

cdc/api/v2/changefeed.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,10 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {
367367

368368
uri, err := url.Parse(cfg.SinkURI)
369369
if err != nil {
370-
_ = c.Error(err)
370+
_ = c.Error(cerror.WrapError(
371+
cerror.ErrSinkURIInvalid,
372+
util.MaskSensitiveDataInURLError(err),
373+
util.MaskSensitiveDataInURIForError(cfg.SinkURI)))
371374
return
372375
}
373376
scheme := uri.Scheme

cdc/model/changefeed.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func (info *ChangeFeedInfo) RmUnusedFields() {
348348
log.Warn(
349349
"failed to parse the sink uri",
350350
zap.Error(err),
351-
zap.Any("sinkUri", info.SinkURI),
351+
zap.Any("sinkURI", util.MaskSensitiveDataInURIForError(info.SinkURI)),
352352
)
353353
return
354354
}
@@ -488,7 +488,7 @@ func (info *ChangeFeedInfo) fixState() {
488488
func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
489489
uri, err := url.Parse(info.SinkURI)
490490
if err != nil {
491-
log.Warn("parse sink URI failed", zap.Error(err))
491+
log.Warn("parse sink URI failed", zap.Error(util.MaskSensitiveDataInURLError(err)))
492492
// SAFETY: It is safe to ignore this unresolvable sink URI here,
493493
// as it is almost impossible for this to happen.
494494
// If we ignore it when fixing it after it happens,
@@ -518,7 +518,7 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
518518
func (info *ChangeFeedInfo) fixMQSinkProtocol() {
519519
uri, err := url.Parse(info.SinkURI)
520520
if err != nil {
521-
log.Warn("parse sink URI failed", zap.Error(err))
521+
log.Warn("parse sink URI failed", zap.Error(util.MaskSensitiveDataInURLError(err)))
522522
return
523523
}
524524

cdc/processor/processor.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,10 @@ func (p *processor) tick(ctx context.Context) (error, error) {
591591
func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
592592
sinkURI, err := url.Parse(sinkURIStr)
593593
if err != nil {
594-
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
594+
return false, cerror.WrapError(
595+
cerror.ErrSinkURIInvalid,
596+
util.MaskSensitiveDataInURLError(err),
597+
util.MaskSensitiveDataInURIForError(sinkURIStr))
595598
}
596599
scheme := sink.GetScheme(sinkURI)
597600
return sink.IsMySQLCompatibleScheme(scheme), nil
@@ -609,7 +612,10 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
609612
func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) {
610613
sinkURI, err := url.Parse(sinkURIStr)
611614
if err != nil {
612-
return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
615+
return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(
616+
cerror.ErrSinkURIInvalid,
617+
util.MaskSensitiveDataInURLError(err),
618+
util.MaskSensitiveDataInURIForError(sinkURIStr))
613619
}
614620
scheme := sink.GetScheme(sinkURI)
615621
if !sink.IsMySQLCompatibleScheme(scheme) {

cdc/sink/ddlsink/factory/factory.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ func New(
4343
) (ddlsink.Sink, error) {
4444
sinkURI, err := url.Parse(sinkURIStr)
4545
if err != nil {
46-
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
46+
return nil, cerror.WrapError(
47+
cerror.ErrSinkURIInvalid,
48+
util.MaskSensitiveDataInURLError(err),
49+
util.MaskSensitiveDataInURIForError(sinkURIStr))
4750
}
4851
scheme := sink.GetScheme(sinkURI)
4952
switch scheme {

cdc/sink/dmlsink/factory/factory.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ func New(
7575
) (*SinkFactory, error) {
7676
sinkURI, err := url.Parse(sinkURIStr)
7777
if err != nil {
78-
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
78+
return nil, cerror.WrapError(
79+
cerror.ErrSinkURIInvalid,
80+
util.MaskSensitiveDataInURLError(err),
81+
util.MaskSensitiveDataInURIForError(sinkURIStr))
7982
}
8083

8184
s := &SinkFactory{}

cdc/sink/validator/validator.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func checkSyncPointSchemeCompatibility(
7474
return cerror.ErrSinkURIInvalid.
7575
GenWithStack(
7676
"sink uri scheme is not supported with syncpoint enabled"+
77-
"sink uri: %s", uri,
77+
"sink uri: %s", util.MaskSensitiveDataInURIForError(uri.String()),
7878
)
7979
}
8080
return nil
@@ -90,7 +90,10 @@ func preCheckSinkURI(sinkURIStr string) (*url.URL, error) {
9090

9191
sinkURI, err := url.Parse(sinkURIStr)
9292
if err != nil {
93-
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
93+
return nil, cerror.WrapError(
94+
cerror.ErrSinkURIInvalid,
95+
util.MaskSensitiveDataInURLError(err),
96+
util.MaskSensitiveDataInURIForError(sinkURIStr))
9497
}
9598

9699
// Check if we use the correct IPv6 address format.

cdc/sink/validator/validator_test.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ func TestPreCheckSinkURI(t *testing.T) {
2727
t.Parallel()
2828

2929
tests := []struct {
30-
name string
31-
uri string
32-
err string
30+
name string
31+
uri string
32+
err string
33+
notContains string
3334
}{
3435
{
3536
name: "valid domain MySQL URI",
@@ -76,6 +77,12 @@ func TestPreCheckSinkURI(t *testing.T) {
7677
uri: "kafka://3333:10:9:101::204:9092/topic1",
7778
err: "sink uri host is not valid IPv6 address",
7879
},
80+
{
81+
name: "invalid escaped URI masks password",
82+
uri: "mysql://root:verysecure@127.0.0.1/%zz",
83+
err: `parse "<invalid uri>"`,
84+
notContains: "verysecure",
85+
},
7986
}
8087

8188
for _, tt := range tests {
@@ -85,6 +92,9 @@ func TestPreCheckSinkURI(t *testing.T) {
8592
_, err := preCheckSinkURI(test.uri)
8693
if test.err != "" {
8794
require.Contains(t, err.Error(), test.err)
95+
if test.notContains != "" {
96+
require.NotContains(t, err.Error(), test.notContains)
97+
}
8898
} else {
8999
require.NoError(t, err)
90100
}
@@ -126,4 +136,12 @@ func TestValidateSink(t *testing.T) {
126136
t, err.Error(),
127137
"sink uri scheme is not supported with syncpoint enabled",
128138
)
139+
require.NotContains(t, err.Error(), "verysecure")
140+
require.NotContains(t, err.Error(), "sasl-password=verysecure")
141+
142+
sinkURI = "kafka://127.0.0.1:9092/topic?sasl-password=verysecure"
143+
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
144+
require.NotNil(t, err)
145+
require.Contains(t, err.Error(), "sasl-password=xxxxx")
146+
require.NotContains(t, err.Error(), "verysecure")
129147
}

cdc/syncpointstore/syncpoint_store.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/pingcap/tiflow/cdc/model"
2222
"github.com/pingcap/tiflow/pkg/config"
2323
cerror "github.com/pingcap/tiflow/pkg/errors"
24+
"github.com/pingcap/tiflow/pkg/util"
2425
)
2526

2627
// SyncPointStore is an abstraction for anything that a changefeed may emit into.
@@ -45,7 +46,10 @@ func NewSyncPointStore(
4546
// parse sinkURI as a URI
4647
sinkURI, err := url.Parse(sinkURIStr)
4748
if err != nil {
48-
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
49+
return nil, cerror.WrapError(
50+
cerror.ErrSinkURIInvalid,
51+
util.MaskSensitiveDataInURLError(err),
52+
util.MaskSensitiveDataInURIForError(sinkURIStr))
4953
}
5054
switch strings.ToLower(sinkURI.Scheme) {
5155
case "mysql", "tidb", "mysql+ssl", "tidb+ssl":

0 commit comments

Comments
 (0)