Skip to content

Commit 3a52939

Browse files
committed
clean up
1 parent 4458a29 commit 3a52939

14 files changed

Lines changed: 19 additions & 614 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ public MySqlPipelineRecordEmitter(
9393
debeziumDeserializationSchema,
9494
sourceReaderMetrics,
9595
sourceConfig.isIncludeSchemaChanges(),
96-
false, // Explicitly disable transaction metadata events
9796
false); // Explicitly disable heartbeat events
9897
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
9998
this.sourceConfig = sourceConfig;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.cdc.connectors.mysql.debezium;
1919

20-
import com.github.shyiko.mysql.binlog.network.SSLMode;
2120
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
2221
import org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionFactory;
2322
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
@@ -243,26 +242,12 @@ private static Map<String, String> querySystemVariables(
243242
return variables;
244243
}
245244

246-
static SSLMode sslModeFor(MySqlConnectorConfig.SecureConnectionMode mode) {
247-
try {
248-
return mode == null ? null : SSLMode.valueOf(mode.name());
249-
} catch (IllegalArgumentException e) {
250-
throw new FlinkRuntimeException(
251-
String.format("Invalid SecureConnectionMode provided: %s ", mode.name()), e);
252-
}
253-
}
254-
255245
public static BinlogOffset findBinlogOffset(
256246
long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) {
257247
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
258248
BinaryLogClient client =
259249
new BinaryLogClient(
260250
config.hostname(), config.port(), config.username(), config.password());
261-
SSLMode sslMode = sslModeFor(config.sslMode());
262-
if (sslMode != null) {
263-
client.setSSLMode(sslMode);
264-
}
265-
266251
if (mySqlSourceConfig.getServerIdRange() != null) {
267252
client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId());
268253
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ public static <T> MySqlSourceBuilder<T> builder() {
133133
deserializationSchema,
134134
sourceReaderMetrics,
135135
sourceConfig.isIncludeSchemaChanges(),
136-
sourceConfig.isIncludeTransactionMetadataEvents(),
137136
sourceConfig.isIncludeHeartbeatEvents()));
138137
}
139138

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,6 @@ public MySqlSourceBuilder<T> includeSchemaChanges(boolean includeSchemaChanges)
198198
return this;
199199
}
200200

