Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ func verifyCreateChangefeedConfig(
// verify replicaConfig
sinkURIParsed, err := url.Parse(changefeedConfig.SinkURI)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return nil, cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(changefeedConfig.SinkURI))
}
err = replicaConfig.ValidateAndAdjust(sinkURIParsed)
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ func (h APIV2HelpersImpl) verifyCreateChangefeedConfig(
// verify replicaConfig
sinkURIParsed, err := url.Parse(cfg.SinkURI)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return nil, cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(cfg.SinkURI))
}
err = replicaCfg.ValidateAndAdjust(sinkURIParsed)
if err != nil {
Expand Down Expand Up @@ -343,6 +346,10 @@ func (h APIV2HelpersImpl) verifyUpdateChangefeedConfig(
}
sinkURIParsed, err := url.Parse(sinkURI)
if err != nil {
err = cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(sinkURI))
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
err = newInfo.Config.ValidateAndAdjust(sinkURIParsed)
Expand Down
5 changes: 4 additions & 1 deletion cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,10 @@ func (h *OpenAPIV2) verifyTable(c *gin.Context) {

uri, err := url.Parse(cfg.SinkURI)
if err != nil {
_ = c.Error(err)
_ = c.Error(cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(cfg.SinkURI)))
return
}
scheme := uri.Scheme
Expand Down
8 changes: 4 additions & 4 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,8 @@ func (info *ChangeFeedInfo) RmUnusedFields() {
if err != nil {
log.Warn(
"failed to parse the sink uri",
zap.Error(err),
zap.Any("sinkUri", info.SinkURI),
zap.Error(util.MaskSensitiveDataInURLError(err)),
zap.Any("sinkURI", util.MaskSensitiveDataInURIForError(info.SinkURI)),
Comment thread
asddongmen marked this conversation as resolved.
)
return
}
Expand Down Expand Up @@ -488,7 +488,7 @@ func (info *ChangeFeedInfo) fixState() {
func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
log.Warn("parse sink URI failed", zap.Error(err))
log.Warn("parse sink URI failed", zap.Error(util.MaskSensitiveDataInURLError(err)))
// SAFETY: It is safe to ignore this unresolvable sink URI here,
// as it is almost impossible for this to happen.
// If we ignore it when fixing it after it happens,
Expand Down Expand Up @@ -518,7 +518,7 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
func (info *ChangeFeedInfo) fixMQSinkProtocol() {
uri, err := url.Parse(info.SinkURI)
if err != nil {
log.Warn("parse sink URI failed", zap.Error(err))
log.Warn("parse sink URI failed", zap.Error(util.MaskSensitiveDataInURLError(err)))
return
}

Expand Down
10 changes: 8 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,10 @@ func (p *processor) tick(ctx context.Context) (error, error) {
func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return false, cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(sinkURIStr))
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
Expand All @@ -609,7 +612,10 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(sinkURIStr))
}
scheme := sink.GetScheme(sinkURI)
if !sink.IsMySQLCompatibleScheme(scheme) {
Expand Down
5 changes: 4 additions & 1 deletion cdc/sink/ddlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ func New(
) (ddlsink.Sink, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return nil, cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(sinkURIStr))
}
scheme := sink.GetScheme(sinkURI)
switch scheme {
Expand Down
5 changes: 4 additions & 1 deletion cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ func New(
) (*SinkFactory, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return nil, cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(sinkURIStr))
}

s := &SinkFactory{}
Expand Down
7 changes: 5 additions & 2 deletions cdc/sink/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func checkSyncPointSchemeCompatibility(
return cerror.ErrSinkURIInvalid.
GenWithStack(
"sink uri scheme is not supported with syncpoint enabled"+
"sink uri: %s", uri,
" sink uri: %s", util.MaskSensitiveDataInURIForError(uri.String()),
)
}
return nil
Expand All @@ -90,7 +90,10 @@ func preCheckSinkURI(sinkURIStr string) (*url.URL, error) {

sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return nil, cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(sinkURIStr))
}

// Check if we use the correct IPv6 address format.
Expand Down
24 changes: 21 additions & 3 deletions cdc/sink/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ func TestPreCheckSinkURI(t *testing.T) {
t.Parallel()

tests := []struct {
name string
uri string
err string
name string
uri string
err string
notContains string
}{
{
name: "valid domain MySQL URI",
Expand Down Expand Up @@ -76,6 +77,12 @@ func TestPreCheckSinkURI(t *testing.T) {
uri: "kafka://3333:10:9:101::204:9092/topic1",
err: "sink uri host is not valid IPv6 address",
},
{
name: "invalid escaped URI masks password",
uri: "mysql://root:[email protected]/%zz",
err: `parse "<invalid uri>"`,
notContains: "verysecure",
},
}

for _, tt := range tests {
Expand All @@ -85,6 +92,9 @@ func TestPreCheckSinkURI(t *testing.T) {
_, err := preCheckSinkURI(test.uri)
if test.err != "" {
require.Contains(t, err.Error(), test.err)
if test.notContains != "" {
require.NotContains(t, err.Error(), test.notContains)
}
} else {
require.NoError(t, err)
}
Expand Down Expand Up @@ -126,4 +136,12 @@ func TestValidateSink(t *testing.T) {
t, err.Error(),
"sink uri scheme is not supported with syncpoint enabled",
)
require.NotContains(t, err.Error(), "verysecure")
require.NotContains(t, err.Error(), "sasl-password=verysecure")

sinkURI = "kafka://127.0.0.1:9092/topic?sasl-password=verysecure"
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
require.NotNil(t, err)
require.Contains(t, err.Error(), "sasl-password=xxxxx")
require.NotContains(t, err.Error(), "verysecure")
}
6 changes: 5 additions & 1 deletion cdc/syncpointstore/syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
)

// SyncPointStore is an abstraction for anything that a changefeed may emit into.
Expand All @@ -45,7 +46,10 @@ func NewSyncPointStore(
// parse sinkURI as a URI
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return nil, cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(sinkURIStr))
}
switch strings.ToLower(sinkURI.Scheme) {
case "mysql", "tidb", "mysql+ssl", "tidb+ssl":
Expand Down
9 changes: 7 additions & 2 deletions pkg/check/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/util"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)
Expand All @@ -48,7 +49,8 @@ func UpstreamDownstreamNotSame(ctx context.Context,
zap.Uint64("upID", upID), zap.Uint64("downID", downID), zap.Bool("isTiDB", isTiDB))
if err != nil {
log.Error("failed to get cluster ID from sink URI",
zap.String("downSinkURI", downSinkURI), zap.Error(err))
zap.String("downSinkURI", util.MaskSensitiveDataInURIForError(downSinkURI)),
zap.Error(err))
return false, cerror.Trace(err)
}

Expand All @@ -70,7 +72,10 @@ func getClusterIDBySinkURI(
// Create a MySQL connection by using the sink URI.
url, err := url.Parse(sinkURI)
if err != nil {
return 0, true, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return 0, true, cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(sinkURI))
}
if !sink.IsMySQLCompatibleScheme(sink.GetScheme(url)) {
return 0, false, nil
Expand Down
12 changes: 8 additions & 4 deletions pkg/check/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -58,16 +57,18 @@ func TestGetClusterIDBySinkURI(t *testing.T) {
wantClusterID uint64
wantIsTiDB bool
wantErr error
wantNotInErr string
}{
{
name: "non mysql scheme",
sinkURI: "kafka://127.0.0.1:9092/topic",
wantIsTiDB: false,
},
{
name: "invalid uri",
sinkURI: ":invalid:",
wantErr: cerror.ErrSinkURIInvalid.Wrap(errors.New("parse \":invalid:\": missing protocol scheme")),
name: "invalid uri",
sinkURI: "mysql://user:[email protected]/%zz",
wantErr: errors.New(`parse "<invalid uri>": invalid URL escape "%zz"`),
wantNotInErr: "verysecure",
},
{
name: "connect error",
Expand Down Expand Up @@ -136,6 +137,9 @@ func TestGetClusterIDBySinkURI(t *testing.T) {
if tc.wantErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr.Error())
if tc.wantNotInErr != "" {
require.NotContains(t, err.Error(), tc.wantNotInErr)
}
} else {
require.NoError(t, err)
require.Equal(t, tc.wantClusterID, clusterID)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cmd/redo/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ func (o *applyRedoOptions) complete(cmd *cobra.Command) error {
// parse sinkURI as a URI
sinkURI, err := url.Parse(o.sinkURI)
if err != nil {
return cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(o.sinkURI))
}
rawQuery := sinkURI.Query()
// set safe-mode to true if not set
Expand Down
5 changes: 4 additions & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,10 @@ func (s *SinkConfig) CheckCompatibilityWithSinkURI(
) error {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(sinkURIStr))
}

cfgParamsChanged := s.Protocol != oldSinkConfig.Protocol ||
Expand Down
6 changes: 5 additions & 1 deletion pkg/sink/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/util"
)

// Observer defines an interface of downstream performance observer.
Expand Down Expand Up @@ -67,7 +68,10 @@ func NewObserver(

sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
return nil, cerror.WrapError(
cerror.ErrSinkURIInvalid,
util.MaskSensitiveDataInURLError(err),
util.MaskSensitiveDataInURIForError(sinkURIStr))
}

scheme := strings.ToLower(sinkURI.Scheme)
Expand Down
30 changes: 25 additions & 5 deletions pkg/util/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
"net"
"net/url"
"strings"

"github.com/pingcap/log"
"go.uber.org/zap"
)

// IsValidIPv6AddressFormatInURI reports whether hostPort is a valid IPv6 address in URI.
Expand Down Expand Up @@ -70,7 +67,6 @@ func validOptionalPort(port string) bool {
func MaskSinkURI(uri string) (string, error) {
uriParsed, err := url.Parse(uri)
if err != nil {
log.Error("failed to parse sink URI", zap.Error(err))
return "", err
}
queries := uriParsed.Query()
Expand Down Expand Up @@ -99,7 +95,6 @@ var sensitiveQueryParameterNames = []string{
func MaskSensitiveDataInURI(uri string) string {
uriParsed, err := url.Parse(uri)
if err != nil {
log.Error("failed to parse sink URI", zap.Error(err))
return ""
}
queries := uriParsed.Query()
Expand All @@ -114,3 +109,28 @@ func MaskSensitiveDataInURI(uri string) string {
uriParsed.RawQuery = queries.Encode()
return uriParsed.Redacted()
}

// MaskSensitiveDataInURIForError masks sensitive data in a URI for error messages.
func MaskSensitiveDataInURIForError(uri string) string {
maskedURI := MaskSensitiveDataInURI(uri)
if maskedURI == "" && uri != "" {
return "<invalid uri>"
}
return maskedURI
}

// MaskSensitiveDataInURLError masks the URL carried by net/url errors.
func MaskSensitiveDataInURLError(err error) error {
if err == nil {
return nil
}
urlErr, ok := err.(*url.Error)
if !ok {
return err
}
return &url.Error{
Op: urlErr.Op,
URL: MaskSensitiveDataInURIForError(urlErr.URL),
Err: urlErr.Err,
}
}
Loading
Loading