Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.mysql.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.util.CloseableIterator;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Integration tests to check MySQL pipeline connector works well with different MySQL server
* versions.
*/
@ParameterizedClass
@EnumSource(
value = MySqlVersion.class,
names = {"V5_7", "V8_0", "V8_4"})
class MySqlPipelineCompatibilityITCase {

private static final Logger LOG =
LoggerFactory.getLogger(MySqlPipelineCompatibilityITCase.class);

private static Path tempFolder;
private static File resourceFolder;

private final MySqlVersion version;
private final MySqlContainer mySqlContainer;
private final UniqueDatabase testDatabase;

private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

MySqlPipelineCompatibilityITCase(MySqlVersion version) {
this.version = version;
this.mySqlContainer =
(MySqlContainer)
new MySqlContainer(version)
.withConfigurationOverride(buildCustomMySqlConfig(version))
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withLogConsumer(new Slf4jLogConsumer(LOG));
this.testDatabase =
new UniqueDatabase(mySqlContainer, "inventory", TEST_USER, TEST_PASSWORD);
}

@BeforeEach
void setup() throws Exception {
// Initialize static resources if needed
if (resourceFolder == null) {
resourceFolder =
Paths.get(
Objects.requireNonNull(
MySqlPipelineCompatibilityITCase.class
.getClassLoader()
.getResource("."))
.toURI())
.toFile();
tempFolder = Files.createTempDirectory(resourceFolder.toPath(), "mysql-config");
}

env.setParallelism(4);
env.enableCheckpointing(200);
RestartStrategyUtils.configureNoRestartStrategy(env);

LOG.info("Starting container for MySQL {}...", version.getVersion());
Startables.deepStart(Stream.of(mySqlContainer)).join();
LOG.info("Container is started.");

testDatabase.createAndInitialize();
}

@AfterEach
void tearDown() {
try {
testDatabase.dropDatabase();
} catch (IllegalStateException e) {
LOG.warn("Failed to drop test database during teardown.", e);
}
if (mySqlContainer != null) {
LOG.info("Stopping container for MySQL {}...", version.getVersion());
mySqlContainer.stop();
LOG.info("Container is stopped.");
}
}

@Test
void testSnapshotRead() throws Exception {
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(mySqlContainer.getHost())
.port(mySqlContainer.getDatabasePort())
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(testDatabase.getDatabaseName())
.tableList(testDatabase.getDatabaseName() + ".products")
.startupOptions(StartupOptions.initial())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC");

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();

List<Event> snapshotEvents = fetchEvents(events, 9);

assertThat(snapshotEvents).hasSize(9);
assertThat(snapshotEvents.stream().filter(e -> e instanceof DataChangeEvent)).hasSize(9);

events.close();
}

@Test
void testBinlogRead() throws Exception {
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(mySqlContainer.getHost())
.port(mySqlContainer.getDatabasePort())
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(testDatabase.getDatabaseName())
.tableList(testDatabase.getDatabaseName() + ".products")
.startupOptions(StartupOptions.initial())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC");

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();

fetchEvents(events, 9);

try (Connection connection = testDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`products` VALUES (default,'test_product','desc',1.0);",
testDatabase.getDatabaseName()));
}

List<Event> binlogEvents = fetchEvents(events, 1);
assertThat(binlogEvents).hasSize(1);
assertThat(binlogEvents.get(0)).isInstanceOf(DataChangeEvent.class);

events.close();
}

private String getServerId(int parallelism) {
int serverId = (int) (Math.random() * 100) + 5400;
return serverId + "-" + (serverId + parallelism);
}

private List<Event> fetchEvents(CloseableIterator<Event> iterator, int count) {
List<Event> events = new ArrayList<>();
while (count > 0 && iterator.hasNext()) {
Event event = iterator.next();
if (event instanceof DataChangeEvent) {
events.add(event);
count--;
}
}
return events;
}

