Skip to content

Commit 0a4c256

Browse files
authored
[FLINK-37262][pipeline-connector/mysql] Fix missing PARSE_ONLINE_SCHEMA_CHANGES option in MySqlDataSourceFactory
This closes apache#3910
1 parent 82bf8a0 commit 0a4c256

4 files changed

Lines changed: 54 additions & 5 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ public Set<ConfigOption<?>> optionalOptions() {
336336
options.add(INCLUDE_COMMENTS_ENABLED);
337337
options.add(USE_LEGACY_JSON_FORMAT);
338338
options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
339+
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
339340
return options;
340341
}
341342

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.stream.Collectors;
3838

3939
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
40+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
4041
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
4142
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
4243
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
@@ -258,14 +259,16 @@ public void testOptionalOption() {
258259

259260
// optional option
260261
options.put(TREAT_TINYINT1_AS_BOOLEAN_ENABLED.key(), "false");
262+
options.put(PARSE_ONLINE_SCHEMA_CHANGES.key(), "true");
261263

262264
Factory.Context context = new MockContext(Configuration.fromMap(options));
263265
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
264-
assertThat(factory.optionalOptions().contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED))
265-
.isEqualTo(true);
266+
assertThat(factory.optionalOptions())
267+
.contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED, PARSE_ONLINE_SCHEMA_CHANGES);
266268

267269
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
268-
assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isEqualTo(false);
270+
assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isFalse();
271+
assertThat(dataSource.getSourceConfig().isParseOnLineSchemaChanges()).isTrue();
269272
}
270273

271274
@Test

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,9 @@ public boolean equals(Object o) {
357357
&& Objects.equals(jdbcProperties, that.jdbcProperties)
358358
&& Objects.equals(heartbeatInterval, that.heartbeatInterval)
359359
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
360-
&& Objects.equals(skipSnapshotBackFill, that.skipSnapshotBackFill);
360+
&& Objects.equals(skipSnapshotBackFill, that.skipSnapshotBackFill)
361+
&& parseOnlineSchemaChanges == that.parseOnlineSchemaChanges
362+
&& useLegacyJsonFormat == that.useLegacyJsonFormat;
361363
}
362364

363365
@Override
@@ -390,7 +392,9 @@ public int hashCode() {
390392
jdbcProperties,
391393
heartbeatInterval,
392394
chunkKeyColumn,
393-
skipSnapshotBackFill);
395+
skipSnapshotBackFill,
396+
parseOnlineSchemaChanges,
397+
useLegacyJsonFormat);
394398
}
395399

396400
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,47 @@ public void testValidation() {
805805
}
806806
}
807807

808+
@Test
809+
public void testEnablingExperimentalOptions() {
810+
Map<String, String> properties = getAllOptions();
811+
properties.put("scan.parse.online.schema.changes.enabled", "true");
812+
properties.put("use.legacy.json.format", "true");
813+
814+
// validation for source
815+
DynamicTableSource actualSource = createTableSource(properties);
816+
MySqlTableSource expectedSource =
817+
new MySqlTableSource(
818+
SCHEMA,
819+
3306,
820+
MY_LOCALHOST,
821+
MY_DATABASE,
822+
MY_TABLE,
823+
MY_USERNAME,
824+
MY_PASSWORD,
825+
ZoneId.systemDefault(),
826+
PROPERTIES,
827+
null,
828+
false,
829+
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(),
830+
CHUNK_META_GROUP_SIZE.defaultValue(),
831+
SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
832+
CONNECT_TIMEOUT.defaultValue(),
833+
CONNECT_MAX_RETRIES.defaultValue(),
834+
CONNECTION_POOL_SIZE.defaultValue(),
835+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
836+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
837+
StartupOptions.initial(),
838+
false,
839+
false,
840+
new Properties(),
841+
HEARTBEAT_INTERVAL.defaultValue(),
842+
null,
843+
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
844+
true,
845+
true);
846+
assertEquals(expectedSource, actualSource);
847+
}
848+
808849
private Map<String, String> getAllOptions() {
809850
Map<String, String> options = new HashMap<>();
810851
options.put("connector", "mysql-cdc");

0 commit comments

Comments
 (0)