Skip to content

Commit a41423e

Browse files
authored
[FLINK-36631][source-connector][oracle] Add oracle read data by specific offset (#3675)
1 parent 3559220 commit a41423e

7 files changed

Lines changed: 180 additions & 22 deletions

File tree

docs/content.zh/docs/connectors/flink-sources/oracle-cdc.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,9 +351,16 @@ Connector Options
351351
<td style="word-wrap: break-word;">initial</td>
352352
<td>String</td>
353353
<td>Optional startup mode for Oracle CDC consumer, valid enumerations are "initial"
354-
and "latest-offset".
354+
, "latest-offset" , specific-offset.
355355
Please see <a href="#startup-reading-position">Startup Reading Position</a> section for more detailed information.</td>
356356
</tr>
357+
<tr>
358+
<td>scan.startup.specific-offset.scn</td>
359+
<td>optional</td>
360+
<td style="word-wrap: break-word;">(none)</td>
361+
<td>Long</td>
362+
<td>Optional SCN used in case of "specific-offset" startup mode</td>
363+
</tr>
357364
<tr>
358365
<td>scan.incremental.snapshot.enabled</td>
359366
<td>optional</td>
@@ -543,6 +550,7 @@ The config option `scan.startup.mode` specifies the startup mode for Oracle CDC
543550
- `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest redo log.
544551
- `latest-offset`: Never to perform a snapshot on the monitored database tables upon first startup, just read from
545552
the change since the connector was started.
553+
- `specific-offset`: Skip snapshot phase and start reading redo log from a specific offset with scn.
546554

547555
_Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapshot.mode` configuration. So please do not use them together. If you specific both `scan.startup.mode` and `debezium.snapshot.mode` options in the table DDL, it may make `scan.startup.mode` doesn't work._
548556

docs/content/docs/connectors/flink-sources/oracle-cdc.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,16 @@ Connector Options
352352
<td style="word-wrap: break-word;">initial</td>
353353
<td>String</td>
354354
<td>Optional startup mode for Oracle CDC consumer, valid enumerations are "initial"
355-
and "latest-offset".
355+
, "latest-offset" , specific-offset.
356356
Please see <a href="#startup-reading-position">Startup Reading Position</a> section for more detailed information.</td>
357357
</tr>
358+
<tr>
359+
<td>scan.startup.specific-offset.scn</td>
360+
<td>optional</td>
361+
<td style="word-wrap: break-word;">(none)</td>
362+
<td>Long</td>
363+
<td>Optional SCN used in case of "specific-offset" startup mode</td>
364+
</tr>
358365
<tr>
359366
<td>scan.incremental.snapshot.enabled</td>
360367
<td>optional</td>
@@ -544,6 +551,7 @@ The config option `scan.startup.mode` specifies the startup mode for Oracle CDC
544551
- `initial` (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest redo log.
545552
- `latest-offset`: Never to perform a snapshot on the monitored database tables upon first startup, just read from
546553
the change since the connector was started.
554+
- `specific-offset`: Skip snapshot phase and start reading redo log from a specific offset with scn.
547555

548556
_Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapshot.mode` configuration. So please do not use them together. If you specific both `scan.startup.mode` and `debezium.snapshot.mode` options in the table DDL, it may make `scan.startup.mode` doesn't work._
549557

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ public JdbcSourceConfigFactory startupOptions(StartupOptions startupOptions) {
209209
case INITIAL:
210210
case SNAPSHOT:
211211
case LATEST_OFFSET:
212+
case SPECIFIC_OFFSETS:
212213
case COMMITTED_OFFSETS:
213214
break;
214215
default:

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/StartupOptions.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package org.apache.flink.cdc.connectors.base.options;
1919

20+
import org.apache.flink.util.CollectionUtil;
21+
2022
import java.io.Serializable;
23+
import java.util.Map;
2124
import java.util.Objects;
2225

2326
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -30,21 +33,22 @@ public final class StartupOptions implements Serializable {
3033
public final String specificOffsetFile;
3134
public final Integer specificOffsetPos;
3235
public final Long startupTimestampMillis;
36+
public final Map<String, String> offset;
3337

3438
/**
3539
* Performs an initial snapshot on the monitored database tables upon first startup, and
3640
* continue to read the latest change log.
3741
*/
3842
public static StartupOptions initial() {
39-
return new StartupOptions(StartupMode.INITIAL, null, null, null);
43+
return new StartupOptions(StartupMode.INITIAL, null, null, null, null);
4044
}
4145

4246
/**
4347
* Performs an initial snapshot on the monitored database tables upon first startup, and not
4448
* read the change log anymore .
4549
*/
4650
public static StartupOptions snapshot() {
47-
return new StartupOptions(StartupMode.SNAPSHOT, null, null, null);
51+
return new StartupOptions(StartupMode.SNAPSHOT, null, null, null, null);
4852
}
4953

5054
/**
@@ -53,15 +57,15 @@ public static StartupOptions snapshot() {
5357
* change log is guaranteed to contain the entire history of the database.
5458
*/
5559
public static StartupOptions earliest() {
56-
return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null, null);
60+
return new StartupOptions(StartupMode.EARLIEST_OFFSET, null, null, null, null);
5761
}
5862

5963
/**
6064
* Never to perform snapshot on the monitored database tables upon first startup, just read from
6165
* the end of the change log which means only have the changes since the connector was started.
6266
*/
6367
public static StartupOptions latest() {
64-
return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null);
68+
return new StartupOptions(StartupMode.LATEST_OFFSET, null, null, null, null);
6569
}
6670

6771
/**
@@ -70,7 +74,7 @@ public static StartupOptions latest() {
7074
* the connector last stopped.
7175
*/
7276
public static StartupOptions committed() {
73-
return new StartupOptions(StartupMode.COMMITTED_OFFSETS, null, null, null);
77+
return new StartupOptions(StartupMode.COMMITTED_OFFSETS, null, null, null, null);
7478
}
7579

7680
/**
@@ -79,7 +83,11 @@ public static StartupOptions committed() {
7983
*/
8084
public static StartupOptions specificOffset(String specificOffsetFile, int specificOffsetPos) {
8185
return new StartupOptions(
82-
StartupMode.SPECIFIC_OFFSETS, specificOffsetFile, specificOffsetPos, null);
86+
StartupMode.SPECIFIC_OFFSETS, specificOffsetFile, specificOffsetPos, null, null);
87+
}
88+
89+
public static StartupOptions specificOffset(Map<String, String> offset) {
90+
return new StartupOptions(StartupMode.SPECIFIC_OFFSETS, null, null, null, offset);
8391
}
8492

8593
/**
@@ -92,18 +100,20 @@ public static StartupOptions specificOffset(String specificOffsetFile, int speci
92100
* @param startupTimestampMillis timestamp for the startup offsets, as milliseconds from epoch.
93101
*/
94102
public static StartupOptions timestamp(long startupTimestampMillis) {
95-
return new StartupOptions(StartupMode.TIMESTAMP, null, null, startupTimestampMillis);
103+
return new StartupOptions(StartupMode.TIMESTAMP, null, null, startupTimestampMillis, null);
96104
}
97105

98106
private StartupOptions(
99107
StartupMode startupMode,
100108
String specificOffsetFile,
101109
Integer specificOffsetPos,
102-
Long startupTimestampMillis) {
110+
Long startupTimestampMillis,
111+
Map<String, String> offset) {
103112
this.startupMode = startupMode;
104113
this.specificOffsetFile = specificOffsetFile;
105114
this.specificOffsetPos = specificOffsetPos;
106115
this.startupTimestampMillis = startupTimestampMillis;
116+
this.offset = offset;
107117

108118
switch (startupMode) {
109119
case INITIAL:
@@ -113,8 +123,10 @@ private StartupOptions(
113123
case COMMITTED_OFFSETS:
114124
break;
115125
case SPECIFIC_OFFSETS:
116-
checkNotNull(specificOffsetFile, "specificOffsetFile shouldn't be null");
117-
checkNotNull(specificOffsetPos, "specificOffsetPos shouldn't be null");
126+
if (CollectionUtil.isNullOrEmpty(offset)) {
127+
checkNotNull(specificOffsetFile, "specificOffsetFile shouldn't be null");
128+
checkNotNull(specificOffsetPos, "specificOffsetPos shouldn't be null");
129+
}
118130
break;
119131
case TIMESTAMP:
120132
checkNotNull(startupTimestampMillis, "startupTimestampMillis shouldn't be null");
@@ -124,6 +136,10 @@ private StartupOptions(
124136
}
125137
}
126138

139+
public Map<String, String> getOffset() {
140+
return offset;
141+
}
142+
127143
public boolean isStreamOnly() {
128144
return startupMode == StartupMode.EARLIEST_OFFSET
129145
|| startupMode == StartupMode.LATEST_OFFSET
@@ -148,12 +164,13 @@ public boolean equals(Object o) {
148164
return startupMode == that.startupMode
149165
&& Objects.equals(specificOffsetFile, that.specificOffsetFile)
150166
&& Objects.equals(specificOffsetPos, that.specificOffsetPos)
151-
&& Objects.equals(startupTimestampMillis, that.startupTimestampMillis);
167+
&& Objects.equals(startupTimestampMillis, that.startupTimestampMillis)
168+
&& Objects.equals(offset, that.offset);
152169
}
153170

154171
@Override
155172
public int hashCode() {
156173
return Objects.hash(
157-
startupMode, specificOffsetFile, specificOffsetPos, startupTimestampMillis);
174+
startupMode, specificOffsetFile, specificOffsetPos, startupTimestampMillis, offset);
158175
}
159176
}

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
3131
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
3232
import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
33+
import org.apache.flink.util.CollectionUtil;
3334

3435
import java.io.IOException;
3536
import java.util.ArrayList;
@@ -182,10 +183,14 @@ public StreamSplit createStreamSplit() {
182183
offsetFactory.createTimestampOffset(startupOptions.startupTimestampMillis);
183184
break;
184185
case SPECIFIC_OFFSETS:
185-
startingOffset =
186-
offsetFactory.newOffset(
187-
startupOptions.specificOffsetFile,
188-
startupOptions.specificOffsetPos.longValue());
186+
if (CollectionUtil.isNullOrEmpty(startupOptions.getOffset())) {
187+
startingOffset =
188+
offsetFactory.newOffset(
189+
startupOptions.specificOffsetFile,
190+
startupOptions.specificOffsetPos.longValue());
191+
} else {
192+
startingOffset = offsetFactory.newOffset(startupOptions.getOffset());
193+
}
189194
break;
190195
case COMMITTED_OFFSETS:
191196
startingOffset = dialect.displayCommittedOffset(sourceConfig);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
import org.apache.flink.table.factories.FactoryUtil;
3131

3232
import java.time.Duration;
33+
import java.util.HashMap;
3334
import java.util.HashSet;
35+
import java.util.Map;
3436
import java.util.Set;
3537

3638
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
@@ -70,7 +72,9 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
7072
public DynamicTableSource createDynamicTableSource(Context context) {
7173
final FactoryUtil.TableFactoryHelper helper =
7274
FactoryUtil.createTableFactoryHelper(this, context);
73-
helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
75+
helper.validateExcept(
76+
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX,
77+
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX);
7478

7579
final ReadableConfig config = helper.getOptions();
7680
String url = config.get(URL);
@@ -91,7 +95,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
9195
String tableName = config.get(TABLE_NAME);
9296
String schemaName = config.get(SCHEMA_NAME);
9397
int port = config.get(PORT);
94-
StartupOptions startupOptions = getStartupOptions(config);
98+
99+
StartupOptions startupOptions =
100+
getStartupOptions(config, context.getCatalogTable().getOptions());
95101
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
96102

97103
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
@@ -195,8 +201,12 @@ public Set<ConfigOption<?>> optionalOptions() {
195201
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
196202
private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
197203
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
204+
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offset";
205+
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX =
206+
"scan.startup.specific-offset.";
198207

199-
private static StartupOptions getStartupOptions(ReadableConfig config) {
208+
private static StartupOptions getStartupOptions(
209+
ReadableConfig config, Map<String, String> options) {
200210
String modeString = config.get(SCAN_STARTUP_MODE);
201211

202212
switch (modeString.toLowerCase()) {
@@ -206,7 +216,9 @@ private static StartupOptions getStartupOptions(ReadableConfig config) {
206216
return StartupOptions.snapshot();
207217
case SCAN_STARTUP_MODE_VALUE_LATEST:
208218
return StartupOptions.latest();
209-
219+
case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
220+
Map<String, String> offsetMap = getSpecificOffsetMap(options);
221+
return StartupOptions.specificOffset(offsetMap);
210222
default:
211223
throw new ValidationException(
212224
String.format(
@@ -219,6 +231,20 @@ private static StartupOptions getStartupOptions(ReadableConfig config) {
219231
}
220232
}
221233

234+
private static Map<String, String> getSpecificOffsetMap(Map<String, String> options) {
235+
Map<String, String> offset = new HashMap<>();
236+
for (Map.Entry<String, String> entry : options.entrySet()) {
237+
if (entry.getKey().startsWith(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX)) {
238+
String subKey =
239+
entry.getKey()
240+
.substring(
241+
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS_PREFIX.length());
242+
offset.put(subKey, entry.getValue());
243+
}
244+
}
245+
return offset;
246+
}
247+
222248
/** Checks the value of given integer option is valid. */
223249
private void validateIntegerOption(
224250
ConfigOption<Integer> option, int optionValue, int exclusiveMin) {

0 commit comments

Comments
 (0)