Skip to content

Commit b0d936b

Browse files
authored
sink(cloudstorage): add use-table-id-as-path option (#4356) (#4594)
close #4357
1 parent 55aca47 commit b0d936b

File tree

12 files changed

+493
-44
lines changed

12 files changed

+493
-44
lines changed

api/v2/model.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
485485
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
486486
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
487487
OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent,
488+
UseTableIDAsPath: c.Sink.CloudStorageConfig.UseTableIDAsPath,
488489
}
489490
}
490491
var debeziumConfig *config.DebeziumConfig
@@ -834,6 +835,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
834835
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
835836
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
836837
OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent,
838+
UseTableIDAsPath: cloned.Sink.CloudStorageConfig.UseTableIDAsPath,
837839
}
838840
}
839841
var debeziumConfig *DebeziumConfig
@@ -1468,6 +1470,7 @@ type CloudStorageConfig struct {
14681470
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
14691471
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
14701472
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
1473+
UseTableIDAsPath *bool `json:"use_table_id_as_path,omitempty"`
14711474
}
14721475

14731476
// ChangefeedStatus holds common information of a changefeed in cdc

api/v2/model_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ func TestReplicaConfigConversion(t *testing.T) {
3333
EnableSyncPoint: util.AddressOf(true),
3434
EnableTableMonitor: util.AddressOf(true),
3535
BDRMode: util.AddressOf(true),
36+
Sink: &SinkConfig{
37+
CloudStorageConfig: &CloudStorageConfig{
38+
UseTableIDAsPath: util.AddressOf(true),
39+
},
40+
},
3641
Mounter: &MounterConfig{
3742
WorkerNum: util.AddressOf(16),
3843
},
@@ -61,6 +66,7 @@ func TestReplicaConfigConversion(t *testing.T) {
6166
require.True(t, util.GetOrZero(internalCfg.EnableSyncPoint))
6267
require.True(t, util.GetOrZero(internalCfg.EnableTableMonitor))
6368
require.True(t, util.GetOrZero(internalCfg.BDRMode))
69+
require.True(t, util.GetOrZero(internalCfg.Sink.CloudStorageConfig.UseTableIDAsPath))
6470
require.Equal(t, internalCfg.Mounter.WorkerNum, *apiCfg.Mounter.WorkerNum)
6571
require.True(t, util.GetOrZero(internalCfg.Scheduler.EnableTableAcrossNodes))
6672
require.Equal(t, 1000, util.GetOrZero(internalCfg.Scheduler.RegionThreshold))
@@ -85,6 +91,7 @@ func TestReplicaConfigConversion(t *testing.T) {
8591
require.True(t, *apiCfgBack.CaseSensitive)
8692
require.True(t, *apiCfgBack.ForceReplicate)
8793
require.True(t, *apiCfgBack.IgnoreIneligibleTable)
94+
require.True(t, *apiCfgBack.Sink.CloudStorageConfig.UseTableIDAsPath)
8895
require.Equal(t, 16, *apiCfgBack.Mounter.WorkerNum)
8996
require.True(t, *apiCfgBack.Scheduler.EnableTableAcrossNodes)
9097
require.Equal(t, "correctness", *apiCfgBack.Integrity.IntegrityCheckLevel)

downstreamadapter/sink/cloudstorage/sink.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error {
194194
// For exchange partition, we need to write the schema of the source table.
195195
// write the previous table first
196196
if event.GetDDLType() == model.ActionExchangeTablePartition {
197+
if len(event.MultipleTableInfos) < 2 || event.MultipleTableInfos[1] == nil {
198+
return errors.ErrInternalCheckFailed.GenWithStackByArgs(
199+
"invalid exchange partition ddl event, source table info is missing")
200+
}
201+
sourceTableInfo := event.MultipleTableInfos[1]
202+
197203
var def cloudstorage.TableDefinition
198204
def.FromTableInfo(event.ExtraSchemaName, event.ExtraTableName, event.TableInfo, event.FinishedTs, s.cfg.OutputColumnID)
199205
def.Query = event.Query
@@ -202,8 +208,10 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error {
202208
return err
203209
}
204210
var sourceTableDef cloudstorage.TableDefinition
205-
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, event.MultipleTableInfos[1], event.FinishedTs, s.cfg.OutputColumnID)
206-
if err := s.writeFile(event, sourceTableDef); err != nil {
211+
sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, sourceTableInfo, event.FinishedTs, s.cfg.OutputColumnID)
212+
sourceEvent := *event
213+
sourceEvent.TableInfo = sourceTableInfo
214+
if err := s.writeFile(&sourceEvent, sourceTableDef); err != nil {
207215
return err
208216
}
209217
} else {
@@ -219,12 +227,19 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error {
219227
}
220228

221229
func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition) error {
230+
// skip write database-level event for 'use-table-id-as-path' mode
231+
if s.cfg.UseTableIDAsPath && def.Table == "" {
232+
log.Debug("skip database schema for table id path",
233+
zap.String("schema", def.Schema),
234+
zap.String("query", def.Query))
235+
return nil
236+
}
222237
encodedDef, err := def.MarshalWithQuery()
223238
if err != nil {
224239
return errors.Trace(err)
225240
}
226241

227-
path, err := def.GenerateSchemaFilePath()
242+
path, err := def.GenerateSchemaFilePath(s.cfg.UseTableIDAsPath, v.GetTableID())
228243
if err != nil {
229244
return errors.Trace(err)
230245
}

downstreamadapter/sink/cloudstorage/sink_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,161 @@ func TestWriteDDLEvent(t *testing.T) {
207207
}`, string(tableSchema))
208208
}
209209

210+
func TestWriteDDLEventWithTableIDAsPath(t *testing.T) {
211+
parentDir := t.TempDir()
212+
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
213+
sinkURI, err := url.Parse(uri)
214+
require.NoError(t, err)
215+
216+
replicaConfig := config.GetDefaultReplicaConfig()
217+
err = replicaConfig.ValidateAndAdjust(sinkURI)
218+
require.NoError(t, err)
219+
220+
ctx, cancel := context.WithCancel(context.Background())
221+
defer cancel()
222+
223+
mockPDClock := pdutil.NewClock4Test()
224+
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)
225+
226+
cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
227+
require.NoError(t, err)
228+
229+
go cloudStorageSink.Run(ctx)
230+
231+
tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
232+
ID: 20,
233+
Name: parser_model.NewCIStr("table1"),
234+
Columns: []*timodel.ColumnInfo{
235+
{
236+
Name: parser_model.NewCIStr("col1"),
237+
FieldType: *types.NewFieldType(mysql.TypeLong),
238+
},
239+
{
240+
Name: parser_model.NewCIStr("col2"),
241+
FieldType: *types.NewFieldType(mysql.TypeVarchar),
242+
},
243+
},
244+
})
245+
ddlEvent := &commonEvent.DDLEvent{
246+
Query: "alter table test.table1 add col2 varchar(64)",
247+
Type: byte(timodel.ActionAddColumn),
248+
SchemaName: "test",
249+
TableName: "table1",
250+
FinishedTs: 100,
251+
TableInfo: tableInfo,
252+
}
253+
254+
err = cloudStorageSink.WriteBlockEvent(ddlEvent)
255+
require.NoError(t, err)
256+
257+
tableDir := path.Join(parentDir, "20/meta/")
258+
tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json"))
259+
require.NoError(t, err)
260+
require.Contains(t, string(tableSchema), `"Table": "table1"`)
261+
}
262+
263+
func TestSkipDatabaseSchemaWithTableIDAsPath(t *testing.T) {
264+
parentDir := t.TempDir()
265+
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
266+
sinkURI, err := url.Parse(uri)
267+
require.NoError(t, err)
268+
269+
replicaConfig := config.GetDefaultReplicaConfig()
270+
err = replicaConfig.ValidateAndAdjust(sinkURI)
271+
require.NoError(t, err)
272+
273+
ctx, cancel := context.WithCancel(context.Background())
274+
defer cancel()
275+
276+
mockPDClock := pdutil.NewClock4Test()
277+
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)
278+
279+
cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
280+
require.NoError(t, err)
281+
282+
go cloudStorageSink.Run(ctx)
283+
284+
ddlEvent := &commonEvent.DDLEvent{
285+
Query: "create database test_db",
286+
Type: byte(timodel.ActionCreateSchema),
287+
SchemaName: "test_db",
288+
TableName: "",
289+
FinishedTs: 100,
290+
TableInfo: nil,
291+
}
292+
293+
err = cloudStorageSink.WriteBlockEvent(ddlEvent)
294+
require.NoError(t, err)
295+
296+
_, err = os.Stat(path.Join(parentDir, "test_db"))
297+
require.Error(t, err)
298+
require.True(t, os.IsNotExist(err))
299+
}
300+
301+
func TestWriteDDLEventWithInvalidExchangePartitionEvent(t *testing.T) {
302+
testCases := []struct {
303+
name string
304+
multipleTableInfos []*common.TableInfo
305+
}{
306+
{
307+
name: "nil source table info",
308+
multipleTableInfos: []*common.TableInfo{nil},
309+
},
310+
{
311+
name: "short table infos",
312+
multipleTableInfos: nil,
313+
},
314+
}
315+
316+
parentDir := t.TempDir()
317+
uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir)
318+
sinkURI, err := url.Parse(uri)
319+
require.NoError(t, err)
320+
321+
replicaConfig := config.GetDefaultReplicaConfig()
322+
err = replicaConfig.ValidateAndAdjust(sinkURI)
323+
require.NoError(t, err)
324+
325+
ctx, cancel := context.WithCancel(context.Background())
326+
defer cancel()
327+
328+
mockPDClock := pdutil.NewClock4Test()
329+
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)
330+
331+
cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil)
332+
require.NoError(t, err)
333+
334+
tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{
335+
ID: 20,
336+
Name: parser_model.NewCIStr("table1"),
337+
Columns: []*timodel.ColumnInfo{
338+
{
339+
Name: parser_model.NewCIStr("col1"),
340+
FieldType: *types.NewFieldType(mysql.TypeLong),
341+
},
342+
},
343+
})
344+
345+
for _, tc := range testCases {
346+
t.Run(tc.name, func(t *testing.T) {
347+
ddlEvent := &commonEvent.DDLEvent{
348+
Query: "alter table test.table1 exchange partition p0 with table test.table2",
349+
Type: byte(timodel.ActionExchangeTablePartition),
350+
SchemaName: "test",
351+
TableName: "table1",
352+
ExtraSchemaName: "test",
353+
ExtraTableName: "table2",
354+
FinishedTs: 100,
355+
TableInfo: tableInfo,
356+
}
357+
ddlEvent.MultipleTableInfos = append([]*common.TableInfo{tableInfo}, tc.multipleTableInfos...)
358+
359+
err = cloudStorageSink.WriteBlockEvent(ddlEvent)
360+
require.ErrorContains(t, err, "invalid exchange partition ddl event, source table info is missing")
361+
})
362+
}
363+
}
364+
210365
func TestWriteCheckpointEvent(t *testing.T) {
211366
parentDir := t.TempDir()
212367
uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir)

downstreamadapter/sink/cloudstorage/writer.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,15 @@ func (d *writer) flushMessages(ctx context.Context) error {
171171
zap.Error(err))
172172
return errors.Trace(err)
173173
}
174-
indexFilePath := d.filePathGenerator.GenerateIndexFilePath(table, date)
174+
indexFilePath, err := d.filePathGenerator.GenerateIndexFilePath(table, date)
175+
if err != nil {
176+
log.Error("failed to generate index file path",
177+
zap.Int("workerID", d.id),
178+
zap.String("keyspace", d.changeFeedID.Keyspace()),
179+
zap.Stringer("changefeed", d.changeFeedID.ID()),
180+
zap.Error(err))
181+
return errors.Trace(err)
182+
}
175183

176184
// first write the data file to external storage.
177185
err = d.writeDataFile(ctx, dataFilePath, indexFilePath, task)

pkg/config/sink.go

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ const (
3737

3838
// TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI.
3939
TxnAtomicityKey = "transaction-atomicity"
40+
// UseTableIDAsPathKey specifies the key of the use-table-id-as-path in the SinkURI.
41+
UseTableIDAsPathKey = "use-table-id-as-path"
4042
// defaultTxnAtomicity is the default atomicity level.
4143
defaultTxnAtomicity = noneTxnAtomicity
4244
// unknownTxnAtomicity is an invalid atomicity level and will be treated as
@@ -698,6 +700,8 @@ type CloudStorageConfig struct {
698700
FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"`
699701
FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"`
700702
FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"`
703+
// UseTableIDAsPath is only available when the downstream is Storage (TiCI only).
704+
UseTableIDAsPath *bool `toml:"use-table-id-as-path" json:"use-table-id-as-path,omitempty"`
701705

702706
// OutputRawChangeEvent controls whether to split the update pk/uk events.
703707
OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"`
@@ -711,6 +715,27 @@ func (c *CloudStorageConfig) GetOutputRawChangeEvent() bool {
711715
return *c.OutputRawChangeEvent
712716
}
713717

718+
// CheckUseTableIDAsPathCompatibility checks the compatibility between sink config and sink URI.
719+
func CheckUseTableIDAsPathCompatibility(
720+
sinkConfig *SinkConfig,
721+
useTableIDAsPathFromURI *bool,
722+
) error {
723+
if sinkConfig == nil ||
724+
sinkConfig.CloudStorageConfig == nil ||
725+
sinkConfig.CloudStorageConfig.UseTableIDAsPath == nil ||
726+
useTableIDAsPathFromURI == nil {
727+
return nil
728+
}
729+
useTableIDAsPathFromConfig := sinkConfig.CloudStorageConfig.UseTableIDAsPath
730+
if util.GetOrZero(useTableIDAsPathFromConfig) == util.GetOrZero(useTableIDAsPathFromURI) {
731+
return nil
732+
}
733+
return cerror.ErrIncompatibleSinkConfig.GenWithStackByArgs(
734+
fmt.Sprintf("%s=%t", UseTableIDAsPathKey, util.GetOrZero(useTableIDAsPathFromURI)),
735+
fmt.Sprintf("%s=%t", UseTableIDAsPathKey, util.GetOrZero(useTableIDAsPathFromConfig)),
736+
)
737+
}
738+
714739
func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
715740
if err := s.validateAndAdjustSinkURI(sinkURI); err != nil {
716741
return err
@@ -977,15 +1002,59 @@ func (s *SinkConfig) CheckCompatibilityWithSinkURI(
9771002
return cerror.WrapError(cerror.ErrSinkURIInvalid, err)
9781003
}
9791004

1005+
var useTableIDAsPathFromURI *bool
1006+
if IsStorageScheme(sinkURI.Scheme) {
1007+
useTableIDAsPathValue := sinkURI.Query().Get(UseTableIDAsPathKey)
1008+
if useTableIDAsPathValue != "" {
1009+
enabled, parseErr := strconv.ParseBool(useTableIDAsPathValue)
1010+
if parseErr != nil {
1011+
return cerror.WrapError(cerror.ErrSinkURIInvalid, parseErr)
1012+
}
1013+
useTableIDAsPathFromURI = util.AddressOf(enabled)
1014+
}
1015+
}
1016+
1017+
getUseTableIDAsPath := func(cfg *SinkConfig) *bool {
1018+
if cfg == nil || cfg.CloudStorageConfig == nil {
1019+
return nil
1020+
}
1021+
return cfg.CloudStorageConfig.UseTableIDAsPath
1022+
}
1023+
1024+
useTableIDAsPathChanged := func() bool {
1025+
newVal := getUseTableIDAsPath(s)
1026+
oldVal := getUseTableIDAsPath(oldSinkConfig)
1027+
if newVal == nil && oldVal == nil {
1028+
return false
1029+
}
1030+
if newVal == nil || oldVal == nil {
1031+
return true
1032+
}
1033+
return *newVal != *oldVal
1034+
}
1035+
9801036
cfgParamsChanged := s.Protocol != oldSinkConfig.Protocol ||
981-
s.TxnAtomicity != oldSinkConfig.TxnAtomicity
1037+
s.TxnAtomicity != oldSinkConfig.TxnAtomicity ||
1038+
useTableIDAsPathChanged()
9821039

9831040
isURIParamsChanged := func(oldCfg SinkConfig) bool {
9841041
err := oldCfg.applyParameterBySinkURI(sinkURI)
985-
return cerror.ErrIncompatibleSinkConfig.Equal(err)
1042+
if cerror.ErrIncompatibleSinkConfig.Equal(err) {
1043+
return true
1044+
}
1045+
if useTableIDAsPathFromURI == nil {
1046+
return false
1047+
}
1048+
return CheckUseTableIDAsPathCompatibility(&oldCfg, useTableIDAsPathFromURI) != nil
9861049
}
9871050
uriParamsChanged := isURIParamsChanged(*oldSinkConfig)
9881051

1052+
if !uriParamsChanged && IsStorageScheme(sinkURI.Scheme) {
1053+
if err := CheckUseTableIDAsPathCompatibility(s, useTableIDAsPathFromURI); err != nil {
1054+
return err
1055+
}
1056+
}
1057+
9891058
if !uriParamsChanged && !cfgParamsChanged {
9901059
return nil
9911060
}

0 commit comments

Comments
 (0)