Skip to content

Commit ed3a1fd

Browse files
committed
dm: fix validator unsigned row handling
1 parent e709452 commit ed3a1fd

File tree

2 files changed

+88
-0
lines changed

2 files changed

+88
-0
lines changed

dm/syncer/data_validator.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,15 @@ func (v *DataValidator) processRowsEvent(header *replication.EventHeader, ev *re
925925
beforeImage = ev.Rows[i]
926926
}
927927

928+
beforeImage, err = adjustValueImageFromBinlogData(beforeImage, tableInfo)
929+
if err != nil {
930+
return terror.Annotate(err, "failed to adjust row before image")
931+
}
932+
afterImage, err = adjustValueImageFromBinlogData(afterImage, tableInfo)
933+
if err != nil {
934+
return terror.Annotate(err, "failed to adjust row after image")
935+
}
936+
928937
rowChange := sqlmodel.NewRowChange(
929938
&cdcmodel.TableName{Schema: sourceTable.Schema, Table: sourceTable.Name},
930939
&cdcmodel.TableName{Schema: targetTable.Schema, Table: targetTable.Name},
@@ -956,6 +965,13 @@ func (v *DataValidator) processRowsEvent(header *replication.EventHeader, ev *re
956965
return nil
957966
}
958967

968+
func adjustValueImageFromBinlogData(data []interface{}, sourceTI *model.TableInfo) ([]interface{}, error) {
969+
if data == nil {
970+
return nil, nil
971+
}
972+
return adjustValueFromBinlogData(data, sourceTI)
973+
}
974+
959975
func (v *DataValidator) checkAndPersistCheckpointAndData(loc binlog.Location) error {
960976
metaFlushInterval := v.cfg.ValidatorCfg.MetaFlushInterval.Duration
961977
cutOverLocation := v.cutOverLocation.Load()

dm/syncer/data_validator_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,78 @@ func TestValidatorWaitSyncerRunning(t *testing.T) {
329329
require.NoError(t, validator.waitSyncerRunning())
330330
}
331331

332+
func TestValidatorProcessRowsEventAdjustUnsignedData(t *testing.T) {
333+
const (
334+
schemaName = "test"
335+
tableName = "tbl_unsigned"
336+
createSQL = "CREATE TABLE `tbl_unsigned`(id int unsigned primary key, v varchar(32))"
337+
)
338+
339+
createAST, err := parseSQL(createSQL)
340+
require.NoError(t, err)
341+
342+
cfg := genSubtaskConfig(t)
343+
syncerObj := NewSyncer(cfg, nil, nil)
344+
syncerObj.tableRouter, err = regexprrouter.NewRegExprRouter(cfg.CaseSensitive, []*router.TableRule{})
345+
require.NoError(t, err)
346+
tableInfo := filter.Table{Schema: schemaName, Name: tableName}
347+
348+
db, mock, err := sqlmock.New()
349+
require.NoError(t, err)
350+
defer db.Close()
351+
mock.MatchExpectationsInOrder(false)
352+
mock.ExpectBegin()
353+
mock.ExpectExec("SET SESSION SQL_MODE.*").WillReturnResult(sqlmock.NewResult(1, 1))
354+
mock.ExpectCommit()
355+
mock.ExpectQuery("SHOW CREATE TABLE " + tableInfo.String() + ".*").WillReturnRows(
356+
mock.NewRows([]string{"Table", "Create Table"}).AddRow(tableName, createSQL),
357+
)
358+
359+
dbConn, err := db.Conn(context.Background())
360+
require.NoError(t, err)
361+
syncerObj.downstreamTrackConn = dbconn.NewDBConn(cfg, conn.NewBaseConnForTest(dbConn, &retry.FiniteRetryStrategy{}))
362+
syncerObj.schemaTracker, err = schema.NewTestTracker(context.Background(), cfg.Name, syncerObj.downstreamTrackConn, log.L())
363+
require.NoError(t, err)
364+
defer syncerObj.schemaTracker.Close()
365+
require.NoError(t, syncerObj.schemaTracker.CreateSchemaIfNotExists(schemaName))
366+
require.NoError(t, syncerObj.schemaTracker.Exec(context.Background(), schemaName, createAST))
367+
368+
validator := NewContinuousDataValidator(cfg, syncerObj, false)
369+
validator.ctx, validator.cancel = context.WithCancel(context.Background())
370+
defer validator.cancel()
371+
validator.tctx = tcontext.NewContext(validator.ctx, validator.L)
372+
validator.reset()
373+
validator.workers = []*validateWorker{newValidateWorker(validator, 0)}
374+
375+
err = validator.processRowsEvent(
376+
&replication.EventHeader{
377+
EventType: replication.WRITE_ROWS_EVENTv2,
378+
EventSize: 128,
379+
},
380+
&replication.RowsEvent{
381+
Table: &replication.TableMapEvent{
382+
Schema: []byte(schemaName),
383+
Table: []byte(tableName),
384+
},
385+
ColumnCount: 2,
386+
Rows: [][]interface{}{
387+
{int32(-2061521730), "v1"},
388+
},
389+
},
390+
)
391+
require.NoError(t, err)
392+
393+
select {
394+
case job := <-validator.workers[0].rowChangeCh:
395+
require.Equal(t, "2233445566", job.Key)
396+
require.EqualValues(t, uint64(2233445566), job.row.RowValues()[0])
397+
require.Equal(t, []string{"2233445566"}, job.row.RowStrIdentity())
398+
default:
399+
t.Fatal("expected validator to dispatch one row change job")
400+
}
401+
require.NoError(t, mock.ExpectationsWereMet())
402+
}
403+
332404
func TestValidatorDoValidate(t *testing.T) {
333405
var (
334406
schemaName = "test"

0 commit comments

Comments
 (0)