private String buildCustomMySqlConfig(MySqlVersion version) {
try {
if (resourceFolder == null) {
resourceFolder =
Paths.get(
Objects.requireNonNull(
MySqlPipelineCompatibilityITCase.class
.getClassLoader()
.getResource("."))
.toURI())
.toFile();
tempFolder = Files.createTempDirectory(resourceFolder.toPath(), "mysql-config");
}
// Create version-specific directory to avoid conflicts
Path versionDir =
Files.createDirectories(
Paths.get(
tempFolder.toString(), version.getVersion().replace(".", "_")));
Path cnf = Paths.get(versionDir.toString(), "my.cnf");
// Check if file already exists to avoid FileAlreadyExistsException
if (!Files.exists(cnf)) {
Files.createFile(cnf);
}
StringBuilder mysqlConfBuilder = new StringBuilder();
mysqlConfBuilder.append(
"[mysqld]\n"
+ "binlog_format = row\n"
+ "log_bin = mysql-bin\n"
+ "server-id = 223344\n"
+ "binlog_row_image = FULL\n"
+ "gtid-mode = OFF\n");

if (version == MySqlVersion.V8_0 || version == MySqlVersion.V8_4) {
mysqlConfBuilder.append("secure_file_priv=/var/lib/mysql\n");
}

Files.write(
cnf,
Collections.singleton(mysqlConfBuilder.toString()),
StandardCharsets.UTF_8,
StandardOpenOption.TRUNCATE_EXISTING);
return Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString();
} catch (Exception e) {
throw new RuntimeException("Failed to create my.cnf file.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,12 @@ public void run(SourceContext<T> sourceContext) throws Exception {
debeziumChangeFetcher.runFetchLoop();
} catch (Throwable t) {
if (t.getMessage() != null
&& t.getMessage()
.contains(
"A slave with the same server_uuid/server_id as this slave has connected to the master")) {
&& (t.getMessage()
.contains(
"A slave with the same server_uuid/server_id as this slave has connected to the master")
|| t.getMessage()
.contains(
"A replica with the same server_uuid/server_id as this replica has connected to the source"))) {
throw new RuntimeException(
"The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.\n"
+ "The server id conflict may happen in the following situations: \n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,32 @@

/**
* Copied from Debezium project(1.9.8.final) to add custom jdbc properties in the jdbc url. The new
* parameter {@code jdbcProperties} in the constructor of {@link MySqlConnectionConfiguration} will
* be used to generate the jdbc url pattern, and may overwrite the default value.
* parameter {@link MySqlConnectionConfiguration#MySqlConnectionConfiguration(Configuration config,
* Properties jdbcProperties)} in the constructor of {@link MySqlConnectionConfiguration} will be
* used to generate the jdbc url pattern, and may overwrite the default value.
*
* <p>Line 75: Add field {@code urlPattern} in {@link MySqlConnection} and remove old pattern.
* <p>Add field {@link MySqlConnection#urlPattern} in {@link MySqlConnection} and remove old
* pattern.
*
* <p>Line 92: Init {@code urlPattern} using the url pattern from {@link
* <p>Added MySQL 8.4+ compatible probing fields {@link
* MySqlConnection#MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT}, {@link
* MySqlConnection#MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT}, and {@link
* MySqlConnection#showBinaryLogStatement}.
*
* <p>Init {@link MySqlConnection#urlPattern} using the url pattern from {@link
* MySqlConnectionConfiguration}.
*
* <p>Line 544: Generate the connection string by the new field {@code urlPattern}.
* <p>Generate the connection string by the new field {@link MySqlConnection#urlPattern}.
*
* <p>Line 569 ~ 574: Add new constant and field {@code urlPattern} to {@link
* <p>Add new constant and field {@link MySqlConnection#urlPattern} to {@link
* MySqlConnectionConfiguration}.
*
* <p>Line 625: Init new field {@code urlPattern} in {@link MySqlConnectionConfiguration}.
* <p>Init new field {@link MySqlConnection#urlPattern} in {@link MySqlConnectionConfiguration}.
*
* <p>Add utility methods helping to generate the url pattern and add default values.
*
* <p>Line 715 ~ 741: Add some methods helping to generate the url pattern and add default values.
* <p>Added utility method {@link MySqlConnection#getShowBinaryLogStatement} and {@link
* MySqlConnection#probeShowBinaryLogStatement} for MySQL 8.4 compatibility.
*/
public class MySqlConnection extends JdbcConnection {

Expand All @@ -74,6 +84,10 @@ public class MySqlConnection extends JdbcConnection {

private final String urlPattern;

private static final String MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT = "SHOW MASTER STATUS";
private static final String MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT = "SHOW BINARY LOG STATUS";
private final String showBinaryLogStatement;

/**
* Creates a new connection using the supplied configuration.
*
Expand All @@ -90,6 +104,7 @@ public MySqlConnection(
this.connectionConfig = connectionConfig;
this.mysqlFieldReader = fieldReader;
this.urlPattern = connectionConfig.getUrlPattern();
this.showBinaryLogStatement = probeShowBinaryLogStatement();
}

/**
Expand Down Expand Up @@ -275,7 +290,7 @@ public boolean isGtidModeEnabled() {
public String knownGtidSet() {
try {
return queryAndMap(
"SHOW MASTER STATUS",
showBinaryLogStatement,
rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() > 4) {
return rs.getString(
Expand Down Expand Up @@ -769,4 +784,27 @@ public <T extends DatabaseSchema<TableId>> Object getColumnValue(
public String quotedTableIdString(TableId tableId) {
return tableId.toQuotedString('`');
}

public String getShowBinaryLogStatement() {
return showBinaryLogStatement;
}

private String probeShowBinaryLogStatement() {
LOGGER.info("Probing binary log statement.");
try {
// Attempt to query
query(MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT, rs -> {});
LOGGER.info(
"Successfully found show binary log statement with `{}`.",
MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT);
return MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT;
} catch (SQLException e) {
LOGGER.info(
"Probing with {} failed, fallback to classic {}. Caused by: {}",
MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT,
MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT,
e.getMessage());
return MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT;
}
}
}
Loading
Loading