Skip to content
3 changes: 3 additions & 0 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent,
UseTableIDAsPath: c.Sink.CloudStorageConfig.UseTableIDAsPath,
}
}
var debeziumConfig *config.DebeziumConfig
Expand Down Expand Up @@ -834,6 +835,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent,
UseTableIDAsPath: cloned.Sink.CloudStorageConfig.UseTableIDAsPath,
}
}
var debeziumConfig *DebeziumConfig
Expand Down Expand Up @@ -1468,6 +1470,7 @@ type CloudStorageConfig struct {
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
UseTableIDAsPath *bool `json:"use_table_id_as_path,omitempty"`
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
7 changes: 7 additions & 0 deletions api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func TestReplicaConfigConversion(t *testing.T) {
EnableSyncPoint: util.AddressOf(true),
EnableTableMonitor: util.AddressOf(true),
BDRMode: util.AddressOf(true),
Sink: &SinkConfig{
CloudStorageConfig: &CloudStorageConfig{
UseTableIDAsPath: util.AddressOf(true),
},
},
Mounter: &MounterConfig{
WorkerNum: util.AddressOf(16),
},
Expand Down Expand Up @@ -61,6 +66,7 @@ func TestReplicaConfigConversion(t *testing.T) {
require.True(t, util.GetOrZero(internalCfg.EnableSyncPoint))
require.True(t, util.GetOrZero(internalCfg.EnableTableMonitor))
require.True(t, util.GetOrZero(internalCfg.BDRMode))
require.True(t, util.GetOrZero(internalCfg.Sink.CloudStorageConfig.UseTableIDAsPath))
require.Equal(t, internalCfg.Mounter.WorkerNum, *apiCfg.Mounter.WorkerNum)
require.True(t, util.GetOrZero(internalCfg.Scheduler.EnableTableAcrossNodes))
require.Equal(t, 1000, util.GetOrZero(internalCfg.Scheduler.RegionThreshold))
Expand All @@ -85,6 +91,7 @@ func TestReplicaConfigConversion(t *testing.T) {
require.True(t, *apiCfgBack.CaseSensitive)
require.True(t, *apiCfgBack.ForceReplicate)
require.True(t, *apiCfgBack.IgnoreIneligibleTable)
require.True(t, *apiCfgBack.Sink.CloudStorageConfig.UseTableIDAsPath)
require.Equal(t, 16, *apiCfgBack.Mounter.WorkerNum)
require.True(t, *apiCfgBack.Scheduler.EnableTableAcrossNodes)
require.Equal(t, "correctness", *apiCfgBack.Integrity.IntegrityCheckLevel)
Expand Down
21 changes: 18 additions & 3 deletions downstreamadapter/sink/cloudstorage/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error {
// For exchange partition, we need to write the schema of the source table.
// write the previous table first
if event.GetDDLType() == model.ActionExchangeTablePartition {
if len(event.MultipleTableInfos) < 2 || event.MultipleTableInfos[1] == nil {
return errors.ErrInternalCheckFailed.GenWithStackByArgs(
"invalid exchange partition ddl event, source table info is missing")
}
sourceTableInfo := event.MultipleTableInfos[1]

var def cloudstorage.TableDefinition
def.FromTableInfo(event.ExtraSchemaName, event.ExtraTableName, event.TableInfo, event.FinishedTs, s.cfg.OutputColumnID)
def.Query = event.Query
Expand All @@ -202,8 +208,10 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error {
return err
}
var sourceTableDef cloudstorage.TableDefinition
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, event.MultipleTableInfos[1], event.FinishedTs, s.cfg.OutputColumnID)
if err := s.writeFile(event, sourceTableDef); err != nil {
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, sourceTableInfo, event.FinishedTs, s.cfg.OutputColumnID)
sourceEvent := *event
sourceEvent.TableInfo = sourceTableInfo
if err := s.writeFile(&sourceEvent, sourceTableDef); err != nil {
return err
}
} else {
Expand All @@ -219,12 +227,19 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error {
}

func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition) error {
// skip write database-level event for 'use-table-id-as-path' mode
if s.cfg.UseTableIDAsPath && def.Table == "" {
log.Debug("skip database schema for table id path",
zap.String("schema", def.Schema),
zap.String("query", def.Query))
return nil
}
encodedDef, err := def.MarshalWithQuery()
if err != nil {
return errors.Trace(err)
}

path, err := def.GenerateSchemaFilePath()
path, err := def.GenerateSchemaFilePath(s.cfg.UseTableIDAsPath, v.GetTableID())
if err != nil {
return errors.Trace(err)
}
Expand Down
155 changes: 155 additions & 0 deletions downstreamadapter/sink/cloudstorage/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,161 @@ func TestWriteDDLEvent(t *testing.T) {
}`, string(tableSchema))
}

func TestWriteDDLEventWithTableIDAsPath(t *testing.T) {
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

go cloudStorageSink.Run(ctx)

tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
ID: 20,
Name: parser_model.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{
Name: parser_model.NewCIStr("col1"),
FieldType: *types.NewFieldType(mysql.TypeLong),
},
{
Name: parser_model.NewCIStr("col2"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
},
})
ddlEvent := &commonEvent.DDLEvent{
Query: "alter table test.table1 add col2 varchar(64)",
Type: byte(timodel.ActionAddColumn),
SchemaName: "test",
TableName: "table1",
FinishedTs: 100,
TableInfo: tableInfo,
}

err = cloudStorageSink.WriteBlockEvent(ddlEvent)
require.NoError(t, err)

tableDir := path.Join(parentDir, "20/meta/")
tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json"))
require.NoError(t, err)
require.Contains(t, string(tableSchema), `"Table": "table1"`)
}

func TestSkipDatabaseSchemaWithTableIDAsPath(t *testing.T) {
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

go cloudStorageSink.Run(ctx)

ddlEvent := &commonEvent.DDLEvent{
Query: "create database test_db",
Type: byte(timodel.ActionCreateSchema),
SchemaName: "test_db",
TableName: "",
FinishedTs: 100,
TableInfo: nil,
}

err = cloudStorageSink.WriteBlockEvent(ddlEvent)
require.NoError(t, err)

_, err = os.Stat(path.Join(parentDir, "test_db"))
require.Error(t, err)
require.True(t, os.IsNotExist(err))
}

func TestWriteDDLEventWithInvalidExchangePartitionEvent(t *testing.T) {
testCases := []struct {
name string
multipleTableInfos []*common.TableInfo
}{
{
name: "nil source table info",
multipleTableInfos: []*common.TableInfo{nil},
},
{
name: "short table infos",
multipleTableInfos: nil,
},
}

parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
sinkURI, err := url.Parse(uri)
require.NoError(t, err)

replicaConfig := config.GetDefaultReplicaConfig()
err = replicaConfig.ValidateAndAdjust(sinkURI)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockPDClock := pdutil.NewClock4Test()
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)

cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
require.NoError(t, err)

tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
ID: 20,
Name: parser_model.NewCIStr("table1"),
Columns: []*timodel.ColumnInfo{
{
Name: parser_model.NewCIStr("col1"),
FieldType: *types.NewFieldType(mysql.TypeLong),
},
},
})

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ddlEvent := &commonEvent.DDLEvent{
Query: "alter table test.table1 exchange partition p0 with table test.table2",
Type: byte(timodel.ActionExchangeTablePartition),
SchemaName: "test",
TableName: "table1",
ExtraSchemaName: "test",
ExtraTableName: "table2",
FinishedTs: 100,
TableInfo: tableInfo,
}
ddlEvent.MultipleTableInfos = append([]*common.TableInfo{tableInfo}, tc.multipleTableInfos...)

err = cloudStorageSink.WriteBlockEvent(ddlEvent)
require.ErrorContains(t, err, "invalid exchange partition ddl event, source table info is missing")
})
}
}

func TestWriteCheckpointEvent(t *testing.T) {
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)
Expand Down
10 changes: 9 additions & 1 deletion downstreamadapter/sink/cloudstorage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,15 @@ func (d *writer) flushMessages(ctx context.Context) error {
zap.Error(err))
return errors.Trace(err)
}
indexFilePath := d.filePathGenerator.GenerateIndexFilePath(table, date)
indexFilePath, err := d.filePathGenerator.GenerateIndexFilePath(table, date)
if err != nil {
log.Error("failed to generate index file path",
zap.Int("workerID", d.id),
zap.String("keyspace", d.changeFeedID.Keyspace()),
zap.Stringer("changefeed", d.changeFeedID.ID()),
zap.Error(err))
return errors.Trace(err)
}

// first write the data file to external storage.
err = d.writeDataFile(ctx, dataFilePath, indexFilePath, task)
Expand Down
73 changes: 71 additions & 2 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (

// TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI.
TxnAtomicityKey = "transaction-atomicity"
// UseTableIDAsPathKey specifies the key of the use-table-id-as-path in the SinkURI.
UseTableIDAsPathKey = "use-table-id-as-path"
// defaultTxnAtomicity is the default atomicity level.
defaultTxnAtomicity = noneTxnAtomicity
// unknownTxnAtomicity is an invalid atomicity level and will be treated as
Expand Down Expand Up @@ -698,6 +700,8 @@ type CloudStorageConfig struct {
FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"`
FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"`
FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"`
// UseTableIDAsPath is only available when the downstream is Storage (TiCI only).
UseTableIDAsPath *bool `toml:"use-table-id-as-path" json:"use-table-id-as-path,omitempty"`

// OutputRawChangeEvent controls whether to split the update pk/uk events.
OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"`
Expand All @@ -711,6 +715,27 @@ func (c *CloudStorageConfig) GetOutputRawChangeEvent() bool {
return *c.OutputRawChangeEvent
}

// CheckUseTableIDAsPathCompatibility checks the compatibility between sink config and sink URI.
func CheckUseTableIDAsPathCompatibility(
sinkConfig *SinkConfig,
useTableIDAsPathFromURI *bool,
) error {
if sinkConfig == nil ||
sinkConfig.CloudStorageConfig == nil ||
sinkConfig.CloudStorageConfig.UseTableIDAsPath == nil ||
useTableIDAsPathFromURI == nil {
return nil
}
useTableIDAsPathFromConfig := sinkConfig.CloudStorageConfig.UseTableIDAsPath
if util.GetOrZero(useTableIDAsPathFromConfig) == util.GetOrZero(useTableIDAsPathFromURI) {
return nil
}
return cerror.ErrIncompatibleSinkConfig.GenWithStackByArgs(
fmt.Sprintf("%s=%t", UseTableIDAsPathKey, util.GetOrZero(useTableIDAsPathFromURI)),
fmt.Sprintf("%s=%t", UseTableIDAsPathKey, util.GetOrZero(useTableIDAsPathFromConfig)),
)
}
Comment on lines +718 to +737
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Persist use-table-id-as-path in applyParameterBySinkURI too.

This compatibility path only sees the new URI. A changefeed created with ...?use-table-id-as-path=true still leaves oldSinkConfig.CloudStorageConfig.UseTableIDAsPath == nil, so a later update that drops the query param looks compatible here and silently flips the sink back to name-based paths.

💡 Suggested fix
 func (s *SinkConfig) applyParameterBySinkURI(sinkURI *url.URL) error {
 	if sinkURI == nil {
 		return nil
 	}
 
 	cfgInSinkURI := map[string]string{}
 	cfgInFile := map[string]string{}
 	params := sinkURI.Query()
@@
 	protocolFromURI := params.Get(ProtocolKey)
 	if protocolFromURI != "" {
 		if s.Protocol != nil && util.GetOrZero(s.Protocol) != protocolFromURI {
 			cfgInSinkURI[ProtocolKey] = protocolFromURI
 			cfgInFile[ProtocolKey] = util.GetOrZero(s.Protocol)
 		}
 		s.Protocol = util.AddressOf(protocolFromURI)
 	}
+
+	if IsStorageScheme(sinkURI.Scheme) {
+		useTableIDAsPathFromURI := params.Get(UseTableIDAsPathKey)
+		if useTableIDAsPathFromURI != "" {
+			enabled, err := strconv.ParseBool(useTableIDAsPathFromURI)
+			if err != nil {
+				return cerror.WrapError(cerror.ErrSinkURIInvalid, err)
+			}
+			if s.CloudStorageConfig == nil {
+				s.CloudStorageConfig = &CloudStorageConfig{}
+			}
+			if s.CloudStorageConfig.UseTableIDAsPath != nil &&
+				util.GetOrZero(s.CloudStorageConfig.UseTableIDAsPath) != enabled {
+				cfgInSinkURI[UseTableIDAsPathKey] = strconv.FormatBool(enabled)
+				cfgInFile[UseTableIDAsPathKey] = strconv.FormatBool(util.GetOrZero(s.CloudStorageConfig.UseTableIDAsPath))
+			}
+			s.CloudStorageConfig.UseTableIDAsPath = util.AddressOf(enabled)
+		}
+	}

Also applies to: 1005-1056

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/config/sink.go` around lines 718 - 737, The compatibility bug: when
applyParameterBySinkURI parses a sink URI with the query param
use-table-id-as-path it does not persist that value into the SinkConfig
(CloudStorageConfig.UseTableIDAsPath), so later
CheckUseTableIDAsPathCompatibility sees nil and allows an unintended flip; fix
applyParameterBySinkURI (and the analogous block around lines 1005-1056) to set
oldSinkConfig.CloudStorageConfig.UseTableIDAsPath = pointer(boolValue) (or
initialize CloudStorageConfig if nil) whenever the URI contains
use-table-id-as-path so the parsed boolean is stored in the SinkConfig and
CheckUseTableIDAsPathCompatibility will compare actual values instead of nil.


func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
if err := s.validateAndAdjustSinkURI(sinkURI); err != nil {
return err
Expand Down Expand Up @@ -977,15 +1002,59 @@ func (s *SinkConfig) CheckCompatibilityWithSinkURI(
return cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}

var useTableIDAsPathFromURI *bool
if IsStorageScheme(sinkURI.Scheme) {
useTableIDAsPathValue := sinkURI.Query().Get(UseTableIDAsPathKey)
if useTableIDAsPathValue != "" {
enabled, parseErr := strconv.ParseBool(useTableIDAsPathValue)
if parseErr != nil {
return cerror.WrapError(cerror.ErrSinkURIInvalid, parseErr)
}
useTableIDAsPathFromURI = util.AddressOf(enabled)
}
}

getUseTableIDAsPath := func(cfg *SinkConfig) *bool {
if cfg == nil || cfg.CloudStorageConfig == nil {
return nil
}
return cfg.CloudStorageConfig.UseTableIDAsPath
}

useTableIDAsPathChanged := func() bool {
newVal := getUseTableIDAsPath(s)
oldVal := getUseTableIDAsPath(oldSinkConfig)
if newVal == nil && oldVal == nil {
return false
}
if newVal == nil || oldVal == nil {
return true
}
return *newVal != *oldVal
}

cfgParamsChanged := s.Protocol != oldSinkConfig.Protocol ||
s.TxnAtomicity != oldSinkConfig.TxnAtomicity
s.TxnAtomicity != oldSinkConfig.TxnAtomicity ||
useTableIDAsPathChanged()

isURIParamsChanged := func(oldCfg SinkConfig) bool {
err := oldCfg.applyParameterBySinkURI(sinkURI)
return cerror.ErrIncompatibleSinkConfig.Equal(err)
if cerror.ErrIncompatibleSinkConfig.Equal(err) {
return true
}
if useTableIDAsPathFromURI == nil {
return false
}
return CheckUseTableIDAsPathCompatibility(&oldCfg, useTableIDAsPathFromURI) != nil
}
uriParamsChanged := isURIParamsChanged(*oldSinkConfig)

if !uriParamsChanged && IsStorageScheme(sinkURI.Scheme) {
if err := CheckUseTableIDAsPathCompatibility(s, useTableIDAsPathFromURI); err != nil {
return err
}
}

if !uriParamsChanged && !cfgParamsChanged {
return nil
}
Expand Down
Loading
Loading