201-
/** Whether the {@link MySqlSource} should output the transaction metadata events or not. */
202-
public MySqlSourceBuilder<T> includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) {
203-
this.configFactory.includeTransactionMetadataEvents(includeTransactionMetadataEvents);
204-
return this;
205-
}
206-
207201
/** Whether the {@link MySqlSource} should scan the newly added tables or not. */
208202
public MySqlSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
209203
this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
@@ -249,7 +243,8 @@ public MySqlSourceBuilder<T> heartbeatInterval(Duration heartbeatInterval) {
249243
* @param heartbeatInterval the interval of heartbeat event
250244
* @param includeHeartbeatEvents whether to emit heartbeat events
251245
*/
252-
public MySqlSourceBuilder<T> heartbeatInterval(Duration heartbeatInterval, boolean includeHeartbeatEvents) {
246+
public MySqlSourceBuilder<T> heartbeatInterval(
247+
Duration heartbeatInterval, boolean includeHeartbeatEvents) {
253248
this.configFactory.heartbeatInterval(heartbeatInterval);
254249
this.configFactory.includeHeartbeatEvents(includeHeartbeatEvents);
255250
return this;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public class MySqlSourceConfig implements Serializable {
6262
private final double distributionFactorUpper;
6363
private final double distributionFactorLower;
6464
private final boolean includeSchemaChanges;
65-
private final boolean includeTransactionMetadataEvents;
6665
private final boolean includeHeartbeatEvents;
6766
private final boolean scanNewlyAddedTableEnabled;
6867
private final boolean closeIdleReaders;
@@ -101,7 +100,6 @@ public class MySqlSourceConfig implements Serializable {
101100
double distributionFactorUpper,
102101
double distributionFactorLower,
103102
boolean includeSchemaChanges,
104-
boolean includeTransactionMetadataEvents,
105103
boolean includeHeartbeatEvents,
106104
boolean scanNewlyAddedTableEnabled,
107105
boolean closeIdleReaders,
@@ -132,7 +130,6 @@ public class MySqlSourceConfig implements Serializable {
132130
this.distributionFactorUpper = distributionFactorUpper;
133131
this.distributionFactorLower = distributionFactorLower;
134132
this.includeSchemaChanges = includeSchemaChanges;
135-
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
136133
this.includeHeartbeatEvents = includeHeartbeatEvents;
137134
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
138135
this.closeIdleReaders = closeIdleReaders;
@@ -233,10 +230,6 @@ public boolean isIncludeSchemaChanges() {
233230
return includeSchemaChanges;
234231
}
235232

236-
public boolean isIncludeTransactionMetadataEvents() {
237-
return includeTransactionMetadataEvents;
238-
}
239-
240233
public boolean isIncludeHeartbeatEvents() {
241234
return includeHeartbeatEvents;
242235
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ public class MySqlSourceConfigFactory implements Serializable {
6363
private double distributionFactorLower =
6464
MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
6565
private boolean includeSchemaChanges = false;
66-
private boolean includeTransactionMetadataEvents = false;
6766
private boolean includeHeartbeatEvents = false;
6867
private boolean scanNewlyAddedTableEnabled = false;
6968
private boolean closeIdleReaders = false;
@@ -237,12 +236,6 @@ public MySqlSourceConfigFactory includeSchemaChanges(boolean includeSchemaChange
237236
return this;
238237
}
239238

240-
/** Whether the {@link MySqlSource} should output the transaction metadata events or not. */
241-
public MySqlSourceConfigFactory includeTransactionMetadataEvents(boolean includeTransactionMetadataEvents) {
242-
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
243-
return this;
244-
}
245-
246239
/** Whether the {@link MySqlSource} should output the heartbeat events or not. */
247240
public MySqlSourceConfigFactory includeHeartbeatEvents(boolean includeHeartbeatEvents) {
248241
this.includeHeartbeatEvents = includeHeartbeatEvents;
@@ -373,8 +366,6 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
373366
// Note: the includeSchemaChanges parameter is used to control emitting the schema record,
374367
// only DataStream API program need to emit the schema record, the Table API need not
375368
props.setProperty("include.schema.changes", String.valueOf(true));
376-
// enable transaction metadata if includeTransactionMetadataEvents is true
377-
props.setProperty("provide.transaction.metadata", String.valueOf(includeTransactionMetadataEvents));
378369
// disable the offset flush totally
379370
props.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
380371
// disable tombstones
@@ -428,7 +419,6 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
428419
distributionFactorUpper,
429420
distributionFactorLower,
430421
includeSchemaChanges,
431-
includeTransactionMetadataEvents,
432422
includeHeartbeatEvents,
433423
scanNewlyAddedTableEnabled,
434424
closeIdleReaders,

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,17 @@ public class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecords, T, My
5353
private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
5454
private final MySqlSourceReaderMetrics sourceReaderMetrics;
5555
private final boolean includeSchemaChanges;
56-
private final boolean includeTransactionMetadataEvents;
5756
private final boolean includeHeartbeatEvents;
5857
private final OutputCollector<T> outputCollector;
5958

6059
public MySqlRecordEmitter(
6160
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
6261
MySqlSourceReaderMetrics sourceReaderMetrics,
6362
boolean includeSchemaChanges,
64-
boolean includeTransactionMetadataEvents,
6563
boolean includeHeartbeatEvents) {
6664
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
6765
this.sourceReaderMetrics = sourceReaderMetrics;
6866
this.includeSchemaChanges = includeSchemaChanges;
69-
this.includeTransactionMetadataEvents = includeTransactionMetadataEvents;
7067
this.includeHeartbeatEvents = includeHeartbeatEvents;
7168
this.outputCollector = new OutputCollector<>();
7269
}
@@ -111,11 +108,6 @@ protected void processElement(
111108
if (includeHeartbeatEvents) {
112109
emitElement(element, output);
113110
}
114-
} else if (RecordUtils.isTransactionMetadataEvent(element)) {
115-
updateStartingOffsetForSplit(splitState, element);
116-
if (includeTransactionMetadataEvents) {
117-
emitElement(element, output);
118-
}
119111
} else {
120112
// unknown element
121113
LOG.info("Meet unknown element {}, just skip.", element);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ private RecordUtils() {}
7373
"io.debezium.connector.mysql.SchemaChangeKey";
7474
public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME =
7575
"io.debezium.connector.common.Heartbeat";
76-
public static final String SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME =
77-
"io.debezium.connector.common.TransactionMetadataKey";
7876
private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader();
7977

8078
/** Converts a {@link ResultSet} row to an array of Objects. */
@@ -341,18 +339,6 @@ public static boolean isHeartbeatEvent(SourceRecord record) {
341339
&& SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
342340
}
343341

344-
/**
345-
* Check whether the given source record is a transaction metadata event (BEGIN or END).
346-
*
347-
* <p>Transaction events are emitted by Debezium to mark transaction boundaries when
348-
* provide.transaction.metadata is enabled.
349-
*/
350-
public static boolean isTransactionMetadataEvent(SourceRecord record) {
351-
Schema keySchema = record.keySchema();
352-
return keySchema != null
353-
&& SCHEMA_TRANSACTION_METADATA_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
354-
}
355-
356342
/**
357343
* Return the finished snapshot split information.
358344
*

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package org.apache.flink.cdc.connectors.mysql.debezium;
1919

20-
import com.github.shyiko.mysql.binlog.network.SSLMode;
21-
import io.debezium.connector.mysql.MySqlConnectorConfig;
22-
import java.util.stream.Stream;
2320
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
2421
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
2522
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
@@ -31,9 +28,6 @@
3128
import java.time.Duration;
3229
import java.time.ZoneId;
3330
import java.util.Properties;
34-
import org.junit.jupiter.params.ParameterizedTest;
35-
import org.junit.jupiter.params.provider.Arguments;
36-
import org.junit.jupiter.params.provider.MethodSource;
3731

3832
/** Tests for {@link org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils}. */
3933
class DebeziumUtilsTest {
@@ -95,24 +89,4 @@ private void assertJdbcUrl(String expected, String actual) {
9589
String[] actualParam = actual.split("&");
9690
Assertions.assertThat(actualParam).containsExactlyInAnyOrder(expectedParam);
9791
}
98-
99-
@ParameterizedTest
100-
@MethodSource("sslModeProvider")
101-
void testSslModeConversion(MySqlConnectorConfig.SecureConnectionMode input, SSLMode expected) {
102-
SSLMode actual = DebeziumUtils.sslModeFor(input);
103-
Assertions.assertThat(actual).isEqualTo(expected);
104-
}
105-
static Stream<Arguments> sslModeProvider() {
106-
return Stream.of(
107-
Arguments.of(MySqlConnectorConfig.SecureConnectionMode.DISABLED, SSLMode.DISABLED),
108-
Arguments.of(
109-
MySqlConnectorConfig.SecureConnectionMode.PREFERRED, SSLMode.PREFERRED),
110-
Arguments.of(MySqlConnectorConfig.SecureConnectionMode.REQUIRED, SSLMode.REQUIRED),
111-
Arguments.of(
112-
MySqlConnectorConfig.SecureConnectionMode.VERIFY_CA, SSLMode.VERIFY_CA),
113-
Arguments.of(
114-
MySqlConnectorConfig.SecureConnectionMode.VERIFY_IDENTITY,
115-
SSLMode.VERIFY_IDENTITY));
116-
}
117-
11892
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceSSLConnectionITCase.java

Lines changed: 0 additions & 144 deletions
This file was deleted.

0 commit comments

Comments
 (0)