Skip to content

Commit 6aaa3e5

Browse files
committed
[FLINK-39252][sqlserver] Address review feedback for pipeline and source connectors
- handle null namespace in metadata accessor - register chunk-key option and improve tables validation messages - relax SQL Server database-name validation and escape bracket quoting - fix FLOAT/REAL type mapping and clean up option/docs wording - tighten tests for chunk-key support, malformed tables format, and quote escaping - fail fast on SQL Server error 313 when CDC LSN window has been purged
1 parent da3a099 commit 6aaa3e5

16 files changed

Lines changed: 211 additions & 81 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/factory/SqlServerDataSourceFactory.java

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import java.util.List;
5151
import java.util.Map;
5252
import java.util.Objects;
53-
import java.util.Optional;
5453
import java.util.Properties;
5554
import java.util.Set;
5655
import java.util.stream.Collectors;
@@ -77,8 +76,6 @@
7776
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
7877
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.SCHEMA_CHANGE_ENABLED;
7978
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.SERVER_TIME_ZONE;
80-
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
81-
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
8279
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.TABLES;
8380
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.TABLES_EXCLUDE;
8481
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.USERNAME;
@@ -141,7 +138,7 @@ public DataSource createDataSource(Context context) {
141138

142139
Map<String, String> configMap = config.toMap();
143140
mergeJdbcPropertiesIntoDebeziumProperties(configMap);
144-
String databaseName = getValidateDatabaseName(tables).orElseThrow();
141+
String databaseName = getValidateDatabaseName(tables);
145142

146143
SqlServerSourceConfigFactory configFactory = new SqlServerSourceConfigFactory();
147144
configFactory
@@ -249,6 +246,7 @@ public Set<ConfigOption<?>> optionalOptions() {
249246
Set<ConfigOption<?>> options = new HashSet<>();
250247
options.add(PORT);
251248
options.add(TABLES_EXCLUDE);
249+
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
252250
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
253251
options.add(SCAN_SNAPSHOT_FETCH_SIZE);
254252
options.add(SCHEMA_CHANGE_ENABLED);
@@ -341,7 +339,7 @@ private void validateDistributionFactorUpper(double distributionFactorUpper) {
341339
doubleCompare(distributionFactorUpper, 1.0d) >= 0,
342340
String.format(
343341
"The value of option '%s' must larger than or equals %s, but is %s",
344-
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(),
342+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(),
345343
1.0d,
346344
distributionFactorUpper));
347345
}
@@ -353,7 +351,7 @@ private void validateDistributionFactorLower(double distributionFactorLower) {
353351
&& doubleCompare(distributionFactorLower, 1.0d) <= 0,
354352
String.format(
355353
"The value of option '%s' must between %s and %s inclusively, but is %s",
356-
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(),
354+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(),
357355
0.0d,
358356
1.0d,
359357
distributionFactorLower));
@@ -365,41 +363,35 @@ && doubleCompare(distributionFactorLower, 1.0d) <= 0,
365363
* @param tables Table name list, format is "db.schema.table,db.schema.table,..." Each table
366364
* name consists of three parts separated by ".", which are database name, schema name, and
367365
* table name.
368-
* @return Database name if found, otherwise returns Optional.empty()
369366
* @throws IllegalArgumentException If the input parameter is null or does not match the
370367
* expected format, or if database names are inconsistent.
371368
*/
372-
private Optional<String> getValidateDatabaseName(String tables) {
373-
// Input validation
369+
private String getValidateDatabaseName(String tables) {
374370
if (tables == null || tables.trim().isEmpty()) {
375371
throw new IllegalArgumentException("Parameter tables cannot be null or empty");
376372
}
377373

378-
// Split table name list
379374
String[] tableNames = tables.split(",");
380375
String dbName = null;
381376

382377
for (String tableName : tableNames) {
383-
// Trim whitespace and split table name
384378
String trimmedTableName = tableName.trim();
385-
if (!trimmedTableName.contains(".")) {
386-
continue; // Skip table names that do not match the expected format
387-
}
388-
389-
String[] tableNameParts =
390-
trimmedTableName.split(
391-
"(?<!\\\\)\\.", -1); // Use -1 to avoid ignoring trailing empty elements
379+
String[] tableNameParts = trimmedTableName.split("(?<!\\\\)\\.", -1);
392380

393381
checkState(
394382
tableNameParts.length == 3,
395383
String.format(
396-
"Tables format must db.schema.table, can not 'tables' = %s",
397-
TABLES.key()));
384+
"Table '%s' does not match the expected 'database.schema.table' "
385+
+ "format. Please check the value of option '%s'.",
386+
trimmedTableName, TABLES.key()));
398387
String currentDbName = tableNameParts[0];
399388

400389
checkState(
401390
isValidSqlServerDbName(currentDbName),
402-
String.format("%s is not a valid SQL Server database name", currentDbName));
391+
String.format(
392+
"Database name '%s' exceeds SQL Server's maximum identifier length "
393+
+ "of 128 characters.",
394+
currentDbName));
403395
if (dbName == null) {
404396
dbName = currentDbName;
405397
} else {
@@ -411,20 +403,18 @@ private Optional<String> getValidateDatabaseName(String tables) {
411403
}
412404
}
413405

414-
// If no valid table name is found, return Optional.empty()
415-
return Optional.ofNullable(dbName);
406+
if (dbName == null) {
407+
throw new IllegalArgumentException(
408+
"Cannot determine database name from 'tables' option: "
409+
+ tables
410+
+ ". Expected format: database.schema.table");
411+
}
412+
return dbName;
416413
}
417414

