Skip to content

Commit 0864fb7

Browse files
ti-chi-botwk989898
andauthored
sink: use where in operator in delete or update statement (#3788) (#4470)
* This is an automated cherry-pick of #3788 Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io> * Update config.go * Update config.go * Fix syntax by adding a comma after SlowQuery * update Signed-off-by: wk989898 <nhsmwk@gmail.com> * update Signed-off-by: wk989898 <nhsmwk@gmail.com> --------- Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io> Signed-off-by: wk989898 <nhsmwk@gmail.com> Co-authored-by: nhsmw <nhsmwk@gmail.com>
1 parent 4da823d commit 0864fb7

File tree

15 files changed

+1525
-147
lines changed

15 files changed

+1525
-147
lines changed

pkg/sink/mysql/config.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/pingcap/ticdc/pkg/config"
3333
cerror "github.com/pingcap/ticdc/pkg/errors"
3434
"github.com/pingcap/ticdc/pkg/security"
35+
"github.com/pingcap/ticdc/pkg/sink/sqlmodel"
3536
"github.com/pingcap/ticdc/pkg/util"
3637
"github.com/pingcap/tidb/br/pkg/version"
3738
"github.com/pingcap/tidb/pkg/sessionctx/variable"
@@ -148,6 +149,12 @@ type Config struct {
148149

149150
// ServerInfo is the version info of the downstream
150151
ServerInfo version.ServerInfo
152+
153+
// whereClause controls the WHERE clause strategy used by multi-row UPDATE/DELETE.
154+
//
155+
// It is configured via the sink URI query param `where-clause` and passed to
156+
// sqlmodel.Gen{Delete,Update}SQL. See pkg/sink/sqlmodel for details.
157+
whereClause string
151158
}
152159

153160
// New returns the default mysql backend config.
@@ -171,6 +178,7 @@ func New() *Config {
171178
HasVectorType: defaultHasVectorType,
172179
EnableDDLTs: defaultEnableDDLTs,
173180
SlowQuery: slowQuery,
181+
whereClause: sqlmodel.DefaultWhereClause,
174182
}
175183
}
176184

@@ -269,6 +277,9 @@ func (c *Config) Apply(
269277
if err = getEnableDDLTs(query, &c.EnableDDLTs); err != nil {
270278
return err
271279
}
280+
if err = getWhereClause(query, &c.whereClause); err != nil {
281+
return err
282+
}
272283

273284
// c.EnableOldValue = config.EnableOldValue
274285
// Note: The TiDBSourceID should never be 0 here, but we have found that
@@ -610,6 +621,14 @@ func getEnableDDLTs(value url.Values, enableDDLTs *bool) error {
610621
return getBool(value, "enable-ddl-ts", enableDDLTs)
611622
}
612623

624+
func getWhereClause(value url.Values, whereClause *string) error {
625+
s := value.Get("where-clause")
626+
if len(s) > 0 {
627+
*whereClause = s
628+
}
629+
return nil
630+
}
631+
613632
func getBool(values url.Values, key string, target *bool) error {
614633
s := values.Get(key)
615634
if len(s) > 0 {

pkg/sink/mysql/mysql_writer_dml.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -846,7 +846,7 @@ func (w *Writer) batchSingleTxnDmls(
846846
// handle delete
847847
if len(deleteRows) > 0 {
848848
for _, rows := range deleteRows {
849-
sql, value := sqlmodel.GenDeleteSQL(rows...)
849+
sql, value := sqlmodel.GenDeleteSQL(w.cfg.whereClause, rows...)
850850
sqls = append(sqls, sql)
851851
values = append(values, value)
852852
rowTypes = append(rowTypes, common.RowTypeDelete)
@@ -978,7 +978,7 @@ func (w *Writer) genUpdateSQL(rows ...*sqlmodel.RowChange) ([]string, [][]interf
978978
}
979979
if size < w.cfg.MaxMultiUpdateRowSize*len(rows) {
980980
// use multi update in one SQL
981-
sql, value := sqlmodel.GenUpdateSQL(rows...)
981+
sql, value := sqlmodel.GenUpdateSQL(w.cfg.whereClause, rows...)
982982
return []string{sql}, [][]interface{}{value}, []common.RowType{common.RowTypeUpdate}
983983
}
984984
// each row has one independent update SQL.

pkg/sink/mysql/mysql_writer_dml_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) {
353353
require.Equal(t, 2, len(sql))
354354
require.Equal(t, 2, len(args))
355355
require.Equal(t, 2, len(rowTypes))
356-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
356+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
357357
require.Equal(t, []interface{}{int64(1)}, args[0])
358358
require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[1])
359359
require.Equal(t, []interface{}{int64(1), "test"}, args[1])
@@ -365,7 +365,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) {
365365
require.Equal(t, 2, len(sql))
366366
require.Equal(t, 2, len(args))
367367
require.Equal(t, 2, len(rowTypes))
368-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
368+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
369369
require.Equal(t, []interface{}{int64(2)}, args[0])
370370
require.Equal(t, "UPDATE `test`.`t` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", sql[1])
371371

@@ -375,7 +375,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) {
375375
sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent})
376376
require.Equal(t, 1, len(sql))
377377
require.Equal(t, 1, len(args))
378-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
378+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
379379
require.Equal(t, []interface{}{int64(3)}, args[0])
380380

381381
// Insert A + Update A
@@ -394,7 +394,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) {
394394
sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent})
395395
require.Equal(t, 1, len(sql))
396396
require.Equal(t, 1, len(args))
397-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
397+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
398398
require.Equal(t, []interface{}{int64(5)}, args[0])
399399

400400
// Update A + Update A
@@ -416,7 +416,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) {
416416
require.Equal(t, 2, len(sql))
417417
require.Equal(t, 2, len(args))
418418
require.Equal(t, 2, len(rowTypes))
419-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?) OR (`id` = ?)", sql[0])
419+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?),(?))", sql[0])
420420
require.Equal(t, []interface{}{int64(7), int64(7)}, args[0])
421421
require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[1])
422422
require.Equal(t, []interface{}{int64(7), "test2"}, args[1])
@@ -431,7 +431,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) {
431431
sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2})
432432
require.Equal(t, 1, len(sql))
433433
require.Equal(t, 1, len(args))
434-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?) OR (`id` = ?)", sql[0])
434+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?),(?))", sql[0])
435435
require.Equal(t, []interface{}{int64(8), int64(8)}, args[0])
436436

