Skip to content

Commit a052ad9

Browse files
yuxiqian.yxqlvyanquan
authored andcommitted
[FLINK-38139] Fix consecutive online schema change causes job failure.
1 parent 13c2456 commit a052ad9

5 files changed

Lines changed: 463 additions & 7 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.mysql.source;
19+
20+
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
21+
import org.apache.flink.cdc.common.configuration.Configuration;
22+
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
23+
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
24+
import org.apache.flink.cdc.common.utils.TestCaseUtils;
25+
import org.apache.flink.cdc.composer.PipelineExecution;
26+
import org.apache.flink.cdc.composer.definition.PipelineDef;
27+
import org.apache.flink.cdc.composer.definition.RouteDef;
28+
import org.apache.flink.cdc.composer.definition.SinkDef;
29+
import org.apache.flink.cdc.composer.definition.SourceDef;
30+
import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
31+
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
32+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
33+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
34+
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
35+
import org.apache.flink.cdc.connectors.values.ValuesDatabase;
36+
import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
37+
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
38+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
39+
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
40+
41+
import org.junit.jupiter.api.AfterAll;
42+
import org.junit.jupiter.api.AfterEach;
43+
import org.junit.jupiter.api.BeforeAll;
44+
import org.junit.jupiter.api.BeforeEach;
45+
import org.junit.jupiter.api.Test;
46+
import org.testcontainers.DockerClientFactory;
47+
import org.testcontainers.containers.Container;
48+
import org.testcontainers.containers.GenericContainer;
49+
import org.testcontainers.containers.output.Slf4jLogConsumer;
50+
import org.testcontainers.lifecycle.Startables;
51+
import org.testcontainers.utility.MountableFile;
52+
53+
import java.io.ByteArrayOutputStream;
54+
import java.io.IOException;
55+
import java.io.PrintStream;
56+
import java.sql.Connection;
57+
import java.sql.Statement;
58+
import java.util.Collections;
59+
import java.util.stream.Stream;
60+
61+
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
62+
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
63+
64+
/** A more complicated IT case for Evolving MySQL schema with gh-ost/pt-osc utility. */
65+
class MySqlOscITCase extends MySqlSourceTestBase {
66+
private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
67+
68+
private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7";
69+
70+
protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
71+
createPerconaToolkitContainer();
72+
73+
private final UniqueDatabase customerDatabase =
74+
new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD);
75+
76+
private final StreamExecutionEnvironment env =
77+
StreamExecutionEnvironment.getExecutionEnvironment();
78+
79+
private static final String GH_OST_RESOURCE_NAME =
80+
DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64")
81+
? "ghost-cli/gh-ost-binary-linux-amd64-20231207144046.tar.gz"
82+
: "ghost-cli/gh-ost-binary-linux-arm64-20231207144046.tar.gz";
83+
84+
private final PrintStream standardOut = System.out;
85+
private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream();
86+
87+
@BeforeEach
88+
void takeoverOutput() {
89+
System.setOut(new PrintStream(outCaptor));
90+
}
91+
92+
@AfterEach
93+
protected void handInStdOut() {
94+
System.setOut(standardOut);
95+
outCaptor.reset();
96+
}
97+
98+
@BeforeAll
99+
static void beforeClass() {
100+
LOG.info("Starting MySql8 containers...");
101+
Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
102+
Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join();
103+
LOG.info("Container MySql8 is started.");
104+
}
105+
106+
@AfterAll
107+
static void afterClass() {
108+
LOG.info("Stopping MySql8 containers...");
109+
MYSQL8_CONTAINER.stop();
110+
PERCONA_TOOLKIT_CONTAINER.stop();
111+
LOG.info("Container MySql8 is stopped.");
112+
}
113+
114+
@BeforeEach
115+
void before() {
116+
customerDatabase.createAndInitialize();
117+
TestValuesTableFactory.clearAllData();
118+
ValuesDatabase.clear();
119+
env.setParallelism(4);
120+
env.enableCheckpointing(200);
121+
env.setRestartStrategy(RestartStrategies.noRestart());
122+
}
123+
124+
@AfterEach
125+
void after() {
126+
customerDatabase.dropDatabase();
127+
}
128+
129+
private static void installGhOstCli(Container<?> container) {
130+
try {
131+
container.copyFileToContainer(
132+
MountableFile.forClasspathResource(GH_OST_RESOURCE_NAME), "/tmp/gh-ost.tar.gz");
133+
execInContainer(
134+
container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin");
135+
} catch (IOException | InterruptedException e) {
136+
throw new RuntimeException(e);
137+
}
138+
}
139+
140+
private static GenericContainer<?> createPerconaToolkitContainer() {
141+
GenericContainer<?> perconaToolkit =
142+
new GenericContainer<>(PERCONA_TOOLKIT)
143+
// keep container alive
144+
.withCommand("tail", "-f", "/dev/null")
145+
.withNetwork(NETWORK)
146+
.withLogConsumer(new Slf4jLogConsumer(LOG));
147+
return perconaToolkit;
148+
}
149+
150+
private void insertRecordsPhase1(UniqueDatabase database, int startIndex, int count)
151+
throws Exception {
152+
try (Connection connection = database.getJdbcConnection();
153+
Statement statement = connection.createStatement()) {
154+
for (int i = startIndex; i < startIndex + count; i++) {
155+
statement.execute(
156+
String.format(
157+
"insert into customers (id, name, address, phone_number) values (%s, '%s', '%s', '%s');",
158+
i, "flink_" + i, "Address Line #" + i, 1000000000L + i));
159+
}
160+
}
161+
}
162+
163+
private void insertRecordsPhase2(UniqueDatabase database, int startIndex, int count)
164+
throws Exception {
165+
try (Connection connection = database.getJdbcConnection();
166+
Statement statement = connection.createStatement()) {
167+
for (int i = startIndex; i < startIndex + count; i++) {
168+
statement.execute(
169+
String.format(
170+
"insert into customers (id, name, address, phone_number, ext) values (%s, '%s', '%s', '%s', %s);",
171+
i, "flink_" + i, "Address Line #" + i, 1000000000L + i, i));
172+
}
173+
}
174+
}
175+
176+
@Test
177+
void testGhOstSchemaMigration() throws Exception {
178+
String databaseName = customerDatabase.getDatabaseName();
179+
180+
LOG.info("Step 1: Install gh-ost command line utility");
181+
installGhOstCli(MYSQL8_CONTAINER);
182+
183+
Thread yamlJob = runJob(databaseName, "customers");
184+
yamlJob.start();
185+
186+
LOG.info("Step 2: Start pipeline job");
187+
insertRecordsPhase1(customerDatabase, 5000, 1000);
188+
189+
LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
190+
191+
Thread thread =
192+
new Thread(
193+
() -> {
194+
try {
195+
execInContainer(
196+
MYSQL8_CONTAINER,
197+
"evolve schema",
198+
"gh-ost",
199+
"--user=" + TEST_USER,
200+
"--password=" + TEST_PASSWORD,
201+
"--database=" + databaseName,
202+
"--table=customers",
203+
"--alter=add column ext int first",
204+
"--allow-on-master", // because we don't have a replica
205+
"--initially-drop-old-table", // drop previously generated
206+
// temporary tables
207+
"--execute");
208+
} catch (IOException | InterruptedException e) {
209+
throw new RuntimeException(e);
210+
}
211+
});
212+
213+
thread.start();
214+
insertRecordsPhase1(customerDatabase, 7000, 3000);
215+
216+
thread.join();
217+
insertRecordsPhase2(customerDatabase, 12000, 1000);
218+
219+
try {
220+
TestCaseUtils.repeatedCheck(
221+
() -> outCaptor.toString().split(System.lineSeparator()).length == 5023);
222+
} catch (Exception e) {
223+
LOG.error("Failed to verify results. Captured stdout: {}", outCaptor.toString(), e);
224+
} finally {
225+
yamlJob.interrupt();
226+
}
227+
}
228+
229+
@Test
230+
void testPtOscSchemaMigration() throws Exception {
231+
String databaseName = customerDatabase.getDatabaseName();
232+
233+
LOG.info("Step 1: Install gh-ost command line utility");
234+
installGhOstCli(MYSQL8_CONTAINER);
235+
236+
Thread yamlJob = runJob(databaseName, "customers");
237+
yamlJob.start();
238+
239+
LOG.info("Step 2: Start pipeline job");
240+
insertRecordsPhase1(customerDatabase, 5000, 1000);
241+
242+
LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
243+
244+
Thread thread =
245+
new Thread(
246+
() -> {
247+
try {
248+
execInContainer(
249+
PERCONA_TOOLKIT_CONTAINER,
250+
"evolve schema",
251+
"pt-online-schema-change",
252+
"--user=" + TEST_USER,
253+
"--host=" + INTER_CONTAINER_MYSQL_ALIAS,
254+
"--password=" + TEST_PASSWORD,
255+
"P=3306,t=customers,D=" + databaseName,
256+
"--alter",
257+
"add column ext int first",
258+
"--charset=utf8",
259+
"--recursion-method=NONE", // Do not look for slave nodes
260+
"--print",
261+
"--execute");
262+
} catch (IOException | InterruptedException e) {
263+
throw new RuntimeException(e);
264+
}
265+
});
266+
267+
LOG.info("Insertion Phase 1 finishes");
268+
thread.start();
269+
insertRecordsPhase1(customerDatabase, 7000, 3000);
270+
LOG.info("Insertion Phase 2 finishes");
271+
272+
thread.join();
273+
insertRecordsPhase2(customerDatabase, 12000, 1000);
274+
LOG.info("Insertion Phase 3 finishes");
275+
276+
try {
277+
TestCaseUtils.repeatedCheck(
278+
() -> outCaptor.toString().split(System.lineSeparator()).length == 5023);
279+
} catch (Exception e) {
280+
LOG.error("Failed to verify results. Captured stdout: {}", outCaptor.toString(), e);
281+
} finally {
282+
yamlJob.interrupt();
283+
}
284+
}
285+
286+
private static void execInContainer(Container<?> container, String prompt, String... commands)
287+
throws IOException, InterruptedException {
288+
{
289+
LOG.info(
290+
"Starting to {} with the following command: `{}`",
291+
prompt,
292+
String.join(" ", commands));
293+
Container.ExecResult execResult = container.execInContainer(commands);
294+
if (execResult.getExitCode() == 0) {
295+
LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout());
296+
} else {
297+
LOG.error(
298+
"Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}",
299+
prompt,
300+
execResult.getExitCode(),
301+
execResult.getStdout(),
302+
execResult.getStderr());
303+
throw new IOException("Failed to execute commands: " + String.join(" ", commands));
304+
}
305+
}
306+
}
307+
308+
private Thread runJob(String databaseName, String tableName) {
309+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
310+
311+
// Setup MySQL source
312+
Configuration sourceConfig = new Configuration();
313+
sourceConfig.set(MySqlDataSourceOptions.HOSTNAME, MYSQL8_CONTAINER.getHost());
314+
sourceConfig.set(MySqlDataSourceOptions.PORT, MYSQL8_CONTAINER.getDatabasePort());
315+
sourceConfig.set(MySqlDataSourceOptions.USERNAME, TEST_USER);
316+
sourceConfig.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD);
317+
sourceConfig.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC");
318+
sourceConfig.set(MySqlDataSourceOptions.TABLES, databaseName + "." + tableName);
319+
sourceConfig.set(MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES, true);
320+
321+
SourceDef sourceDef =
322+
new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "MySQL Source", sourceConfig);
323+
324+
Configuration sinkConfig = new Configuration();
325+
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
326+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
327+
328+
Configuration pipelineConfig = new Configuration();
329+
pipelineConfig.set(
330+
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
331+
PipelineDef pipelineDef =
332+
new PipelineDef(
333+
sourceDef,
334+
sinkDef,
335+
Collections.singletonList(
336+
new RouteDef(
337+
databaseName + "." + tableName,
338+
"sink_db.sink_tbl",
339+
null,
340+
null)),
341+
Collections.emptyList(),
342+
Collections.emptyList(),
343+
pipelineConfig);
344+
345+
PipelineExecution execution = composer.compose(pipelineDef);
346+
return new Thread(
347+
() -> {
348+
try {
349+
execution.execute();
350+
} catch (Exception e) {
351+
throw new RuntimeException(e);
352+
}
353+
});
354+
}
355+
}

0 commit comments

Comments
 (0)