Skip to content

Commit 6aebdc0

Browse files
authored
[Improve][Oracle-CDC] Support ReadOnlyLogWriterFlushStrategy (#8912)
1 parent 47ba3e1 commit 6aebdc0

File tree

4 files changed

+103
-1
lines changed

4 files changed

+103
-1
lines changed

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package io.debezium.connector.oracle.logminer;
1919

20+
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
21+
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224

@@ -33,6 +35,7 @@
3335
import io.debezium.connector.oracle.logminer.logwriter.CommitLogWriterFlushStrategy;
3436
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
3537
import io.debezium.connector.oracle.logminer.logwriter.RacCommitLogWriterFlushStrategy;
38+
import io.debezium.connector.oracle.logminer.logwriter.ReadOnlyLogWriterFlushStrategy;
3639
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
3740
import io.debezium.jdbc.JdbcConfiguration;
3841
import io.debezium.pipeline.ErrorHandler;
@@ -1012,7 +1015,12 @@ private boolean isTableAllColumnsSupplementalLoggingEnabled(
10121015
*
10131016
* @return the strategy to be used to flush Oracle's LGWR process, never {@code null}.
10141017
*/
1015-
private LogWriterFlushStrategy resolveFlushStrategy() {
1018+
public LogWriterFlushStrategy resolveFlushStrategy() {
1019+
if (connectorConfig
1020+
.getConfig()
1021+
.getBoolean(OracleSourceConfigFactory.LOG_MINING_READONLY_KEY, false)) {
1022+
return new ReadOnlyLogWriterFlushStrategy();
1023+
}
10161024
if (connectorConfig.isRacSystem()) {
10171025
return new RacCommitLogWriterFlushStrategy(
10181026
connectorConfig, jdbcConfiguration, streamingMetrics);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.debezium.connector.oracle.logminer.logwriter;
19+
20+
import io.debezium.DebeziumException;
21+
import io.debezium.connector.oracle.Scn;
22+
23+
public class ReadOnlyLogWriterFlushStrategy implements LogWriterFlushStrategy {
24+
@Override
25+
public String getHost() {
26+
throw new DebeziumException("Not applicable when using read-only flushing strategy");
27+
}
28+
29+
@Override
30+
public void flush(Scn currentScn) throws InterruptedException {
31+
// no operation
32+
}
33+
34+
@Override
35+
public void close() throws Exception {
36+
// no operation
37+
}
38+
}

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class OracleSourceConfigFactory extends JdbcSourceConfigFactory {
4141
public static final String SCHEMA_CHANGE_KEY = "include.schema.changes";
4242
public static final String LOG_MINING_STRATEGY_KEY = "log.mining.strategy";
4343
public static final String LOG_MINING_STRATEGY_DEFAULT = "online_catalog";
44+
public static final String LOG_MINING_READONLY_KEY = "log.mining.read.only";
4445

4546
private List<String> schemaList;
4647

@@ -106,6 +107,7 @@ public OracleSourceConfig create(int subtask) {
106107
props.setProperty("connect.timeout.ms", String.valueOf(connectTimeoutMillis));
107108
// disable tombstones
108109
props.setProperty("tombstones.on.delete", String.valueOf(false));
110+
props.setProperty(LOG_MINING_READONLY_KEY, "true");
109111

110112
if (originUrl != null) {
111113
props.setProperty("database.url", originUrl);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.debezium.connector.oracle.logminer.logwriter;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
21+
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.Test;
24+
25+
import io.debezium.DebeziumException;
26+
import io.debezium.config.Configuration;
27+
import io.debezium.connector.oracle.OracleConnectorConfig;
28+
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource;
29+
30+
import static org.junit.jupiter.api.Assertions.assertTrue;
31+
import static org.mockito.Mockito.mock;
32+
import static org.mockito.Mockito.when;
33+
34+
public class ReadOnlyLogWriterFlushStrategyTest {
35+
36+
@Test
37+
void returnsReadOnlyLogWriterFlushStrategyWhenReadOnlyKeyIsTrue() throws Exception {
38+
OracleConnectorConfig config = mock(OracleConnectorConfig.class);
39+
Configuration configuration = mock(Configuration.class);
40+
when(config.getConfig()).thenReturn(configuration);
41+
when(configuration.getBoolean(OracleSourceConfigFactory.LOG_MINING_READONLY_KEY, false))
42+
.thenReturn(true);
43+
44+
LogMinerStreamingChangeEventSource source =
45+
new LogMinerStreamingChangeEventSource(
46+
config, null, null, null, null, null, null, null);
47+
LogWriterFlushStrategy strategy = source.resolveFlushStrategy();
48+
assertTrue(strategy instanceof ReadOnlyLogWriterFlushStrategy);
49+
50+
Assertions.assertThrows(DebeziumException.class, () -> strategy.getHost());
51+
strategy.flush(null);
52+
strategy.close();
53+
}
54+
}

0 commit comments

Comments
 (0)