Skip to content

Commit b81e920

Browse files
committed
*: make table attribute labels keyspace-aware
1 parent 5ffa989 commit b81e920

20 files changed

Lines changed: 373 additions & 91 deletions

br/pkg/backup/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ go_library(
5050
"@com_github_pingcap_kvproto//pkg/metapb",
5151
"@com_github_pingcap_log//:log",
5252
"@com_github_tikv_client_go_v2//oracle",
53+
"@com_github_tikv_client_go_v2//tikv",
5354
"@com_github_tikv_client_go_v2//txnkv/txnlock",
5455
"@com_github_tikv_client_go_v2//util",
5556
"@com_github_tikv_pd_client//:client",

br/pkg/backup/schema.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package backup
55
import (
66
"context"
77
"encoding/json"
8-
"fmt"
98
"maps"
109
"slices"
1110
"time"
@@ -29,6 +28,7 @@ import (
2928
"github.com/pingcap/tidb/pkg/statistics/handle"
3029
"github.com/pingcap/tidb/pkg/statistics/util"
3130
tidbutil "github.com/pingcap/tidb/pkg/util"
31+
"github.com/tikv/client-go/v2/tikv"
3232
kvutil "github.com/tikv/client-go/v2/util"
3333
"go.uber.org/zap"
3434
"golang.org/x/sync/errgroup"
@@ -170,7 +170,7 @@ func (ss *Schemas) BackupSchemas(
170170
}
171171
}
172172
// Check merge option allowed (network I/O)
173-
isMergeOptionAllowed, partitionMergeOptionAllowed, err := schema.checkMergeOptionAllowed(ectx)
173+
isMergeOptionAllowed, partitionMergeOptionAllowed, err := schema.checkMergeOptionAllowed(ectx, store.GetCodec())
174174
if err != nil {
175175
logger.Warn("failed to check merge_option for table", logutil.ShortError(err))
176176
} else {
@@ -325,20 +325,20 @@ func (s *schemaInfo) encodeToSchema() (*backuppb.Schema, error) {
325325
// Returns:
326326
// - tableMergeOptionAllowed: whether merge_option=allow is set for the table itself
327327
// - partitionMergeOptionAllowed: a map from partition name to whether merge_option=allow is set for that partition
328-
func (s *schemaInfo) checkMergeOptionAllowed(ctx context.Context) (bool, map[string]bool, error) {
328+
func (s *schemaInfo) checkMergeOptionAllowed(ctx context.Context, codec tikv.Codec) (bool, map[string]bool, error) {
329329
partitionMergeOptionAllowed := make(map[string]bool)
330330

331331
// Construct the rule ID for this table using the same format as ddl/label
332332
// Use .L (lowercase) to match DDL behavior for case-insensitive matching
333333
dbName := s.dbInfo.Name.L
334334
tableName := s.tableInfo.Name.L
335-
ruleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, dbName, tableName)
335+
ruleID := label.NewRuleID(codec, dbName, tableName, "")
336336

337337
// Collect all rule IDs to check (table + partitions if any)
338338
ruleIDs := []string{ruleID}
339339
if s.tableInfo.Partition != nil && len(s.tableInfo.Partition.Definitions) > 0 {
340340
for _, def := range s.tableInfo.Partition.Definitions {
341-
partitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, def.Name.L)
341+
partitionRuleID := label.NewRuleID(codec, dbName, tableName, def.Name.L)
342342
ruleIDs = append(ruleIDs, partitionRuleID)
343343
}
344344
}
@@ -370,7 +370,7 @@ func (s *schemaInfo) checkMergeOptionAllowed(ctx context.Context) (bool, map[str
370370
// Check partition rules if this is a partitioned table
371371
if s.tableInfo.Partition != nil && len(s.tableInfo.Partition.Definitions) > 0 {
372372
for _, def := range s.tableInfo.Partition.Definitions {
373-
partitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, def.Name.L)
373+
partitionRuleID := label.NewRuleID(codec, dbName, tableName, def.Name.L)
374374
if rule, exists := rules[partitionRuleID]; exists {
375375
for _, label := range rule.Labels {
376376
if label.Key == "merge_option" && label.Value == "allow" {

br/pkg/backup/schema_merge_option_test.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package backup_test
44

55
import (
66
"context"
7-
"fmt"
87
"math"
98
"testing"
109

@@ -17,34 +16,41 @@ import (
1716
"github.com/pingcap/tidb/pkg/testkit"
1817
filter "github.com/pingcap/tidb/pkg/util/table-filter"
1918
"github.com/stretchr/testify/require"
19+
"github.com/tikv/client-go/v2/tikv"
2020
)
2121

2222
// TestBackupSchemaMergeOptionRuleIDFormat tests the rule ID format generation
2323
func TestBackupSchemaMergeOptionRuleIDFormat(t *testing.T) {
24+
codec := tikv.NewCodecV1(tikv.ModeTxn)
25+
2426
t.Run("normal table rule ID format", func(t *testing.T) {
2527
dbName := "test"
2628
tableName := "t1"
27-
expectedRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, dbName, tableName)
29+
expectedRuleID := label.NewRuleID(codec, dbName, tableName, "")
2830
require.Equal(t, "schema/test/t1", expectedRuleID)
2931
})
3032

3133
t.Run("partition table rule ID format", func(t *testing.T) {
3234
dbName := "test"
3335
tableName := "pt1"
3436
partitionName := "p0"
35-
expectedRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, partitionName)
37+
expectedRuleID := label.NewRuleID(codec, dbName, tableName, partitionName)
3638
require.Equal(t, "schema/test/pt1/p0", expectedRuleID)
3739
})
3840

3941
t.Run("multiple partitions rule ID format", func(t *testing.T) {
4042
dbName := "test"
4143
tableName := "pt2"
4244
partitions := []string{"p0", "p1", "p2"}
45+
expected := []string{
46+
"schema/test/pt2/p0",
47+
"schema/test/pt2/p1",
48+
"schema/test/pt2/p2",
49+
}
4350

44-
for _, partName := range partitions {
45-
ruleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, dbName, tableName, partName)
46-
expected := fmt.Sprintf("schema/%s/%s/%s", dbName, tableName, partName)
47-
require.Equal(t, expected, ruleID, "partition %s rule ID should match", partName)
51+
for i, partName := range partitions {
52+
ruleID := label.NewRuleID(codec, dbName, tableName, partName)
53+
require.Equal(t, expected[i], ruleID, "partition %s rule ID should match", partName)
4854
}
4955
})
5056

@@ -56,13 +62,13 @@ func TestBackupSchemaMergeOptionRuleIDFormat(t *testing.T) {
5662
// Simulate DDL behavior: uses .L (lowercase)
5763
// DDL would use: schema.Name.L, meta.Name.L, spec.PartitionNames[0].L
5864
// For "TestDB"/"TestTable"/"Partition0", .L would be: "testdb"/"testtable"/"partition0"
59-
ddlRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, "testdb", "testtable")
60-
ddlPartitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, "testdb", "testtable", "partition0")
65+
ddlRuleID := label.NewRuleID(codec, "testdb", "testtable", "")
66+
ddlPartitionRuleID := label.NewRuleID(codec, "testdb", "testtable", "partition0")
6167

6268
// BR should also use .L (lowercase) - simulate BR behavior
6369
// In real code: table.DB.Name.L, table.Info.Name.L, def.Name.L
64-
brRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, "testdb", "testtable")
65-
brPartitionRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, "testdb", "testtable", "partition0")
70+
brRuleID := label.NewRuleID(codec, "testdb", "testtable", "")
71+
brPartitionRuleID := label.NewRuleID(codec, "testdb", "testtable", "partition0")
6672

6773
// Both should generate the same rule ID
6874
require.Equal(t, ddlRuleID, brRuleID, "DDL and BR should generate same table rule ID")

br/pkg/restore/snap_client/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,7 +1199,7 @@ func (rc *SnapClient) setMergeOptionForTables(ctx context.Context, createdTables
11991199
}
12001200
// Use Reset() to set ID, RuleType, Data, Index, and add/update db/table labels
12011201
// Reset() uses the NEW table ID (after restore)
1202-
rule.Reset(dbName, tableName, "", newTableInfo.ID)
1202+
rule.Reset(rc.dom.Store().GetCodec(), dbName, tableName, "", newTableInfo.ID)
12031203

12041204
rulesToSet = append(rulesToSet, rule)
12051205
}
@@ -1237,7 +1237,7 @@ func (rc *SnapClient) setMergeOptionForTables(ctx context.Context, createdTables
12371237
}
12381238
// Use Reset() to set ID, RuleType, Data, Index, and add/update db/table/partition labels
12391239
// Reset() uses the NEW partition ID (after restore)
1240-
rule.Reset(dbName, tableName, partitionName, newDef.ID)
1240+
rule.Reset(rc.dom.Store().GetCodec(), dbName, tableName, partitionName, newDef.ID)
12411241

12421242
rulesToSet = append(rulesToSet, rule)
12431243
}

pkg/ddl/executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6101,7 +6101,7 @@ func (e *executor) AlterTableAttributes(ctx sessionctx.Context, ident ast.Ident,
61016101
return dbterror.ErrInvalidAttributesSpec.GenWithStackByArgs(err)
61026102
}
61036103
ids := getIDs([]*model.TableInfo{meta})
6104-
rule.Reset(schema.Name.L, meta.Name.L, "", ids...)
6104+
rule.Reset(e.store.GetCodec(), schema.Name.L, meta.Name.L, "", ids...)
61056105

61066106
job := &model.Job{
61076107
Version: model.GetJobVerInUse(),
@@ -6146,7 +6146,7 @@ func (e *executor) AlterTablePartitionAttributes(ctx sessionctx.Context, ident a
61466146
if err != nil {
61476147
return dbterror.ErrInvalidAttributesSpec.GenWithStackByArgs(err)
61486148
}
6149-
rule.Reset(schema.Name.L, meta.Name.L, spec.PartitionNames[0].L, partitionID)
6149+
rule.Reset(e.store.GetCodec(), schema.Name.L, meta.Name.L, spec.PartitionNames[0].L, partitionID)
61506150

61516151
pdLabelRule := (*pdhttp.LabelRule)(rule)
61526152
job := &model.Job{

pkg/ddl/label/BUILD.bazel

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ go_library(
1010
importpath = "github.com/pingcap/tidb/pkg/ddl/label",
1111
visibility = ["//visibility:public"],
1212
deps = [
13+
"//pkg/config/kerneltype",
1314
"//pkg/parser/ast",
1415
"//pkg/tablecodec",
1516
"//pkg/util/codec",
17+
"@com_github_tikv_client_go_v2//tikv",
1618
"@com_github_tikv_pd_client//http",
1719
"@in_gopkg_yaml_v2//:yaml_v2",
1820
],
@@ -28,11 +30,15 @@ go_test(
2830
],
2931
embed = [":label"],
3032
flaky = True,
31-
shard_count = 8,
33+
shard_count = 9,
3234
deps = [
35+
"//pkg/config/kerneltype",
3336
"//pkg/parser/ast",
37+
"//pkg/tablecodec",
3438
"//pkg/testkit/testsetup",
39+
"@com_github_pingcap_kvproto//pkg/keyspacepb",
3540
"@com_github_stretchr_testify//require",
41+
"@com_github_tikv_client_go_v2//tikv",
3642
"@com_github_tikv_pd_client//http",
3743
"@org_uber_go_goleak//:goleak",
3844
],

pkg/ddl/label/attributes.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ import (
1818
"fmt"
1919
"strings"
2020

21+
"github.com/pingcap/tidb/pkg/config/kerneltype"
2122
pd "github.com/tikv/pd/client/http"
2223
)
2324

2425
const (
26+
keyspaceKey = "keyspace"
2527
dbKey = "db"
2628
tableKey = "table"
2729
partitionKey = "partition"
@@ -103,6 +105,10 @@ func RestoreRegionLabels(labels *[]pd.RegionLabel) string {
103105
switch label.Key {
104106
case dbKey, tableKey, partitionKey:
105107
continue
108+
case keyspaceKey:
109+
if kerneltype.IsNextGen() {
110+
continue
111+
}
106112
default:
107113
}
108114

pkg/ddl/label/attributes_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package label
1717
import (
1818
"testing"
1919

20+
"github.com/pingcap/tidb/pkg/config/kerneltype"
2021
"github.com/stretchr/testify/require"
2122
pd "github.com/tikv/pd/client/http"
2223
)
@@ -200,6 +201,8 @@ func TestRestoreLabels(t *testing.T) {
200201
require.NoError(t, err)
201202
input5, err := NewLabel("partition=p1")
202203
require.NoError(t, err)
204+
input6, err := NewLabel("keyspace=42")
205+
require.NoError(t, err)
203206

204207
tests := []TestCase{
205208
{
@@ -230,4 +233,12 @@ func TestRestoreLabels(t *testing.T) {
230233
require.Equal(t, test.output, output)
231234
})
232235
}
236+
237+
if kerneltype.IsNextGen() {
238+
output := RestoreRegionLabels(&[]pd.RegionLabel{input1, input6})
239+
require.Equal(t, `"merge_option=allow"`, output)
240+
} else {
241+
output := RestoreRegionLabels(&[]pd.RegionLabel{input1, input6})
242+
require.Equal(t, `"merge_option=allow","keyspace=42"`, output)
243+
}
233244
}

pkg/ddl/label/rule.go

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,23 @@ import (
1919
"encoding/json"
2020
"fmt"
2121
"slices"
22+
"strconv"
2223

24+
"github.com/pingcap/tidb/pkg/config/kerneltype"
2325
"github.com/pingcap/tidb/pkg/parser/ast"
2426
"github.com/pingcap/tidb/pkg/tablecodec"
2527
"github.com/pingcap/tidb/pkg/util/codec"
28+
"github.com/tikv/client-go/v2/tikv"
2629
pd "github.com/tikv/pd/client/http"
2730
"gopkg.in/yaml.v2"
2831
)
2932

3033
const (
3134
// IDPrefix is the prefix for label rule ID.
3235
IDPrefix = "schema"
33-
ruleType = "key-range"
36+
// KeyspacePrefix is the prefix for keyspace in label rule ID.
37+
KeyspacePrefix = "keyspace"
38+
ruleType = "key-range"
3439
)
3540

3641
const (
@@ -94,20 +99,43 @@ func (r *Rule) Clone() *Rule {
9499
return newRule
95100
}
96101

97-
// Reset will reset the label rule for a table/partition with a given ID and names.
98-
func (r *Rule) Reset(dbName, tableName, partName string, ids ...int64) *Rule {
102+
// UseKeyspaceAwareRules returns true when table attribute label rules should be
103+
// scoped by keyspace in NextGen deployments.
104+
func UseKeyspaceAwareRules(tikvCodec tikv.Codec) bool {
105+
return kerneltype.IsNextGen() && tikvCodec != nil && tikvCodec.GetKeyspaceMeta() != nil
106+
}
107+
108+
// NewRuleID generates a new rule ID for a table or partition.
109+
func NewRuleID(tikvCodec tikv.Codec, dbName, tableName, partName string) string {
110+
var id string
99111
isPartition := partName != ""
100112
if isPartition {
101-
r.ID = fmt.Sprintf(PartitionIDFormat, IDPrefix, dbName, tableName, partName)
113+
id = fmt.Sprintf(PartitionIDFormat, IDPrefix, dbName, tableName, partName)
102114
} else {
103-
r.ID = fmt.Sprintf(TableIDFormat, IDPrefix, dbName, tableName)
115+
id = fmt.Sprintf(TableIDFormat, IDPrefix, dbName, tableName)
104116
}
117+
if UseKeyspaceAwareRules(tikvCodec) {
118+
id = fmt.Sprintf("%s/%d/%s", KeyspacePrefix, tikvCodec.GetKeyspaceID(), id)
119+
}
120+
return id
121+
}
122+
123+
// Reset will reset the label rule for a table/partition with a given ID and names.
124+
func (r *Rule) Reset(tikvCodec tikv.Codec, dbName, tableName, partName string, ids ...int64) *Rule {
125+
isPartition := partName != ""
126+
useKeyspace := UseKeyspaceAwareRules(tikvCodec)
127+
r.ID = NewRuleID(tikvCodec, dbName, tableName, partName)
105128
if len(r.Labels) == 0 {
106129
return r
107130
}
108-
var hasDBKey, hasTableKey, hasPartitionKey bool
131+
var hasKeyspaceKey, hasDBKey, hasTableKey, hasPartitionKey bool
109132
for i := range r.Labels {
110133
switch r.Labels[i].Key {
134+
case keyspaceKey:
135+
if useKeyspace {
136+
r.Labels[i].Value = strconv.FormatInt(int64(tikvCodec.GetKeyspaceID()), 10)
137+
hasKeyspaceKey = true
138+
}
111139
case dbKey:
112140
r.Labels[i].Value = dbName
113141
hasDBKey = true
@@ -123,6 +151,10 @@ func (r *Rule) Reset(dbName, tableName, partName string, ids ...int64) *Rule {
123151
}
124152
}
125153

154+
if useKeyspace && !hasKeyspaceKey {
155+
r.Labels = append(r.Labels, pd.RegionLabel{Key: keyspaceKey, Value: strconv.FormatInt(int64(tikvCodec.GetKeyspaceID()), 10)})
156+
}
157+
126158
if !hasDBKey {
127159
r.Labels = append(r.Labels, pd.RegionLabel{Key: dbKey, Value: dbName})
128160
}
@@ -138,9 +170,16 @@ func (r *Rule) Reset(dbName, tableName, partName string, ids ...int64) *Rule {
138170
dataSlice := make([]any, 0, len(ids))
139171
slices.Sort(ids)
140172
for i := range ids {
173+
startKey := codec.EncodeBytes(nil, tablecodec.GenTablePrefix(ids[i]))
174+
endKey := codec.EncodeBytes(nil, tablecodec.GenTablePrefix(ids[i]+1))
175+
if useKeyspace {
176+
// Label rules are consumed as region boundary keys, so V2 must encode
177+
// the whole outer key instead of prefixing a mem-encoded table key.
178+
startKey, endKey = tikvCodec.EncodeRegionRange(tablecodec.GenTablePrefix(ids[i]), tablecodec.GenTablePrefix(ids[i]+1))
179+
}
141180
data := map[string]string{
142-
"start_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(ids[i]))),
143-
"end_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(ids[i]+1))),
181+
"start_key": hex.EncodeToString(startKey),
182+
"end_key": hex.EncodeToString(endKey),
144183
}
145184
dataSlice = append(dataSlice, data)
146185
}

0 commit comments

Comments
 (0)