Skip to content

Commit 5ef2bc7

Browse files
beryllwboyu.wjb
authored andcommitted
add auto increment ID support for pt-osc schema migration testing
1 parent 520c0bf commit 5ef2bc7

2 files changed

Lines changed: 112 additions & 0 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,30 @@ private void insertRecordsPhase2(UniqueDatabase database, int startIndex, int co
173173
}
174174
}
175175

176+
private void insertRecordsPhase1ForAutoId(UniqueDatabase database, int count) throws Exception {
177+
try (Connection connection = database.getJdbcConnection();
178+
Statement statement = connection.createStatement()) {
179+
for (int i = 0; i < count; i++) {
180+
statement.execute(
181+
String.format(
182+
"insert into customers_auto_id (name, address, phone_number) values ('%s', '%s', '%s');",
183+
"flink_" + i, "Address Line #" + i, 1000000000L + i));
184+
}
185+
}
186+
}
187+
188+
private void insertRecordsPhase2ForAutoId(UniqueDatabase database, int count) throws Exception {
189+
try (Connection connection = database.getJdbcConnection();
190+
Statement statement = connection.createStatement()) {
191+
for (int i = 0; i < count; i++) {
192+
statement.execute(
193+
String.format(
194+
"insert into customers_auto_id (name, address, phone_number, ext) values ('%s', '%s', '%s', %s);",
195+
"flink_ext_" + i, "Address Line Ext #" + i, 1000000000L + i, i));
196+
}
197+
}
198+
}
199+
176200
@Test
177201
void testGhOstSchemaMigration() throws Exception {
178202
String databaseName = customerDatabase.getDatabaseName();
@@ -283,6 +307,63 @@ void testPtOscSchemaMigration() throws Exception {
283307
}
284308
}
285309

310+
@Test
311+
void testPtOscSchemaMigrationWithAutoIncrementId() throws Exception {
312+
String databaseName = customerDatabase.getDatabaseName();
313+
314+
LOG.info("Step 1: Start pipeline job for auto increment id table");
315+
316+
Thread yamlJob = runJob(databaseName, "customers_auto_id");
317+
yamlJob.start();
318+
319+
LOG.info("Step 2: Insert initial records (Phase 1)");
320+
insertRecordsPhase1ForAutoId(customerDatabase, 1000);
321+
322+
LOG.info("Step 3: Evolve schema with pt-osc - ADD COLUMN");
323+
324+
Thread thread =
325+
new Thread(
326+
() -> {
327+
try {
328+
execInContainer(
329+
PERCONA_TOOLKIT_CONTAINER,
330+
"evolve schema",
331+
"pt-online-schema-change",
332+
"--user=" + TEST_USER,
333+
"--host=" + INTER_CONTAINER_MYSQL_ALIAS,
334+
"--password=" + TEST_PASSWORD,
335+
"P=3306,t=customers_auto_id,D=" + databaseName,
336+
"--alter",
337+
"add column ext int first",
338+
"--charset=utf8",
339+
"--recursion-method=NONE", // Do not look for slave nodes
340+
"--print",
341+
"--execute");
342+
} catch (IOException | InterruptedException e) {
343+
throw new RuntimeException(e);
344+
}
345+
});
346+
347+
LOG.info("Insertion Phase 1 finishes");
348+
thread.start();
349+
insertRecordsPhase1ForAutoId(customerDatabase, 3000);
350+
LOG.info("Insertion Phase 2 finishes");
351+
352+
thread.join();
353+
insertRecordsPhase2ForAutoId(customerDatabase, 1000);
354+
LOG.info("Insertion Phase 3 finishes");
355+
356+
// Initial 21 records + Phase1 1000 + Phase2 3000 + Phase3 1000 = 5021
357+
try {
358+
TestCaseUtils.repeatedCheck(
359+
() -> outCaptor.toString().split(System.lineSeparator()).length == 5021);
360+
} catch (Exception e) {
361+
LOG.error("Failed to verify results. Captured stdout: {}", outCaptor.toString(), e);
362+
} finally {
363+
yamlJob.interrupt();
364+
}
365+
}
366+
286367
private static void execInContainer(Container<?> container, String prompt, String... commands)
287368
throws IOException, InterruptedException {
288369
{

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,3 +326,34 @@ CREATE TABLE default_value_test (
326326
INSERT INTO default_value_test
327327
VALUES (1,'user1','Shanghai',123567),
328328
(2,'user2','Shanghai',123567);
329+
330+
-- table has auto increment primary key for pt-osc testing
331+
CREATE TABLE customers_auto_id (
332+
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
333+
name VARCHAR(255) NOT NULL DEFAULT 'flink',
334+
address VARCHAR(1024),
335+
phone_number VARCHAR(512)
336+
);
337+
338+
INSERT INTO customers_auto_id
339+
VALUES (default, 'user_1', 'Shanghai', '123567891234'),
340+
(default, 'user_2', 'Shanghai', '123567891234'),
341+
(default, 'user_3', 'Shanghai', '123567891234'),
342+
(default, 'user_4', 'Shanghai', '123567891234'),
343+
(default, 'user_5', 'Shanghai', '123567891234'),
344+
(default, 'user_6', 'Shanghai', '123567891234'),
345+
(default, 'user_7', 'Shanghai', '123567891234'),
346+
(default, 'user_8', 'Shanghai', '123567891234'),
347+
(default, 'user_9', 'Shanghai', '123567891234'),
348+
(default, 'user_10', 'Shanghai', '123567891234'),
349+
(default, 'user_11', 'Shanghai', '123567891234'),
350+
(default, 'user_12', 'Shanghai', '123567891234'),
351+
(default, 'user_13', 'Shanghai', '123567891234'),
352+
(default, 'user_14', 'Shanghai', '123567891234'),
353+
(default, 'user_15', 'Shanghai', '123567891234'),
354+
(default, 'user_16', 'Shanghai', '123567891234'),
355+
(default, 'user_17', 'Shanghai', '123567891234'),
356+
(default, 'user_18', 'Shanghai', '123567891234'),
357+
(default, 'user_19', 'Shanghai', '123567891234'),
358+
(default, 'user_20', 'Shanghai', '123567891234'),
359+
(default, 'user_21', 'Shanghai', '123567891234');

0 commit comments

Comments
 (0)