[FLINK-36796][pipeline-connector][oracle]add oracle pipeline connector.#3995
Conversation
joyCurry30
left a comment
There was a problem hiding this comment.
Thank you for your contribution. I just left some comment.
| <dependency> | ||
| <groupId>io.debezium</groupId> | ||
| <artifactId>debezium-core</artifactId> | ||
| <version>1.9.8.Final</version> |
There was a problem hiding this comment.
Use ${debezium.version}
There was a problem hiding this comment.
And the scope should be "provide".
There was a problem hiding this comment.
Have been modified
| <dependency> | ||
| <groupId>com.ververica</groupId> | ||
| <artifactId>flink-cdc-source-e2e-tests</artifactId> | ||
| <version>cty-3.0-2.2-SNAPSHOT</version> |
There was a problem hiding this comment.
Why you dependent on "flink-cdc-source-e2e-tests"?
There was a problem hiding this comment.
Have been removed
| <dependency> | ||
| <groupId>org.apache.flink</groupId> | ||
| <artifactId>flink-connector-test-util</artifactId> | ||
| <version>3.4-SNAPSHOT</version> |
There was a problem hiding this comment.
Please use "${project.version}".
There was a problem hiding this comment.
Have been modified
| <include>io.debezium:debezium-core</include> | ||
| <include>io.debezium:debezium-ddl-parser</include> | ||
| <include>io.debezium:debezium-connector-oracle</include> | ||
| <include>io.debezium:debezium-connector-mysql</include> |
There was a problem hiding this comment.
Could you explain the rationale behind having a dependency on the MySQL CDC connector within the Oracle CDC connector implementation? I'd like to better understand how these components interact in this context.
There was a problem hiding this comment.
Have been removed
| <include>io.debezium:debezium-connector-oracle</include> | ||
| <include>io.debezium:debezium-connector-mysql</include> | ||
| <include>com.ververica:flink-connector-debezium</include> | ||
| <include>com.ververica:flink-connector-mysql-cdc</include> |
There was a problem hiding this comment.
Have been removed
| <include>org.antlr:antlr4-runtime</include> | ||
| <include>org.apache.kafka:*</include> | ||
| <include>mysql:mysql-connector-java</include> | ||
| <include>com.zendesk:mysql-binlog-connector-java</include> |
There was a problem hiding this comment.
Have been removed
| public static final ConfigOption<String> JDBC_URL = | ||
| ConfigOptions.key("jdbc.url") | ||
| .stringType() | ||
| .noDefaultValue() | ||
| .withDescription("The jdbc url."); |
There was a problem hiding this comment.
Could we clarify the relationship between the JDBC URL and the individual hostname/port parameters?
If we already have a “jdbc.url” configuration field, is there still value in maintaining separate “hostname and “port” parameters? Should these be mutually exclusive?
When using “hostname” and "port" configuration, is there a parameter to explicitly specify the driver type (Thin vs OCI)? This would be crucial for constructing the correct JDBC connection string format.
There was a problem hiding this comment.
In The original oracle module [flink-connector-oracle-cdc]OracleJdbcUrlUtils#getConnectionUrlWithSid method,url,hostname and port is not exclusive,when url config is null,url = "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname;,but 19c url is must connect through the service name,like url = "jdbc:oracle:thin:@" + hostname + ":" + port + "/" + dbname; So you can configure the URL to adapt to oracle 19c
| if (isAddMeta) { | ||
| map.put(OracleDataSourceOptions.HOSTNAME.key(), hostname); | ||
| map.put(OracleDataSourceOptions.PORT.key(), port); | ||
| } |
There was a problem hiding this comment.
Could you clarify the design intent behind treating hostname and port as common metadata fields?
Since we don't currently support multi-source CDC synchronization, are these fields intended for future extensibility?
There was a problem hiding this comment.
Have been removed
820b720 to
5015bbb
Compare
|
@joyCurry30 Already modified,please review again ,thanks! |
joyCurry30
left a comment
There was a problem hiding this comment.
Hi, thank you for your contribution. I left some comments for the doc.
|
|
||
| ```yaml | ||
| source: | ||
| type: mysql |
There was a problem hiding this comment.
“type” should be "oracle".
| - `initial` (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。 | ||
| - `latest-offset`:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。 |
There was a problem hiding this comment.
Does oracle cdc use binlog?
| <tr> | ||
| <td> | ||
| XMLTYPE | ||
| </td> | ||
| <td>VARCHAR(n)</td> | ||
| <td></td> | ||
| </tr> | ||
| <tr> | ||
| <td> | ||
| VARCHAR(n)<br> | ||
| VARCHAR2(n)<br> | ||
| NVARCHAR2(n)<br> | ||
| NCHAR(n)<br> | ||
| CHAR(n)<br> | ||
| </td> | ||
| <td>VARCHAR(n)</td> | ||
| <td></td> | ||
| </tr> |
There was a problem hiding this comment.
These two elements should be merged.
|
|
||
| ```yaml | ||
| source: | ||
| type: mysql |
| - `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog. | ||
| - `latest-offset`: Never to perform snapshot on the monitored database tables upon first startup, just read from | ||
|
|
| ## 数据类型映射 | ||
|
|
5015bbb to
f284d27
Compare
f6b25cc to
4f73a6f
Compare
| .hostname( | ||
| config.getOptional(OracleDataSourceOptions.HOSTNAME).get()) | ||
| .port(config.getOptional(OracleDataSourceOptions.PORT).get()) | ||
| .databaseList( | ||
| config.getOptional(OracleDataSourceOptions.DATABASE) | ||
| .get()) // monitor oracledatabase | ||
| .tableList( | ||
| config.getOptional(OracleDataSourceOptions.TABLES) | ||
| .get()) // monitor productstable | ||
| .username( | ||
| config.getOptional(OracleDataSourceOptions.USERNAME).get()) | ||
| .password( | ||
| config.getOptional(OracleDataSourceOptions.PASSWORD).get()) | ||
| .includeSchemaChanges(true); |
There was a problem hiding this comment.
Use config.get(ConfigOption option).
| options.add(OracleDataSourceOptions.SCHEMALIST); | ||
| options.add(OracleDataSourceOptions.DATABASE); | ||
| options.add(OracleDataSourceOptions.TABLES); | ||
| options.add(METADATA_LIST); |
There was a problem hiding this comment.
Use OracleDataSourceOptions.METADATA_LIST.
| private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; | ||
| private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; | ||
| private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; | ||
| private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset"; | ||
| private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp"; |
There was a problem hiding this comment.
Use enum replace switch-case.
There was a problem hiding this comment.
reference mysql pipeline,it like this
| builder.hostname(config.getOptional(OracleDataSourceOptions.HOSTNAME).get()) | ||
| .port(config.getOptional(OracleDataSourceOptions.PORT).get()) | ||
| .database( | ||
| config.getOptional(OracleDataSourceOptions.DATABASE) | ||
| .get()) // monitor database | ||
| .schemaList( | ||
| config.getOptional(OracleDataSourceOptions.SCHEMALIST) | ||
| .get()) // monitor schema | ||
| .tableList(capturedTables) // monitor | ||
| // EMP table | ||
| .username(config.getOptional(OracleDataSourceOptions.USERNAME).get()) | ||
| .password(config.getOptional(OracleDataSourceOptions.PASSWORD).get()) |
There was a problem hiding this comment.
Use config.get(ConfigOption option)
4f73a6f to
3479f43
Compare
| <shadedPattern> | ||
| com.ververica.cdc.connectors.shaded.org.apache.kafka | ||
| </shadedPattern> | ||
| </relocation> | ||
| <relocation> | ||
| <pattern>org.antlr</pattern> | ||
| <shadedPattern> | ||
| com.ververica.cdc.connectors.shaded.org.antlr | ||
| </shadedPattern> | ||
| </relocation> | ||
| <relocation> | ||
| <pattern>com.fasterxml</pattern> | ||
| <shadedPattern> | ||
| com.ververica.cdc.connectors.shaded.com.fasterxml | ||
| </shadedPattern> | ||
| </relocation> | ||
| <relocation> | ||
| <pattern>com.google</pattern> | ||
| <shadedPattern> | ||
| com.ververica.cdc.connectors.shaded.com.google | ||
| </shadedPattern> | ||
| </relocation> | ||
| <relocation> | ||
| <pattern>com.esri.geometry</pattern> | ||
| <shadedPattern>com.ververica.cdc.connectors.shaded.com.esri.geometry</shadedPattern> | ||
| </relocation> | ||
| <relocation> | ||
| <pattern>com.zaxxer</pattern> | ||
| <shadedPattern> | ||
| com.ververica.cdc.connectors.shaded.com.zaxxer | ||
| </shadedPattern> |
There was a problem hiding this comment.
Why relocate com.ververica.cdc.connectors?
Please use the correct package path.
| package com.apache.flink.cdc.connectors.oracle.dto; | ||
|
|
| import com.apache.flink.cdc.connectors.oracle.source.OracleDataSource; | ||
| import com.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions; | ||
| import com.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils; |
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static com.apache.flink.cdc.connectors.oracle.source.OracleDataSourceOptions.METADATA_LIST; |
| } | ||
| throw new IllegalArgumentException( | ||
| String.format( | ||
| "[%s] cannot be found in mysql metadata.", |
There was a problem hiding this comment.
thanks for review!have modified!
13c6586 to
ba183e3
Compare
0c77212 to
0237401
Compare
| dbzProperties.setProperty( | ||
| "snapshot.locking.mode", config.get(OracleDataSourceOptions.SNAPSHOT_LOCKING_MODE)); | ||
| dbzProperties.setProperty( | ||
| "snapshot.locking.mode", config.get(OracleDataSourceOptions.SNAPSHOT_LOCKING_MODE)); |
There was a problem hiding this comment.
This is a duplicate code snippet.
There was a problem hiding this comment.
have remove the duplicate code
| @Override | ||
| public EventSourceProvider getEventSourceProvider() { | ||
| String url = config.get(OracleDataSourceOptions.JDBC_URL); |
There was a problem hiding this comment.
How should null parameters be handled?
There was a problem hiding this comment.
url could be null,and hostname,port,database,username,password must be not null,have add checknotnull
| switch (modeString.toLowerCase()) { | ||
| case SCAN_STARTUP_MODE_VALUE_INITIAL: | ||
| return StartupOptions.initial(); | ||
|
|
||
| case SCAN_STARTUP_MODE_VALUE_LATEST: | ||
| return StartupOptions.latest(); | ||
|
|
There was a problem hiding this comment.
How should other situations of StartupOptions be handled?
There was a problem hiding this comment.
We have added the remaining supported StartupOptions
| public static final ConfigOption<String> SERVER_TIME_ZONE = | ||
| ConfigOptions.key("server-time-zone") | ||
| .stringType() | ||
| .noDefaultValue() | ||
| .withDescription( | ||
| "The session time zone in database server. If not set, then " | ||
| + "ZoneId.systemDefault() is used to determine the server time zone."); | ||
|
|
||
| public static final ConfigOption<String> SERVER_ID = | ||
| ConfigOptions.key("server-id") | ||
| .stringType() |
There was a problem hiding this comment.
Does Oracle CDC have these configurations?
There was a problem hiding this comment.
have remove SERVER_ID,and SERVER_TIME_ZONE is need
| @Override | ||
| protected Object convertToString(Object dbzObj, Schema schema) { | ||
| // the Geometry datatype in oracle will be converted to | ||
| // a String with Json format | ||
| if (Point.LOGICAL_NAME.equals(schema.name()) |
There was a problem hiding this comment.
I'm not sure if Oracle also needs to be handled in this way.
There was a problem hiding this comment.
Oracle database provides powerful geometric data type support through the Spatial module, which is mainly based on the SDO_GEOMETRY object type.
|
Hi. Thanks for your contribution. @linjianchang I've found that some code might be useless in Oracle CDC. Please simplify the code carefully to make it as concise as possible. |
09e894a to
7c7a74d
Compare
LGTM. cc @LYanquan |
lvyanquan
left a comment
There was a problem hiding this comment.
Thanks @linjianchang's contribution, left some comments.
| @@ -0,0 +1,15 @@ | |||
| # Copyright 2023 Ververica Inc. | |||
There was a problem hiding this comment.
We should use ASF License.
There was a problem hiding this comment.
We should use ASF License.
modified
|
|
||
| # Set root logger level to OFF to not flood build logs | ||
| # set manually to INFO for debugging purposes | ||
| rootLogger.level=INFO |
There was a problem hiding this comment.
It's better to set this to ERROR to reduce log in CI machine.
There was a problem hiding this comment.
It's better to set this to
ERRORto reduce log in CI machine.
modified
| @@ -0,0 +1,266 @@ | |||
| <?xml version="1.0" encoding="UTF-8"?> | |||
| <!-- | |||
| ~ Copyright 2023 Ververica Inc. | |||
There was a problem hiding this comment.
Should use ASF license.
modified
| @@ -0,0 +1,26 @@ | |||
| ################################################################################ | |||
| # Copyright 2023 Ververica Inc. | |||
There was a problem hiding this comment.
Ditto. Maybe you could modify all files at once.
There was a problem hiding this comment.
Ditto. Maybe you could modify all files at once.
modified
| .durationType() | ||
| .defaultValue(Duration.ofSeconds(30)) | ||
| .withDescription( | ||
| "Optional interval of sending heartbeat event for tracing the latest available binlog offsets"); |
There was a problem hiding this comment.
binlog?
modified
| ConfigOptions.key("debezium.database.connection.adapter") | ||
| .stringType() | ||
| .defaultValue("logminer") | ||
| .withDescription("Database connection adapter."); |
There was a problem hiding this comment.
Should introduce all available values.
There was a problem hiding this comment.
Should introduce all available values.
modified
|
|
||
| /** IT tests for {@link OracleDataSource}. */ | ||
| @TestMethodOrder(MethodOrderer.OrderAnnotation.class) | ||
| public class OraclePipelineITCase extends OracleSourceTestBase { |
There was a problem hiding this comment.
It's necessary to add some tests to cover the scene of restoring from savepoint with different start up mode.
There was a problem hiding this comment.
It's necessary to add some tests to cover the scene of restoring from savepoint with different start up mode.
added
| String str = (String) dbzObj; | ||
| // TIMESTAMP_LTZ type is encoded in string type | ||
| Instant instant = Instant.parse(str); | ||
| Instant instant = ZonedTimestamp.FORMATTER.parse(str, Instant::from); |
There was a problem hiding this comment.
Could you add some explanation about why this change is necessary?
| + " port: %d\n" | ||
| + " username: %s\n" | ||
| + " password: %s\n" | ||
| + " tables: %s.PRODUCTS\n" |
There was a problem hiding this comment.
There is only one table here, you can add a few more tables.
There was a problem hiding this comment.
There is only one table here, you can add a few more tables.
modified
| return DataTypes.STRING(); | ||
| } | ||
| if (scale < 0 || scale > 36) { | ||
| return DataTypes.STRING(); |
There was a problem hiding this comment.
Please explain why this change is necessary.
There was a problem hiding this comment.
Please explain why this change is necessary.
"Does Oracle's 'nember' type have a Scala value?"
It is in the range of 0-36 in decimal, so it is not in the decimal range
Convert to string
All sources in the pipeline have been tested with fulltype's ut and are normal. This change has not affected other connectors.


| OracleDialect oracleDialect = new OracleDialect(); | ||
| OracleEventDeserializer deserializer = | ||
| new OracleEventDeserializer( | ||
| DebeziumChangelogMode.ALL, | ||
| config.get(OracleDataSourceOptions.SCHEMA_CHANGE_ENABLED), | ||
| readableMetadataList); | ||
|
|
||
| RedoLogOffsetFactory offsetFactory = new RedoLogOffsetFactory(); | ||
| OracleTableSourceReader oracleChangeEventSource = | ||
| new OracleTableSourceReader( | ||
| configFactory, deserializer, offsetFactory, oracleDialect); |
There was a problem hiding this comment.
Can be updated to the following code to avoid Raw use of parameterized class.
OracleEventDeserializer<Event> deserializer =
new OracleEventDeserializer<>(
DebeziumChangelogMode.ALL,
config.get(OracleDataSourceOptions.SCHEMA_CHANGE_ENABLED),
readableMetadataList);
RedoLogOffsetFactory offsetFactory = new RedoLogOffsetFactory();
OracleTableSourceReader<Event> oracleChangeEventSource =
new OracleTableSourceReader<>(
configFactory, deserializer, offsetFactory, oracleDialect);
There was a problem hiding this comment.
Can be updated to the following code to avoid
Raw use of parameterized class.OracleEventDeserializer<Event> deserializer = new OracleEventDeserializer<>( DebeziumChangelogMode.ALL, config.get(OracleDataSourceOptions.SCHEMA_CHANGE_ENABLED), readableMetadataList); RedoLogOffsetFactory offsetFactory = new RedoLogOffsetFactory(); OracleTableSourceReader<Event> oracleChangeEventSource = new OracleTableSourceReader<>( configFactory, deserializer, offsetFactory, oracleDialect);
Modified
| import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent; | ||
|
|
||
| /** The {@link RecordEmitter} implementation for Oracle pipeline connector. */ | ||
| public class OraclePipelineRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> { |
There was a problem hiding this comment.
We can clarify that the generic type T here is Event.
There was a problem hiding this comment.
We can clarify that the generic type
There isEvent.
Modified
| * and Watermark Signal Algorithm which supports parallel reading snapshot of table and then | ||
| * continue to capture data change by streaming reading. | ||
| */ | ||
| public class OracleTableSourceReader<T> extends OracleSourceBuilder.OracleIncrementalSource<T> { |
There was a problem hiding this comment.
We can clarify that the generic type T here is Event.
There was a problem hiding this comment.
We can clarify that the generic type
There isEvent.
Modified
| if (isDataChangeRecord || isSchemaChangeEvent(element)) { | ||
| TableId debeziumTableId = getTableId(element); | ||
| TableId tableId = | ||
| TableId.parse(debeziumTableId.schema() + "." + debeziumTableId.table()); |
There was a problem hiding this comment.
Can we use debeziumTableId directly here?
There was a problem hiding this comment.
Can we use debeziumTableId directly here?
Modified
| sendCreateTableEvent(jdbc, tableId, output); | ||
| } | ||
| alreadySendCreateTableTables.add( | ||
| TableId.parse(tableId.schema() + "." + tableId.table())); |
There was a problem hiding this comment.
alreadySendCreateTableTables.add(new TableId(tableId.schema(), null, tableId.table()));
There was a problem hiding this comment.
alreadySendCreateTableTables.add(new TableId(tableId.schema(), null, tableId.table()));
Modified
|
|
||
| private static final Logger LOGGER = | ||
| LoggerFactory.getLogger( | ||
| io.debezium.connector.oracle.antlr.listener.AlterTableParserListener.class); |
There was a problem hiding this comment.
LoggerFactory.getLogger(OracleAlterTableParserListener.class);
There was a problem hiding this comment.
LoggerFactory.getLogger(OracleAlterTableParserListener.class);
Modified
| .booleanType() | ||
| .defaultValue(true) | ||
| .withDescription( | ||
| "Whether send schema change events, by default is true. If set to false, the schema changes will not be sent."); |
There was a problem hiding this comment.
What would happen if we alter table structure but setting schema-change.enabled to false? Should we always set this parameter to true?
There was a problem hiding this comment.
What would happen if we alter table structure but setting
schema-change.enabledto false? Should we always set this parameter to true?
always set this parameter to true
| /** | ||
| * Test utility for creating converter, formatter and deserializer of a table in the test database. | ||
| */ | ||
| public class TestTable { |
There was a problem hiding this comment.
This class seems to be useless.
There was a problem hiding this comment.
This class seems to be useless.
already removed
| import static org.apache.flink.util.Preconditions.checkState; | ||
|
|
||
| /** Oracle test utilities. */ | ||
| public class OracleTestUtils { |
There was a problem hiding this comment.
This class seems to be useless.
There was a problem hiding this comment.
This class seems to be useless.
already removed
| import java.util.stream.Collectors; | ||
|
|
||
| /** Formatter that formats the {@link SourceRecord} to String. */ | ||
| public class RecordsFormatter { |
There was a problem hiding this comment.
This class seems to be useless.
There was a problem hiding this comment.
This class seems to be useless.
already removed
| throw new IllegalArgumentException( | ||
| "Cannot find any table by the option 'tables' = " + tables); | ||
| } | ||
| configFactory.tableList(capturedTables); |
There was a problem hiding this comment.
In this configuration mode, we cannot handle tables newly created during the incremental phase. Perhaps you can refer to the implementation of MySQL.
There was a problem hiding this comment.
In this configuration mode, we cannot handle tables newly created during the incremental phase. Perhaps you can refer to the implementation of MySQL.
Remove it temporarily. We won't add this feature in this version
08a8f61 to
a6cabd0
Compare
|
|
||
| @Override | ||
| public void enterModify_col_properties(PlSqlParser.Modify_col_propertiesContext ctx) { | ||
| resolveColumnDataType(ctx); |
There was a problem hiding this comment.
If dataType is not specified, will changing the column attribute to nullable have no effect?
There was a problem hiding this comment.
In this case, could we add a drop column event? Drop column seems simply makes the column nullable.
There was a problem hiding this comment.
If dataType is not specified, will changing the column attribute to nullable have no effect?
Why is there a situation where no data type is specified? If no new data type is specified in the MODIFY clause, Oracle will report an error
There was a problem hiding this comment.
ALTER TABLE t MODIFY a NULL;
Is this syntax not supported?
There was a problem hiding this comment.
|
Please update https://github.com/apache/flink-cdc/blob/master/.github/workflows/modules.py to include test for this newly added module. |
61e8a57 to
64e4fd7
Compare
Have updated the file in master-36796 |
… com.fasterxml.jackson.
| public static List<TableId> listTables( | ||
| OracleSourceConfig sourceConfig, @Nullable String dbName) { | ||
| try (JdbcConnection jdbc = createOracleConnection(sourceConfig)) { | ||
| List<String> databases = |
There was a problem hiding this comment.
CDB + PDB mode can not find PDB databases here, should change session to PDB through setSessionToPdb(${pdbName}) before listDatabase?
| Set<TableId> tableIdSet = new HashSet<>(); | ||
| String queryTablesSql = | ||
| "SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n" | ||
| + "WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX') " |
There was a problem hiding this comment.
Oracle returns table space as null for partitioned tables. When I tested with a partitioned table, this table was never discovered and initial snapshot never started. So it was throwing generic error - “Db history topic or its content is fully or partially missing. Please check ur database history topic configuration”.
But it worked fine for me with
SourceFunction-based DataStream API. It never worked for me with Incremental Snapshot based DataStream API.
@linjianchang @lvyanquan Let me know If I am missing something.



add oracle pipeline connector.