Skip to content

Commit 676be0e

Browse files
a-congAlexandra Cong
authored andcommitted
move dispatcher validation
1 parent 2312089 commit 676be0e

File tree

4 files changed

+95
-36
lines changed

4 files changed

+95
-36
lines changed

cdc/sink/dispatcher/dispatcher.go

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,23 @@ package dispatcher
1616
import (
1717
"fmt"
1818
"regexp"
19+
20+
"github.com/pingcap/tiflow/pkg/config"
1921
)
2022

2123
var (
2224
// schemaRE is used to match substring '{schema}' in expressions
2325
schemaRE = regexp.MustCompile(`\{schema\}`)
2426
// tableRE is used to match substring '{table}' in expressions
2527
tableRE = regexp.MustCompile(`\{table\}`)
26-
// validExprRE validates schema/table routing expressions
27-
// Expression can contain {schema}, {table}, or both, with valid separators
28-
// Valid characters: A-Za-z0-9\._\-
29-
validExprRE = regexp.MustCompile(`^[A-Za-z0-9\._\-]*(\{schema\})?[A-Za-z0-9\._\-]*(\{table\})?[A-Za-z0-9\._\-]*$`)
3028
)
3129

30+
// ValidateExpression validates a schema or table routing expression.
31+
// This is a convenience wrapper around config.ValidateRoutingExpression.
32+
func ValidateExpression(expr string) error {
33+
return config.ValidateRoutingExpression(expr)
34+
}
35+
3236
// Dispatcher is an abstraction for routing schema and table names.
3337
// It determines the target schema and table name for a given source schema/table.
3438
type Dispatcher interface {
@@ -87,20 +91,3 @@ func (d *DynamicSchemaDispatcher) substituteExpression(expr, sourceSchema, sourc
8791
func (d *DynamicSchemaDispatcher) String() string {
8892
return fmt.Sprintf("dynamic(schema:%s,table:%s)", d.schemaExpr, d.tableExpr)
8993
}
90-
91-
// ValidateExpression validates a schema or table routing expression.
92-
// Expression format: [prefix][{schema}][middle][{table}][suffix]
93-
// prefix/middle/suffix are optional and should match [A-Za-z0-9\._\-]*
94-
func ValidateExpression(expr string) error {
95-
if expr == "" {
96-
// Empty is valid - means use source schema/table
97-
return nil
98-
}
99-
100-
if !validExprRE.MatchString(expr) {
101-
return fmt.Errorf("invalid schema/table routing expression: %s. "+
102-
"Expression must match pattern: [prefix][{schema}][middle][{table}][suffix]", expr)
103-
}
104-
105-
return nil
106-
}

pkg/config/replica_config.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -325,28 +325,21 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin
325325

326326
// Validate sink routing configuration (dispatcher approach)
327327
// Check if any dispatch rules have schema/table routing configured
328-
hasSchemaRouting := false
329328
for _, rule := range c.Sink.DispatchRules {
330-
if rule.SchemaRule != "" || rule.TableRule != "" {
331-
hasSchemaRouting = true
332-
break
329+
if rule.SchemaRule == "" && rule.TableRule == "" {
330+
continue
333331
}
334-
}
335-
if hasSchemaRouting {
336332
// Schema routing is only supported for MySQL/TiDB sinks
337333
if !sink.IsMySQLCompatibleScheme(sinkURI.Scheme) {
338334
return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs(
339335
"sink routing (via dispatch rules) is only supported for MySQL/TiDB sinks")
340336
}
341-
// Validate by attempting to create the sink router
342-
// This will validate the expressions and matcher patterns
343-
// We don't need to keep the router, just validate it can be created
344-
if _, err := func() (interface{}, error) {
345-
// Import the dispatcher package dynamically to avoid import cycle
346-
// The actual validation happens in dispatcher.NewSinkRouter
347-
return nil, nil
348-
}(); err != nil {
349-
return err
337+
// Validate the schema and table expressions
338+
if err := ValidateRoutingExpression(rule.SchemaRule); err != nil {
339+
return cerror.WrapError(cerror.ErrInvalidReplicaConfig, err)
340+
}
341+
if err := ValidateRoutingExpression(rule.TableRule); err != nil {
342+
return cerror.WrapError(cerror.ErrInvalidReplicaConfig, err)
350343
}
351344
}
352345

pkg/config/replica_config_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,62 @@ func TestValidateAndAdjustLargeMessageHandle(t *testing.T) {
383383
require.Equal(t, compression.None, cfg.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleCompression)
384384
}
385385

386+
func TestValidateAndAdjustSinkRouting(t *testing.T) {
387+
t.Parallel()
388+
389+
// Test valid routing rules on MySQL sink
390+
cfg := GetDefaultReplicaConfig()
391+
cfg.Sink.DispatchRules = []*DispatchRule{
392+
{Matcher: []string{"db1.*"}, SchemaRule: "target_db", TableRule: "{table}"},
393+
}
394+
mysqlURL, err := url.Parse("mysql://root:123@localhost:3306/")
395+
require.NoError(t, err)
396+
require.NoError(t, cfg.ValidateAndAdjust(mysqlURL))
397+
398+
// Test valid routing rules with transformations
399+
cfg = GetDefaultReplicaConfig()
400+
cfg.Sink.DispatchRules = []*DispatchRule{
401+
{Matcher: []string{"db1.*"}, SchemaRule: "backup_{schema}", TableRule: "{table}_archive"},
402+
}
403+
require.NoError(t, cfg.ValidateAndAdjust(mysqlURL))
404+
405+
// Test invalid SchemaRule expression
406+
cfg = GetDefaultReplicaConfig()
407+
cfg.Sink.DispatchRules = []*DispatchRule{
408+
{Matcher: []string{"db1.*"}, SchemaRule: "{invalid}", TableRule: "{table}"},
409+
}
410+
err = cfg.ValidateAndAdjust(mysqlURL)
411+
require.Error(t, err)
412+
require.Contains(t, err.Error(), "invalid schema/table routing expression")
413+
414+
// Test invalid TableRule expression with space
415+
cfg = GetDefaultReplicaConfig()
416+
cfg.Sink.DispatchRules = []*DispatchRule{
417+
{Matcher: []string{"db1.*"}, SchemaRule: "{schema}", TableRule: "invalid space"},
418+
}
419+
err = cfg.ValidateAndAdjust(mysqlURL)
420+
require.Error(t, err)
421+
require.Contains(t, err.Error(), "invalid schema/table routing expression")
422+
423+
// Test routing rules on non-MySQL sink (should fail)
424+
cfg = GetDefaultReplicaConfig()
425+
cfg.Sink.DispatchRules = []*DispatchRule{
426+
{Matcher: []string{"db1.*"}, SchemaRule: "target_db", TableRule: "{table}"},
427+
}
428+
kafkaURL, err := url.Parse("kafka://localhost:9092/topic?protocol=open-protocol")
429+
require.NoError(t, err)
430+
err = cfg.ValidateAndAdjust(kafkaURL)
431+
require.Error(t, err)
432+
require.Contains(t, err.Error(), "sink routing (via dispatch rules) is only supported for MySQL/TiDB sinks")
433+
434+
// Test dispatch rules without routing (TopicRule only) on Kafka - should pass
435+
cfg = GetDefaultReplicaConfig()
436+
cfg.Sink.DispatchRules = []*DispatchRule{
437+
{Matcher: []string{"db1.*"}, TopicRule: "topic_{schema}_{table}"},
438+
}
439+
require.NoError(t, cfg.ValidateAndAdjust(kafkaURL))
440+
}
441+
386442
func TestMaskSensitiveData(t *testing.T) {
387443
config := ReplicaConfig{
388444
Sink: nil,

pkg/config/sink.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package config
1616
import (
1717
"fmt"
1818
"net/url"
19+
"regexp"
1920
"strconv"
2021
"strings"
2122
"time"
@@ -408,6 +409,28 @@ type DispatchRule struct {
408409
TableRule string `toml:"table" json:"table"`
409410
}
410411

412+
// validRoutingExprRE validates schema/table routing expressions.
413+
// Expression can contain {schema}, {table}, or both, with valid separators.
414+
// Valid characters: A-Za-z0-9\._\-
415+
var validRoutingExprRE = regexp.MustCompile(`^[A-Za-z0-9\._\-]*(\{schema\})?[A-Za-z0-9\._\-]*(\{table\})?[A-Za-z0-9\._\-]*$`)
416+
417+
// ValidateRoutingExpression validates a schema or table routing expression.
418+
// Expression format: [prefix][{schema}][middle][{table}][suffix]
419+
// prefix/middle/suffix are optional and should match [A-Za-z0-9\._\-]*
420+
func ValidateRoutingExpression(expr string) error {
421+
if expr == "" {
422+
// Empty is valid - means use source schema/table
423+
return nil
424+
}
425+
426+
if !validRoutingExprRE.MatchString(expr) {
427+
return fmt.Errorf("invalid schema/table routing expression: %s. "+
428+
"Expression must match pattern: [prefix][{schema}][middle][{table}][suffix]", expr)
429+
}
430+
431+
return nil
432+
}
433+
411434
// ColumnSelector represents a column selector for a table.
412435
type ColumnSelector struct {
413436
Matcher []string `toml:"matcher" json:"matcher"`

0 commit comments

Comments
 (0)