Skip to content

Commit 27006a5

Browse files
committed
pkg/lightning: normalize view restore dependency lookup
1 parent fa1603c commit 27006a5

4 files changed

Lines changed: 141 additions & 21 deletions

File tree

pkg/lightning/mydump/schema_import.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,14 @@ func (si *SchemaImporter) importViews(ctx context.Context, plan *SchemaImportPla
240240
p.SetSQLMode(si.sqlMode)
241241

242242
for _, node := range plan.viewPlan.ordered {
243-
if existingViews.has(node.key) {
243+
normalizedKey := normalizeTableName(node.key.Schema, node.key.Name)
244+
if existingViews.has(normalizedKey) {
244245
si.logger.Info("view already exists in downstream, skip",
245246
zap.String("db", node.key.Schema),
246247
zap.String("view-name", node.key.Name))
247248
continue
248249
}
249-
if existingTables.has(node.key) {
250+
if existingTables.has(normalizedKey) {
250251
return common.ErrCreateSchema.GenWithStack("downstream table already exists for view '%s'", node.key.String())
251252
}
252253
if err := si.runCommonJob(ctx, p, &schemaJob{

pkg/lightning/mydump/schema_import_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,30 @@ func TestSchemaImporter(t *testing.T) {
285285
require.NoError(t, os.Remove(path.Join(tempDir, fileNameV2)))
286286
require.NoError(t, os.Remove(path.Join(tempDir, fileNameValidV1)))
287287
})
288+
289+
t.Run("view: skip existing view with different case", func(t *testing.T) {
290+
fileNameV2 := "test03.V2-schema-view.sql"
291+
require.NoError(t, os.WriteFile(path.Join(tempDir, fileNameV2), []byte("create view V2 as select * from t;"), 0o644))
292+
dbMetas := []*MDDatabaseMeta{
293+
{Name: "test03", Tables: []*MDTableMeta{{DB: "test03", Name: "t"}}},
294+
{Name: "test03", Views: []*MDTableMeta{
295+
{DB: "test03", Name: "V2", charSet: "auto", SchemaFile: FileInfo{FileMeta: SourceFileMeta{Path: fileNameV2}}},
296+
}},
297+
}
298+
299+
mock.ExpectQuery(`information_schema.SCHEMATA`).WillReturnRows(
300+
sqlmock.NewRows([]string{"SCHEMA_NAME"}).AddRow("test03"))
301+
mock.ExpectQuery("SHOW CREATE TABLE `test03`.`t`").
302+
WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow("t", "CREATE TABLE `t` (a int);"))
303+
mock.ExpectQuery("TABLES WHERE TABLE_SCHEMA = 'test03' AND TABLE_NAME = 't'").
304+
WillReturnRows(sqlmock.NewRows([]string{"TABLE_TYPE"}).AddRow("BASE TABLE"))
305+
mock.ExpectQuery("TABLES WHERE TABLE_SCHEMA = 'test03' AND TABLE_NAME = 'V2'").
306+
WillReturnRows(sqlmock.NewRows([]string{"TABLE_TYPE"}).AddRow("VIEW"))
307+
308+
require.NoError(t, importer.Run(ctx, dbMetas))
309+
require.NoError(t, mock.ExpectationsWereMet())
310+
require.NoError(t, os.Remove(path.Join(tempDir, fileNameV2)))
311+
})
288312
}
289313

290314
func TestNewSchemaImportPlan(t *testing.T) {

pkg/lightning/mydump/view_restore.go

Lines changed: 73 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ func (s tableNameSet) add(tbl filter.Table) {
3636
if s == nil {
3737
return
3838
}
39-
s[tbl] = struct{}{}
39+
s[normalizeTableName(tbl.Schema, tbl.Name)] = struct{}{}
4040
}
4141

4242
func (s tableNameSet) has(tbl filter.Table) bool {
4343
if s == nil {
4444
return false
4545
}
46-
_, ok := s[tbl]
46+
_, ok := s[normalizeTableName(tbl.Schema, tbl.Name)]
4747
return ok
4848
}
4949

@@ -85,24 +85,75 @@ type SchemaImportPlan struct {
8585
type viewDependencyCollector struct {
8686
currentSchema string
8787
deps tableNameSet
88+
cteNameScopes []map[string]struct{}
89+
}
90+
91+
func (c *viewDependencyCollector) pushCTEScope() {
92+
c.cteNameScopes = append(c.cteNameScopes, make(map[string]struct{}))
93+
}
94+
95+
func (c *viewDependencyCollector) popCTEScope() {
96+
if len(c.cteNameScopes) == 0 {
97+
return
98+
}
99+
c.cteNameScopes = c.cteNameScopes[:len(c.cteNameScopes)-1]
100+
}
101+
102+
func (c *viewDependencyCollector) recordCTEName(name string) {
103+
if len(c.cteNameScopes) == 0 {
104+
return
105+
}
106+
c.cteNameScopes[len(c.cteNameScopes)-1][strings.ToLower(name)] = struct{}{}
107+
}
108+
109+
func (c *viewDependencyCollector) isCTEName(name string) bool {
110+
normalized := strings.ToLower(name)
111+
for i := len(c.cteNameScopes) - 1; i >= 0; i-- {
112+
if _, ok := c.cteNameScopes[i][normalized]; ok {
113+
return true
114+
}
115+
}
116+
return false
88117
}
89118

90119
func (c *viewDependencyCollector) Enter(n ast.Node) (ast.Node, bool) {
91-
tbl, ok := n.(*ast.TableName)
92-
if !ok {
120+
switch node := n.(type) {
121+
case *ast.SelectStmt:
122+
if node.With != nil {
123+
c.pushCTEScope()
124+
}
93125
return n, false
94-
}
126+
case *ast.CommonTableExpression:
127+
if node.IsRecursive {
128+
c.recordCTEName(node.Name.O)
129+
}
130+
return n, false
131+
case *ast.TableName:
132+
if node.Schema.O == "" && c.isCTEName(node.Name.O) {
133+
return n, true
134+
}
95135

96-
schema := tbl.Schema.O
97-
if schema == "" {
98-
// Dumpling may omit the schema for same-database references.
99-
schema = c.currentSchema
136+
schema := node.Schema.O
137+
if schema == "" {
138+
// Dumpling may omit the schema for same-database references.
139+
schema = c.currentSchema
140+
}
141+
c.deps.add(filterTableName(schema, node.Name.O))
142+
return n, true
143+
default:
144+
return n, false
100145
}
101-
c.deps.add(filter.Table{Schema: schema, Name: tbl.Name.O})
102-
return n, true
103146
}
104147

105-
func (*viewDependencyCollector) Leave(n ast.Node) (ast.Node, bool) {
148+
func (c *viewDependencyCollector) Leave(n ast.Node) (ast.Node, bool) {
149+
switch node := n.(type) {
150+
case *ast.CommonTableExpression:
151+
c.recordCTEName(node.Name.O)
152+
case *ast.SelectStmt:
153+
if node.With != nil {
154+
c.popCTEScope()
155+
}
156+
}
106157
return n, true
107158
}
108159

@@ -235,30 +286,33 @@ func buildViewRestorePlan(parsedViews []*parsedViewSchema, dumpTables tableNameS
235286
}
236287

237288
for _, parsed := range parsedViews {
238-
if _, exists := plan.nodes[parsed.key]; exists {
289+
normalizedKey := normalizeTableName(parsed.key.Schema, parsed.key.Name)
290+
if _, exists := plan.nodes[normalizedKey]; exists {
239291
return nil, errors.Errorf("duplicate view definition for %s", parsed.key.String())
240292
}
241-
plan.nodes[parsed.key] = &viewNode{
293+
plan.nodes[normalizedKey] = &viewNode{
242294
key: parsed.key,
243295
deps: append([]filter.Table(nil), parsed.deps...),
244296
createSQL: parsed.createSQL,
245297
}
246298
}
247299

248300
for _, node := range plan.nodes {
301+
nodeKey := normalizeTableName(node.key.Schema, node.key.Name)
249302
for _, dep := range node.deps {
250-
if dep == node.key {
303+
normalizedDep := normalizeTableName(dep.Schema, dep.Name)
304+
if normalizedDep == nodeKey {
251305
return nil, errors.Errorf("cyclic view dependency detected for %s", node.key.String())
252306
}
253-
if depNode, ok := plan.nodes[dep]; ok {
307+
if depNode, ok := plan.nodes[normalizedDep]; ok {
254308
node.indegree++
255-
depNode.dependents = append(depNode.dependents, node.key)
309+
depNode.dependents = append(depNode.dependents, nodeKey)
256310
continue
257311
}
258-
if dumpTables.has(dep) {
312+
if dumpTables.has(normalizedDep) {
259313
continue
260314
}
261-
node.externalDeps = append(node.externalDeps, dep)
315+
node.externalDeps = append(node.externalDeps, normalizedDep)
262316
}
263317
sort.Slice(node.dependents, func(i, j int) bool {
264318
if node.dependents[i].Schema != node.dependents[j].Schema {

pkg/lightning/mydump/view_restore_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,22 @@ JOIN db2.base_table ON base_table.id = src.id;
103103
)
104104
}
105105

106+
func TestParseViewSchemaSQLIgnoresCTEDependencies(t *testing.T) {
107+
p := parser.New()
108+
currentView := filter.Table{Schema: "test", Name: "v_cte"}
109+
sql := `
110+
CREATE VIEW v_cte AS
111+
WITH cte AS (
112+
SELECT id FROM t1
113+
)
114+
SELECT cte.id FROM cte;
115+
`
116+
117+
parsed, err := parseViewSchemaSQL(p, currentView, sql)
118+
require.NoError(t, err)
119+
require.Equal(t, []filter.Table{{Schema: "test", Name: "t1"}}, parsed.deps)
120+
}
121+
106122
func TestBuildViewRestorePlan(t *testing.T) {
107123
v1 := filter.Table{Schema: "test", Name: "v1"}
108124
v2 := filter.Table{Schema: "test", Name: "v2"}
@@ -171,6 +187,31 @@ func TestBuildViewRestorePlanSupportsMultipleAndCrossSchemaViewDeps(t *testing.T
171187
require.Equal(t, []filter.Table{db2v2}, plan.nodes[db2v3].dependents)
172188
}
173189

190+
func TestBuildViewRestorePlanNormalizesCaseInsensitiveDeps(t *testing.T) {
191+
v1 := filter.Table{Schema: "test", Name: "v1"}
192+
v2 := filter.Table{Schema: "test", Name: "V2"}
193+
dumpTables := make(tableNameSet)
194+
dumpTables.add(filter.Table{Schema: "test", Name: "t"})
195+
196+
plan, err := buildViewRestorePlan([]*parsedViewSchema{
197+
{
198+
key: v1,
199+
deps: []filter.Table{{Schema: "test", Name: "t"}},
200+
createSQL: "CREATE VIEW `test`.`v1` AS SELECT `id` FROM `test`.`t`;",
201+
},
202+
{
203+
key: v2,
204+
deps: []filter.Table{{Schema: "Test", Name: "V1"}},
205+
createSQL: "CREATE VIEW `test`.`V2` AS SELECT `id` FROM `test`.`v1`;",
206+
},
207+
}, dumpTables)
208+
require.NoError(t, err)
209+
require.Len(t, plan.ordered, 2)
210+
require.Equal(t, v1, plan.ordered[0].key)
211+
require.Equal(t, v2, plan.ordered[1].key)
212+
require.Empty(t, plan.nodes[normalizeTableName(v2.Schema, v2.Name)].externalDeps)
213+
}
214+
174215
func TestBuildViewRestorePlanDetectsCycle(t *testing.T) {
175216
_, err := buildViewRestorePlan([]*parsedViewSchema{
176217
{

0 commit comments

Comments
 (0)