4444import org .junit .jupiter .api .AfterAll ;
4545import org .junit .jupiter .api .BeforeAll ;
4646import org .junit .jupiter .api .BeforeEach ;
47- import org .junit .jupiter .api .Test ;
47+ import org .junit .jupiter .params .ParameterizedTest ;
48+ import org .junit .jupiter .params .provider .ValueSource ;
4849import org .testcontainers .lifecycle .Startables ;
4950
5051import java .sql .Connection ;
@@ -96,8 +97,10 @@ public void before() {
9697 env .setRestartStrategy (RestartStrategies .noRestart ());
9798 }
9899
99- @ Test
100- public void testParseAlterStatementWhenTableNameAndColumnIsUpper () throws Exception {
100+ @ ParameterizedTest
101+ @ ValueSource (strings = {"products" , "uppercase_products" })
102+ public void testParseAlterStatementWhenTableNameAndColumnIsUpper (String tableName )
103+ throws Exception {
101104 env .setParallelism (1 );
102105 inventoryDatabase .createAndInitialize ();
103106 MySqlSourceConfigFactory configFactory =
@@ -107,7 +110,7 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except
107110 .username (TEST_USER )
108111 .password (TEST_PASSWORD )
109112 .databaseList (inventoryDatabase .getDatabaseName ())
110- .tableList (inventoryDatabase .getDatabaseName () + "\\ .products" )
113+ .tableList (inventoryDatabase .getDatabaseName () + "\\ ." + tableName )
111114 .startupOptions (StartupOptions .latest ())
112115 .serverId (getServerId (env .getParallelism ()))
113116 .serverTimeZone ("UTC" )
@@ -124,17 +127,17 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except
124127 .executeAndCollect ();
125128 Thread .sleep (5_000 );
126129
127- TableId tableId = TableId .tableId (inventoryDatabase .getDatabaseName (), "products" );
130+ TableId tableId = TableId .tableId (inventoryDatabase .getDatabaseName (), tableName );
128131 List <Event > expected = new ArrayList <>();
129132 expected .add (getProductsCreateTableEvent (tableId ));
130133 try (Connection connection = inventoryDatabase .getJdbcConnection ();
131134 Statement statement = connection .createStatement ()) {
132- expected .addAll (executeAlterAndProvideExpected (tableId , statement ));
135+ expected .addAll (executeAlterAndProvideExpected (tableId , statement , tableName ));
133136
134137 statement .execute (
135138 String .format (
136- "ALTER TABLE `%s`.`PRODUCTS ` ADD `cols1 ` VARCHAR(45);" ,
137- inventoryDatabase .getDatabaseName ()));
139+ "ALTER TABLE `%s`.`%s ` ADD `COLS1 ` VARCHAR(45);" ,
140+ inventoryDatabase .getDatabaseName (), tableName ));
138141 expected .add (
139142 new AddColumnEvent (
140143 tableId ,
@@ -147,6 +150,42 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except
147150 assertThat (actual ).isEqualTo (expected );
148151 }
149152
153+ @ ParameterizedTest
154+ @ ValueSource (strings = {"products" , "uppercase_products" })
155+ public void testSnapshotModeWhenTableNameAndColumnIsUpper (String tableName ) throws Exception {
156+ env .setParallelism (1 );
157+ inventoryDatabase .createAndInitialize ();
158+ MySqlSourceConfigFactory configFactory =
159+ new MySqlSourceConfigFactory ()
160+ .hostname (MYSQL8_CONTAINER .getHost ())
161+ .port (MYSQL8_CONTAINER .getDatabasePort ())
162+ .username (TEST_USER )
163+ .password (TEST_PASSWORD )
164+ .databaseList (inventoryDatabase .getDatabaseName ())
165+ .tableList (inventoryDatabase .getDatabaseName () + "\\ ." + tableName )
166+ .startupOptions (StartupOptions .snapshot ())
167+ .serverId (getServerId (env .getParallelism ()))
168+ .serverTimeZone ("UTC" )
169+ .includeSchemaChanges (SCHEMA_CHANGE_ENABLED .defaultValue ());
170+
171+ FlinkSourceProvider sourceProvider =
172+ (FlinkSourceProvider ) new MySqlDataSource (configFactory ).getEventSourceProvider ();
173+ CloseableIterator <Event > events =
174+ env .fromSource (
175+ sourceProvider .getSource (),
176+ WatermarkStrategy .noWatermarks (),
177+ MySqlDataSourceFactory .IDENTIFIER ,
178+ new EventTypeInfo ())
179+ .executeAndCollect ();
180+ Thread .sleep (5_000 );
181+
182+ TableId tableId = TableId .tableId (inventoryDatabase .getDatabaseName (), tableName );
183+ List <Event > expected = new ArrayList <>();
184+ expected .add (getProductsCreateTableEvent (tableId ));
185+ List <Event > actual = fetchResults (events , expected .size ());
186+ assertThat (actual ).isEqualTo (expected );
187+ }
188+
150189 private CreateTableEvent getProductsCreateTableEvent (TableId tableId ) {
151190 return new CreateTableEvent (
152191 tableId ,
@@ -172,13 +211,13 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
172211 * );
173212 * </pre>
174213 */
175- private List <Event > executeAlterAndProvideExpected (TableId tableId , Statement statement )
176- throws SQLException {
214+ private List <Event > executeAlterAndProvideExpected (
215+ TableId tableId , Statement statement , String tableName ) throws SQLException {
177216 List <Event > expected = new ArrayList <>();
178217 statement .execute (
179218 String .format (
180- "ALTER TABLE `%s`.`products ` CHANGE COLUMN `DESCRIPTION` `DESC` VARCHAR(255) NULL DEFAULT NULL;" ,
181- inventoryDatabase .getDatabaseName ()));
219+ "ALTER TABLE `%s`.`%s ` CHANGE COLUMN `DESCRIPTION` `DESC` VARCHAR(255) NULL DEFAULT NULL;" ,
220+ inventoryDatabase .getDatabaseName (), tableName ));
182221 expected .add (
183222 new AlterColumnTypeEvent (
184223 tableId , Collections .singletonMap ("description" , DataTypes .VARCHAR (255 ))));
@@ -187,17 +226,17 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
187226
188227 statement .execute (
189228 String .format (
190- "ALTER TABLE `%s`.`products ` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;" ,
191- inventoryDatabase .getDatabaseName ()));
229+ "ALTER TABLE `%s`.`%s ` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;" ,
230+ inventoryDatabase .getDatabaseName (), tableName ));
192231 expected .add (
193232 new AlterColumnTypeEvent (
194233 tableId , Collections .singletonMap ("desc" , DataTypes .VARCHAR (400 ))));
195234 expected .add (new RenameColumnEvent (tableId , Collections .singletonMap ("desc" , "desc2" )));
196235
197236 statement .execute (
198237 String .format (
199- "ALTER TABLE `%s`.`products ` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;" ,
200- inventoryDatabase .getDatabaseName ()));
238+ "ALTER TABLE `%s`.`%s ` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;" ,
239+ inventoryDatabase .getDatabaseName (), tableName ));
201240 expected .add (
202241 new AddColumnEvent (
203242 tableId ,
@@ -209,8 +248,8 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
209248
210249 statement .execute (
211250 String .format (
212- "ALTER TABLE `%s`.`products ` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER `desc1`;" ,
213- inventoryDatabase .getDatabaseName ()));
251+ "ALTER TABLE `%s`.`%s ` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER `desc1`;" ,
252+ inventoryDatabase .getDatabaseName (), tableName ));
214253 expected .add (
215254 new AddColumnEvent (
216255 tableId ,
@@ -230,8 +269,8 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
230269
231270 statement .execute (
232271 String .format (
233- "ALTER TABLE `%s`.`products ` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;" ,
234- inventoryDatabase .getDatabaseName ()));
272+ "ALTER TABLE `%s`.`%s ` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;" ,
273+ inventoryDatabase .getDatabaseName (), tableName ));
235274 expected .add (new DropColumnEvent (tableId , Collections .singletonList ("desc2" )));
236275 expected .add (
237276 new AlterColumnTypeEvent (
@@ -240,22 +279,22 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
240279 // Only available in mysql 8.0
241280 statement .execute (
242281 String .format (
243- "ALTER TABLE `%s`.`products ` RENAME COLUMN `desc1` TO `desc3`;" ,
244- inventoryDatabase .getDatabaseName ()));
282+ "ALTER TABLE `%s`.`%s ` RENAME COLUMN `desc1` TO `desc3`;" ,
283+ inventoryDatabase .getDatabaseName (), tableName ));
245284 expected .add (new RenameColumnEvent (tableId , Collections .singletonMap ("desc1" , "desc3" )));
246285
247286 statement .execute (
248287 String .format (
249- "ALTER TABLE `%s`.`products ` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;" ,
250- inventoryDatabase .getDatabaseName ()));
288+ "ALTER TABLE `%s`.`%s ` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;" ,
289+ inventoryDatabase .getDatabaseName (), tableName ));
251290 expected .add (
252291 new AlterColumnTypeEvent (
253292 tableId , Collections .singletonMap ("desc3" , DataTypes .VARCHAR (255 ))));
254293
255294 statement .execute (
256295 String .format (
257- "ALTER TABLE `%s`.`products ` DROP COLUMN `desc3`;" ,
258- inventoryDatabase .getDatabaseName ()));
296+ "ALTER TABLE `%s`.`%s ` DROP COLUMN `desc3`;" ,
297+ inventoryDatabase .getDatabaseName (), tableName ));
259298 expected .add (new DropColumnEvent (tableId , Collections .singletonList ("desc3" )));
260299
261300 // Should not catch SchemaChangeEvent of tables other than `products`
0 commit comments