diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 9cb05fa1044..adb2a76b5fe 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -49,6 +49,7 @@ create table ` + "`Lead-1`(`Lead`" + ` binary(16), name varbinary(16), date1 dat create table _vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431(id int, val varbinary(128), primary key(id), key(val)); create table db_order_test (c_uuid varchar(64) not null default '', created_at datetime not null, dstuff varchar(128), dtstuff text, dbstuff blob, cstuff char(32), primary key (c_uuid,created_at), key (dstuff)) CHARSET=utf8mb4; create table datze (id int, dt1 datetime not null default current_timestamp, dt2 datetime not null, ts1 timestamp default current_timestamp, primary key (id), key (dt1)); +create table customer_nopk(id int, name varchar(128)) CHARSET=utf8; ` // These should always be ignored in vreplication @@ -81,7 +82,8 @@ create table datze (id int, dt1 datetime not null default current_timestamp, dt2 "Lead": {}, "Lead-1": {}, "db_order_test": {}, - "datze": {} + "datze": {}, + "customer_nopk": {} } } ` @@ -156,6 +158,14 @@ create table datze (id int, dt1 datetime not null default current_timestamp, dt2 "name": "reverse_bits" } ] + }, + "customer_nopk": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] } } } diff --git a/go/test/endtoend/vreplication/unsharded_init_data.sql b/go/test/endtoend/vreplication/unsharded_init_data.sql index 019945609db..364be5f4ceb 100644 --- a/go/test/endtoend/vreplication/unsharded_init_data.sql +++ b/go/test/endtoend/vreplication/unsharded_init_data.sql @@ -33,3 +33,20 @@ insert into datze(id, dt2, ts1) values (4, '2022-03-27 03:00:00', current_timest insert into datze(id, dt2, ts1) values (5, '2022-03-27 03:15:00', current_timestamp); insert into datze(id, dt2, ts1) values (6, current_timestamp, current_timestamp); +insert into customer_nopk(id, name) values(1, 'abc'); +insert into customer_nopk(id, name) values(2, 'def'); +insert into customer_nopk(id, name) values(3, 'ghi'); +insert into customer_nopk(id, name) values(4, 'jkl'); +insert into customer_nopk(id, name) values(5, 'mno'); +insert into customer_nopk(id, name) values(6, 'pqr'); +insert into customer_nopk(id, name) values(7, 'stu'); +insert into customer_nopk(id, name) values(8, 'vwx'); +insert into customer_nopk(id, name) values(9, 'yz'); +insert into customer_nopk(id, name) values(10, 'abc'); +insert into customer_nopk(id, name) values(11, 'def'); +insert into customer_nopk(id, name) values(12, 'ghi'); +insert into customer_nopk(id, name) values(13, 'jkl'); +insert into customer_nopk(id, name) values(14, 'mno'); +insert into customer_nopk(id, name) values(15, 'pqr'); + + diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 0627e39b775..d897b094ca6 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -62,7 +62,7 @@ var testCases = []*testCase{ sourceShards: "0", targetShards: "-80,80-", tabletBaseID: 200, - tables: "customer,Lead,Lead-1", + tables: "customer,customer_nopk,Lead,Lead-1", autoRetryError: true, retryInsert: `insert into customer(cid, name, typ) values(91234, 'Testy McTester', 'soho')`, resume: true, @@ -187,7 +187,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, updateTableStats(t, tab, tc.tables) // need to do this in order to test progress reports } - vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil) + // vdiff v1 fails for tables without pks. Since it is not supported anymore, let's just not test with v1. + vdiff(t, tc.targetKs, tc.workflow, allCellNames, false, true, nil) if tc.autoRetryError { testAutoRetryError(t, tc, allCellNames) diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index 6035063902d..8800b759c02 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -98,7 +98,7 @@ func waitForVDiff2ToComplete(t *testing.T, ksWorkflow, cells, uuid string, compl if !completedAtMin.IsZero() { ca := info.CompletedAt completedAt, _ := time.Parse(vdiff2.TimestampFormat, ca) - if !completedAt.After(completedAtMin) { + if completedAt.Before(completedAtMin) { continue } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 3a88fe790ab..f2d2a9582a9 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -433,6 +433,7 @@ func (td *tableDiffer) setupRowSorters() { for shard, source := range td.wd.ct.sources { sources[shard] = source.shardStreamer } + log.Infof("Setting up row sorters for %d sources, with pk %+v, for workflow %s", len(sources), td.tablePlan.comparePKs, td.wd.ct.workflow) td.sourcePrimitive = newMergeSorter(sources, td.tablePlan.comparePKs) // create a merge sorter for the target diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go index 64993c4eabd..b109a7896c3 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_plan.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_plan.go @@ -50,7 +50,28 @@ type tablePlan struct { aggregates []*engine.AggregateParams } +func (ci *compareColInfo) String() string { + return fmt.Sprintf("compareColInfo{colIndex:%d, colName:%q, isPK:%v}", ci.colIndex, ci.colName, ci.isPK) +} + +func (tp tablePlan) String() string { + return fmt.Sprintf( + "tablePlan(%s) {sourceQuery:%q, targetQuery:%q, compareCols:%+v, comparePKs:%+v, pkCols:%v, selectPks:%v, table:%v, orderBy:%v, aggregates:%d}", + tp.table.Name, + tp.sourceQuery, + tp.targetQuery, + tp.compareCols, + tp.comparePKs, + tp.pkCols, + tp.selectPks, + tp.table, + tp.orderBy, + len(tp.aggregates), + ) +} + func (td *tableDiffer) buildTablePlan() (*tablePlan, error) { + log.Infof("buildTablePlan for %v, sourceQuery %s", td.table.Name, td.sourceQuery) tp := &tablePlan{table: td.table} statement, err := sqlparser.Parse(td.sourceQuery) if err != nil { @@ -145,6 +166,7 @@ func (td *tableDiffer) buildTablePlan() (*tablePlan, error) { if err != nil { return nil, err } + log.Infof("tablePlan after findPKs: %v", tp) // Remove in_keyrange. It's not understood by mysql. sourceSelect.Where = sel.Where //removeKeyrange(sel.Where) // The source should also perform the group by. @@ -200,5 +222,32 @@ func (tp *tablePlan) findPKs(targetSelect *sqlparser.Select) error { }) } tp.orderBy = orderby + + // The code below handles the case where a table has no PKs. + if len(tp.pkCols) == 0 { + log.Warningf("No PK columns found in table %v, using compareCols", tp.table.Name) + for _, col := range tp.compareCols { + tp.pkCols = append(tp.pkCols, col.colIndex) + } + } + if len(tp.comparePKs) == 0 { + log.Warningf("No comparePKs found in table %v, using compareCols", tp.table.Name) + for _, col := range tp.compareCols { + tp.comparePKs = append(tp.comparePKs, col) + } + } + if len(tp.orderBy) == 0 { + log.Warningf("No orderby found in table %v, using compareCols", tp.table.Name) + for _, col := range tp.compareCols { + tp.orderBy = append(tp.orderBy, &sqlparser.Order{ + Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI(col.colName)}, + Direction: sqlparser.AscOrder, + }) + } + } + log.Infof("tp.orderby columns for table %v", tp.table.Name) + for i, col := range tp.orderBy { + log.Infof("orderBy[%d]: %v", i, col.Expr) + } return nil }