Skip to content

Commit 4c8a81d

Browse files
authored
fix: handle virtual primary keys in slow path of LOAD DATA & fall back MySQL JSON loading temporarily (#336)
* fix: handle virtual primary keys in slow path of LOAD DATA & fall back MySQL JSON loading temporarily * Support database/table filtering in MySQL replication
1 parent 956e1c4 commit 4c8a81d

File tree

14 files changed

+355
-24
lines changed

14 files changed

+355
-24
lines changed

.github/workflows/replication-test.yml

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,23 +90,29 @@ jobs:
9090
docker exec source-db dolt sql -q "
9191
CREATE DATABASE test;
9292
CREATE TABLE test.items (id INT PRIMARY KEY, name VARCHAR(50));
93-
INSERT INTO test.items VALUES (1, 'test1'), (2, 'test2');"
93+
INSERT INTO test.items VALUES (1, 'test1'), (2, 'test2');
94+
CREATE TABLE test.skip (id INT PRIMARY KEY, name VARCHAR(50));
95+
INSERT INTO test.skip VALUES (1, 'abc'), (2, 'def');"
9496
elif [ "${{ matrix.source }}" = "mariadb" ]; then
9597
docker exec source-db mariadb -uroot -proot test -e "
9698
CREATE TABLE items (id INT PRIMARY KEY, name VARCHAR(50));
97-
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');"
99+
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');
100+
CREATE TABLE skip (id INT PRIMARY KEY, name VARCHAR(50));
101+
INSERT INTO skip VALUES (1, 'abc'), (2, 'def');"
98102
else
99103
docker exec source-db mysql -uroot -proot test -e "
100104
CREATE TABLE items (id INT PRIMARY KEY, name VARCHAR(50));
101-
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');"
105+
INSERT INTO items VALUES (1, 'test1'), (2, 'test2');
106+
CREATE TABLE skip (id INT PRIMARY KEY, name VARCHAR(50));
107+
INSERT INTO skip VALUES (1, 'abc'), (2, 'def');"
102108
fi
103109
104110
- name: Start MyDuck Server in replica mode
105111
run: |
106112
if [ "${{ matrix.source }}" = "postgres" ]; then
107113
SOURCE_DSN="postgres://postgres:[email protected]:5432/test"
108114
else
109-
SOURCE_DSN="mysql://root:[email protected]:3306"
115+
SOURCE_DSN="mysql://root:[email protected]:3306/test?skip-tables=skip"
110116
fi
111117
112118
docker run -d --name myduck \
@@ -203,6 +209,9 @@ jobs:
203209
exit 1
204210
fi
205211
212+
# Print the logs
213+
docker logs myduck
214+
206215
- name: Cleanup
207216
if: always()
208217
run: |

backend/loaddata.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (db *DuckBuilder) executeLoadData(ctx *sql.Context, insert *plan.InsertInto
123123
// Replicated tables do not have physical primary keys.
124124
// Their logical primary keys are fake and should not be used in INSERT INTO statements.
125125
// https://github.com/apecloud/myduckserver/issues/272
126-
keyless = t.ExtraTableInfo().Replicated
126+
keyless = t.ExtraTableInfo().Replicated || !t.HasPrimaryKey()
127127
}
128128
}
129129

