Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
Expand Down
12 changes: 6 additions & 6 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package backup
import (
"context"
"encoding/json"
"fmt"
"maps"
"slices"
"time"
Expand All @@ -29,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle"
"github.com/pingcap/tidb/pkg/statistics/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -170,7 +170,7 @@ func (ss *Schemas) BackupSchemas(
}
}
// Check merge option allowed (network I/O)
isMergeOptionAllowed, partitionMergeOptionAllowed, err := schema.checkMergeOptionAllowed(ectx)
isMergeOptionAllowed, partitionMergeOptionAllowed, err := schema.checkMergeOptionAllowed(ectx, store.GetCodec())
if err != nil {
logger.Warn("failed to check merge_option for table", logutil.ShortError(err))
} else {
Expand Down Expand Up @@ -325,20 +325,20 @@ func (s *schemaInfo) encodeToSchema() (*backuppb.Schema, error) {
// Returns:
// - tableMergeOptionAllowed: whether merge_option=allow is set for the table itself
// - partitionMergeOptionAllowed: a map from partition name to whether merge_option=allow is set for that partition
func (s *schemaInfo) checkMergeOptionAllowed(ctx context.Context) (bool, map[string]bool, error) {
func (s *schemaInfo) checkMergeOptionAllowed(ctx context.Context, codec tikv.Codec) (bool, map[string]bool, error) {
partitionMergeOptionAllowed := make(map[string]bool)

// Construct the rule ID for this table using the same format as ddl/label
// Use .L (lowercase) to match DDL behavior for case-insensitive matching
dbName := s.dbInfo.Name.L
tableName := s.tableInfo.Name.L
ruleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, dbName, tableName)
ruleID := label.NewRuleID(codec, dbName, tableName, "")

// Collect all rule IDs to check (table + partitions if any)
ruleIDs := []string{ruleID}
if s.tableInfo.Partition != nil && len(s.tableInfo.Partition.Definitions) > 0 {
for _, def := range s.tableInfo.Partition.Definitions {
partitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, def.Name.L)
partitionRuleID := label.NewRuleID(codec, dbName, tableName, def.Name.L)
ruleIDs = append(ruleIDs, partitionRuleID)
}
}
Expand Down Expand Up @@ -370,7 +370,7 @@ func (s *schemaInfo) checkMergeOptionAllowed(ctx context.Context) (bool, map[str
// Check partition rules if this is a partitioned table
if s.tableInfo.Partition != nil && len(s.tableInfo.Partition.Definitions) > 0 {
for _, def := range s.tableInfo.Partition.Definitions {
partitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, def.Name.L)
partitionRuleID := label.NewRuleID(codec, dbName, tableName, def.Name.L)
if rule, exists := rules[partitionRuleID]; exists {
for _, label := range rule.Labels {
if label.Key == "merge_option" && label.Value == "allow" {
Expand Down
28 changes: 17 additions & 11 deletions br/pkg/backup/schema_merge_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package backup_test

import (
"context"
"fmt"
"math"
"testing"

Expand All @@ -17,34 +16,41 @@ import (
"github.com/pingcap/tidb/pkg/testkit"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
)

// TestBackupSchemaMergeOptionRuleIDFormat tests the rule ID format generation
func TestBackupSchemaMergeOptionRuleIDFormat(t *testing.T) {
codec := tikv.NewCodecV1(tikv.ModeTxn)

t.Run("normal table rule ID format", func(t *testing.T) {
dbName := "test"
tableName := "t1"
expectedRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, dbName, tableName)
expectedRuleID := label.NewRuleID(codec, dbName, tableName, "")
require.Equal(t, "schema/test/t1", expectedRuleID)
})

t.Run("partition table rule ID format", func(t *testing.T) {
dbName := "test"
tableName := "pt1"
partitionName := "p0"
expectedRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, partitionName)
expectedRuleID := label.NewRuleID(codec, dbName, tableName, partitionName)
require.Equal(t, "schema/test/pt1/p0", expectedRuleID)
})

t.Run("multiple partitions rule ID format", func(t *testing.T) {
dbName := "test"
tableName := "pt2"
partitions := []string{"p0", "p1", "p2"}
expected := []string{
"schema/test/pt2/p0",
"schema/test/pt2/p1",
"schema/test/pt2/p2",
}

for _, partName := range partitions {
ruleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, partName)
expected := fmt.Sprintf("schema/%s/%s/%s", dbName, tableName, partName)
require.Equal(t, expected, ruleID, "partition %s rule ID should match", partName)
for i, partName := range partitions {
ruleID := label.NewRuleID(codec, dbName, tableName, partName)
require.Equal(t, expected[i], ruleID, "partition %s rule ID should match", partName)
}
})

Expand All @@ -56,13 +62,13 @@ func TestBackupSchemaMergeOptionRuleIDFormat(t *testing.T) {
// Simulate DDL behavior: uses .L (lowercase)
// DDL would use: schema.Name.L, meta.Name.L, spec.PartitionNames[0].L
// For "TestDB"/"TestTable"/"Partition0", .L would be: "testdb"/"testtable"/"partition0"
ddlRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, "testdb", "testtable")
ddlPartitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, "testdb", "testtable", "partition0")
ddlRuleID := label.NewRuleID(codec, "testdb", "testtable", "")
ddlPartitionRuleID := label.NewRuleID(codec, "testdb", "testtable", "partition0")

// BR should also use .L (lowercase) - simulate BR behavior
// In real code: table.DB.Name.L, table.Info.Name.L, def.Name.L
brRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, "testdb", "testtable")
brPartitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, "testdb", "testtable", "partition0")
brRuleID := label.NewRuleID(codec, "testdb", "testtable", "")
brPartitionRuleID := label.NewRuleID(codec, "testdb", "testtable", "partition0")

// Both should generate the same rule ID
require.Equal(t, ddlRuleID, brRuleID, "DDL and BR should generate same table rule ID")
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ func (rc *SnapClient) setMergeOptionForTables(ctx context.Context, createdTables
}
// Use Reset() to set ID, RuleType, Data, Index, and add/update db/table labels
// Reset() uses the NEW table ID (after restore)
rule.Reset(dbName, tableName, "", newTableInfo.ID)
rule.Reset(rc.dom.Store().GetCodec(), dbName, tableName, "", newTableInfo.ID)

rulesToSet = append(rulesToSet, rule)
}
Expand Down Expand Up @@ -1237,7 +1237,7 @@ func (rc *SnapClient) setMergeOptionForTables(ctx context.Context, createdTables
}
// Use Reset() to set ID, RuleType, Data, Index, and add/update db/table/partition labels
// Reset() uses the NEW partition ID (after restore)
rule.Reset(dbName, tableName, partitionName, newDef.ID)
rule.Reset(rc.dom.Store().GetCodec(), dbName, tableName, partitionName, newDef.ID)

rulesToSet = append(rulesToSet, rule)
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,20 @@ PARTITION BY RANGE (c) (
require.Equal(t, "schema/test/rename_ot1", rows3[0][0])
require.Equal(t, `"key=value"`, rows3[0][2])
require.Equal(t, rows2[0][3], rows3[0][3])

tk.MustExec(`create database rename_dst1;`)
tk.MustExec(`create database rename_dst2;`)
tk.MustExec(`create table test.rename_multi1 (c int);`)
tk.MustExec(`create table test.rename_multi2 (c int);`)
tk.MustExec(`alter table test.rename_multi1 attributes="key=multi1";`)
tk.MustExec(`alter table test.rename_multi2 attributes="key=multi2";`)
tk.MustExec(`rename table test.rename_multi1 to rename_dst1.rename_multi1, test.rename_multi2 to rename_dst2.rename_multi2;`)
tk.MustQuery(`select id, attributes from information_schema.attributes
where id in ('schema/rename_dst1/rename_multi1', 'schema/rename_dst2/rename_multi2')
order by id`).Check(testkit.Rows(
`schema/rename_dst1/rename_multi1 "key=multi1"`,
`schema/rename_dst2/rename_multi2 "key=multi2"`,
))
}

func TestRecoverTable(t *testing.T) {
Expand Down Expand Up @@ -482,4 +496,22 @@ PARTITION BY RANGE (c) (
require.Equal(t, "schema/test/part1", rows3[1][0])
require.Equal(t, `"key2=value2"`, rows3[1][2])
require.Equal(t, rows2[1][3], rows3[1][3])

tk.MustExec(`create database exchange_partition_attrs;`)
tk.MustExec(`create database exchange_normal_attrs;`)
tk.MustExec(`create table exchange_partition_attrs.part_cross (c int)
PARTITION BY RANGE (c) (
PARTITION p0 VALUES LESS THAN (10),
PARTITION p1 VALUES LESS THAN (20)
);`)
tk.MustExec(`create table exchange_normal_attrs.part_cross_nt (c int);`)
tk.MustExec(`alter table exchange_partition_attrs.part_cross partition p0 attributes="role=partition";`)
tk.MustExec(`alter table exchange_normal_attrs.part_cross_nt attributes="role=table";`)
tk.MustExec(`alter table exchange_partition_attrs.part_cross exchange partition p0 with table exchange_normal_attrs.part_cross_nt;`)
tk.MustQuery(`select id, attributes from information_schema.attributes
where id in ('schema/exchange_normal_attrs/part_cross_nt', 'schema/exchange_partition_attrs/part_cross/p0')
order by id`).Check(testkit.Rows(
`schema/exchange_normal_attrs/part_cross_nt "role=partition"`,
`schema/exchange_partition_attrs/part_cross/p0 "role=table"`,
))
}
4 changes: 2 additions & 2 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6101,7 +6101,7 @@ func (e *executor) AlterTableAttributes(ctx sessionctx.Context, ident ast.Ident,
return dbterror.ErrInvalidAttributesSpec.GenWithStackByArgs(err)
}
ids := getIDs([]*model.TableInfo{meta})
rule.Reset(schema.Name.L, meta.Name.L, "", ids...)
rule.Reset(e.store.GetCodec(), schema.Name.L, meta.Name.L, "", ids...)

job := &model.Job{
Version: model.GetJobVerInUse(),
Expand Down Expand Up @@ -6146,7 +6146,7 @@ func (e *executor) AlterTablePartitionAttributes(ctx sessionctx.Context, ident a
if err != nil {
return dbterror.ErrInvalidAttributesSpec.GenWithStackByArgs(err)
}
rule.Reset(schema.Name.L, meta.Name.L, spec.PartitionNames[0].L, partitionID)
rule.Reset(e.store.GetCodec(), schema.Name.L, meta.Name.L, spec.PartitionNames[0].L, partitionID)

pdLabelRule := (*pdhttp.LabelRule)(rule)
job := &model.Job{
Expand Down
8 changes: 7 additions & 1 deletion pkg/ddl/label/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/ddl/label",
visibility = ["//visibility:public"],
deps = [
"//pkg/config/kerneltype",
"//pkg/parser/ast",
"//pkg/tablecodec",
"//pkg/util/codec",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//http",
"@in_gopkg_yaml_v2//:yaml_v2",
],
Expand All @@ -28,11 +30,15 @@ go_test(
],
embed = [":label"],
flaky = True,
shard_count = 8,
shard_count = 9,
deps = [
"//pkg/config/kerneltype",
"//pkg/parser/ast",
"//pkg/tablecodec",
"//pkg/testkit/testsetup",
"@com_github_pingcap_kvproto//pkg/keyspacepb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//http",
"@org_uber_go_goleak//:goleak",
],
Expand Down
12 changes: 10 additions & 2 deletions pkg/ddl/label/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"fmt"
"strings"

"github.com/pingcap/tidb/pkg/config/kerneltype"
pd "github.com/tikv/pd/client/http"
)

const (
keyspaceKey = "keyspace"
dbKey = "db"
tableKey = "table"
partitionKey = "partition"
Expand Down Expand Up @@ -99,19 +101,25 @@ func NewLabels(attrs []string) ([]pd.RegionLabel, error) {
// RestoreRegionLabels converts Attributes to a string.
func RestoreRegionLabels(labels *[]pd.RegionLabel) string {
var sb strings.Builder
for i, label := range *labels {
written := 0
for _, label := range *labels {
switch label.Key {
case dbKey, tableKey, partitionKey:
continue
case keyspaceKey:
if kerneltype.IsNextGen() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why skipped in nextgen

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keyspace is an internal label injected by TiDB for keyspace-aware table attribute label rules. It is not part of the user-specified table attributes, so information_schema.attributes should hide it in NextGen, similar to how we already hide the internal db/table/partition labels.

continue
}
default:
}

if i > 0 {
if written > 0 {
sb.WriteByte(',')
}
sb.WriteByte('"')
sb.WriteString(RestoreRegionLabel(&label))
sb.WriteByte('"')
written++
}
return sb.String()
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/ddl/label/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package label
import (
"testing"

"github.com/pingcap/tidb/pkg/config/kerneltype"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client/http"
)
Expand Down Expand Up @@ -200,6 +201,8 @@ func TestRestoreLabels(t *testing.T) {
require.NoError(t, err)
input5, err := NewLabel("partition=p1")
require.NoError(t, err)
input6, err := NewLabel("keyspace=42")
require.NoError(t, err)

tests := []TestCase{
{
Expand Down Expand Up @@ -230,4 +233,16 @@ func TestRestoreLabels(t *testing.T) {
require.Equal(t, test.output, output)
})
}

if kerneltype.IsNextGen() {
output := RestoreRegionLabels(&[]pd.RegionLabel{input1, input6})
require.Equal(t, `"merge_option=allow"`, output)
output = RestoreRegionLabels(&[]pd.RegionLabel{input6, input1})
require.Equal(t, `"merge_option=allow"`, output)
} else {
output := RestoreRegionLabels(&[]pd.RegionLabel{input1, input6})
require.Equal(t, `"merge_option=allow","keyspace=42"`, output)
output = RestoreRegionLabels(&[]pd.RegionLabel{input6, input1})
require.Equal(t, `"keyspace=42","merge_option=allow"`, output)
}
}
Loading
Loading