418415
/** Validate if the database name conforms to SQL Server naming conventions. */
419416
private boolean isValidSqlServerDbName(String dbName) {
420-
// SQL Server database name conventions:
421-
// 1. Length does not exceed 128 characters
422-
// 2. Can contain letters, numbers, underscores, at sign, and dollar signs
423-
// 3. Cannot start with a digit or special character (except underscore)
424-
if (dbName == null || dbName.length() > 128) {
425-
return false;
426-
}
427-
return dbName.matches("[a-zA-Z_@#][a-zA-Z0-9_@#$]*");
417+
return !StringUtils.isNullOrWhitespaceOnly(dbName) && dbName.length() <= 128;
428418
}
429419

430420
/** Replaces the default timezone placeholder with session timezone, if applicable. */

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerDataSourceOptions.java

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class SqlServerDataSourceOptions {
4545
.stringType()
4646
.noDefaultValue()
4747
.withDescription(
48-
"Name of the SQL Server database to use when connecting to the SQL Server database server.");
48+
"Name of the SQL Server user to use when connecting to the SQL Server database server.");
4949

5050
public static final ConfigOption<String> PASSWORD =
5151
ConfigOptions.key("password")
@@ -131,30 +131,6 @@ public class SqlServerDataSourceOptions {
131131
.withDescription(
132132
"Optional timestamp used in case of \"timestamp\" startup mode");
133133

134-
public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
135-
ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound")
136-
.doubleType()
137-
.defaultValue(1000.0d)
138-
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
139-
.withDescription(
140-
"The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the"
141-
+ " table is evenly distribution or not."
142-
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
143-
+ " and the query for splitting would happen when it is uneven."
144-
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
145-
146-
public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
147-
ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound")
148-
.doubleType()
149-
.defaultValue(0.05d)
150-
.withFallbackKeys("split-key.even-distribution.factor.lower-bound")
151-
.withDescription(
152-
"The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the"
153-
+ " table is evenly distribution or not."
154-
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
155-
+ " and the query for splitting would happen when it is uneven."
156-
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
157-
158134
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP =
159135
ConfigOptions.key("scan.incremental.snapshot.backfill.skip")
160136
.booleanType()

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerMetadataAccessor.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.cdc.common.source.MetadataAccessor;
2424
import org.apache.flink.cdc.connectors.sqlserver.source.config.SqlServerSourceConfig;
2525
import org.apache.flink.cdc.connectors.sqlserver.utils.SqlServerSchemaUtils;
26+
import org.apache.flink.table.api.ValidationException;
2627

2728
import javax.annotation.Nullable;
2829

@@ -56,7 +57,22 @@ public List<String> listNamespaces() {
5657
*/
5758
@Override
5859
public List<String> listSchemas(@Nullable String namespace) {
59-
return SqlServerSchemaUtils.listSchemas(sourceConfig, namespace);
60+
return SqlServerSchemaUtils.listSchemas(sourceConfig, resolveNamespace(namespace));
61+
}
62+
63+
private String resolveNamespace(@Nullable String namespace) {
64+
if (namespace != null) {
65+
return namespace;
66+
}
67+
68+
List<String> configuredDatabases = sourceConfig.getDatabaseList();
69+
if (configuredDatabases != null && !configuredDatabases.isEmpty()) {
70+
return configuredDatabases.get(0);
71+
}
72+
73+
throw new ValidationException(
74+
"Namespace must not be null when listing SQL Server schemas and no database "
75+
+ "is configured in the source configuration.");
6076
}
6177

6278
/**

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSchemaDataTypeInference.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ public class SqlServerSchemaDataTypeInference extends DebeziumSchemaDataTypeInfe
2929

3030
private static final long serialVersionUID = 1L;
3131

32-
// SQL Server doesn't have special geometry types like PostgreSQL,
33-
// so we can use the default implementation from the parent class.
34-
// If there are SQL Server specific types that need special handling,
35-
// they can be added here by overriding the inferStruct method.
32+
// SQL Server has database-specific types, but no special handling is currently
33+
// needed here, so this class uses the default implementation from the parent class.
34+
// If SQL Server-specific types require special handling in the future,
35+
// it can be added here by overriding the inferStruct method.
3636
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/utils/SqlServerSchemaUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public static List<String> listNamespaces(JdbcConnection jdbc) throws SQLExcepti
160160
}
161161

162162
public static String quote(String dbOrTableName) {
163-
return "[" + dbOrTableName + "]";
163+
return "[" + dbOrTableName.replace("]", "]]") + "]";
164164
}
165165

166166
public static Schema getTableSchema(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/utils/SqlServerTypeUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ private static DataType convertFromColumn(Column column) {
7979
case Types.BIGINT:
8080
return DataTypes.BIGINT();
8181
case Types.REAL:
82-
case Types.FLOAT:
8382
return DataTypes.FLOAT();
83+
case Types.FLOAT:
84+
return DataTypes.DOUBLE();
8485
case Types.DOUBLE:
8586
return DataTypes.DOUBLE();
8687
case Types.NUMERIC:

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/factory/SqlServerDataSourceFactoryTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.HOSTNAME;
4141
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.PASSWORD;
4242
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.PORT;
43+
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
4344
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.SCAN_STARTUP_MODE;
4445
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
4546
import static org.apache.flink.cdc.connectors.sqlserver.source.SqlServerDataSourceOptions.TABLES;
@@ -215,6 +216,26 @@ public void testOptionalOption() {
215216
.isEqualTo(MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT));
216217
}
217218

219+
@Test
220+
public void testChunkKeyColumnOptionIsSupported() {
221+
Map<String, String> options = new HashMap<>();
222+
options.put(HOSTNAME.key(), MSSQL_SERVER_CONTAINER.getHost());
223+
options.put(
224+
PORT.key(),
225+
String.valueOf(MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT)));
226+
options.put(USERNAME.key(), MSSQL_SERVER_CONTAINER.getUsername());
227+
options.put(PASSWORD.key(), MSSQL_SERVER_CONTAINER.getPassword());
228+
options.put(TABLES.key(), DATABASE_NAME + ".dbo.products");
229+
options.put(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key(), "id");
230+
231+
Factory.Context context = new MockContext(Configuration.fromMap(options));
232+
SqlServerDataSourceFactory factory = new SqlServerDataSourceFactory();
233+
234+
assertThat(factory.optionalOptions()).contains(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
235+
SqlServerDataSource dataSource = (SqlServerDataSource) factory.createDataSource(context);
236+
assertThat(dataSource.getSqlServerSourceConfig().getChunkKeyColumn()).isEqualTo("id");
237+
}
238+
218239
@Test
219240
public void testStartupFromTimestamp() {
220241
Map<String, String> options = new HashMap<>();
@@ -317,6 +338,29 @@ public void testTableValidationWithDifferentDatabases() {
317338
"The value of option `tables` is `db1.dbo.table1,db2.dbo.table2`, but not all table names have the same database name");
318339
}
319340

341+
@Test
342+
public void testTableValidationRequiresDatabaseSchemaTableFormat() {
343+
Map<String, String> options = new HashMap<>();
344+
options.put(HOSTNAME.key(), MSSQL_SERVER_CONTAINER.getHost());
345+
options.put(
346+
PORT.key(),
347+
String.valueOf(MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT)));
348+
options.put(USERNAME.key(), MSSQL_SERVER_CONTAINER.getUsername());
349+
options.put(PASSWORD.key(), MSSQL_SERVER_CONTAINER.getPassword());
350+
options.put(TABLES.key(), DATABASE_NAME + ".dbo");
351+
352+
SqlServerDataSourceFactory factory = new SqlServerDataSourceFactory();
353+
Factory.Context context = new MockContext(Configuration.fromMap(options));
354+
355+
assertThatThrownBy(() -> factory.createDataSource(context))
356+
.isInstanceOf(IllegalStateException.class)
357+
.hasMessageContaining(
358+
"Table '"
359+
+ DATABASE_NAME
360+
+ ".dbo' does not match the expected 'database.schema.table' format.")
361+
.hasMessageContaining(TABLES.key());
362+
}
363+
320364
static class MockContext implements Factory.Context {
321365

322366
Configuration factoryConfiguration;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerMetadataAccessorITCase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ public void testListSchemas() {
7474
assertThat(schemas).contains("dbo");
7575
}
7676

77+
@Test
78+
public void testListSchemasUsesConfiguredDatabaseWhenNamespaceIsNull() {
79+
String[] tables = new String[] {"dbo.full_types"};
80+
SqlServerMetadataAccessor metadataAccessor = getMetadataAccessor(tables);
81+
82+
assertThat(metadataAccessor.listSchemas(null)).contains("dbo");
83+
}
84+
7785
@Test
7886
public void testListTables() {
7987
String[] tables =

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerOnlineSchemaMigrationITCase.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,6 @@ void testSchemaMigrationFromScratch() throws Exception {
106106
new EventTypeInfo())
107107
.executeAndCollect();
108108

109-
Thread.sleep(5_000);
110-
111109
List<Event> expected = new ArrayList<>();
112110
Schema schemaV1 =
113111
Schema.newBuilder()
@@ -124,9 +122,6 @@ void testSchemaMigrationFromScratch() throws Exception {
124122
.containsExactlyInAnyOrderElementsOf(
125123
expected.stream().map(Object::toString).collect(Collectors.toList()));
126124

127-
// Wait for a little while until we're in streaming mode.
128-
Thread.sleep(5_000);
129-
130125
// ADD COLUMN
131126
try (Connection connection = getJdbcConnection();
132127
Statement statement = connection.createStatement()) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerPipelineITCaseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T side
336336
while (size > 0 && iter.hasNext()) {
337337
T event = iter.next();
338338
if (sideEvent.getClass().isInstance(event)) {
339-
sideResults.add(sideEvent);
339+
sideResults.add(event);
340340
} else {
341341
result.add(event);
342342
size--;

0 commit comments

Comments
 (0)