Skip to content

Commit 106c369

Browse files
authored
[Improve] cdc related options (#10372)
1 parent ec8919a commit 106c369

File tree

37 files changed

+504
-481
lines changed

37 files changed

+504
-481
lines changed

seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public class ConnectorOptionCheckTest {
5252
@Test
5353
public void checkConnectorOptionExist() {
5454
Set<String> connectorOptionFileNames = new HashSet<>();
55-
Set<String> whiteListConnectorOptionFileNames = buildWhiteList();
5655
try (Stream<Path> paths = Files.walk(Paths.get(".."), FileVisitOption.FOLLOW_LINKS)) {
5756
List<Path> connectorClassPaths =
5857
paths.filter(
@@ -164,15 +163,6 @@ public void checkConnectorOptionExist() {
164163
connectorOptionFileNames.remove(className);
165164
});
166165

167-
whiteListConnectorOptionFileNames.forEach(
168-
whiteListConnectorOptionFileName -> {
169-
Assertions.assertTrue(
170-
connectorOptionFileNames.remove(whiteListConnectorOptionFileName),
171-
"This [Options] class is in white list, but not found related connector classes, please check: ["
172-
+ whiteListConnectorOptionFileName
173-
+ "]\n");
174-
});
175-
176166
Assertions.assertEquals(
177167
0,
178168
connectorOptionFileNames.size(),
@@ -186,14 +176,4 @@ public void checkConnectorOptionExist() {
186176
throw new RuntimeException(e);
187177
}
188178
}
189-
190-
private Set<String> buildWhiteList() {
191-
Set<String> whiteList = new HashSet<>();
192-
whiteList.add("PostgresIncrementalSourceOptions");
193-
whiteList.add("SqlServerIncrementalSourceOptions");
194-
whiteList.add("OracleIncrementalSourceOptions");
195-
whiteList.add("MySqlIncrementalSourceOptions");
196-
whiteList.add("MongodbIncrementalSourceOptions");
197-
return whiteList;
198-
}
199179
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ public class SourceOptions {
3030
public static final String STARTUP_MODE_KEY = "startup.mode";
3131
public static final String STOP_MODE_KEY = "stop.mode";
3232

33+
public static final Option<String> URL =
34+
Options.key("url")
35+
.stringType()
36+
.noDefaultValue()
37+
.withFallbackKeys("base-url")
38+
.withDescription("url");
39+
3340
public static final Option<Integer> SNAPSHOT_SPLIT_SIZE =
3441
Options.key("snapshot.split.size")
3542
.intType()

seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@
3434
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
3535
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
3636
import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
37+
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbIncrementalSourceOptions;
3738
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
3839
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfigProvider;
39-
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
4040
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema;
4141
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.MongoDBRecordEmitter;
4242
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect.MongodbDialect;
@@ -59,12 +59,12 @@ public MongodbIncrementalSource(ReadonlyConfig options, List<CatalogTable> catal
5959

6060
@Override
6161
public Option<StartupMode> getStartupModeOption() {
62-
return MongodbSourceOptions.STARTUP_MODE;
62+
return MongodbIncrementalSourceOptions.STARTUP_MODE;
6363
}
6464

6565
@Override
6666
public Option<StopMode> getStopModeOption() {
67-
return MongodbSourceOptions.STOP_MODE;
67+
return MongodbIncrementalSourceOptions.STOP_MODE;
6868
}
6969

7070
@Override
@@ -77,29 +77,31 @@ public SourceConfig.Factory<MongodbSourceConfig> createSourceConfigFactory(
7777
@Nonnull ReadonlyConfig config) {
7878
MongodbSourceConfigProvider.Builder builder =
7979
MongodbSourceConfigProvider.newBuilder()
80-
.hosts(config.get(MongodbSourceOptions.HOSTS))
80+
.hosts(config.get(MongodbIncrementalSourceOptions.HOSTS))
8181
.validate();
82-
Optional.ofNullable(config.get(MongodbSourceOptions.DATABASE))
82+
Optional.ofNullable(config.get(MongodbIncrementalSourceOptions.DATABASE))
8383
.ifPresent(builder::databaseList);
84-
Optional.ofNullable(config.get(MongodbSourceOptions.COLLECTION))
84+
Optional.ofNullable(config.get(MongodbIncrementalSourceOptions.COLLECTION))
8585
.ifPresent(builder::collectionList);
86-
Optional.ofNullable(config.get(MongodbSourceOptions.USERNAME)).ifPresent(builder::username);
87-
Optional.ofNullable(config.get(MongodbSourceOptions.PASSWORD)).ifPresent(builder::password);
88-
Optional.ofNullable(config.get(MongodbSourceOptions.CONNECTION_OPTIONS))
86+
Optional.ofNullable(config.get(MongodbIncrementalSourceOptions.USERNAME))
87+
.ifPresent(builder::username);
88+
Optional.ofNullable(config.get(MongodbIncrementalSourceOptions.PASSWORD))
89+
.ifPresent(builder::password);
90+
Optional.ofNullable(config.get(MongodbIncrementalSourceOptions.CONNECTION_OPTIONS))
8991
.ifPresent(builder::connectionOptions);
90-
Optional.ofNullable(config.get(MongodbSourceOptions.BATCH_SIZE))
92+
Optional.ofNullable(config.get(MongodbIncrementalSourceOptions.BATCH_SIZE))
9193
.ifPresent(builder::batchSize);
92-
Optional.ofNullable(config.get(MongodbSourceOptions.EXACTLY_ONCE))
94+
Optional.ofNullable(config.get(MongodbIncrementalSourceOptions.EXACTLY_ONCE))
9395
.ifPresent(builder::exactlyOnce);
94-
Optional.ofNullable(config.get(MongodbSourceOptions.POLL_MAX_BATCH_SIZE))
96+
Optional.ofNullable(config.get(MongodbIncrementalSourceOptions.POLL_MAX_BATCH_SIZE))
9597
.ifPresent(builder::pollMaxBatchSize);
96-
Optional.ofNullable(config.get(MongodbSourceOptions.POLL_AWAIT_TIME_MILLIS))
98+
Optional.ofNullable(config.get(MongodbIncrementalSourceOptions.POLL_AWAIT_TIME_MILLIS))
9799
.ifPresent(builder::pollAwaitTimeMillis);
98-
Optional.ofNullable(config.get(MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS))
100+
Optional.ofNullable(config.get(MongodbIncrementalSourceOptions.HEARTBEAT_INTERVAL_MILLIS))
99101
.ifPresent(builder::heartbeatIntervalMillis);
100-
Optional.ofNullable(config.get(MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS))
101-
.ifPresent(builder::splitMetaGroupSize);
102-
Optional.ofNullable(config.get(MongodbSourceOptions.INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB))
102+
Optional.ofNullable(
103+
config.get(
104+
MongodbIncrementalSourceOptions.INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB))
103105
.ifPresent(builder::splitSizeMB);
104106
Optional.ofNullable(startupConfig).ifPresent(builder::startupOptions);
105107
Optional.ofNullable(stopConfig).ifPresent(builder::stopOptions);

seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@
3131
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
3232
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
3333
import org.apache.seatunnel.common.utils.SeaTunnelException;
34-
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
3534
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
36-
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
35+
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbIncrementalSourceOptions;
3736
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
3837

3938
import com.google.auto.service.AutoService;
@@ -55,27 +54,30 @@ public String factoryIdentifier() {
5554

5655
@Override
5756
public OptionRule optionRule() {
58-
return MongodbSourceOptions.getBaseRule()
57+
return MongodbIncrementalSourceOptions.getBaseRule()
5958
.required(
60-
MongodbSourceOptions.HOSTS,
61-
MongodbSourceOptions.DATABASE,
62-
MongodbSourceOptions.COLLECTION)
63-
.exclusive(ConnectorCommonOptions.SCHEMA, ConnectorCommonOptions.TABLE_CONFIGS)
59+
MongodbIncrementalSourceOptions.HOSTS,
60+
MongodbIncrementalSourceOptions.DATABASE,
61+
MongodbIncrementalSourceOptions.COLLECTION)
62+
.exclusive(
63+
MongodbIncrementalSourceOptions.SCHEMA,
64+
MongodbIncrementalSourceOptions.TABLE_CONFIGS)
6465
.optional(
65-
MongodbSourceOptions.USERNAME,
66-
MongodbSourceOptions.PASSWORD,
67-
MongodbSourceOptions.CONNECTION_OPTIONS,
68-
MongodbSourceOptions.BATCH_SIZE,
69-
MongodbSourceOptions.POLL_MAX_BATCH_SIZE,
70-
MongodbSourceOptions.POLL_AWAIT_TIME_MILLIS,
71-
MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS,
72-
MongodbSourceOptions.INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB,
73-
MongodbSourceOptions.STARTUP_MODE,
74-
MongodbSourceOptions.STOP_MODE)
66+
MongodbIncrementalSourceOptions.USERNAME,
67+
MongodbIncrementalSourceOptions.PASSWORD,
68+
MongodbIncrementalSourceOptions.CONNECTION_OPTIONS,
69+
MongodbIncrementalSourceOptions.BATCH_SIZE,
70+
MongodbIncrementalSourceOptions.POLL_MAX_BATCH_SIZE,
71+
MongodbIncrementalSourceOptions.POLL_AWAIT_TIME_MILLIS,
72+
MongodbIncrementalSourceOptions.HEARTBEAT_INTERVAL_MILLIS,
73+
MongodbIncrementalSourceOptions.INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB,
74+
MongodbIncrementalSourceOptions.STARTUP_MODE,
75+
MongodbIncrementalSourceOptions.STOP_MODE,
76+
MongodbIncrementalSourceOptions.DEBEZIUM_PROPERTIES)
7577
.conditional(
76-
MongodbSourceOptions.STARTUP_MODE,
78+
MongodbIncrementalSourceOptions.STARTUP_MODE,
7779
StartupMode.TIMESTAMP,
78-
SourceOptions.STARTUP_TIMESTAMP)
80+
MongodbIncrementalSourceOptions.STARTUP_TIMESTAMP)
7981
.build();
8082
}
8183

@@ -90,7 +92,8 @@ public Class<? extends SeaTunnelSource> getSourceClass() {
9092
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
9193
return () -> {
9294
List<CatalogTable> catalogTables = buildWithConfig(context.getOptions());
93-
List<String> collections = context.getOptions().get(MongodbSourceOptions.COLLECTION);
95+
List<String> collections =
96+
context.getOptions().get(MongodbIncrementalSourceOptions.COLLECTION);
9497
validateCatalogTablesAndCollections(catalogTables, collections);
9598
catalogTables = updateAndValidateCatalogTableId(catalogTables, collections);
9699
return (SeaTunnelSource<T, SplitT, StateT>)

seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java renamed to seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbIncrementalSourceOptions.java

Lines changed: 2 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -20,150 +20,18 @@
2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
2222
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
23+
import org.apache.seatunnel.api.options.table.TableSchemaOptions;
2324
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
2425
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
2526
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
2627

27-
import org.bson.BsonDouble;
28-
import org.bson.json.JsonMode;
29-
import org.bson.json.JsonWriterSettings;
30-
3128
import java.util.Arrays;
3229
import java.util.Collections;
3330
import java.util.HashMap;
34-
import java.util.HashSet;
3531
import java.util.List;
3632
import java.util.Map;
37-
import java.util.Set;
38-
39-
import static java.util.Arrays.asList;
40-
41-
public class MongodbSourceOptions extends SourceOptions {
42-
43-
public static final String ENCODE_VALUE_FIELD = "_value";
44-
45-
public static final String CLUSTER_TIME_FIELD = "clusterTime";
46-
47-
public static final String TS_MS_FIELD = "ts_ms";
48-
49-
public static final String SOURCE_FIELD = "source";
50-
51-
public static final String SNAPSHOT_FIELD = "snapshot";
52-
53-
public static final String FALSE_FALSE = "false";
54-
55-
public static final String OPERATION_TYPE_INSERT = "insert";
56-
57-
public static final String SNAPSHOT_TRUE = "true";
58-
59-
public static final String ID_FIELD = "_id";
60-
61-
public static final String HEARTBEAT_KEY_FIELD = "HEARTBEAT";
62-
63-
public static final String COPY_KEY_FIELD = "copy";
64-
65-
public static final String DOCUMENT_KEY = "documentKey";
66-
67-
public static final String NS_FIELD = "ns";
68-
69-
public static final String OPERATION_TYPE = "operationType";
70-
71-
public static final String TIMESTAMP_FIELD = "timestamp";
72-
73-
public static final String RESUME_TOKEN_FIELD = "resumeToken";
74-
75-
public static final String FULL_DOCUMENT = "fullDocument";
7633

77-
public static final String DB_FIELD = "db";
78-
79-
public static final String COLL_FIELD = "coll";
80-
81-
public static final int FAILED_TO_PARSE_ERROR = 9;
82-
83-
public static final int UNAUTHORIZED_ERROR = 13;
84-
85-
public static final int ILLEGAL_OPERATION_ERROR = 20;
86-
87-
public static final int INVALIDATED_RESUME_TOKEN_ERROR = 260;
88-
public static final int CHANGE_STREAM_FATAL_ERROR = 280;
89-
public static final int CHANGE_STREAM_HISTORY_LOST = 286;
90-
public static final int BSON_OBJECT_TOO_LARGE = 10334;
91-
92-
public static final Set<Integer> INVALID_CHANGE_STREAM_ERRORS =
93-
new HashSet<>(
94-
asList(
95-
INVALIDATED_RESUME_TOKEN_ERROR,
96-
CHANGE_STREAM_FATAL_ERROR,
97-
CHANGE_STREAM_HISTORY_LOST,
98-
BSON_OBJECT_TOO_LARGE));
99-
100-
public static final String RESUME_TOKEN = "resume token";
101-
public static final String NOT_FOUND = "not found";
102-
public static final String DOES_NOT_EXIST = "does not exist";
103-
public static final String INVALID_RESUME_TOKEN = "invalid resume token";
104-
public static final String NO_LONGER_IN_THE_OPLOG = "no longer be in the oplog";
105-
106-
public static final int UNKNOWN_FIELD_ERROR = 40415;
107-
108-
public static final String DROPPED_FIELD = "dropped";
109-
110-
public static final String MAX_FIELD = "max";
111-
112-
public static final String MIN_FIELD = "min";
113-
114-
public static final String ADD_NS_FIELD_NAME = "_ns_";
115-
116-
public static final String UUID_FIELD = "uuid";
117-
118-
public static final String SHARD_FIELD = "shard";
119-
120-
public static final String DIALECT_NAME = "MongoDB";
121-
122-
public static final BsonDouble COMMAND_SUCCEED_FLAG = new BsonDouble(1.0d);
123-
124-
public static final JsonWriterSettings DEFAULT_JSON_WRITER_SETTINGS =
125-
JsonWriterSettings.builder().outputMode(JsonMode.EXTENDED).build();
126-
127-
public static final String OUTPUT_SCHEMA =
128-
"{"
129-
+ " \"name\": \"ChangeStream\","
130-
+ " \"type\": \"record\","
131-
+ " \"fields\": ["
132-
+ " { \"name\": \"_id\", \"type\": \"string\" },"
133-
+ " { \"name\": \"operationType\", \"type\": [\"string\", \"null\"] },"
134-
+ " { \"name\": \"fullDocument\", \"type\": [\"string\", \"null\"] },"
135-
+ " { \"name\": \"source\","
136-
+ " \"type\": [{\"name\": \"source\", \"type\": \"record\", \"fields\": ["
137-
+ " {\"name\": \"ts_ms\", \"type\": \"long\"},"
138-
+ " {\"name\": \"table\", \"type\": [\"string\", \"null\"]},"
139-
+ " {\"name\": \"db\", \"type\": [\"string\", \"null\"]},"
140-
+ " {\"name\": \"snapshot\", \"type\": [\"string\", \"null\"] } ]"
141-
+ " }, \"null\" ] },"
142-
+ " { \"name\": \"ts_ms\", \"type\": [\"long\", \"null\"]},"
143-
+ " { \"name\": \"ns\","
144-
+ " \"type\": [{\"name\": \"ns\", \"type\": \"record\", \"fields\": ["
145-
+ " {\"name\": \"db\", \"type\": \"string\"},"
146-
+ " {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
147-
+ " }, \"null\" ] },"
148-
+ " { \"name\": \"to\","
149-
+ " \"type\": [{\"name\": \"to\", \"type\": \"record\", \"fields\": ["
150-
+ " {\"name\": \"db\", \"type\": \"string\"},"
151-
+ " {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
152-
+ " }, \"null\" ] },"
153-
+ " { \"name\": \"documentKey\", \"type\": [\"string\", \"null\"] },"
154-
+ " { \"name\": \"updateDescription\","
155-
+ " \"type\": [{\"name\": \"updateDescription\", \"type\": \"record\", \"fields\": ["
156-
+ " {\"name\": \"updatedFields\", \"type\": [\"string\", \"null\"]},"
157-
+ " {\"name\": \"removedFields\","
158-
+ " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]"
159-
+ " }] }, \"null\"] },"
160-
+ " { \"name\": \"clusterTime\", \"type\": [\"string\", \"null\"] },"
161-
+ " { \"name\": \"txnNumber\", \"type\": [\"long\", \"null\"]},"
162-
+ " { \"name\": \"lsid\", \"type\": [{\"name\": \"lsid\", \"type\": \"record\","
163-
+ " \"fields\": [ {\"name\": \"id\", \"type\": \"string\"},"
164-
+ " {\"name\": \"uid\", \"type\": \"string\"}] }, \"null\"] }"
165-
+ " ]"
166-
+ "}";
34+
public class MongodbIncrementalSourceOptions extends SourceOptions implements TableSchemaOptions {
16735

16836
public static final Option<String> HOSTS =
16937
Options.key("hosts")
@@ -234,12 +102,6 @@ public class MongodbSourceOptions extends SourceOptions {
234102
"The amount of time to wait before checking for new results on the change stream."
235103
+ "Defaults: 1000.");
236104

237-
public static final Option<Boolean> EXACTLY_ONCE =
238-
Options.key("exactly_once")
239-
.booleanType()
240-
.defaultValue(false)
241-
.withDescription("Enable exactly once semantic.");
242-
243105
public static final Option<Integer> HEARTBEAT_INTERVAL_MILLIS =
244106
Options.key("heartbeat.interval.ms")
245107
.intType()

seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfig.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ public class MongodbSourceConfig implements SourceConfig {
6161

6262
private final int heartbeatIntervalMillis;
6363

64-
private final int splitMetaGroupSize;
65-
6664
private final int splitSizeMB;
6765

6866
private final boolean exactlyOnce;
@@ -81,7 +79,6 @@ public class MongodbSourceConfig implements SourceConfig {
8179
StartupConfig startupOptions,
8280
StopConfig stopOptions,
8381
int heartbeatIntervalMillis,
84-
int splitMetaGroupSize,
8582
int splitSizeMB,
8683
boolean exactlyOnce) {
8784
this.hosts = checkNotNull(hosts);
@@ -99,7 +96,6 @@ public class MongodbSourceConfig implements SourceConfig {
9996
this.startupOptions = startupOptions;
10097
this.stopOptions = stopOptions;
10198
this.heartbeatIntervalMillis = heartbeatIntervalMillis;
102-
this.splitMetaGroupSize = splitMetaGroupSize;
10399
this.splitSizeMB = splitSizeMB;
104100
this.exactlyOnce = exactlyOnce;
105101
}

0 commit comments

Comments
 (0)