@@ -21,7 +21,7 @@ import (
2121 "github.com/stretchr/testify/require"
2222)
2323
24- // TestBinlogReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
24+ // TestReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
2525// filtering option is correctly applied and honored.
2626func TestBinlogReplicationFilters_ignoreTablesOnly (t * testing.T ) {
2727 defer teardown (t )
@@ -189,3 +189,96 @@ func TestBinlogReplicationFilters_errorCases(t *testing.T) {
189189 require .Error (t , err )
190190 require .ErrorContains (t , err , "no database specified for table" )
191191}
192+
193+ // TestReplicationFilters_ignoreDatabasesOnly tests that the ignoreDatabases replication
194+ // filtering option is correctly applied and honored.
195+ func TestReplicationFilters_ignoreDatabasesOnly (t * testing.T ) {
196+ defer teardown (t )
197+ startSqlServersWithSystemVars (t , duckReplicaSystemVars )
198+ startReplicationAndCreateTestDb (t , mySqlPort )
199+
200+ // Ignore replication events for db01. Also tests that the first filter setting is overwritten by
201+ // the second and that db names are case-insensitive.
202+ replicaDatabase .MustExec ("CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB=(db02);" )
203+ replicaDatabase .MustExec ("CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB=(DB01);" )
204+
205+ // TODO(fan): Not implemented yet
206+ // Assert that status shows replication filters
207+ // status := showReplicaStatus(t)
208+ // require.Equal(t, "db01", status["Replicate_Ignore_DB"])
209+ // require.Equal(t, "", status["Replicate_Do_DB"])
210+
211+ // Make changes on the primary
212+ primaryDatabase .MustExec ("CREATE DATABASE db02;" )
213+ primaryDatabase .MustExec ("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);" )
214+ primaryDatabase .MustExec ("CREATE TABLE db02.t1 (pk INT PRIMARY KEY);" )
215+ for i := 1 ; i < 12 ; i ++ {
216+ primaryDatabase .MustExec (fmt .Sprintf ("INSERT INTO db01.t1 VALUES (%d);" , i ))
217+ primaryDatabase .MustExec (fmt .Sprintf ("INSERT INTO db02.t1 VALUES (%d);" , i ))
218+ }
219+
220+ // Pause to let the replica catch up
221+ waitForReplicaToCatchUp (t )
222+
223+ // Although the database is ignored, it is still created on the replica
224+ // because the DDL statements are not filtered out.
225+
226+ // Verify that no changes from db01 were applied on the replica
227+ rows , err := replicaDatabase .Queryx ("SELECT COUNT(pk) as count FROM db01.t1;" )
228+ require .NoError (t , err )
229+ row := convertMapScanResultToStrings (readNextRow (t , rows ))
230+ require .Equal (t , "0" , row ["count" ])
231+ require .NoError (t , rows .Close ())
232+
233+ // Verify that all changes from db02 were applied on the replica
234+ rows , err = replicaDatabase .Queryx ("SELECT COUNT(pk) as count FROM db02.t1;" )
235+ require .NoError (t , err )
236+ row = convertMapScanResultToStrings (readNextRow (t , rows ))
237+ require .Equal (t , "11" , row ["count" ])
238+ require .NoError (t , rows .Close ())
239+ }
240+
241+ // TestReplicationFilters_doDatabasesOnly tests that the doDatabases replication
242+ // filtering option is correctly applied and honored.
243+ func TestReplicationFilters_doDatabasesOnly (t * testing.T ) {
244+ defer teardown (t )
245+ startSqlServersWithSystemVars (t , duckReplicaSystemVars )
246+ startReplicationAndCreateTestDb (t , mySqlPort )
247+
248+ // Do replication events for db01. Also tests that the first filter setting is overwritten by
249+ // the second and that db names are case-insensitive.
250+ replicaDatabase .MustExec ("CHANGE REPLICATION FILTER REPLICATE_DO_DB=(db02);" )
251+ replicaDatabase .MustExec ("CHANGE REPLICATION FILTER REPLICATE_DO_DB=(DB01);" )
252+
253+ // TODO(fan): Not implemented yet
254+ // Assert that status shows replication filters
255+ // status := showReplicaStatus(t)
256+ // require.Equal(t, "db01", status["Replicate_Do_DB"])
257+ // require.Equal(t, "", status["Replicate_Ignore_DB"])
258+
259+ // Make changes on the primary
260+ primaryDatabase .MustExec ("CREATE DATABASE db02;" )
261+ primaryDatabase .MustExec ("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);" )
262+ primaryDatabase .MustExec ("CREATE TABLE db02.t1 (pk INT PRIMARY KEY);" )
263+ for i := 1 ; i < 12 ; i ++ {
264+ primaryDatabase .MustExec (fmt .Sprintf ("INSERT INTO db01.t1 VALUES (%d);" , i ))
265+ primaryDatabase .MustExec (fmt .Sprintf ("INSERT INTO db02.t1 VALUES (%d);" , i ))
266+ }
267+
268+ // Pause to let the replica catch up
269+ waitForReplicaToCatchUp (t )
270+
271+ // Verify that all changes from db01 were applied on the replica
272+ rows , err := replicaDatabase .Queryx ("SELECT COUNT(pk) as count FROM db01.t1;" )
273+ require .NoError (t , err )
274+ row := convertMapScanResultToStrings (readNextRow (t , rows ))
275+ require .Equal (t , "11" , row ["count" ])
276+ require .NoError (t , rows .Close ())
277+
278+ // Verify that no changes from db02 were applied on the replica
279+ rows , err = replicaDatabase .Queryx ("SELECT COUNT(pk) as count FROM db02.t1;" )
280+ require .NoError (t , err )
281+ row = convertMapScanResultToStrings (readNextRow (t , rows ))
282+ require .Equal (t , "0" , row ["count" ])
283+ require .NoError (t , rows .Close ())
284+ }
0 commit comments