437437
// Delete A + Insert A + Update A + Update A + Delete A
@@ -446,7 +446,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) {
446446
sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2})
447447
require.Equal(t, 1, len(sql))
448448
require.Equal(t, 1, len(args))
449-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?) OR (`id` = ?)", sql[0])
449+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?),(?))", sql[0])
450450
require.Equal(t, []interface{}{int64(9), int64(9)}, args[0])
451451

452452
// Insert A + Delete A + Insert A
@@ -458,7 +458,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) {
458458
require.Equal(t, 2, len(sql))
459459
require.Equal(t, 2, len(args))
460460
require.Equal(t, 2, len(rowTypes))
461-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
461+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
462462
require.Equal(t, []interface{}{int64(10)}, args[0])
463463
require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[1])
464464
require.Equal(t, []interface{}{int64(10), "test2"}, args[1])
@@ -472,7 +472,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) {
472472
sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent})
473473
require.Equal(t, 1, len(sql))
474474
require.Equal(t, 1, len(args))
475-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
475+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
476476
require.Equal(t, []interface{}{int64(11)}, args[0])
477477

478478
// Insert A + Update A + Update A
@@ -495,7 +495,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) {
495495
sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent})
496496
require.Equal(t, 3, len(sql))
497497
require.Equal(t, 3, len(args))
498-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
498+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
499499
require.Equal(t, []interface{}{int64(14)}, args[0])
500500
require.Equal(t, "UPDATE `test`.`t` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", sql[1])
501501
require.Equal(t, []interface{}{int64(15), "test15", int64(15)}, args[1])
@@ -541,7 +541,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) {
541541
sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent})
542542
require.Equal(t, 1, len(sql))
543543
require.Equal(t, 1, len(args))
544-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
544+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
545545
require.Equal(t, []interface{}{int64(3)}, args[0])
546546

547547
// Insert A + Update A
@@ -560,7 +560,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) {
560560
sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent})
561561
require.Equal(t, 1, len(sql))
562562
require.Equal(t, 1, len(args))
563-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
563+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
564564
require.Equal(t, []interface{}{int64(5)}, args[0])
565565

566566
// Update A + Update A
@@ -594,7 +594,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) {
594594
sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2})
595595
require.Equal(t, 1, len(sql))
596596
require.Equal(t, 1, len(args))
597-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
597+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
598598
require.Equal(t, []interface{}{int64(8)}, args[0])
599599

600600
// Delete A + Insert A + Update A + Update A + Delete A
@@ -609,7 +609,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) {
609609
sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2})
610610
require.Equal(t, 1, len(sql))
611611
require.Equal(t, 1, len(args))
612-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
612+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
613613
require.Equal(t, []interface{}{int64(9)}, args[0])
614614

615615
// Insert A + Delete A + Insert A
@@ -632,7 +632,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) {
632632
sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent})
633633
require.Equal(t, 1, len(sql))
634634
require.Equal(t, 1, len(args))
635-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
635+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
636636
require.Equal(t, []interface{}{int64(11)}, args[0])
637637

638638
// Insert A + Update A + Update A
@@ -656,7 +656,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) {
656656
require.Equal(t, 2, len(sql))
657657
require.Equal(t, 2, len(args))
658658
require.Equal(t, 2, len(rowTypes))
659-
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id` = ?)", sql[0])
659+
require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0])
660660
require.Equal(t, []interface{}{int64(14)}, args[0])
661661
require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?)", sql[1])
662662
// The order of args in unsafe mode is not deterministic due to map iteration

pkg/sink/pulsar/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func NewPulsarConfig(sinkURI *url.URL, pulsarConfig *config.PulsarConfig) (*conf
8080
c.BrokerURL = brokerScheme + "://" + sinkURI.Host
8181

8282
if pulsarConfig == nil {
83-
log.L().Debug("new pulsar config", zap.Any("config", c))
83+
log.Debug("new pulsar config", zap.Any("config", c))
8484
return c, nil
8585
}
8686

@@ -108,7 +108,7 @@ func NewPulsarConfig(sinkURI *url.URL, pulsarConfig *config.PulsarConfig) (*conf
108108
pulsarConfig.SendTimeout = c.SendTimeout
109109
}
110110

111-
log.L().Debug("new pulsar config success", zap.Any("config", pulsarConfig))
111+
log.Debug("new pulsar config success", zap.Any("config", pulsarConfig))
112112

113113
return pulsarConfig, nil
114114
}

0 commit comments

Comments
 (0)