Skip to content

Commit b393d06

Browse files
committed
[FLINK-39252][sqlserver] fix: replace Java 9+ APIs with Java 8 compatible code in SQL Server connector
1 parent 1c6b293 commit b393d06

5 files changed

Lines changed: 21 additions & 21 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/factory/SqlServerDataSourceFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import java.util.HashSet;
5050
import java.util.List;
5151
import java.util.Map;
52-
import java.util.Objects;
5352
import java.util.Properties;
5453
import java.util.Set;
5554
import java.util.stream.Collectors;
@@ -274,7 +273,9 @@ public String identifier() {
274273

275274
private static List<String> getTableList(
276275
@Nullable List<TableId> tableIdList, Selectors selectors) {
277-
return Objects.requireNonNullElse(tableIdList, Collections.<TableId>emptyList()).stream()
276+
List<TableId> resolvedTableIdList =
277+
tableIdList == null ? Collections.emptyList() : tableIdList;
278+
return resolvedTableIdList.stream()
278279
.filter(selectors::isMatch)
279280
// SQL Server tableList format: schemaName.tableName (without database prefix)
280281
// See SqlServerSourceBuilder: "Each identifier is of the form

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerOnlineSchemaMigrationITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import java.util.Arrays;
5454
import java.util.Collections;
5555
import java.util.List;
56-
import java.util.Map;
5756
import java.util.stream.Collectors;
5857
import java.util.stream.Stream;
5958

@@ -187,7 +186,7 @@ void testSchemaMigrationFromScratch() throws Exception {
187186
new AlterColumnTypeEvent(
188187
tableId,
189188
Collections.singletonMap("ext", DataTypes.DOUBLE()),
190-
Map.of("ext", DataTypes.INT())),
189+
Collections.singletonMap("ext", DataTypes.INT())),
191190
DataChangeEvent.insertEvent(
192191
tableId,
193192
generate(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerPipelineITCaseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ private List<Event> getSnapshotExpected(TableId tableId) {
438438
108,
439439
BinaryStringData.fromString("jacket"),
440440
BinaryStringData.fromString(
441-
"water resistent black wind breaker"),
441+
"water resistant black wind breaker"),
442442
0.1d
443443
})));
444444
snapshotExpected.add(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerTablePatternMatchingTest.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,11 @@ public void before() {
5555
private void initializePatternTestDatabase() {
5656
try (Connection connection = getJdbcConnection();
5757
Statement statement = connection.createStatement()) {
58-
// Drop database if exists
59-
statement.execute(
60-
String.format(
61-
"IF EXISTS(select 1 from sys.databases where name = '%s') "
62-
+ "BEGIN ALTER DATABASE [%s] SET SINGLE_USER WITH ROLLBACK IMMEDIATE; "
63-
+ "DROP DATABASE [%s]; END",
64-
DATABASE_NAME, DATABASE_NAME, DATABASE_NAME));
58+
dropTestDatabase(connection, DATABASE_NAME);
6559

6660
// Create database
67-
statement.execute(String.format("CREATE DATABASE %s;", DATABASE_NAME));
68-
statement.execute(String.format("USE %s;", DATABASE_NAME));
61+
statement.execute(String.format("CREATE DATABASE [%s];", DATABASE_NAME));
62+
statement.execute(String.format("USE [%s];", DATABASE_NAME));
6963

7064
// Wait for SQL Server Agent
7165
statement.execute("WAITFOR DELAY '00:00:03';");

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/SqlServerTestBase.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@
3333
import org.testcontainers.containers.output.Slf4jLogConsumer;
3434
import org.testcontainers.lifecycle.Startables;
3535

36-
import java.net.URL;
37-
import java.nio.file.Files;
38-
import java.nio.file.Paths;
36+
import java.io.BufferedReader;
37+
import java.io.InputStream;
38+
import java.io.InputStreamReader;
39+
import java.nio.charset.StandardCharsets;
3940
import java.sql.Connection;
4041
import java.sql.DriverManager;
4142
import java.sql.SQLException;
@@ -92,7 +93,7 @@ protected Connection getJdbcConnection() throws SQLException {
9293
MSSQL_SERVER_CONTAINER.getPassword());
9394
}
9495

95-
private static void dropTestDatabase(Connection connection, String databaseName)
96+
protected static void dropTestDatabase(Connection connection, String databaseName)
9697
throws SQLException {
9798
try {
9899
Awaitility.await("Disabling CDC")
@@ -175,14 +176,19 @@ protected static void disableDbCdc(Connection connection, String name) throws SQ
175176
*/
176177
protected void initializeSqlServerTable(String sqlFile) {
177178
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
178-
final URL ddlTestFile = SqlServerTestBase.class.getClassLoader().getResource(ddlFile);
179+
final InputStream ddlTestFile =
180+
SqlServerTestBase.class.getClassLoader().getResourceAsStream(ddlFile);
179181
Assertions.assertThat(ddlTestFile).withFailMessage("Cannot locate " + ddlFile).isNotNull();
180-
try (Connection connection = getJdbcConnection();
182+
try (InputStream inputStream = ddlTestFile;
183+
BufferedReader reader =
184+
new BufferedReader(
185+
new InputStreamReader(inputStream, StandardCharsets.UTF_8));
186+
Connection connection = getJdbcConnection();
181187
Statement statement = connection.createStatement()) {
182188
dropTestDatabase(connection, sqlFile);
183189
final List<String> statements =
184190
Arrays.stream(
185-
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
191+
reader.lines()
186192
.map(String::trim)
187193
.filter(x -> !x.startsWith("--") && !x.isEmpty())
188194
.map(

0 commit comments

Comments
 (0)