forked from pingcap/tidb
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathschema_merge_option_test.go
More file actions
161 lines (134 loc) · 5.79 KB
/
schema_merge_option_test.go
File metadata and controls
161 lines (134 loc) · 5.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.
package backup_test
import (
"context"
"math"
"testing"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/tidb/br/pkg/backup"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/pkg/ddl/label"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/testkit"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
)
// TestBackupSchemaMergeOptionRuleIDFormat tests the rule ID format generation
func TestBackupSchemaMergeOptionRuleIDFormat(t *testing.T) {
codec := tikv.NewCodecV1(tikv.ModeTxn)
t.Run("normal table rule ID format", func(t *testing.T) {
dbName := "test"
tableName := "t1"
expectedRuleID := label.NewRuleID(codec, dbName, tableName, "")
require.Equal(t, "schema/test/t1", expectedRuleID)
})
t.Run("partition table rule ID format", func(t *testing.T) {
dbName := "test"
tableName := "pt1"
partitionName := "p0"
expectedRuleID := label.NewRuleID(codec, dbName, tableName, partitionName)
require.Equal(t, "schema/test/pt1/p0", expectedRuleID)
})
t.Run("multiple partitions rule ID format", func(t *testing.T) {
dbName := "test"
tableName := "pt2"
partitions := []string{"p0", "p1", "p2"}
expected := []string{
"schema/test/pt2/p0",
"schema/test/pt2/p1",
"schema/test/pt2/p2",
}
for i, partName := range partitions {
ruleID := label.NewRuleID(codec, dbName, tableName, partName)
require.Equal(t, expected[i], ruleID, "partition %s rule ID should match", partName)
}
})
t.Run("rule ID format matches DDL behavior - case insensitive", func(t *testing.T) {
// DDL uses .L (lowercase) for rule ID generation
// BR should also use .L to match DDL behavior
// This test verifies that both use lowercase for rule IDs
// Simulate DDL behavior: uses .L (lowercase)
// DDL would use: schema.Name.L, meta.Name.L, spec.PartitionNames[0].L
// For "TestDB"/"TestTable"/"Partition0", .L would be: "testdb"/"testtable"/"partition0"
ddlRuleID := label.NewRuleID(codec, "testdb", "testtable", "")
ddlPartitionRuleID := label.NewRuleID(codec, "testdb", "testtable", "partition0")
// BR should also use .L (lowercase) - simulate BR behavior
// In real code: table.DB.Name.L, table.Info.Name.L, def.Name.L
brRuleID := label.NewRuleID(codec, "testdb", "testtable", "")
brPartitionRuleID := label.NewRuleID(codec, "testdb", "testtable", "partition0")
// Both should generate the same rule ID
require.Equal(t, ddlRuleID, brRuleID, "DDL and BR should generate same table rule ID")
require.Equal(t, ddlPartitionRuleID, brPartitionRuleID, "DDL and BR should generate same partition rule ID")
require.Equal(t, "schema/testdb/testtable", ddlRuleID)
require.Equal(t, "schema/testdb/testtable/partition0", ddlPartitionRuleID)
})
}
// TestBackupSchemaWithMergeOption tests the full backup schema flow with merge_option
func TestBackupSchemaWithMergeOption(t *testing.T) {
m := createMockCluster(t)
tk := testkit.NewTestKit(t, m.Storage)
tk.MustExec("use test")
tk.MustExec("drop table if exists t_normal, t_partition;")
// Create normal table
tk.MustExec("create table t_normal (a int);")
tk.MustExec("insert into t_normal values (1);")
// Create partitioned table
tk.MustExec(`create table t_partition (
a int,
b int
) partition by range (a) (
partition p0 values less than (10),
partition p1 values less than (20),
partition p2 values less than (30)
);`)
tk.MustExec("insert into t_partition values (5, 1), (15, 2), (25, 3);")
testFilter, err := filter.Parse([]string{"test.t_normal", "test.t_partition"})
require.NoError(t, err)
_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(
m.Storage, testFilter, math.MaxUint64, false)
require.NoError(t, err)
require.Equal(t, 2, backupSchemas.Len())
ctx := context.Background()
es := GetRandomStorage(t)
cipher := backuppb.CipherInfo{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
}
metaWriter := metautil.NewMetaWriter(es, metautil.MetaFileSize, false, "", &cipher)
updateCh := new(simpleProgress)
skipChecksum := true // Skip checksum for faster test
err = backupSchemas.BackupSchemas(
ctx, metaWriter, nil, m.Storage, nil, math.MaxUint64, nil, 1, vardef.DefChecksumTableConcurrency, skipChecksum, updateCh)
require.NoError(t, err)
require.Equal(t, int64(2), updateCh.get())
err = metaWriter.FlushBackupMeta(ctx)
require.NoError(t, err)
// Read back schemas and verify structure
schemas := GetSchemasFromMeta(t, es)
require.Len(t, schemas, 2)
// Find normal table and partitioned table
var normalTable, partitionTable *metautil.Table
for _, schema := range schemas {
if schema.Info.Name.O == "t_normal" {
normalTable = schema
} else if schema.Info.Name.O == "t_partition" {
partitionTable = schema
}
}
require.NotNil(t, normalTable, "normal table should be found")
require.NotNil(t, partitionTable, "partition table should be found")
// Verify normal table structure
require.NotNil(t, normalTable.Info)
require.Nil(t, normalTable.Info.Partition, "normal table should not have partition info")
// Verify partition table structure
require.NotNil(t, partitionTable.Info)
require.NotNil(t, partitionTable.Info.Partition, "partition table should have partition info")
require.Len(t, partitionTable.Info.Partition.Definitions, 3, "should have 3 partitions")
// Verify merge_option fields exist (they should be false by default if no rules are set)
// Note: Without actual label rules set, IsMergeOptionAllowed should be false
require.False(t, normalTable.IsMergeOptionAllowed)
require.False(t, partitionTable.IsMergeOptionAllowed)
require.Empty(t, normalTable.PartitionMergeOptionAllowed)
require.Empty(t, partitionTable.PartitionMergeOptionAllowed)
}