binlogreplication/binlog_replica_controller.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,18 @@ func (d *myBinlogReplicaController) SetReplicationSourceOptions(ctx *sql.Context
313313
func (d *myBinlogReplicaController) SetReplicationFilterOptions(_ *sql.Context, options []binlogreplication.ReplicationOption) error {
314314
for _, option := range options {
315315
switch strings.ToUpper(option.Name) {
316+
case "REPLICATE_DO_DB":
317+
value, err := getOptionValueAsDatabaseNames(option)
318+
if err != nil {
319+
return err
320+
}
321+
d.filters.setDoDatabases(value)
322+
case "REPLICATE_IGNORE_DB":
323+
value, err := getOptionValueAsDatabaseNames(option)
324+
if err != nil {
325+
return err
326+
}
327+
d.filters.setIgnoreDatabases(value)
316328
case "REPLICATE_DO_TABLE":
317329
value, err := getOptionValueAsTableNames(option)
318330
if err != nil {
@@ -378,6 +390,8 @@ func (d *myBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogr
378390
copy.SourceServerUuid = replicaSourceInfo.Uuid
379391
copy.ConnectRetry = replicaSourceInfo.ConnectRetryInterval
380392
copy.SourceRetryCount = replicaSourceInfo.ConnectRetryCount
393+
// copy.ReplicateDoDBs = d.filters.getDoDatabases()
394+
// copy.ReplicateIgnoreDBs = d.filters.getIgnoreDatabases()
381395
copy.ReplicateDoTables = d.filters.getDoTables()
382396
copy.ReplicateIgnoreTables = d.filters.getIgnoreTables()
383397

@@ -523,6 +537,24 @@ func getOptionValueAsTableNames(option binlogreplication.ReplicationOption) ([]s
523537
"but expected a list of tables", option.Name, option.Value.GetValue())
524538
}
525539

540+
func getOptionValueAsDatabaseNames(option binlogreplication.ReplicationOption) ([]string, error) {
541+
// The value of the option should be a list of database names.
542+
// But since the parser doesn't have a database name list type,
543+
// we reuse the table name list type to represent a list of database names.
544+
ov, ok := option.Value.(binlogreplication.TableNamesReplicationOptionValue)
545+
if ok {
546+
list := ov.GetValueAsTableList()
547+
names := make([]string, len(list))
548+
for i, t := range list {
549+
names[i] = t.Name()
550+
}
551+
return names, nil
552+
}
553+
554+
return nil, fmt.Errorf("unsupported value type for option %q; found %T, "+
555+
"but expected a list of databases", option.Name, option.Value.GetValue())
556+
}
557+
526558
func verifyAllTablesAreQualified(urts []sql.UnresolvedTable) error {
527559
for _, urt := range urts {
528560
if urt.Database().Name() == "" {

binlogreplication/binlog_replica_filtering.go

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ import (
2525

2626
// filterConfiguration defines the binlog filtering rules applied on the replica.
2727
type filterConfiguration struct {
28+
// doDatabases holds a map of database names that SHOULD be replicated.
29+
doDatabases map[string]struct{}
30+
// ignoreDatabases holds a map of database names that should NOT be replicated.
31+
ignoreDatabases map[string]struct{}
2832
// doTables holds a map of database name to map of table names, indicating tables that SHOULD be replicated.
2933
doTables map[string]map[string]struct{}
3034
// ignoreTables holds a map of database name to map of table names, indicating tables that should NOT be replicated.
@@ -36,9 +40,39 @@ type filterConfiguration struct {
3640
// newFilterConfiguration creates a new filterConfiguration instance and initializes members.
3741
func newFilterConfiguration() *filterConfiguration {
3842
return &filterConfiguration{
39-
doTables: make(map[string]map[string]struct{}),
40-
ignoreTables: make(map[string]map[string]struct{}),
41-
mu: &sync.Mutex{},
43+
doDatabases: make(map[string]struct{}),
44+
ignoreDatabases: make(map[string]struct{}),
45+
doTables: make(map[string]map[string]struct{}),
46+
ignoreTables: make(map[string]map[string]struct{}),
47+
mu: &sync.Mutex{},
48+
}
49+
}
50+
51+
// setDoDatabases sets the databases that are allowed to replicate. If any DoDatabases were previously configured,
52+
// they are cleared out before the new databases are set.
53+
func (fc *filterConfiguration) setDoDatabases(databases []string) {
54+
fc.mu.Lock()
55+
defer fc.mu.Unlock()
56+
57+
// Setting new replication filters clears out any existing filters
58+
fc.doDatabases = make(map[string]struct{})
59+
60+
for _, db := range databases {
61+
fc.doDatabases[strings.ToLower(db)] = struct{}{}
62+
}
63+
}
64+
65+
// setIgnoreDatabases sets the databases that are NOT allowed to replicate. If any IgnoreDatabases were previously configured,
66+
// they are cleared out before the new databases are set.
67+
func (fc *filterConfiguration) setIgnoreDatabases(databases []string) {
68+
fc.mu.Lock()
69+
defer fc.mu.Unlock()
70+
71+
// Setting new replication filters clears out any existing filters
72+
fc.ignoreDatabases = make(map[string]struct{})
73+
74+
for _, db := range databases {
75+
fc.ignoreDatabases[strings.ToLower(db)] = struct{}{}
4276
}
4377
}
4478

@@ -96,6 +130,38 @@ func (fc *filterConfiguration) setIgnoreTables(urts []sql.UnresolvedTable) error
96130
return nil
97131
}
98132

133+
// getDoDatabases returns a slice of database names that are configured to be replicated.
134+
func (fc *filterConfiguration) getDoDatabases() []string {
135+
fc.mu.Lock()
136+
defer fc.mu.Unlock()
137+
138+
if len(fc.doDatabases) == 0 {
139+
return nil
140+
}
141+
142+
databases := make([]string, 0, len(fc.doDatabases))
143+
for db := range fc.doDatabases {
144+
databases = append(databases, db)
145+
}
146+
return databases
147+
}
148+
149+
// getIgnoreDatabases returns a slice of database names that are configured to be filtered out of replication.
150+
func (fc *filterConfiguration) getIgnoreDatabases() []string {
151+
fc.mu.Lock()
152+
defer fc.mu.Unlock()
153+
154+
if len(fc.ignoreDatabases) == 0 {
155+
return nil
156+
}
157+
158+
databases := make([]string, 0, len(fc.ignoreDatabases))
159+
for db := range fc.ignoreDatabases {
160+
databases = append(databases, db)
161+
}
162+
return databases
163+
}
164+
99165
// isTableFilteredOut returns true if the table identified by |tableMap| has been filtered out on this replica and
100166
// should not have any updates applied from binlog messages.
101167
func (fc *filterConfiguration) isTableFilteredOut(ctx *sql.Context, tableMap *mysql.TableMap) bool {
@@ -109,6 +175,21 @@ func (fc *filterConfiguration) isTableFilteredOut(ctx *sql.Context, tableMap *my
109175
fc.mu.Lock()
110176
defer fc.mu.Unlock()
111177

178+
// If any filter doDatabase options are specified, then a database MUST be listed in the set
179+
// for it to be replicated. doDatabase options are processed BEFORE ignoreDatabase options.
180+
// https://dev.mysql.com/doc/refman/8.4/en/replication-rules-db-options.html
181+
if len(fc.doDatabases) > 0 {
182+
if _, ok := fc.doDatabases[db]; !ok {
183+
ctx.GetLogger().Tracef("skipping database %s (not in doDatabases)", db)
184+
return true
185+
}
186+
} else if len(fc.ignoreDatabases) > 0 {
187+
if _, ok := fc.ignoreDatabases[db]; ok {
188+
ctx.GetLogger().Tracef("skipping database %s (in ignoreDatabases)", db)
189+
return true
190+
}
191+
}
192+
112193
// If any filter doTable options are specified, then a table MUST be listed in the set
113194
// for it to be replicated. doTables options are processed BEFORE ignoreTables options.
114195
// If a table appears in both doTable and ignoreTables, it is ignored.
@@ -160,7 +241,7 @@ func convertFilterMapToStringSlice(filterMap map[string]map[string]struct{}) []s
160241

161242
tableNames := make([]string, 0, len(filterMap))
162243
for dbName, tableMap := range filterMap {
163-
for tableName, _ := range tableMap {
244+
for tableName := range tableMap {
164245
tableNames = append(tableNames, fmt.Sprintf("%s.%s", dbName, tableName))
165246
}
166247
}

binlogreplication/binlog_replication_filters_test.go

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"github.com/stretchr/testify/require"
2222
)
2323

24-
// TestBinlogReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
24+
// TestReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
2525
// filtering option is correctly applied and honored.
2626
func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
2727
defer teardown(t)
@@ -189,3 +189,96 @@ func TestBinlogReplicationFilters_errorCases(t *testing.T) {
189189
require.Error(t, err)
190190
require.ErrorContains(t, err, "no database specified for table")
191191
}
192+
193+
// TestReplicationFilters_ignoreDatabasesOnly tests that the ignoreDatabases replication
194+
// filtering option is correctly applied and honored.
195+
func TestReplicationFilters_ignoreDatabasesOnly(t *testing.T) {
196+
defer teardown(t)
197+
startSqlServersWithSystemVars(t, duckReplicaSystemVars)
198+
startReplicationAndCreateTestDb(t, mySqlPort)
199+
200+
// Ignore replication events for db01. Also tests that the first filter setting is overwritten by
201+
// the second and that db names are case-insensitive.
202+
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB=(db02);")
203+
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB=(DB01);")
204+
205+
// TODO(fan): Not implemented yet
206+
// Assert that status shows replication filters
207+
// status := showReplicaStatus(t)
208+
// require.Equal(t, "db01", status["Replicate_Ignore_DB"])
209+
// require.Equal(t, "", status["Replicate_Do_DB"])
210+
211+
// Make changes on the primary
212+
primaryDatabase.MustExec("CREATE DATABASE db02;")
213+
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
214+
primaryDatabase.MustExec("CREATE TABLE db02.t1 (pk INT PRIMARY KEY);")
215+
for i := 1; i < 12; i++ {
216+
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
217+
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db02.t1 VALUES (%d);", i))
218+
}
219+
220+
// Pause to let the replica catch up
221+
waitForReplicaToCatchUp(t)
222+
223+
// Although the database is ignored, it is still created on the replica
224+
// because the DDL statements are not filtered out.
225+
226+
// Verify that no changes from db01 were applied on the replica
227+
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db01.t1;")
228+
require.NoError(t, err)
229+
row := convertMapScanResultToStrings(readNextRow(t, rows))
230+
require.Equal(t, "0", row["count"])
231+
require.NoError(t, rows.Close())
232+
233+
// Verify that all changes from db02 were applied on the replica
234+
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db02.t1;")
235+
require.NoError(t, err)
236+
row = convertMapScanResultToStrings(readNextRow(t, rows))
237+
require.Equal(t, "11", row["count"])
238+
require.NoError(t, rows.Close())
239+
}
240+
241+
// TestReplicationFilters_doDatabasesOnly tests that the doDatabases replication
242+
// filtering option is correctly applied and honored.
243+
func TestReplicationFilters_doDatabasesOnly(t *testing.T) {
244+
defer teardown(t)
245+
startSqlServersWithSystemVars(t, duckReplicaSystemVars)
246+
startReplicationAndCreateTestDb(t, mySqlPort)
247+
248+
// Do replication events for db01. Also tests that the first filter setting is overwritten by
249+
// the second and that db names are case-insensitive.
250+
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_DB=(db02);")
251+
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_DB=(DB01);")
252+
253+
// TODO(fan): Not implemented yet
254+
// Assert that status shows replication filters
255+
// status := showReplicaStatus(t)
256+
// require.Equal(t, "db01", status["Replicate_Do_DB"])
257+
// require.Equal(t, "", status["Replicate_Ignore_DB"])
258+
259+
// Make changes on the primary
260+
primaryDatabase.MustExec("CREATE DATABASE db02;")
261+
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
262+
primaryDatabase.MustExec("CREATE TABLE db02.t1 (pk INT PRIMARY KEY);")
263+
for i := 1; i < 12; i++ {
264+
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
265+
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db02.t1 VALUES (%d);", i))
266+
}
267+
268+
// Pause to let the replica catch up
269+
waitForReplicaToCatchUp(t)
270+
271+
// Verify that all changes from db01 were applied on the replica
272+
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db01.t1;")
273+
require.NoError(t, err)
274+
row := convertMapScanResultToStrings(readNextRow(t, rows))
275+
require.Equal(t, "11", row["count"])
276+
require.NoError(t, rows.Close())
277+
278+
// Verify that no changes from db02 were applied on the replica
279+
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count FROM db02.t1;")
280+
require.NoError(t, err)
281+
row = convertMapScanResultToStrings(readNextRow(t, rows))
282+
require.Equal(t, "0", row["count"])
283+
require.NoError(t, rows.Close())
284+
}

catalog/database.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (d *Database) tablesInsensitive(ctx *sql.Context, pattern string) ([]*Table
9595
}
9696

9797
func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error) {
98-
rows, err := adapter.QueryCatalog(ctx, "SELECT DISTINCT table_name, comment FROM duckdb_tables() WHERE (database_name = ? AND schema_name = ? AND table_name ILIKE ?) OR (temporary IS TRUE AND table_name ILIKE ?)", d.catalog, d.name, pattern, pattern)
98+
rows, err := adapter.QueryCatalog(ctx, "SELECT table_name, has_primary_key, comment FROM duckdb_tables() WHERE (database_name = ? AND schema_name = ? AND table_name ILIKE ?) OR (temporary IS TRUE AND table_name ILIKE ?)", d.catalog, d.name, pattern, pattern)
9999
if err != nil {
100100
return nil, ErrDuckDB.New(err)
101101
}
@@ -104,11 +104,12 @@ func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error
104104
var tbls []*Table
105105
for rows.Next() {
106106
var tblName string
107+
var hasPrimaryKey bool
107108
var comment stdsql.NullString
108-
if err := rows.Scan(&tblName, &comment); err != nil {
109+
if err := rows.Scan(&tblName, &hasPrimaryKey, &comment); err != nil {
109110
return nil, ErrDuckDB.New(err)
110111
}
111-
t := NewTable(tblName, d).withComment(DecodeComment[ExtraTableInfo](comment.String))
112+
t := NewTable(d, tblName, hasPrimaryKey).withComment(DecodeComment[ExtraTableInfo](comment.String))
112113
tbls = append(tbls, t)
113114
}
114115
if err := rows.Err(); err != nil {

catalog/inserter.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type rowInserter struct {
1515
db string
1616
table string
1717
schema sql.Schema
18+
hasPK bool
1819
replace bool
1920

2021
once sync.Once
@@ -69,7 +70,7 @@ func (ri *rowInserter) init(ctx *sql.Context) {
6970

7071
insert.Reset()
7172
insert.WriteString("INSERT ")
72-
if ri.replace {
73+
if ri.replace && ri.hasPK {
7374
insert.WriteString(" OR REPLACE")
7475
}
7576
insert.WriteString(" INTO ")

0 commit comments

Comments
 (0)