Skip to content
48 changes: 34 additions & 14 deletions drivers/mysql/internal/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (m *MySQL) Setup(ctx context.Context) error {
if err := client.PingContext(ctx); err != nil {
return fmt.Errorf("failed to ping database: %s", err)
}
// TODO: If CDC config exists and permission check fails, fail the setup
m.client = client

found, _ := utils.IsOfType(m.config.UpdateMethod, "initial_wait_time")
if found {
logger.Info("Found CDC Configuration")
Expand All @@ -112,21 +113,41 @@ func (m *MySQL) Setup(ctx context.Context) error {
}
if cdc.InitialWaitTime <= 0 {
// Fail the setup if initial_wait_time is 0 or negative
return fmt.Errorf("Failed to setup CDC: Initial wait time must be greater than 0")
return fmt.Errorf("failed to setup CDC: Initial wait time must be greater than 0")
}

// Permission check for CDC
var grants []string
if err := client.SelectContext(ctx, &grants, "SHOW GRANTS FOR CURRENT_USER()"); err != nil {
return fmt.Errorf("failed to get grants for current user: %s", err)
}
grantStr := strings.ToUpper(strings.Join(grants, " "))

if !strings.Contains(grantStr, "ALL PRIVILEGES") {
permissions := []string{"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"}
for _, perm := range permissions {
if !strings.Contains(grantStr, perm) {
return fmt.Errorf("missing required permissions for CDC: SELECT, REPLICATION SLAVE, or REPLICATION CLIENT")
}
}
}

// Enable CDC support if binlog is configured
cdcSupported, err := m.IsCDCSupported(ctx)
if err != nil {
return err
}
if !cdcSupported {
return fmt.Errorf("failed to setup CDC")
}

logger.Infof("CDC setup done with initial wait time %d", cdc.InitialWaitTime)

m.CDCSupport = true
m.cdcConfig = *cdc
}
m.client = client

m.config.RetryCount = utils.Ternary(m.config.RetryCount <= 0, 1, m.config.RetryCount+1).(int)
// Enable CDC support if binlog is configured
cdcSupported, err := m.IsCDCSupported(ctx)
if err != nil {
logger.Warnf("failed to check CDC support: %s", err)
}
if !cdcSupported {
logger.Warnf("CDC is not supported")
}
m.CDCSupport = cdcSupported
return nil
}

Expand Down Expand Up @@ -254,8 +275,7 @@ func (m *MySQL) IsCDCSupported(ctx context.Context) (bool, error) {
}

if strings.ToUpper(value) != expectedValue {
logger.Warnf(warnMessage)
return false, nil
return false, fmt.Errorf(warnMessage)
Comment on lines -258 to +278
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Is this change done ??

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Enable CDC support if binlog is configured
cdcSupported, err := m.IsCDCSupported(ctx)
if err != nil {
return err
}

So if error occurs, user can see error like binlog_format is not set to ROW

}

return true, nil
Expand Down