Skip to content

[Direct PR] (v16) VDiff: add support for tables with no primary keys #18127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: release-16.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ create table ` + "`Lead-1`(`Lead`" + ` binary(16), name varbinary(16), date1 dat
create table _vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431(id int, val varbinary(128), primary key(id), key(val));
create table db_order_test (c_uuid varchar(64) not null default '', created_at datetime not null, dstuff varchar(128), dtstuff text, dbstuff blob, cstuff char(32), primary key (c_uuid,created_at), key (dstuff)) CHARSET=utf8mb4;
create table datze (id int, dt1 datetime not null default current_timestamp, dt2 datetime not null, ts1 timestamp default current_timestamp, primary key (id), key (dt1));
create table customer_nopk(id int, name varchar(128)) CHARSET=utf8;
`

// These should always be ignored in vreplication
Expand Down Expand Up @@ -81,7 +82,8 @@ create table datze (id int, dt1 datetime not null default current_timestamp, dt2
"Lead": {},
"Lead-1": {},
"db_order_test": {},
"datze": {}
"datze": {},
"customer_nopk": {}
}
}
`
Expand Down Expand Up @@ -156,6 +158,14 @@ create table datze (id int, dt1 datetime not null default current_timestamp, dt2
"name": "reverse_bits"
}
]
},
"customer_nopk": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,20 @@ insert into datze(id, dt2, ts1) values (4, '2022-03-27 03:00:00', current_timest
insert into datze(id, dt2, ts1) values (5, '2022-03-27 03:15:00', current_timestamp);
insert into datze(id, dt2, ts1) values (6, current_timestamp, current_timestamp);

insert into customer_nopk(id, name) values(1, 'abc');
insert into customer_nopk(id, name) values(2, 'def');
insert into customer_nopk(id, name) values(3, 'ghi');
insert into customer_nopk(id, name) values(4, 'jkl');
insert into customer_nopk(id, name) values(5, 'mno');
insert into customer_nopk(id, name) values(6, 'pqr');
insert into customer_nopk(id, name) values(7, 'stu');
insert into customer_nopk(id, name) values(8, 'vwx');
insert into customer_nopk(id, name) values(9, 'yz');
insert into customer_nopk(id, name) values(10, 'abc');
insert into customer_nopk(id, name) values(11, 'def');
insert into customer_nopk(id, name) values(12, 'ghi');
insert into customer_nopk(id, name) values(13, 'jkl');
insert into customer_nopk(id, name) values(14, 'mno');
insert into customer_nopk(id, name) values(15, 'pqr');


5 changes: 3 additions & 2 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var testCases = []*testCase{
sourceShards: "0",
targetShards: "-80,80-",
tabletBaseID: 200,
tables: "customer,Lead,Lead-1",
tables: "customer,customer_nopk,Lead,Lead-1",
autoRetryError: true,
retryInsert: `insert into customer(cid, name, typ) values(91234, 'Testy McTester', 'soho')`,
resume: true,
Expand Down Expand Up @@ -187,7 +187,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
updateTableStats(t, tab, tc.tables) // need to do this in order to test progress reports
}

vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)
// vdiff v1 fails for tables without pks. Since it is not supported anymore, let's just not test with v1.
vdiff(t, tc.targetKs, tc.workflow, allCellNames, false, true, nil)

if tc.autoRetryError {
testAutoRetryError(t, tc, allCellNames)
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func waitForVDiff2ToComplete(t *testing.T, ksWorkflow, cells, uuid string, compl
if !completedAtMin.IsZero() {
ca := info.CompletedAt
completedAt, _ := time.Parse(vdiff2.TimestampFormat, ca)
if !completedAt.After(completedAtMin) {
if completedAt.Before(completedAtMin) {
continue
}
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ func (td *tableDiffer) setupRowSorters() {
for shard, source := range td.wd.ct.sources {
sources[shard] = source.shardStreamer
}
log.Infof("Setting up row sorters for %d sources, with pk %+v, for workflow %s", len(sources), td.tablePlan.comparePKs, td.wd.ct.workflow)
td.sourcePrimitive = newMergeSorter(sources, td.tablePlan.comparePKs)

// create a merge sorter for the target
Expand Down
49 changes: 49 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/table_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,28 @@ type tablePlan struct {
aggregates []*engine.AggregateParams
}

func (ci *compareColInfo) String() string {
return fmt.Sprintf("compareColInfo{colIndex:%d, colName:%q, isPK:%v}", ci.colIndex, ci.colName, ci.isPK)
}

func (tp tablePlan) String() string {
return fmt.Sprintf(
"tablePlan(%s) {sourceQuery:%q, targetQuery:%q, compareCols:%+v, comparePKs:%+v, pkCols:%v, selectPks:%v, table:%v, orderBy:%v, aggregates:%d}",
tp.table.Name,
tp.sourceQuery,
tp.targetQuery,
tp.compareCols,
tp.comparePKs,
tp.pkCols,
tp.selectPks,
tp.table,
tp.orderBy,
len(tp.aggregates),
)
}

func (td *tableDiffer) buildTablePlan() (*tablePlan, error) {
log.Infof("buildTablePlan for %v, sourceQuery %s", td.table.Name, td.sourceQuery)
tp := &tablePlan{table: td.table}
statement, err := sqlparser.Parse(td.sourceQuery)
if err != nil {
Expand Down Expand Up @@ -145,6 +166,7 @@ func (td *tableDiffer) buildTablePlan() (*tablePlan, error) {
if err != nil {
return nil, err
}
log.Infof("tablePlan after findPKs: %v", tp)
// Remove in_keyrange. It's not understood by mysql.
sourceSelect.Where = sel.Where //removeKeyrange(sel.Where)
// The source should also perform the group by.
Expand Down Expand Up @@ -200,5 +222,32 @@ func (tp *tablePlan) findPKs(targetSelect *sqlparser.Select) error {
})
}
tp.orderBy = orderby

// The code below handles the case where a table has no PKs.
if len(tp.pkCols) == 0 {
log.Warningf("No PK columns found in table %v, using compareCols", tp.table.Name)
for _, col := range tp.compareCols {
tp.pkCols = append(tp.pkCols, col.colIndex)
}
}
if len(tp.comparePKs) == 0 {
log.Warningf("No comparePKs found in table %v, using compareCols", tp.table.Name)
for _, col := range tp.compareCols {
tp.comparePKs = append(tp.comparePKs, col)
}
}
if len(tp.orderBy) == 0 {
log.Warningf("No orderby found in table %v, using compareCols", tp.table.Name)
for _, col := range tp.compareCols {
tp.orderBy = append(tp.orderBy, &sqlparser.Order{
Expr: &sqlparser.ColName{Name: sqlparser.NewIdentifierCI(col.colName)},
Direction: sqlparser.AscOrder,
})
}
}
log.Infof("tp.orderby columns for table %v", tp.table.Name)
for i, col := range tp.orderBy {
log.Infof("orderBy[%d]: %v", i, col.Expr)
}
return nil
}
Loading