Skip to content
52 changes: 36 additions & 16 deletions drivers/mysql/internal/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,31 +102,52 @@ func (m *MySQL) Setup(ctx context.Context) error {
if err := client.PingContext(ctx); err != nil {
return fmt.Errorf("failed to ping database: %s", err)
}
// TODO: If CDC config exists and permission check fails, fail the setup
m.client = client

found, _ := utils.IsOfType(m.config.UpdateMethod, "initial_wait_time")
if found {
logger.Info("Found CDC Configuration")
cdc := &CDC{}
if err := utils.Unmarshal(m.config.UpdateMethod, cdc); err != nil {
return err
}
if cdc.InitialWaitTime == 0 {
// default set 10 sec
cdc.InitialWaitTime = 10
if cdc.InitialWaitTime <= 0 {
// Fail the setup if initial_wait_time is 0 or negative
return fmt.Errorf("failed to setup CDC: Initial wait time must be greater than 0")
}

// Permission check for CDC
var grants []string
if err := client.SelectContext(ctx, &grants, "SHOW GRANTS FOR CURRENT_USER()"); err != nil {
return fmt.Errorf("failed to get grants for current user: %s", err)
}
grantStr := strings.ToUpper(strings.Join(grants, " "))

if !strings.Contains(grantStr, "ALL PRIVILEGES") {
permissions := []string{"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"}
for _, perm := range permissions {
if !strings.Contains(grantStr, perm) {
return fmt.Errorf("missing required permissions for CDC: SELECT, REPLICATION SLAVE, or REPLICATION CLIENT")
}
}
}

// Enable CDC support if binlog is configured
cdcSupported, err := m.IsCDCSupported(ctx)
if err != nil {
return err
}
if !cdcSupported {
return fmt.Errorf("failed to setup CDC")
}

logger.Infof("CDC setup done with initial wait time %d", cdc.InitialWaitTime)

m.CDCSupport = true
m.cdcConfig = *cdc
}
m.client = client

m.config.RetryCount = utils.Ternary(m.config.RetryCount <= 0, 1, m.config.RetryCount+1).(int)
// Enable CDC support if binlog is configured
cdcSupported, err := m.IsCDCSupported(ctx)
if err != nil {
logger.Warnf("failed to check CDC support: %s", err)
}
if !cdcSupported {
logger.Warnf("CDC is not supported")
}
m.CDCSupport = cdcSupported
return nil
}

Expand Down Expand Up @@ -254,8 +275,7 @@ func (m *MySQL) IsCDCSupported(ctx context.Context) (bool, error) {
}

if strings.ToUpper(value) != expectedValue {
logger.Warnf(warnMessage)
return false, nil
return false, fmt.Errorf(warnMessage)
Comment on lines -258 to +278
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Is this change done ??

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Enable CDC support if binlog is configured
cdcSupported, err := m.IsCDCSupported(ctx)
if err != nil {
return err
}

So if error occurs, user can see error like binlog_format is not set to ROW

}

return true, nil
Expand Down
9 changes: 7 additions & 2 deletions drivers/postgres/internal/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,13 @@ func (p *Postgres) Setup(ctx context.Context) error {
if err := utils.Unmarshal(p.config.UpdateMethod, cdc); err != nil {
return err
}
// set default value
cdc.InitialWaitTime = utils.Ternary(cdc.InitialWaitTime == 0, 1200, cdc.InitialWaitTime).(int)

// Set initial_wait_time to 1200 if not provided by user
userProvided, _ := utils.IsOfType(p.config.UpdateMethod, "initial_wait_time")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also why no query based check is added in postgres side.

you can visit the olake documentation on how to verify cdc configurations

SELECT name, setting FROM pg_settings 
WHERE name IN ('wal_level', 'max_replication_slots', 'max_wal_senders');

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake I didnt checked that. I will include it in code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update this by tomorrow ( saturday ).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any updates on this @Kamlesh72 ?

if !userProvided {
cdc.InitialWaitTime = 1200
logger.Infof("CDC initial wait time not provided by user, defaulting to %d", cdc.InitialWaitTime)
}
Comment on lines -108 to +114
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for this change ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user leave Initial wait time field empty, we set to 1200.
If user enters any value ( including 0 ), then only we validate.

So if user enters 0, I thought we shouldn't silently change that to default value.


// check if initial wait time is valid or not
if cdc.InitialWaitTime < 120 || cdc.InitialWaitTime > 2400 {
Expand Down