Skip to content

Commit 8d52045

Browse files
committed
*: make table attribute labels keyspace-aware
1 parent 5ffa989 commit 8d52045

21 files changed

Lines changed: 476 additions & 94 deletions

br/pkg/backup/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ go_library(
5050
"@com_github_pingcap_kvproto//pkg/metapb",
5151
"@com_github_pingcap_log//:log",
5252
"@com_github_tikv_client_go_v2//oracle",
53+
"@com_github_tikv_client_go_v2//tikv",
5354
"@com_github_tikv_client_go_v2//txnkv/txnlock",
5455
"@com_github_tikv_client_go_v2//util",
5556
"@com_github_tikv_pd_client//:client",

br/pkg/backup/schema.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package backup
55
import (
66
"context"
77
"encoding/json"
8-
"fmt"
98
"maps"
109
"slices"
1110
"time"
@@ -29,6 +28,7 @@ import (
2928
"github.com/pingcap/tidb/pkg/statistics/handle"
3029
"github.com/pingcap/tidb/pkg/statistics/util"
3130
tidbutil "github.com/pingcap/tidb/pkg/util"
31+
"github.com/tikv/client-go/v2/tikv"
3232
kvutil "github.com/tikv/client-go/v2/util"
3333
"go.uber.org/zap"
3434
"golang.org/x/sync/errgroup"
@@ -170,7 +170,7 @@ func (ss *Schemas) BackupSchemas(
170170
}
171171
}
172172
// Check merge option allowed (network I/O)
173-
isMergeOptionAllowed, partitionMergeOptionAllowed, err := schema.checkMergeOptionAllowed(ectx)
173+
isMergeOptionAllowed, partitionMergeOptionAllowed, err := schema.checkMergeOptionAllowed(ectx, store.GetCodec())
174174
if err != nil {
175175
logger.Warn("failed to check merge_option for table", logutil.ShortError(err))
176176
} else {
@@ -325,20 +325,20 @@ func (s *schemaInfo) encodeToSchema() (*backuppb.Schema, error) {
325325
// Returns:
326326
// - tableMergeOptionAllowed: whether merge_option=allow is set for the table itself
327327
// - partitionMergeOptionAllowed: a map from partition name to whether merge_option=allow is set for that partition
328-
func (s *schemaInfo) checkMergeOptionAllowed(ctx context.Context) (bool, map[string]bool, error) {
328+
func (s *schemaInfo) checkMergeOptionAllowed(ctx context.Context, codec tikv.Codec) (bool, map[string]bool, error) {
329329
partitionMergeOptionAllowed := make(map[string]bool)
330330

331331
// Construct the rule ID for this table using the same format as ddl/label
332332
// Use .L (lowercase) to match DDL behavior for case-insensitive matching
333333
dbName := s.dbInfo.Name.L
334334
tableName := s.tableInfo.Name.L
335-
ruleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, dbName, tableName)
335+
ruleID := label.NewRuleID(codec, dbName, tableName, "")
336336

337337
// Collect all rule IDs to check (table + partitions if any)
338338
ruleIDs := []string{ruleID}
339339
if s.tableInfo.Partition != nil && len(s.tableInfo.Partition.Definitions) > 0 {
340340
for _, def := range s.tableInfo.Partition.Definitions {
341-
partitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, def.Name.L)
341+
partitionRuleID := label.NewRuleID(codec, dbName, tableName, def.Name.L)
342342
ruleIDs = append(ruleIDs, partitionRuleID)
343343
}
344344
}
@@ -370,7 +370,7 @@ func (s *schemaInfo) checkMergeOptionAllowed(ctx context.Context) (bool, map[str
370370
// Check partition rules if this is a partitioned table
371371
if s.tableInfo.Partition != nil && len(s.tableInfo.Partition.Definitions) > 0 {
372372
for _, def := range s.tableInfo.Partition.Definitions {
373-
partitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, def.Name.L)
373+
partitionRuleID := label.NewRuleID(codec, dbName, tableName, def.Name.L)
374374
if rule, exists := rules[partitionRuleID]; exists {
375375
for _, label := range rule.Labels {
376376
if label.Key == "merge_option" && label.Value == "allow" {

br/pkg/backup/schema_merge_option_test.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package backup_test
44

55
import (
66
"context"
7-
"fmt"
87
"math"
98
"testing"
109

@@ -17,34 +16,41 @@ import (
1716
"github.com/pingcap/tidb/pkg/testkit"
1817
filter "github.com/pingcap/tidb/pkg/util/table-filter"
1918
"github.com/stretchr/testify/require"
19+
"github.com/tikv/client-go/v2/tikv"
2020
)
2121

2222
// TestBackupSchemaMergeOptionRuleIDFormat tests the rule ID format generation
2323
func TestBackupSchemaMergeOptionRuleIDFormat(t *testing.T) {
24+
codec := tikv.NewCodecV1(tikv.ModeTxn)
25+
2426
t.Run("normal table rule ID format", func(t *testing.T) {
2527
dbName := "test"
2628
tableName := "t1"
27-
expectedRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, dbName, tableName)
29+
expectedRuleID := label.NewRuleID(codec, dbName, tableName, "")
2830
require.Equal(t, "schema/test/t1", expectedRuleID)
2931
})
3032

3133
t.Run("partition table rule ID format", func(t *testing.T) {
3234
dbName := "test"
3335
tableName := "pt1"
3436
partitionName := "p0"
35-
expectedRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, partitionName)
37+
expectedRuleID := label.NewRuleID(codec, dbName, tableName, partitionName)
3638
require.Equal(t, "schema/test/pt1/p0", expectedRuleID)
3739
})
3840

3941
t.Run("multiple partitions rule ID format", func(t *testing.T) {
4042
dbName := "test"
4143
tableName := "pt2"
4244
partitions := []string{"p0", "p1", "p2"}
45+
expected := []string{
46+
"schema/test/pt2/p0",
47+
"schema/test/pt2/p1",
48+
"schema/test/pt2/p2",
49+
}
4350

44-
for _, partName := range partitions {
45-
ruleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, partName)
46-
expected := fmt.Sprintf("schema/%s/%s/%s", dbName, tableName, partName)
47-
require.Equal(t, expected, ruleID, "partition %s rule ID should match", partName)
51+
for i, partName := range partitions {
52+
ruleID := label.NewRuleID(codec, dbName, tableName, partName)
53+
require.Equal(t, expected[i], ruleID, "partition %s rule ID should match", partName)
4854
}
4955
})
5056

@@ -56,13 +62,13 @@ func TestBackupSchemaMergeOptionRuleIDFormat(t *testing.T) {
5662
// Simulate DDL behavior: uses .L (lowercase)
5763
// DDL would use: schema.Name.L, meta.Name.L, spec.PartitionNames[0].L
5864
// For "TestDB"/"TestTable"/"Partition0", .L would be: "testdb"/"testtable"/"partition0"
59-
ddlRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, "testdb", "testtable")
60-
ddlPartitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, "testdb", "testtable", "partition0")
65+
ddlRuleID := label.NewRuleID(codec, "testdb", "testtable", "")
66+
ddlPartitionRuleID := label.NewRuleID(codec, "testdb", "testtable", "partition0")
6167

6268
// BR should also use .L (lowercase) - simulate BR behavior
6369
// In real code: table.DB.Name.L, table.Info.Name.L, def.Name.L
64-
brRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, "testdb", "testtable")
65-
brPartitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, "testdb", "testtable", "partition0")
70+
brRuleID := label.NewRuleID(codec, "testdb", "testtable", "")
71+
brPartitionRuleID := label.NewRuleID(codec, "testdb", "testtable", "partition0")
6672

6773
// Both should generate the same rule ID
6874
require.Equal(t, ddlRuleID, brRuleID, "DDL and BR should generate same table rule ID")

br/pkg/restore/snap_client/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,7 +1199,7 @@ func (rc *SnapClient) setMergeOptionForTables(ctx context.Context, createdTables
11991199
}
12001200
// Use Reset() to set ID, RuleType, Data, Index, and add/update db/table labels
12011201
// Reset() uses the NEW table ID (after restore)
1202-
rule.Reset(dbName, tableName, "", newTableInfo.ID)
1202+
rule.Reset(rc.dom.Store().GetCodec(), dbName, tableName, "", newTableInfo.ID)
12031203

12041204
rulesToSet = append(rulesToSet, rule)
12051205
}
@@ -1237,7 +1237,7 @@ func (rc *SnapClient) setMergeOptionForTables(ctx context.Context, createdTables
12371237
}
12381238
// Use Reset() to set ID, RuleType, Data, Index, and add/update db/table/partition labels
12391239
// Reset() uses the NEW partition ID (after restore)
1240-
rule.Reset(dbName, tableName, partitionName, newDef.ID)
1240+
rule.Reset(rc.dom.Store().GetCodec(), dbName, tableName, partitionName, newDef.ID)
12411241

12421242
rulesToSet = append(rulesToSet, rule)
12431243
}

pkg/ddl/attributes_sql_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,20 @@ PARTITION BY RANGE (c) (
208208
require.Equal(t, "schema/test/rename_ot1", rows3[0][0])
209209
require.Equal(t, `"key=value"`, rows3[0][2])
210210
require.Equal(t, rows2[0][3], rows3[0][3])
211+
212+
tk.MustExec(`create database rename_dst1;`)
213+
tk.MustExec(`create database rename_dst2;`)
214+
tk.MustExec(`create table test.rename_multi1 (c int);`)
215+
tk.MustExec(`create table test.rename_multi2 (c int);`)
216+
tk.MustExec(`alter table test.rename_multi1 attributes="key=multi1";`)
217+
tk.MustExec(`alter table test.rename_multi2 attributes="key=multi2";`)
218+
tk.MustExec(`rename table test.rename_multi1 to rename_dst1.rename_multi1, test.rename_multi2 to rename_dst2.rename_multi2;`)
219+
tk.MustQuery(`select id, attributes from information_schema.attributes
220+
where id in ('schema/rename_dst1/rename_multi1', 'schema/rename_dst2/rename_multi2')
221+
order by id`).Check(testkit.Rows(
222+
`schema/rename_dst1/rename_multi1 "key=multi1"`,
223+
`schema/rename_dst2/rename_multi2 "key=multi2"`,
224+
))
211225
}
212226

213227
func TestRecoverTable(t *testing.T) {
@@ -482,4 +496,22 @@ PARTITION BY RANGE (c) (
482496
require.Equal(t, "schema/test/part1", rows3[1][0])
483497
require.Equal(t, `"key2=value2"`, rows3[1][2])
484498
require.Equal(t, rows2[1][3], rows3[1][3])
499+
500+
tk.MustExec(`create database exchange_partition_attrs;`)
501+
tk.MustExec(`create database exchange_normal_attrs;`)
502+
tk.MustExec(`create table exchange_partition_attrs.part_cross (c int)
503+
PARTITION BY RANGE (c) (
504+
PARTITION p0 VALUES LESS THAN (10),
505+
PARTITION p1 VALUES LESS THAN (20)
506+
);`)
507+
tk.MustExec(`create table exchange_normal_attrs.part_cross_nt (c int);`)
508+
tk.MustExec(`alter table exchange_partition_attrs.part_cross partition p0 attributes="role=partition";`)
509+
tk.MustExec(`alter table exchange_normal_attrs.part_cross_nt attributes="role=table";`)
510+
tk.MustExec(`alter table exchange_partition_attrs.part_cross exchange partition p0 with table exchange_normal_attrs.part_cross_nt;`)
511+
tk.MustQuery(`select id, attributes from information_schema.attributes
512+
where id in ('schema/exchange_normal_attrs/part_cross_nt', 'schema/exchange_partition_attrs/part_cross/p0')
513+
order by id`).Check(testkit.Rows(
514+
`schema/exchange_normal_attrs/part_cross_nt "role=partition"`,
515+
`schema/exchange_partition_attrs/part_cross/p0 "role=table"`,
516+
))
485517
}

pkg/ddl/executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6101,7 +6101,7 @@ func (e *executor) AlterTableAttributes(ctx sessionctx.Context, ident ast.Ident,
61016101
return dbterror.ErrInvalidAttributesSpec.GenWithStackByArgs(err)
61026102
}
61036103
ids := getIDs([]*model.TableInfo{meta})
6104-
rule.Reset(schema.Name.L, meta.Name.L, "", ids...)
6104+
rule.Reset(e.store.GetCodec(), schema.Name.L, meta.Name.L, "", ids...)
61056105

61066106
job := &model.Job{
61076107
Version: model.GetJobVerInUse(),
@@ -6146,7 +6146,7 @@ func (e *executor) AlterTablePartitionAttributes(ctx sessionctx.Context, ident a
61466146
if err != nil {
61476147
return dbterror.ErrInvalidAttributesSpec.GenWithStackByArgs(err)
61486148
}
6149-
rule.Reset(schema.Name.L, meta.Name.L, spec.PartitionNames[0].L, partitionID)
6149+
rule.Reset(e.store.GetCodec(), schema.Name.L, meta.Name.L, spec.PartitionNames[0].L, partitionID)
61506150

61516151
pdLabelRule := (*pdhttp.LabelRule)(rule)
61526152
job := &model.Job{

pkg/ddl/label/BUILD.bazel

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ go_library(
1010
importpath = "github.com/pingcap/tidb/pkg/ddl/label",
1111
visibility = ["//visibility:public"],
1212
deps = [
13+
"//pkg/config/kerneltype",
1314
"//pkg/parser/ast",
1415
"//pkg/tablecodec",
1516
"//pkg/util/codec",
17+
"@com_github_tikv_client_go_v2//tikv",
1618
"@com_github_tikv_pd_client//http",
1719
"@in_gopkg_yaml_v2//:yaml_v2",
1820
],
@@ -28,11 +30,15 @@ go_test(
2830
],
2931
embed = [":label"],
3032
flaky = True,
31-
shard_count = 8,
33+
shard_count = 9,
3234
deps = [
35+
"//pkg/config/kerneltype",
3336
"//pkg/parser/ast",
37+
"//pkg/tablecodec",
3438
"//pkg/testkit/testsetup",
39+
"@com_github_pingcap_kvproto//pkg/keyspacepb",
3540
"@com_github_stretchr_testify//require",
41+
"@com_github_tikv_client_go_v2//tikv",
3642
"@com_github_tikv_pd_client//http",
3743
"@org_uber_go_goleak//:goleak",
3844
],

pkg/ddl/label/attributes.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ import (
1818
"fmt"
1919
"strings"
2020

21+
"github.com/pingcap/tidb/pkg/config/kerneltype"
2122
pd "github.com/tikv/pd/client/http"
2223
)
2324

2425
const (
26+
keyspaceKey = "keyspace"
2527
dbKey = "db"
2628
tableKey = "table"
2729
partitionKey = "partition"
@@ -99,19 +101,25 @@ func NewLabels(attrs []string) ([]pd.RegionLabel, error) {
99101
// RestoreRegionLabels converts Attributes to a string.
100102
func RestoreRegionLabels(labels *[]pd.RegionLabel) string {
101103
var sb strings.Builder
102-
for i, label := range *labels {
104+
written := 0
105+
for _, label := range *labels {
103106
switch label.Key {
104107
case dbKey, tableKey, partitionKey:
105108
continue
109+
case keyspaceKey:
110+
if kerneltype.IsNextGen() {
111+
continue
112+
}
106113
default:
107114
}
108115

109-
if i > 0 {
116+
if written > 0 {
110117
sb.WriteByte(',')
111118
}
112119
sb.WriteByte('"')
113120
sb.WriteString(RestoreRegionLabel(&label))
114121
sb.WriteByte('"')
122+
written++
115123
}
116124
return sb.String()
117125
}

pkg/ddl/label/attributes_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package label
1717
import (
1818
"testing"
1919

20+
"github.com/pingcap/tidb/pkg/config/kerneltype"
2021
"github.com/stretchr/testify/require"
2122
pd "github.com/tikv/pd/client/http"
2223
)
@@ -200,6 +201,8 @@ func TestRestoreLabels(t *testing.T) {
200201
require.NoError(t, err)
201202
input5, err := NewLabel("partition=p1")
202203
require.NoError(t, err)
204+
input6, err := NewLabel("keyspace=42")
205+
require.NoError(t, err)
203206

204207
tests := []TestCase{
205208
{
@@ -230,4 +233,16 @@ func TestRestoreLabels(t *testing.T) {
230233
require.Equal(t, test.output, output)
231234
})
232235
}
236+
237+
if kerneltype.IsNextGen() {
238+
output := RestoreRegionLabels(&[]pd.RegionLabel{input1, input6})
239+
require.Equal(t, `"merge_option=allow"`, output)
240+
output = RestoreRegionLabels(&[]pd.RegionLabel{input6, input1})
241+
require.Equal(t, `"merge_option=allow"`, output)
242+
} else {
243+
output := RestoreRegionLabels(&[]pd.RegionLabel{input1, input6})
244+
require.Equal(t, `"merge_option=allow","keyspace=42"`, output)
245+
output = RestoreRegionLabels(&[]pd.RegionLabel{input6, input1})
246+
require.Equal(t, `"keyspace=42","merge_option=allow"`, output)
247+
}
233248
}

0 commit comments

Comments
 (0)