44
44
import org .junit .jupiter .api .AfterAll ;
45
45
import org .junit .jupiter .api .Assertions ;
46
46
import org .junit .jupiter .api .BeforeAll ;
47
- import org .junit .jupiter .api .Disabled ;
48
47
import org .junit .jupiter .api .TestTemplate ;
49
48
import org .junit .jupiter .api .condition .DisabledOnOs ;
50
49
import org .junit .jupiter .api .condition .OS ;
64
63
import java .util .HashMap ;
65
64
import java .util .List ;
66
65
import java .util .Map ;
66
+ import java .util .Objects ;
67
67
import java .util .concurrent .CompletableFuture ;
68
68
import java .util .concurrent .TimeUnit ;
69
69
import java .util .stream .Stream ;
@@ -165,7 +165,6 @@ private void extractFiles() {
165
165
"sh" , "-c" , "cd " + CATALOG_DIR + " && tar -zxvf " + NAMESPACE_TAR );
166
166
try {
167
167
Process process = processBuilder .start ();
168
- // 等待命令执行完成
169
168
int exitCode = process .waitFor ();
170
169
if (exitCode == 0 ) {
171
170
log .info ("Extract files successful." );
@@ -210,7 +209,6 @@ public void testMysqlCdcCheckDataE2e(TestContainer container)
210
209
}
211
210
212
211
@ TestTemplate
213
- @ Disabled
214
212
public void testMysqlCdcCheckSchemaChangeE2e (TestContainer container )
215
213
throws IOException , InterruptedException {
216
214
// Clear related content to ensure that multiple operations are not affected
@@ -232,9 +230,10 @@ public void testMysqlCdcCheckSchemaChangeE2e(TestContainer container)
232
230
233
231
private void alterSchemaAndCheckIcebergSchema (TestContainer container )
234
232
throws InterruptedException , IOException {
235
- String dropField = "f_binary " ;
233
+ String addField = "f_string_add " ;
236
234
// Init table data
237
- dropTableColumn (MYSQL_DATABASE , SOURCE_TABLE , dropField );
235
+ addTableColumn (MYSQL_DATABASE , SOURCE_TABLE , addField );
236
+ insertAddColumnData (MYSQL_DATABASE , SOURCE_TABLE );
238
237
// Waiting 30s for source capture data
239
238
sleep (30000 );
240
239
@@ -247,10 +246,21 @@ private void alterSchemaAndCheckIcebergSchema(TestContainer container)
247
246
// copy iceberg to local
248
247
container .executeExtraCommands (containerExtendedFactory );
249
248
Schema schema = loadIcebergSchema ();
250
- Types .NestedField nestedField = schema .findField (dropField );
251
- Assertions .assertEquals (true , nestedField == null );
252
- // for next test
253
- addTableColumn (MYSQL_DATABASE , SOURCE_TABLE , dropField );
249
+ Types .NestedField nestedField = schema .findField (addField );
250
+ Assertions .assertEquals (true , Objects .nonNull (nestedField ));
251
+
252
+ List <Record > records = loadIcebergTable ();
253
+ Assertions .assertEquals (4 , records .size ());
254
+ for (Record record : records ) {
255
+ Integer id = (Integer ) record .getField ("id" );
256
+ String f_string_add = (String ) record .getField ("f_string_add" );
257
+ if (id == 100 ) {
258
+ Assertions .assertEquals ("add column field" , f_string_add );
259
+ }
260
+ }
261
+
262
+ // for next test.
263
+ dropTableColumn (MYSQL_DATABASE , SOURCE_TABLE , addField );
254
264
});
255
265
}
256
266
@@ -342,8 +352,9 @@ private void dropTableColumn(String database, String tableName, String dropField
342
352
executeSql ("ALTER TABLE " + database + "." + tableName + " DROP COLUMN " + dropField );
343
353
}
344
354
345
- private void addTableColumn (String database , String tableName , String dropField ) {
346
- executeSql ("ALTER TABLE " + database + "." + tableName + " ADD COLUMN " + dropField );
355
+ private void addTableColumn (String database , String tableName , String addField ) {
356
+ executeSql (
357
+ "ALTER TABLE " + database + "." + tableName + " ADD COLUMN " + addField + " text" );
347
358
}
348
359
349
360
private void clearTable (String database , String tableName ) {
@@ -456,4 +467,28 @@ private void upsertDeleteSourceTable(String database, String tableName) {
456
467
457
468
executeSql ("UPDATE " + database + "." + tableName + " SET f_bigint = 10000 where id = 3" );
458
469
}
470
+
471
+ private void insertAddColumnData (String database , String tableName ) {
472
+ executeSql (
473
+ "INSERT INTO "
474
+ + database
475
+ + "."
476
+ + tableName
477
+ + " ( id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,\n "
478
+ + " f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,\n "
479
+ + " f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,\n "
480
+ + " f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,\n "
481
+ + " f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,\n "
482
+ + " f_tinyint, f_tinyint_unsigned, f_json, f_year, f_string_add)\n "
483
+ + "VALUES ( 100, "
484
+ + "0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n "
485
+ + " 0x68656C6C6F, 0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n "
486
+ + " 0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 1234567, 7654321,\n "
487
+ + " 123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n "
488
+ + " 'This is a text field', 'This is a tiny text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n "
489
+ + " '2023-04-27 11:08:40', 1, b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 'enum2',\n "
490
+ + " 0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 'This is a long varchar field',\n "
491
+ + " 12.345, '14:30:00', -128, 255, '{ \" key\" : \" value\" }', 1992 , 'add column "
492
+ + "field')" );
493
+ }
459
494
}
0 commit comments