Skip to content

[mysql] general fixes #2924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
return nil, errors.New("/validatecdc source peer does not support being a source peer")
return nil, errors.New("connector is not a supported source type")
}
err := fmt.Errorf("failed to create source connector: %w", err)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
Expand All @@ -83,7 +83,7 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName,
err.Error(),
)
return nil, err
return nil, fmt.Errorf("failed to validate source connector %s: %w", req.ConnectionConfigs.SourceName, err)
}

dstConn, err := connectors.GetByNameAs[connectors.MirrorDestinationValidationConnector](
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func (c *MySqlConnector) connect(ctx context.Context) (*client.Conn, error) {
conn := c.conn.Load()
if conn == nil {
argF := []client.Option{func(conn *client.Conn) error {
conn.SetCapability(mysql.CLIENT_COMPRESS)
if c.config.Compression > 0 {
conn.SetCapability(mysql.CLIENT_COMPRESS)
}
if !c.config.DisableTls {
config, err := shared.CreateTlsConfig(tls.VersionTLS12, c.config.RootCa, c.config.Host, c.config.TlsHost)
if err != nil {
Expand Down
17 changes: 11 additions & 6 deletions flow/connectors/mysql/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"github.com/go-mysql-org/go-mysql/mysql"
"go.temporal.io/sdk/log"

utils "github.com/PeerDB-io/peerdb/flow/connectors/utils/partition"
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
partition "github.com/PeerDB-io/peerdb/flow/connectors/utils/partition"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/model"
"github.com/PeerDB-io/peerdb/flow/model/qvalue"
Expand All @@ -37,7 +38,7 @@ func (c *MySqlConnector) GetQRepPartitions(
}

if config.NumRowsPerPartition <= 0 {
return nil, errors.New("num rows per partition must be greater than 0 for sql server")
return nil, errors.New("num rows per partition must be greater than 0")
}

var err error
Expand All @@ -48,9 +49,13 @@ func (c *MySqlConnector) GetQRepPartitions(
if last != nil && last.Range != nil {
whereClause = fmt.Sprintf("WHERE %s > $1", quotedWatermarkColumn)
}
parsedWatermarkTable, err := utils.ParseSchemaTable(config.WatermarkTable)
if err != nil {
return nil, fmt.Errorf("failed to parse watermark table %s: %w", config.WatermarkTable, err)
}

// Query to get the total number of rows in the table
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s %s", config.WatermarkTable, whereClause)
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s %s", parsedWatermarkTable.MySQL(), whereClause)
var minVal any
var totalRows int64
if last != nil && last.Range != nil {
Expand Down Expand Up @@ -108,7 +113,7 @@ func (c *MySqlConnector) GetQRepPartitions(
ORDER BY start`,
numPartitions,
quotedWatermarkColumn,
config.WatermarkTable,
parsedWatermarkTable.MySQL(),
)
c.logger.Info("partitions query", slog.String("query", partitionsQuery), slog.Any("minVal", minVal))
rs, err = c.Execute(ctx, partitionsQuery, minVal)
Expand All @@ -122,7 +127,7 @@ func (c *MySqlConnector) GetQRepPartitions(
ORDER BY start`,
numPartitions,
quotedWatermarkColumn,
config.WatermarkTable,
parsedWatermarkTable.MySQL(),
)
c.logger.Info("partitions query", slog.String("query", partitionsQuery))
rs, err = c.Execute(ctx, partitionsQuery)
Expand All @@ -131,7 +136,7 @@ func (c *MySqlConnector) GetQRepPartitions(
return nil, fmt.Errorf("failed to query for partitions: %w", err)
}

partitionHelper := utils.NewPartitionHelper(c.logger)
partitionHelper := partition.NewPartitionHelper(c.logger)
for _, row := range rs.Values {
if err := partitionHelper.AddPartition(row[1].Value(), row[2].Value()); err != nil {
return nil, fmt.Errorf("failed to add partition: %w", err)
Expand Down
46 changes: 21 additions & 25 deletions flow/connectors/mysql/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,22 @@ func (c *MySqlConnector) CheckSourceTables(ctx context.Context, tableNames []*ut
}

func (c *MySqlConnector) CheckReplicationConnectivity(ctx context.Context) error {
if c.config.ReplicationMechanism == protos.MySqlReplicationMechanism_MYSQL_GTID {
// GTID -> check GTID and error out if not enabled, check filepos as well
// AUTO -> check GTID and fall back to filepos check
// FILEPOS -> check filepos only
if c.config.ReplicationMechanism != protos.MySqlReplicationMechanism_MYSQL_FILEPOS {
if _, err := c.GetMasterGTIDSet(ctx); err != nil {
return fmt.Errorf("failed to check replication status: %w", err)
}
} else {
namePos, err := c.GetMasterPos(ctx)
if err != nil {
return fmt.Errorf("failed to check replication status: %w", err)
}

if namePos.Name == "" || namePos.Pos <= 0 {
return errors.New("invalid replication status: missing log file or position")
if c.config.ReplicationMechanism == protos.MySqlReplicationMechanism_MYSQL_GTID {
return fmt.Errorf("failed to check replication status: %w", err)
}
}
}
if namePos, err := c.GetMasterPos(ctx); err != nil {
return fmt.Errorf("failed to check replication status: %w", err)
} else if namePos.Name == "" || namePos.Pos <= 0 {
return errors.New("invalid replication status: missing log file or position")
}

return nil
}

Expand Down Expand Up @@ -263,13 +265,17 @@ func (c *MySqlConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.F
sourceTables = append(sourceTables, parsedTable)
}

if err := c.CheckReplicationConnectivity(ctx); err != nil {
return fmt.Errorf("unable to establish replication connectivity: %w", err)
}

if err := c.CheckSourceTables(ctx, sourceTables); err != nil {
return fmt.Errorf("provided source tables invalidated: %w", err)
}
// no need to check replication stuff for initial snapshot only mirrors
if cfg.DoInitialSnapshot && cfg.InitialSnapshotOnly {
return nil
}

if err := c.CheckReplicationConnectivity(ctx); err != nil {
return fmt.Errorf("unable to establish replication connectivity: %w", err)
}

requireRowMetadata := false
for _, tm := range cfg.TableMappings {
Expand Down Expand Up @@ -310,15 +316,5 @@ func (c *MySqlConnector) ValidateCheck(ctx context.Context) error {
}
}

if err := c.CheckReplicationConnectivity(ctx); err != nil {
return fmt.Errorf("unable to establish replication connectivity: %w", err)
}
if err := c.CheckBinlogSettings(ctx, false); err != nil {
return fmt.Errorf("binlog configuration error: %w", err)
}
if err := c.CheckRDSBinlogSettings(ctx); err != nil {
return fmt.Errorf("binlog configuration error: %w", err)
}

return nil
}
8 changes: 1 addition & 7 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,7 @@ const (
WHERE TABLE_SCHEMA=? and TABLE_NAME=?`
checkIfSchemaExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA
WHERE SCHEMA_NAME=?`
getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?"
setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?"
getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropSchemaIfExistsSQL = "DROP SCHEMA IF EXISTS %s"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
)

type SnowflakeConnector struct {
Expand Down
68 changes: 32 additions & 36 deletions flow/e2e/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
"github.com/PeerDB-io/peerdb/flow/e2e"
Expand Down Expand Up @@ -295,54 +297,48 @@ func (s Suite) TestScripts() {
}
}

func (s Suite) TestMySQLBinlogValidation() {
func (s Suite) TestMySQLRDSBinlogValidation() {
_, ok := s.source.(*e2e.MySqlSource)
if !ok {
s.t.Skip("only for MySQL")
}
require.NoError(s.t, s.source.Exec(s.t.Context(),
fmt.Sprintf("CREATE TABLE %s(id int primary key, val text)", e2e.AttachSchema(s, "valid"))))

response, err := s.ValidatePeer(s.t.Context(), &protos.ValidatePeerRequest{
Peer: s.source.GeneratePeer(s.t),
})
require.NoError(s.t, err)
require.NotNil(s.t, response)
require.Equal(s.t, protos.ValidatePeerStatus_VALID, response.Status)
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: "my_validation_" + s.suffix,
TableNameMapping: map[string]string{e2e.AttachSchema(s, "valid"): "valid"},
Destination: s.ch.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)

require.NoError(s.t, s.source.Exec(s.t.Context(), "CREATE TABLE IF NOT EXISTS mysql.rds_configuration(name TEXT, value TEXT)"))
require.NoError(s.t, s.source.Exec(s.t.Context(),
"INSERT INTO mysql.rds_configuration(name, value) VALUES ('binlog retention hours', NULL)"))

response, err = s.ValidatePeer(s.t.Context(), &protos.ValidatePeerRequest{
Peer: s.source.GeneratePeer(s.t),
})
require.NoError(s.t, err)
require.NotNil(s.t, response)
require.Equal(s.t, protos.ValidatePeerStatus_INVALID, response.Status)
require.Equal(s.t,
"failed to validate peer mysql: binlog configuration error: "+
"RDS/Aurora setting 'binlog retention hours' should be at least 24, currently unset",
response.Message)
res, err := s.ValidateCDCMirror(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.Nil(s.t, res)
require.Error(s.t, err)
st, ok := status.FromError(err)
require.True(s.t, ok)
require.Equal(s.t, codes.Unknown, st.Code())
require.Equal(s.t, "failed to validate source connector mysql: binlog configuration error: "+
"RDS/Aurora setting 'binlog retention hours' should be at least 24, currently unset", st.Message())

require.NoError(s.t, s.source.Exec(s.t.Context(), "UPDATE mysql.rds_configuration SET value = '1' WHERE name = 'binlog retention hours'"))
response, err = s.ValidatePeer(s.t.Context(), &protos.ValidatePeerRequest{
Peer: s.source.GeneratePeer(s.t),
})
require.NoError(s.t, err)
require.NotNil(s.t, response)
require.Equal(s.t, protos.ValidatePeerStatus_INVALID, response.Status)
require.Equal(s.t,
"failed to validate peer mysql: binlog configuration error: "+
"RDS/Aurora setting 'binlog retention hours' should be at least 24, currently 1",
response.Message)

err = s.source.Exec(s.t.Context(), "UPDATE mysql.rds_configuration SET value = '24' WHERE name = 'binlog retention hours';")
require.NoError(s.t, err)
response, err = s.ValidatePeer(s.t.Context(), &protos.ValidatePeerRequest{
Peer: s.source.GeneratePeer(s.t),
})
require.NoError(s.t, err)
require.NotNil(s.t, response)
require.Equal(s.t, protos.ValidatePeerStatus_VALID, response.Status)
res, err = s.ValidateCDCMirror(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.Nil(s.t, res)
require.Error(s.t, err)
st, ok = status.FromError(err)
require.True(s.t, ok)
require.Equal(s.t, codes.Unknown, st.Code())
require.Equal(s.t, "failed to validate source connector mysql: binlog configuration error: "+
"RDS/Aurora setting 'binlog retention hours' should be at least 24, currently 1", st.Message())

require.NoError(s.t, s.source.Exec(s.t.Context(), "UPDATE mysql.rds_configuration SET value = '24' WHERE name = 'binlog retention hours';"))
res, err = s.ValidateCDCMirror(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, err)
require.NotNil(s.t, res)

require.NoError(s.t, s.source.Exec(s.t.Context(), "DROP TABLE IF EXISTS mysql.rds_configuration;"))
}
Expand Down
20 changes: 14 additions & 6 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,21 @@ func (s *SnapshotFlowExecution) cloneTable(
from = strings.Join(quotedColumns, ",")
}

// usually MySQL supports double quotes with ANSI_QUOTES, but Vitess doesn't
// Vitess currently only supports initial load so change here is enough
srcTableEscaped := parsedSrcTable.String()
if dbtype, err := getPeerType(ctx, s.config.SourceName); err != nil {
return err
} else if dbtype == protos.DBType_MYSQL {
srcTableEscaped = parsedSrcTable.MySQL()
}

var query string
if mapping.PartitionKey == "" {
query = fmt.Sprintf("SELECT %s FROM %s", from, parsedSrcTable.String())
query = fmt.Sprintf("SELECT %s FROM %s", from, srcTableEscaped)
} else {
query = fmt.Sprintf("SELECT %s FROM %s WHERE %s BETWEEN {{.start}} AND {{.end}}",
from, parsedSrcTable.String(), mapping.PartitionKey)
from, srcTableEscaped, mapping.PartitionKey)
}

numWorkers := uint32(8)
Expand All @@ -180,13 +189,12 @@ func (s *SnapshotFlowExecution) cloneTable(
snapshotWriteMode := &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
}

// ensure document IDs are synchronized across initial load and CDC
// for the same document
dbtype, err := getPeerType(ctx, s.config.DestinationName)
if err != nil {
if dbtype, err := getPeerType(ctx, s.config.DestinationName); err != nil {
return err
}
if dbtype == protos.DBType_ELASTICSEARCH {
} else if dbtype == protos.DBType_ELASTICSEARCH {
if err := initTableSchema(); err != nil {
return err
}
Expand Down