diff --git a/api/v2/model.go b/api/v2/model.go index 86e99d1078..22d29c01fa 100644 --- a/api/v2/model.go +++ b/api/v2/model.go @@ -485,6 +485,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent, + UseTableIDAsPath: c.Sink.CloudStorageConfig.UseTableIDAsPath, } } var debeziumConfig *config.DebeziumConfig @@ -834,6 +835,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent, + UseTableIDAsPath: cloned.Sink.CloudStorageConfig.UseTableIDAsPath, } } var debeziumConfig *DebeziumConfig @@ -1468,6 +1470,7 @@ type CloudStorageConfig struct { FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` FlushConcurrency *int `json:"flush_concurrency,omitempty"` OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"` + UseTableIDAsPath *bool `json:"use_table_id_as_path,omitempty"` } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/api/v2/model_test.go b/api/v2/model_test.go index ac638098b0..0b8c39f0c8 100644 --- a/api/v2/model_test.go +++ b/api/v2/model_test.go @@ -33,6 +33,11 @@ func TestReplicaConfigConversion(t *testing.T) { EnableSyncPoint: util.AddressOf(true), EnableTableMonitor: util.AddressOf(true), BDRMode: util.AddressOf(true), + Sink: &SinkConfig{ + CloudStorageConfig: &CloudStorageConfig{ + UseTableIDAsPath: util.AddressOf(true), + }, + }, Mounter: &MounterConfig{ WorkerNum: util.AddressOf(16), }, @@ -61,6 +66,7 @@ func TestReplicaConfigConversion(t *testing.T) { require.True(t, util.GetOrZero(internalCfg.EnableSyncPoint)) require.True(t, util.GetOrZero(internalCfg.EnableTableMonitor)) require.True(t, util.GetOrZero(internalCfg.BDRMode)) + require.True(t, util.GetOrZero(internalCfg.Sink.CloudStorageConfig.UseTableIDAsPath)) require.Equal(t, internalCfg.Mounter.WorkerNum, *apiCfg.Mounter.WorkerNum) require.True(t, util.GetOrZero(internalCfg.Scheduler.EnableTableAcrossNodes)) require.Equal(t, 1000, util.GetOrZero(internalCfg.Scheduler.RegionThreshold)) @@ -85,6 +91,7 @@ func TestReplicaConfigConversion(t *testing.T) { require.True(t, *apiCfgBack.CaseSensitive) require.True(t, *apiCfgBack.ForceReplicate) require.True(t, *apiCfgBack.IgnoreIneligibleTable) + require.True(t, *apiCfgBack.Sink.CloudStorageConfig.UseTableIDAsPath) require.Equal(t, 16, *apiCfgBack.Mounter.WorkerNum) require.True(t, *apiCfgBack.Scheduler.EnableTableAcrossNodes) require.Equal(t, "correctness", *apiCfgBack.Integrity.IntegrityCheckLevel) diff --git a/downstreamadapter/sink/cloudstorage/sink.go b/downstreamadapter/sink/cloudstorage/sink.go index cb59ce6953..1b55dec3cf 100644 --- a/downstreamadapter/sink/cloudstorage/sink.go +++ b/downstreamadapter/sink/cloudstorage/sink.go @@ -194,6 +194,12 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error { // For exchange partition, we need to write the schema of the source table. // write the previous table first if event.GetDDLType() == model.ActionExchangeTablePartition { + if len(event.MultipleTableInfos) < 2 || event.MultipleTableInfos[1] == nil { + return errors.ErrInternalCheckFailed.GenWithStackByArgs( + "invalid exchange partition ddl event, source table info is missing") + } + sourceTableInfo := event.MultipleTableInfos[1] + var def cloudstorage.TableDefinition def.FromTableInfo(event.ExtraSchemaName, event.ExtraTableName, event.TableInfo, event.FinishedTs, s.cfg.OutputColumnID) def.Query = event.Query @@ -202,8 +208,10 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error { return err } var sourceTableDef cloudstorage.TableDefinition - sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, event.MultipleTableInfos[1], event.FinishedTs, s.cfg.OutputColumnID) - if err := s.writeFile(event, sourceTableDef); err != nil { + sourceTableDef.FromTableInfo(event.SchemaName, event.TableName, sourceTableInfo, event.FinishedTs, s.cfg.OutputColumnID) + sourceEvent := *event + sourceEvent.TableInfo = sourceTableInfo + if err := s.writeFile(&sourceEvent, sourceTableDef); err != nil { return err } } else { @@ -219,12 +227,19 @@ func (s *sink) writeDDLEvent(event *commonEvent.DDLEvent) error { } func (s *sink) writeFile(v *commonEvent.DDLEvent, def cloudstorage.TableDefinition) error { + // skip write database-level event for 'use-table-id-as-path' mode + if s.cfg.UseTableIDAsPath && def.Table == "" { + log.Debug("skip database schema for table id path", + zap.String("schema", def.Schema), + zap.String("query", def.Query)) + return nil + } encodedDef, err := def.MarshalWithQuery() if err != nil { return errors.Trace(err) } - path, err := def.GenerateSchemaFilePath() + path, err := def.GenerateSchemaFilePath(s.cfg.UseTableIDAsPath, v.GetTableID()) if err != nil { return errors.Trace(err) } diff --git a/downstreamadapter/sink/cloudstorage/sink_test.go b/downstreamadapter/sink/cloudstorage/sink_test.go index 5e36786947..88fa1998cb 100644 --- a/downstreamadapter/sink/cloudstorage/sink_test.go +++ b/downstreamadapter/sink/cloudstorage/sink_test.go @@ -207,6 +207,161 @@ func TestWriteDDLEvent(t *testing.T) { }`, string(tableSchema)) } +func TestWriteDDLEventWithTableIDAsPath(t *testing.T) { + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + + cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + go cloudStorageSink.Run(ctx) + + tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{ + ID: 20, + Name: parser_model.NewCIStr("table1"), + Columns: []*timodel.ColumnInfo{ + { + Name: parser_model.NewCIStr("col1"), + FieldType: *types.NewFieldType(mysql.TypeLong), + }, + { + Name: parser_model.NewCIStr("col2"), + FieldType: *types.NewFieldType(mysql.TypeVarchar), + }, + }, + }) + ddlEvent := &commonEvent.DDLEvent{ + Query: "alter table test.table1 add col2 varchar(64)", + Type: byte(timodel.ActionAddColumn), + SchemaName: "test", + TableName: "table1", + FinishedTs: 100, + TableInfo: tableInfo, + } + + err = cloudStorageSink.WriteBlockEvent(ddlEvent) + require.NoError(t, err) + + tableDir := path.Join(parentDir, "20/meta/") + tableSchema, err := os.ReadFile(path.Join(tableDir, "schema_100_4192708364.json")) + require.NoError(t, err) + require.Contains(t, string(tableSchema), `"Table": "table1"`) +} + +func TestSkipDatabaseSchemaWithTableIDAsPath(t *testing.T) { + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + + cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + go cloudStorageSink.Run(ctx) + + ddlEvent := &commonEvent.DDLEvent{ + Query: "create database test_db", + Type: byte(timodel.ActionCreateSchema), + SchemaName: "test_db", + TableName: "", + FinishedTs: 100, + TableInfo: nil, + } + + err = cloudStorageSink.WriteBlockEvent(ddlEvent) + require.NoError(t, err) + + _, err = os.Stat(path.Join(parentDir, "test_db")) + require.Error(t, err) + require.True(t, os.IsNotExist(err)) +} + +func TestWriteDDLEventWithInvalidExchangePartitionEvent(t *testing.T) { + testCases := []struct { + name string + multipleTableInfos []*common.TableInfo + }{ + { + name: "nil source table info", + multipleTableInfos: []*common.TableInfo{nil}, + }, + { + name: "short table infos", + multipleTableInfos: nil, + }, + } + + parentDir := t.TempDir() + uri := fmt.Sprintf("file:///%s?protocol=csv&use-table-id-as-path=true", parentDir) + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + replicaConfig := config.GetDefaultReplicaConfig() + err = replicaConfig.ValidateAndAdjust(sinkURI) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockPDClock := pdutil.NewClock4Test() + appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) + + cloudStorageSink, err := newSinkForTest(ctx, replicaConfig, sinkURI, nil) + require.NoError(t, err) + + tableInfo := common.WrapTableInfo("test", &timodel.TableInfo{ + ID: 20, + Name: parser_model.NewCIStr("table1"), + Columns: []*timodel.ColumnInfo{ + { + Name: parser_model.NewCIStr("col1"), + FieldType: *types.NewFieldType(mysql.TypeLong), + }, + }, + }) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ddlEvent := &commonEvent.DDLEvent{ + Query: "alter table test.table1 exchange partition p0 with table test.table2", + Type: byte(timodel.ActionExchangeTablePartition), + SchemaName: "test", + TableName: "table1", + ExtraSchemaName: "test", + ExtraTableName: "table2", + FinishedTs: 100, + TableInfo: tableInfo, + } + ddlEvent.MultipleTableInfos = append([]*common.TableInfo{tableInfo}, tc.multipleTableInfos...) + + err = cloudStorageSink.WriteBlockEvent(ddlEvent) + require.ErrorContains(t, err, "invalid exchange partition ddl event, source table info is missing") + }) + } +} + func TestWriteCheckpointEvent(t *testing.T) { parentDir := t.TempDir() uri := fmt.Sprintf("file:///%s?protocol=csv", parentDir) diff --git a/downstreamadapter/sink/cloudstorage/writer.go b/downstreamadapter/sink/cloudstorage/writer.go index 35eee16791..c640a0da9d 100644 --- a/downstreamadapter/sink/cloudstorage/writer.go +++ b/downstreamadapter/sink/cloudstorage/writer.go @@ -171,7 +171,15 @@ func (d *writer) flushMessages(ctx context.Context) error { zap.Error(err)) return errors.Trace(err) } - indexFilePath := d.filePathGenerator.GenerateIndexFilePath(table, date) + indexFilePath, err := d.filePathGenerator.GenerateIndexFilePath(table, date) + if err != nil { + log.Error("failed to generate index file path", + zap.Int("workerID", d.id), + zap.String("keyspace", d.changeFeedID.Keyspace()), + zap.Stringer("changefeed", d.changeFeedID.ID()), + zap.Error(err)) + return errors.Trace(err) + } // first write the data file to external storage. err = d.writeDataFile(ctx, dataFilePath, indexFilePath, task) diff --git a/pkg/config/sink.go b/pkg/config/sink.go index b2db3d6a16..ab0bbc2644 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -37,6 +37,8 @@ const ( // TxnAtomicityKey specifies the key of the transaction-atomicity in the SinkURI. TxnAtomicityKey = "transaction-atomicity" + // UseTableIDAsPathKey specifies the key of the use-table-id-as-path in the SinkURI. + UseTableIDAsPathKey = "use-table-id-as-path" // defaultTxnAtomicity is the default atomicity level. defaultTxnAtomicity = noneTxnAtomicity // unknownTxnAtomicity is an invalid atomicity level and will be treated as @@ -698,6 +700,8 @@ type CloudStorageConfig struct { FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + // UseTableIDAsPath is only available when the downstream is Storage (TiCI only). + UseTableIDAsPath *bool `toml:"use-table-id-as-path" json:"use-table-id-as-path,omitempty"` // OutputRawChangeEvent controls whether to split the update pk/uk events. OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` @@ -711,6 +715,27 @@ func (c *CloudStorageConfig) GetOutputRawChangeEvent() bool { return *c.OutputRawChangeEvent } +// CheckUseTableIDAsPathCompatibility checks the compatibility between sink config and sink URI. +func CheckUseTableIDAsPathCompatibility( + sinkConfig *SinkConfig, + useTableIDAsPathFromURI *bool, +) error { + if sinkConfig == nil || + sinkConfig.CloudStorageConfig == nil || + sinkConfig.CloudStorageConfig.UseTableIDAsPath == nil || + useTableIDAsPathFromURI == nil { + return nil + } + useTableIDAsPathFromConfig := sinkConfig.CloudStorageConfig.UseTableIDAsPath + if util.GetOrZero(useTableIDAsPathFromConfig) == util.GetOrZero(useTableIDAsPathFromURI) { + return nil + } + return cerror.ErrIncompatibleSinkConfig.GenWithStackByArgs( + fmt.Sprintf("%s=%t", UseTableIDAsPathKey, util.GetOrZero(useTableIDAsPathFromURI)), + fmt.Sprintf("%s=%t", UseTableIDAsPathKey, util.GetOrZero(useTableIDAsPathFromConfig)), + ) +} + func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { if err := s.validateAndAdjustSinkURI(sinkURI); err != nil { return err @@ -977,15 +1002,59 @@ func (s *SinkConfig) CheckCompatibilityWithSinkURI( return cerror.WrapError(cerror.ErrSinkURIInvalid, err) } + var useTableIDAsPathFromURI *bool + if IsStorageScheme(sinkURI.Scheme) { + useTableIDAsPathValue := sinkURI.Query().Get(UseTableIDAsPathKey) + if useTableIDAsPathValue != "" { + enabled, parseErr := strconv.ParseBool(useTableIDAsPathValue) + if parseErr != nil { + return cerror.WrapError(cerror.ErrSinkURIInvalid, parseErr) + } + useTableIDAsPathFromURI = util.AddressOf(enabled) + } + } + + getUseTableIDAsPath := func(cfg *SinkConfig) *bool { + if cfg == nil || cfg.CloudStorageConfig == nil { + return nil + } + return cfg.CloudStorageConfig.UseTableIDAsPath + } + + useTableIDAsPathChanged := func() bool { + newVal := getUseTableIDAsPath(s) + oldVal := getUseTableIDAsPath(oldSinkConfig) + if newVal == nil && oldVal == nil { + return false + } + if newVal == nil || oldVal == nil { + return true + } + return *newVal != *oldVal + } + cfgParamsChanged := s.Protocol != oldSinkConfig.Protocol || - s.TxnAtomicity != oldSinkConfig.TxnAtomicity + s.TxnAtomicity != oldSinkConfig.TxnAtomicity || + useTableIDAsPathChanged() isURIParamsChanged := func(oldCfg SinkConfig) bool { err := oldCfg.applyParameterBySinkURI(sinkURI) - return cerror.ErrIncompatibleSinkConfig.Equal(err) + if cerror.ErrIncompatibleSinkConfig.Equal(err) { + return true + } + if useTableIDAsPathFromURI == nil { + return false + } + return CheckUseTableIDAsPathCompatibility(&oldCfg, useTableIDAsPathFromURI) != nil } uriParamsChanged := isURIParamsChanged(*oldSinkConfig) + if !uriParamsChanged && IsStorageScheme(sinkURI.Scheme) { + if err := CheckUseTableIDAsPathCompatibility(s, useTableIDAsPathFromURI); err != nil { + return err + } + } + if !uriParamsChanged && !cfgParamsChanged { return nil } diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 4b817dd394..0bb9a39137 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -64,9 +64,10 @@ const ( ) type urlConfig struct { - WorkerCount *int `form:"worker-count"` - FlushInterval *string `form:"flush-interval"` - FileSize *int `form:"file-size"` + WorkerCount *int `form:"worker-count"` + FlushInterval *string `form:"flush-interval"` + FileSize *int `form:"file-size"` + UseTableIDAsPath *bool `form:"use-table-id-as-path"` } // Config is the configuration for cloud storage sink. @@ -82,6 +83,7 @@ type Config struct { OutputColumnID bool FlushConcurrency int EnableTableAcrossNodes bool + UseTableIDAsPath bool } // NewConfig returns the default cloud storage sink config. @@ -132,6 +134,9 @@ func (c *Config) Apply( if err != nil { return err } + if err = getUseTableIDAsPath(urlParameter, &c.UseTableIDAsPath); err != nil { + return err + } c.DateSeparator = util.GetOrZero(sinkConfig.DateSeparator) c.EnablePartitionSeparator = util.GetOrZero(sinkConfig.EnablePartitionSeparator) @@ -167,6 +172,7 @@ func mergeConfig( dest.WorkerCount = sinkConfig.CloudStorageConfig.WorkerCount dest.FlushInterval = sinkConfig.CloudStorageConfig.FlushInterval dest.FileSize = sinkConfig.CloudStorageConfig.FileSize + dest.UseTableIDAsPath = sinkConfig.CloudStorageConfig.UseTableIDAsPath } if err := mergo.Merge(dest, urlParameters, mergo.WithOverride); err != nil { return nil, cerror.WrapError(cerror.ErrStorageSinkInvalidConfig, err) @@ -219,6 +225,15 @@ func getFlushInterval(values *urlConfig, flushInterval *time.Duration) error { return nil } +func getUseTableIDAsPath(values *urlConfig, useTableIDAsPath *bool) error { + if values.UseTableIDAsPath == nil { + return nil + } + + *useTableIDAsPath = *values.UseTableIDAsPath + return nil +} + func getFileSize(values *urlConfig, fileSize *int) error { if values.FileSize == nil { return nil diff --git a/pkg/sink/cloudstorage/config_test.go b/pkg/sink/cloudstorage/config_test.go index 442d95a2eb..bafa95a5ed 100644 --- a/pkg/sink/cloudstorage/config_test.go +++ b/pkg/sink/cloudstorage/config_test.go @@ -33,7 +33,8 @@ func TestConfigApply(t *testing.T) { expected.DateSeparator = config.DateSeparatorDay.String() expected.EnablePartitionSeparator = true expected.FlushConcurrency = 1 - uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv" + expected.UseTableIDAsPath = true + uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv&use-table-id-as-path=true" sinkURI, err := url.Parse(uri) require.Nil(t, err) @@ -77,6 +78,11 @@ func TestVerifySinkURIParams(t *testing.T) { uri: "s3://bucket/prefix?worker-count=64&flush-interval=1m30s&file-size=33554432", expectedErr: "", }, + { + name: "sink uri with use-table-id-as-path", + uri: "s3://bucket/prefix?use-table-id-as-path=true", + expectedErr: "", + }, { name: "invalid sink uri with unknown storage scheme", uri: "xxx://tmp/test", diff --git a/pkg/sink/cloudstorage/path.go b/pkg/sink/cloudstorage/path.go index 390d5d6e4f..07631e832a 100644 --- a/pkg/sink/cloudstorage/path.go +++ b/pkg/sink/cloudstorage/path.go @@ -53,6 +53,8 @@ const ( // The table schema is stored in the following path: // //meta/schema_{tableVersion}_{checksum}.json tableSchemaPrefix = "%s/%s/meta/" + // When use-table-id-as-path, schema is omitted: /meta/... + tableIdPrefix = "%s/meta/" ) var schemaRE = regexp.MustCompile(`meta/schema_\d+_\d{10}\.json$`) @@ -91,23 +93,47 @@ func mustParseSchemaName(path string) (uint64, uint32) { } func generateSchemaFilePath( - schema, table string, tableVersion uint64, checksum uint32, -) string { + schema, table string, tableVersion uint64, checksum uint32, omitSchema bool, +) (string, error) { if schema == "" || tableVersion == 0 { - log.Panic("invalid schema or tableVersion", - zap.String("schema", schema), zap.String("table", table), zap.Uint64("tableVersion", tableVersion)) + return "", errors.ErrInternalCheckFailed.GenWithStackByArgs( + fmt.Sprintf("invalid schema or tableVersion, schema=%q table=%q tableVersion=%d", + schema, table, tableVersion), + ) } var dir string - if table == "" { - // Generate db schema file path. - dir = fmt.Sprintf(dbSchemaPrefix, schema) + if omitSchema { + if table == "" { + return "", errors.ErrInternalCheckFailed.GenWithStackByArgs( + "table cannot be empty when 'use-table-id-as-path' is true", + ) + } + // use-table-id-as-path: omit schema, path is /meta/ + dir = fmt.Sprintf(tableIdPrefix, table) } else { - // Generate table schema file path. - dir = fmt.Sprintf(tableSchemaPrefix, schema, table) + if table == "" { + // Generate db schema file path. + dir = fmt.Sprintf(dbSchemaPrefix, schema) + } else { + // Generate table schema file path. + dir = fmt.Sprintf(tableSchemaPrefix, schema, table) + } } name := fmt.Sprintf(schemaFileNameFormat, tableVersion, checksum) - return path.Join(dir, name) + return path.Join(dir, name), nil +} + +func generateTablePath(tableName string, tableID int64, useTableIDAsPath bool) (string, error) { + if useTableIDAsPath { + if tableID <= 0 { + return "", errors.ErrInternalCheckFailed.GenWithStackByArgs( + "invalid table id for table-id path", + ) + } + return fmt.Sprintf("%d", tableID), nil + } + return tableName, nil } func generateDataFileName(enableTableAcrossNodes bool, dispatcherID string, index uint64, extension string, fileIndexWidth int) string { @@ -194,7 +220,7 @@ func (f *FilePathGenerator) CheckOrWriteSchema( } // Case 1: point check if the schema file exists. - tblSchemaFile, err := def.GenerateSchemaFilePath() + tblSchemaFile, err := def.GenerateSchemaFilePath(f.config.UseTableIDAsPath, table.TableNameWithPhysicTableID.TableID) if err != nil { return false, err } @@ -206,12 +232,20 @@ func (f *FilePathGenerator) CheckOrWriteSchema( f.versionMap[table] = table.TableInfoVersion return false, nil } - // walk the table meta path to find the last schema file _, checksum := mustParseSchemaName(tblSchemaFile) schemaFileCnt := 0 lastVersion := uint64(0) - subDir := fmt.Sprintf(tableSchemaPrefix, def.Schema, def.Table) + tablePathPart, err := generateTablePath(def.Table, table.TableNameWithPhysicTableID.TableID, f.config.UseTableIDAsPath) + if err != nil { + return false, err + } + var subDir string + if f.config.UseTableIDAsPath { + subDir = fmt.Sprintf(tableIdPrefix, tablePathPart) + } else { + subDir = fmt.Sprintf(tableSchemaPrefix, def.Schema, tablePathPart) + } checksumSuffix := fmt.Sprintf("%010d.json", checksum) hasNewerSchemaVersion := false err = f.storage.WalkDir(ctx, &storage.WalkOption{ @@ -305,20 +339,26 @@ func (f *FilePathGenerator) GenerateDateStr() string { } // GenerateIndexFilePath generates a canonical path for index file. -func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTableName, date string) string { - dir := f.generateDataDirPath(tbl, date) +func (f *FilePathGenerator) GenerateIndexFilePath(tbl VersionedTableName, date string) (string, error) { + dir, err := f.generateDataDirPath(tbl, date) + if err != nil { + return "", err + } name := defaultIndexFileName if f.config.EnableTableAcrossNodes { name = fmt.Sprintf(defaultTableAcrossNodesIndexFileName, tbl.DispatcherID.String()) } - return path.Join(dir, name) + return path.Join(dir, name), nil } // GenerateDataFilePath generates a canonical path for data file. func (f *FilePathGenerator) GenerateDataFilePath( ctx context.Context, tbl VersionedTableName, date string, ) (string, error) { - dir := f.generateDataDirPath(tbl, date) + dir, err := f.generateDataDirPath(tbl, date) + if err != nil { + return "", err + } newIndexFile := false if idx, ok := f.fileIndex[tbl]; !ok { fileIdx, err := f.getFileIdxFromIndexFile(ctx, tbl, date) @@ -362,14 +402,41 @@ func (f *FilePathGenerator) GenerateDataFilePath( return f.GenerateDataFilePath(ctx, tbl, date) } -func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date string) string { +func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date string) (string, error) { var elems []string - elems = append(elems, tbl.TableNameWithPhysicTableID.Schema) - elems = append(elems, tbl.TableNameWithPhysicTableID.Table) - elems = append(elems, fmt.Sprintf("%d", f.versionMap[tbl])) + tableVersion, ok := f.versionMap[tbl] + if !ok || tableVersion == 0 { + return "", errors.ErrInternalCheckFailed.GenWithStackByArgs( + "table schema version is not initialized", + ) + } + + if f.config.UseTableIDAsPath { + tablePathPart, err := generateTablePath( + tbl.TableNameWithPhysicTableID.Table, + tbl.TableNameWithPhysicTableID.TableID, + true, + ) + if err != nil { + return "", err + } + elems = append(elems, tablePathPart) + } else { + elems = append(elems, tbl.TableNameWithPhysicTableID.Schema) + tablePathPart, err := generateTablePath( + tbl.TableNameWithPhysicTableID.Table, + tbl.TableNameWithPhysicTableID.TableID, + false, + ) + if err != nil { + return "", err + } + elems = append(elems, tablePathPart) + } + elems = append(elems, fmt.Sprintf("%d", tableVersion)) - if f.config.EnablePartitionSeparator && tbl.TableNameWithPhysicTableID.IsPartition { + if f.config.EnablePartitionSeparator && tbl.TableNameWithPhysicTableID.IsPartition && !f.config.UseTableIDAsPath { elems = append(elems, fmt.Sprintf("%d", tbl.TableNameWithPhysicTableID.TableID)) } @@ -377,13 +444,16 @@ func (f *FilePathGenerator) generateDataDirPath(tbl VersionedTableName, date str elems = append(elems, date) } - return path.Join(elems...) + return path.Join(elems...), nil } func (f *FilePathGenerator) getFileIdxFromIndexFile( ctx context.Context, tbl VersionedTableName, date string, ) (uint64, error) { - indexFile := f.GenerateIndexFilePath(tbl, date) + indexFile, err := f.GenerateIndexFilePath(tbl, date) + if err != nil { + return 0, err + } exist, err := f.storage.FileExists(ctx, indexFile) if err != nil { return 0, err diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index b8f1f5fae8..1d882c9632 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -131,6 +131,33 @@ func TestGenerateDataFilePath(t *testing.T) { require.Equal(t, fmt.Sprintf("test/table1/5/2023-01-01/CDC_%s_000001.json", table.DispatcherID.String()), path) } +func TestGenerateDataFilePathWithTableIDAsPath(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + table := VersionedTableName{ + TableNameWithPhysicTableID: commonType.TableName{ + Schema: "test", + Table: "table1", + TableID: 12345, + }, + TableInfoVersion: 5, + DispatcherID: commonType.NewDispatcherID(), + } + + dir := t.TempDir() + f := testFilePathGenerator(ctx, t, dir) + f.config.UseTableIDAsPath = true + f.versionMap[table] = table.TableInfoVersion + + date := f.GenerateDateStr() + path, err := f.GenerateDataFilePath(ctx, table, date) + require.NoError(t, err) + require.Equal(t, fmt.Sprintf("12345/5/CDC_%s_000001.json", table.DispatcherID.String()), path) +} + func TestFetchIndexFromFileName(t *testing.T) { t.Parallel() @@ -203,8 +230,9 @@ func TestGenerateDataFilePathWithIndexFile(t *testing.T) { } f.versionMap[table] = table.TableInfoVersion date := f.GenerateDateStr() - indexFilePath := f.GenerateIndexFilePath(table, date) - err := f.storage.WriteFile(ctx, indexFilePath, []byte(fmt.Sprintf("CDC_%s_000005.json\n", dispatcherID.String()))) + indexFilePath, err := f.GenerateIndexFilePath(table, date) + require.NoError(t, err) + err = f.storage.WriteFile(ctx, indexFilePath, []byte(fmt.Sprintf("CDC_%s_000005.json\n", dispatcherID.String()))) require.NoError(t, err) dataFilePath, err := f.GenerateDataFilePath(ctx, table, date) @@ -235,7 +263,8 @@ func TestGenerateDataFilePathResyncIndexFile(t *testing.T) { f2.versionMap[table] = table.TableInfoVersion date := "" - indexFilePath := f1.GenerateIndexFilePath(table, date) + indexFilePath, err := f1.GenerateIndexFilePath(table, date) + require.NoError(t, err) // Simulate dispatcher moved between captures: // 1) f1 generates CDC_..._000001 and writes index file. diff --git a/pkg/sink/cloudstorage/table_definition.go b/pkg/sink/cloudstorage/table_definition.go index 51d9f914ab..4f1ac44cb5 100644 --- a/pkg/sink/cloudstorage/table_definition.go +++ b/pkg/sink/cloudstorage/table_definition.go @@ -329,14 +329,46 @@ func (t *TableDefinition) Sum32(hasher *hash.PositionInertia) (uint32, error) { return hasher.Sum32(), nil } -// GenerateSchemaFilePath generates the schema file path for TableDefinition. -func (t *TableDefinition) GenerateSchemaFilePath() (string, error) { +// GenerateSchemaFilePath generates the schema file path for TableDefinition +// with optional table id path. +func (t *TableDefinition) GenerateSchemaFilePath(useTableIDAsPath bool, tableID int64) (string, error) { checksum, err := t.Sum32(nil) if err != nil { return "", err } - if !t.IsTableSchema() && t.Table != "" { - log.Panic("invalid table definition", zap.Any("tableDef", t)) + if t.Schema == "" { + return "", errors.ErrInternalCheckFailed.GenWithStackByArgs("schema cannot be empty") + } + if t.TableVersion == 0 { + return "", errors.ErrInternalCheckFailed.GenWithStackByArgs("table version cannot be zero") + } + if len(t.Columns) != t.TotalColumns { + return "", errors.ErrInternalCheckFailed.GenWithStackByArgs("invalid table definition") + } + isTableSchema := t.TotalColumns != 0 + if !isTableSchema && t.Table != "" { + return "", errors.ErrInternalCheckFailed.GenWithStackByArgs( + "invalid table definition", + ) + } + if useTableIDAsPath && isTableSchema && tableID <= 0 { + return "", errors.ErrInternalCheckFailed.GenWithStackByArgs( + "invalid table id for table-id path", + ) + } + + table := t.Table + if isTableSchema { + tablePath, err := generateTablePath(t.Table, tableID, useTableIDAsPath) + if err != nil { + return "", err + } + table = tablePath + } + omitSchema := useTableIDAsPath && isTableSchema + path, err := generateSchemaFilePath(t.Schema, table, t.TableVersion, checksum, omitSchema) + if err != nil { + return "", err } - return generateSchemaFilePath(t.Schema, t.Table, t.TableVersion, checksum), nil + return path, nil } diff --git a/pkg/sink/cloudstorage/table_definition_test.go b/pkg/sink/cloudstorage/table_definition_test.go index 4389fa8fd8..3845f2b111 100644 --- a/pkg/sink/cloudstorage/table_definition_test.go +++ b/pkg/sink/cloudstorage/table_definition_test.go @@ -488,14 +488,54 @@ func TestTableDefinitionGenFilePath(t *testing.T) { Version: defaultTableDefinitionVersion, TableVersion: 100, } - schemaPath, err := schemaDef.GenerateSchemaFilePath() + schemaPath, err := schemaDef.GenerateSchemaFilePath(false, 0) + require.NoError(t, err) + require.Equal(t, "schema1/meta/schema_100_3233644819.json", schemaPath) + + schemaPath, err = schemaDef.GenerateSchemaFilePath(true, 0) require.NoError(t, err) require.Equal(t, "schema1/meta/schema_100_3233644819.json", schemaPath) def, _ := generateTableDef() - tablePath, err := def.GenerateSchemaFilePath() + tablePath, err := def.GenerateSchemaFilePath(false, 0) require.NoError(t, err) require.Equal(t, "schema1/table1/meta/schema_100_3752767265.json", tablePath) + + tablePath, err = def.GenerateSchemaFilePath(true, 12345) + require.NoError(t, err) + require.Equal(t, "12345/meta/schema_100_3752767265.json", tablePath) +} + +func TestGenerateSchemaFilePathValidation(t *testing.T) { + t.Parallel() + + def, _ := generateTableDef() + + // empty schema + emptySchemaDef := &TableDefinition{Schema: "", Table: "t1", TableVersion: 100, TotalColumns: 1, Columns: []TableCol{{}}} + _, err := emptySchemaDef.GenerateSchemaFilePath(false, 0) + require.Error(t, err) + require.Contains(t, err.Error(), "schema cannot be empty") + + // zero table version + zeroVersionDef := &TableDefinition{Schema: "s1", Table: "t1", TableVersion: 0, TotalColumns: 1, Columns: []TableCol{{}}} + _, err = zeroVersionDef.GenerateSchemaFilePath(false, 0) + require.Error(t, err) + require.Contains(t, err.Error(), "table version cannot be zero") + + // use-table-id-as-path with invalid tableID + _, err = def.GenerateSchemaFilePath(true, 0) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid table id for table-id path") + _, err = def.GenerateSchemaFilePath(true, -1) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid table id for table-id path") + + // invalid table definition + invalidDef := &TableDefinition{Schema: "s1", Table: "t1", TableVersion: 100, TotalColumns: 1, Columns: nil} + _, err = invalidDef.GenerateSchemaFilePath(false, 0) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid table definition") } func TestTableDefinitionSum32(t *testing.T) {