diff --git a/docs/en/connector-v2/sink/MongoDB.md b/docs/en/connector-v2/sink/MongoDB.md index 04bb548aad3..d4f702bd17b 100644 --- a/docs/en/connector-v2/sink/MongoDB.md +++ b/docs/en/connector-v2/sink/MongoDB.md @@ -62,19 +62,21 @@ The following table lists the field data type mapping from MongoDB BSON type to ## Sink Options -| Name | Type | Required | Default | Description | -|-----------------------|----------|----------|---------|------------------------------------------------------------------------------------------------------------------------------| -| uri | String | Yes | - | The MongoDB standard connection uri. eg. mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true. | -| database | String | Yes | - | The name of MongoDB database to read or write. | -| collection | String | Yes | - | The name of MongoDB collection to read or write. | -| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum number of buffered rows per batch request. | -| buffer-flush.interval | String | No | 30000 | Specifies the maximum interval of buffered rows per batch request, the unit is millisecond. | -| retry.max | String | No | 3 | Specifies the max number of retry if writing records to database failed. | -| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. | -| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. | -| primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. | -| transaction | Boolean | No | false | Whether to use transactions in MongoSink (requires MongoDB 4.2+). | -| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../sink-common-options.md) for details | +| Name | Type | Required | Default | Description | +|-----------------------|----------|----------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| uri | String | Yes | - | The MongoDB standard connection uri. eg. mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true. | +| database | String | Yes | - | The name of MongoDB database to read or write. | +| collection | String | Yes | - | The name of MongoDB collection to read or write. | +| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum number of buffered rows per batch request. | +| buffer-flush.interval | String | No | 30000 | Specifies the maximum interval of buffered rows per batch request, the unit is millisecond. | +| retry.max | String | No | 3 | Specifies the max number of retry if writing records to database failed. | +| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. | +| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. | +| primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. | +| transaction | Boolean | No | false | Whether to use transactions in MongoSink (requires MongoDB 4.2+). | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../sink-common-options.md) for details | +| data_save_mode | String | No | APPEND_DATA | The data saving mode of mongodb,Option introduction,`DROP_DATA`:The collection will be cleared before inserting data;`APPEND_DATA`:Append data ;`ERROR_WHEN_DATA_EXISTS`:An error will be reported if there is data in the collection. | + ### Tips diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalog.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalog.java new file mode 100644 index 00000000000..ceda30fdd15 --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalog.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.catalog; + +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.common.exception.CommonError; + +import org.bson.Document; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +import java.util.ArrayList; +import java.util.List; + +public class MongodbCatalog implements Catalog { + + private final String catalogName; + private final String baseUrl; + private transient MongoClient mongoClient; + private final String defaultDatabase; + + public MongodbCatalog(String catalogName, String baseUrl, String defaultDatabase) { + this.catalogName = catalogName; + this.baseUrl = baseUrl; + this.defaultDatabase = defaultDatabase; + } + + @Override + public void open() throws CatalogException { + try { + if (mongoClient == null) { + mongoClient = MongoClients.create(baseUrl); + } + } catch (Exception e) { + throw new CatalogException("Failed to open MongoDB Catalog: " + e.getMessage(), e); + } + } + + @Override + public String name() { + return catalogName; + } + + @Override + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + try { + return listDatabases().contains(databaseName); + } catch (Exception e) { + throw new CatalogException("Failed to check database existence: " + databaseName, e); + } + } + + @Override + public List listDatabases() throws CatalogException { + try { + List dbs = new ArrayList<>(); + for (String name : mongoClient.listDatabaseNames()) { + dbs.add(name); + } + return dbs; + } catch (Exception e) { + throw new CatalogException("Failed to list databases", e); + } + } + + @Override + public List listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(name(), databaseName); + } + try { + MongoDatabase db = mongoClient.getDatabase(databaseName); + return db.listCollectionNames().into(new ArrayList<>()); + } catch (Exception e) { + throw new CatalogException("Failed to list tables for database: " + databaseName, e); + } + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + try { + return listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName()); + } catch (DatabaseNotExistException e) { + return false; + } + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + throw CommonError.unsupportedOperation(name(), "get table with tablePath "); + } + + @Override + public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(name(), tablePath.getDatabaseName()); + } + if (tableExists(tablePath)) { + if (ignoreIfExists) return; + throw new TableAlreadyExistException(name(), tablePath); + } + try { + MongoDatabase db = mongoClient.getDatabase(tablePath.getDatabaseName()); + db.createCollection(tablePath.getTableName()); + } catch (Exception e) { + throw new CatalogException( + "Failed to create collection: " + tablePath.getFullName(), e); + } + } + + @Override + public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + if (ignoreIfNotExists) return; + throw new TableNotExistException(name(), tablePath); + } + try { + MongoDatabase db = mongoClient.getDatabase(tablePath.getDatabaseName()); + db.getCollection(tablePath.getTableName()).drop(); + } catch (Exception e) { + throw new CatalogException("Failed to drop collection: " + tablePath.getFullName(), e); + } + } + + @Override + public void createDatabase(TablePath tablePath, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + throw CommonError.unsupportedOperation(name(), "create database "); + } + + @Override + public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + throw CommonError.unsupportedOperation(name(), "drop database "); + } + + @Override + public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + if (!tableExists(tablePath)) { + if (ignoreIfNotExists) { + return; + } + throw new TableNotExistException(name(), tablePath); + } + MongoDatabase db = mongoClient.getDatabase(tablePath.getDatabaseName()); + MongoCollection collection = db.getCollection(tablePath.getTableName()); + collection.deleteMany(new Document()); + } catch (Exception e) { + throw new CatalogException( + "Failed to truncate collection: " + tablePath.getFullName(), e); + } + } + + @Override + public boolean isExistsData(TablePath tablePath) { + try { + if (!tableExists(tablePath)) { + return false; + } + MongoDatabase db = mongoClient.getDatabase(tablePath.getDatabaseName()); + MongoCollection collection = db.getCollection(tablePath.getTableName()); + return collection.estimatedDocumentCount() > 0; + } catch (Exception e) { + return false; + } + } + + @Override + public void close() throws CatalogException { + if (mongoClient != null) { + mongoClient.close(); + mongoClient = null; + } + } +} diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalogFactory.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalogFactory.java new file mode 100644 index 00000000000..05a22cd6c7c --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/catalog/MongodbCatalogFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig; + +import com.google.auto.service.AutoService; + +import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY; + +@AutoService(Factory.class) +public class MongodbCatalogFactory implements CatalogFactory { + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + return new MongodbCatalog( + catalogName, options.get(MongodbConfig.URI), options.get(MongodbConfig.DATABASE)); + } + + @Override + public String factoryIdentifier() { + return CONNECTOR_IDENTITY; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().required(MongodbConfig.URI, MongodbConfig.DATABASE).build(); + } +} diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java index 848a120e270..ce7f894c870 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java @@ -19,12 +19,18 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.sink.DataSaveMode; import org.bson.json.JsonMode; import org.bson.json.JsonWriterSettings; +import java.util.Arrays; import java.util.List; +import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA; +import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA; +import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS; + public class MongodbConfig { public static final String CONNECTOR_IDENTITY = "MongoDB"; @@ -153,4 +159,12 @@ public class MongodbConfig { public static final Option TRANSACTION = Options.key("transaction").booleanType().defaultValue(false).withDescription("."); + + public static final Option DATA_SAVE_MODE = + Options.key("data_save_mode") + .singleChoice( + DataSaveMode.class, + Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS)) + .defaultValue(APPEND_DATA) + .withDescription("The save mode of collection data"); } diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java index 563af55c195..fe1962b5388 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java @@ -19,25 +19,35 @@ import org.apache.seatunnel.api.serialization.DefaultSerializer; import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SaveModeHandler; +import org.apache.seatunnel.api.sink.SchemaSaveMode; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportSaveMode; +import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.connectors.seatunnel.mongodb.catalog.MongodbCatalog; import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer; import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters; import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.MongodbSinkAggregatedCommitter; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.savemode.MongodbSaveModeHandler; import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk; import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo; import java.util.Optional; +import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY; public class MongodbSink implements SeaTunnelSink< - SeaTunnelRow, DocumentBulk, MongodbCommitInfo, MongodbAggregatedCommitInfo> { + SeaTunnelRow, DocumentBulk, MongodbCommitInfo, MongodbAggregatedCommitInfo>, + SupportSaveMode { private final MongodbWriterOptions options; @@ -92,4 +102,24 @@ public Optional> getCommitInfoSerializer() { public Optional getWriteCatalogTable() { return Optional.ofNullable(catalogTable); } + + @Override + public Optional getSaveModeHandler() { + String url = options.getConnectString(); + String database = options.getDatabase(); + if (catalogTable != null) { + Optional catalogOptional = + Optional.of(new MongodbCatalog(CONNECTOR_IDENTITY, url, database)); + try { + DataSaveMode dataSaveMode = options.getDataSaveMode(); + Catalog catalog = catalogOptional.get(); + return Optional.of( + new MongodbSaveModeHandler( + SchemaSaveMode.IGNORE, dataSaveMode, catalog, catalogTable)); + } catch (Exception e) { + throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); + } + } + return Optional.empty(); + } } diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java index d8e2cf03bc8..43569e7d36c 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java @@ -19,6 +19,8 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -46,7 +48,8 @@ public OptionRule optionRule() { MongodbConfig.RETRY_MAX, MongodbConfig.RETRY_INTERVAL, MongodbConfig.UPSERT_ENABLE, - MongodbConfig.PRIMARY_KEY) + MongodbConfig.PRIMARY_KEY, + MongodbConfig.DATA_SAVE_MODE) .build(); } @@ -84,6 +87,12 @@ public TableSink createSink(TableSinkFactoryContext context) { if (readonlyConfig.getOptional(MongodbConfig.TRANSACTION).isPresent()) { builder.withTransaction(readonlyConfig.get(MongodbConfig.TRANSACTION)); } - return () -> new MongodbSink(builder.build(), context.getCatalogTable()); + builder.withDataSaveMode(readonlyConfig.get(MongodbConfig.DATA_SAVE_MODE)); + CatalogTable catalogTable = context.getCatalogTable(); + // sourceCatalogTable to sinkCatalogTable + TableIdentifier tableIdentifier = + TableIdentifier.of(CONNECTOR_IDENTITY, database, collection); + CatalogTable sinkCatalogTable = CatalogTable.of(tableIdentifier, catalogTable); + return () -> new MongodbSink(builder.build(), sinkCatalogTable); } } diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java index e9b82647756..d1e4ee4aebc 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.mongodb.sink; +import org.apache.seatunnel.api.sink.DataSaveMode; + import lombok.Getter; import java.io.Serializable; @@ -46,6 +48,8 @@ public class MongodbWriterOptions implements Serializable { protected final boolean transaction; + protected final DataSaveMode dataSaveMode; + public MongodbWriterOptions( String connectString, String database, @@ -56,7 +60,8 @@ public MongodbWriterOptions( String[] primaryKey, int retryMax, long retryInterval, - boolean transaction) { + boolean transaction, + DataSaveMode dataSaveMode) { this.connectString = connectString; this.database = database; this.collection = collection; @@ -67,6 +72,7 @@ public MongodbWriterOptions( this.retryMax = retryMax; this.retryInterval = retryInterval; this.transaction = transaction; + this.dataSaveMode = dataSaveMode; } public static Builder builder() { @@ -95,6 +101,8 @@ public static class Builder { protected boolean transaction; + protected DataSaveMode dataSaveMode; + public Builder withConnectString(String connectString) { this.connectString = connectString; return this; @@ -145,6 +153,11 @@ public Builder withTransaction(boolean transaction) { return this; } + public Builder withDataSaveMode(DataSaveMode dataSaveMode) { + this.dataSaveMode = dataSaveMode; + return this; + } + public MongodbWriterOptions build() { return new MongodbWriterOptions( connectString, @@ -156,7 +169,8 @@ public MongodbWriterOptions build() { primaryKey, retryMax, retryInterval, - transaction); + transaction, + dataSaveMode); } } } diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/savemode/MongodbSaveModeHandler.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/savemode/MongodbSaveModeHandler.java new file mode 100644 index 00000000000..4846fded5f7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/savemode/MongodbSaveModeHandler.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.savemode; + +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.DefaultSaveModeHandler; +import org.apache.seatunnel.api.sink.SchemaSaveMode; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; + +public class MongodbSaveModeHandler extends DefaultSaveModeHandler { + public MongodbSaveModeHandler( + SchemaSaveMode schemaSaveMode, + DataSaveMode dataSaveMode, + Catalog catalog, + CatalogTable catalogTable) { + super(schemaSaveMode, dataSaveMode, catalog, catalogTable, null); + } + + public void handleSaveMode() { + // mongodb remove schema save mode,only data save mde + handleDataSaveMode(); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java index cf0b1dbd091..41a96abf02e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java @@ -31,6 +31,9 @@ import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; @@ -119,7 +122,7 @@ protected void initSourceData() { prepareInitDataInCollection(MONGODB_DOUBLE_TABLE, TEST_DOUBLE_DATASET); } - protected void clearDate(String table) { + protected void clearData(String table) { client.getDatabase(MONGODB_DATABASE).getCollection(table).drop(); } @@ -247,6 +250,14 @@ public void startUp() { .withNetwork(NETWORK) .withNetworkAliases(MONGODB_CONTAINER_HOST) .withExposedPorts(MONGODB_PORT) + .withCreateContainerCmdModifier( + cmd -> + cmd.getHostConfig() + .withPortBindings( + new PortBinding( + Ports.Binding.bindPort( + MONGODB_PORT), + new ExposedPort(MONGODB_PORT)))) .waitingFor( new HttpWaitStrategy() .forPort(MONGODB_PORT) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java index 6db7db4fa15..b249dc2e88a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java @@ -63,7 +63,7 @@ public void testMongodbCDCUpsertSink(TestContainer container) .map(Map.Entry::getValue) .collect(Collectors.toCollection(ArrayList::new))) .collect(Collectors.toList())); - clearDate(MONGODB_CDC_RESULT_TABLE); + clearData(MONGODB_CDC_RESULT_TABLE); } @TestTemplate @@ -85,6 +85,6 @@ public void testMongodbCDCSink(TestContainer container) .map(Map.Entry::getValue) .collect(Collectors.toCollection(ArrayList::new))) .collect(Collectors.toList())); - clearDate(MONGODB_CDC_RESULT_TABLE); + clearData(MONGODB_CDC_RESULT_TABLE); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java index b289af315f2..4a721333e94 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java @@ -17,20 +17,48 @@ package org.apache.seatunnel.e2e.connector.v2.mongodb; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SaveModeHandler; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer; +import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongoKeyExtractor; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbSink; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo; import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.bson.BsonDocument; import org.bson.Document; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.WriteModel; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY; + @Slf4j public class MongodbIT extends AbstractMongodbIT { @@ -42,7 +70,7 @@ public void testMongodbSourceAndSink(TestContainer container) Container.ExecResult assertResult = container.executeJob("/mongodb_source_to_assert.conf"); Assertions.assertEquals(0, assertResult.getExitCode(), assertResult.getStderr()); - clearDate(MONGODB_SINK_TABLE); + clearData(MONGODB_SINK_TABLE); } @TestTemplate @@ -59,8 +87,8 @@ public void testMongodbNullValue(TestContainer container) readMongodbData(MONGODB_NULL_TABLE_RESULT).stream() .peek(e -> e.remove("_id")) .collect(Collectors.toList())); - clearDate(MONGODB_NULL_TABLE); - clearDate(MONGODB_NULL_TABLE_RESULT); + clearData(MONGODB_NULL_TABLE); + clearData(MONGODB_NULL_TABLE_RESULT); } @TestTemplate @@ -78,7 +106,7 @@ public void testMongodbSourceMatch(TestContainer container) readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream() .peek(e -> e.remove("_id")) .collect(Collectors.toList())); - clearDate(MONGODB_MATCH_RESULT_TABLE); + clearData(MONGODB_MATCH_RESULT_TABLE); Container.ExecResult projectionResult = container.executeJob("/matchIT/mongodb_matchProjection_source_to_assert.conf"); @@ -93,7 +121,7 @@ public void testMongodbSourceMatch(TestContainer container) readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream() .peek(e -> e.remove("_id")) .collect(Collectors.toList())); - clearDate(MONGODB_MATCH_RESULT_TABLE); + clearData(MONGODB_MATCH_RESULT_TABLE); } @TestTemplate @@ -112,7 +140,7 @@ public void testFakeSourceToUpdateMongodb(TestContainer container) container.executeJob("/updateIT/update_mongodb_to_assert.conf"); Assertions.assertEquals(0, assertResult.getExitCode(), assertResult.getStderr()); - clearDate(MONGODB_UPDATE_TABLE); + clearData(MONGODB_UPDATE_TABLE); } @TestTemplate @@ -126,7 +154,7 @@ public void testFlatSyncString(TestContainer container) container.executeJob("/flatIT/mongodb_flat_source_to_assert.conf"); Assertions.assertEquals(0, assertResult.getExitCode(), assertResult.getStderr()); - clearDate(MONGODB_FLAT_TABLE); + clearData(MONGODB_FLAT_TABLE); } @TestTemplate @@ -144,7 +172,7 @@ public void testMongodbSourceSplit(TestContainer container) readMongodbData(MONGODB_SPLIT_RESULT_TABLE).stream() .peek(e -> e.remove("_id")) .collect(Collectors.toList())); - clearDate(MONGODB_SPLIT_RESULT_TABLE); + clearData(MONGODB_SPLIT_RESULT_TABLE); Container.ExecResult projectionResult = container.executeJob("/splitIT/mongodb_split_size_source_to_assert.conf"); @@ -158,7 +186,7 @@ public void testMongodbSourceSplit(TestContainer container) readMongodbData(MONGODB_SPLIT_RESULT_TABLE).stream() .peek(e -> e.remove("_id")) .collect(Collectors.toList())); - clearDate(MONGODB_SPLIT_RESULT_TABLE); + clearData(MONGODB_SPLIT_RESULT_TABLE); } @TestTemplate @@ -177,7 +205,7 @@ public void testCompatibleParameters(TestContainer container) container.executeJob("/updateIT/update_mongodb_to_assert.conf"); Assertions.assertEquals(0, assertResult.getExitCode(), assertResult.getStderr()); - clearDate(MONGODB_UPDATE_TABLE); + clearData(MONGODB_UPDATE_TABLE); // `matchQuery` compatible test Container.ExecResult queryResult = @@ -192,7 +220,7 @@ public void testCompatibleParameters(TestContainer container) readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream() .peek(e -> e.remove("_id")) .collect(Collectors.toList())); - clearDate(MONGODB_MATCH_RESULT_TABLE); + clearData(MONGODB_MATCH_RESULT_TABLE); } @TestTemplate @@ -218,8 +246,8 @@ public void testTransactionSinkAndUpsert(TestContainer container) Assertions.assertEquals( 0, assertUpsertResult.getExitCode(), assertUpsertResult.getStderr()); - clearDate(MONGODB_TRANSACTION_SINK_TABLE); - clearDate(MONGODB_TRANSACTION_UPSERT_TABLE); + clearData(MONGODB_TRANSACTION_SINK_TABLE); + clearData(MONGODB_TRANSACTION_UPSERT_TABLE); } @TestTemplate @@ -235,6 +263,163 @@ public void testMongodbDoubleValue(TestContainer container) readMongodbData(MONGODB_DOUBLE_TABLE_RESULT).stream() .peek(e -> e.remove("_id")) .collect(Collectors.toList())); - clearDate(MONGODB_DOUBLE_TABLE_RESULT); + clearData(MONGODB_DOUBLE_TABLE_RESULT); + } + + @SneakyThrows + @TestTemplate + public void testDropDataSaveMode(TestContainer container) { + // test drop data save mode + String collectionName = "drop_data_save_mode_coll"; + MongoCollection collection = + client.getDatabase(MONGODB_DATABASE) + .getCollection(collectionName, BsonDocument.class); + // insert one row + beforeInsertData(collectionName, DataSaveMode.DROP_DATA, collection); + // build sink + final MongodbSink mongoDbSink = getSinkInstance(collectionName, DataSaveMode.DROP_DATA); + final SinkWriter writer = + mongoDbSink.createWriter(null); + final Optional saveModeHandlerOptional = mongoDbSink.getSaveModeHandler(); + // do save mode + if (saveModeHandlerOptional.isPresent()) { + final SaveModeHandler saveModeHandler = saveModeHandlerOptional.get(); + saveModeHandler.open(); + saveModeHandler.handleSaveMode(); + saveModeHandler.close(); + } + // do write + writer.write(getSeaTunnelRowOne()); + Assertions.assertEquals(1L, collection.countDocuments()); + // clear + collection.drop(); + } + + @SneakyThrows + @TestTemplate + public void testAppendDataSaveMode(TestContainer container) { + // test drop data save mode + String collectionName = "append_data_save_mode_coll"; + MongoCollection collection = + client.getDatabase(MONGODB_DATABASE) + .getCollection(collectionName, BsonDocument.class); + // insert one row + beforeInsertData(collectionName, DataSaveMode.APPEND_DATA, collection); + // build sink + final MongodbSink mongoDbSink = getSinkInstance(collectionName, DataSaveMode.APPEND_DATA); + final SinkWriter writer = + mongoDbSink.createWriter(null); + final Optional saveModeHandlerOptional = mongoDbSink.getSaveModeHandler(); + // do save mode + if (saveModeHandlerOptional.isPresent()) { + final SaveModeHandler saveModeHandler = saveModeHandlerOptional.get(); + saveModeHandler.open(); + saveModeHandler.handleSaveMode(); + saveModeHandler.close(); + } + // do write + writer.write(getSeaTunnelRowOne()); + Assertions.assertEquals(3L, collection.countDocuments()); + // clear + collection.drop(); + } + + @SneakyThrows + @TestTemplate + public void testErrorWhenDataExistsSaveMode(TestContainer container) { + // test drop data save mode + String collectionName = "error_data_save_mode_coll"; + MongoCollection collection = + client.getDatabase(MONGODB_DATABASE) + .getCollection(collectionName, BsonDocument.class); + // insert one row + beforeInsertData(collectionName, DataSaveMode.ERROR_WHEN_DATA_EXISTS, collection); + // build sink + final MongodbSink mongoDbSink = + getSinkInstance(collectionName, DataSaveMode.ERROR_WHEN_DATA_EXISTS); + final SinkWriter writer = + mongoDbSink.createWriter(null); + final Optional saveModeHandlerOptional = mongoDbSink.getSaveModeHandler(); + // do save mode + if (saveModeHandlerOptional.isPresent()) { + final SaveModeHandler saveModeHandler = saveModeHandlerOptional.get(); + saveModeHandler.open(); + Assertions.assertThrows( + SeaTunnelRuntimeException.class, + saveModeHandler::handleDataSaveMode, + "When there exist data, an error will be reported"); + saveModeHandler.close(); + } + Assertions.assertEquals(2L, collection.countDocuments()); + // clear + collection.drop(); + } + + private void beforeInsertData( + String collection, + DataSaveMode dataSaveMode, + MongoCollection dropDataCollection) { + final RowDataDocumentSerializer rowDataDocumentSerializer = + new RowDataDocumentSerializer( + RowDataToBsonConverters.createConverter( + getCatalogTable(collection).getSeaTunnelRowType()), + getMongodbWriterOptions(collection, dataSaveMode), + new MongoKeyExtractor(getMongodbWriterOptions(collection, dataSaveMode))); + WriteModel bsonDocumentWriteModelOne = + rowDataDocumentSerializer.serializeToWriteModel(getSeaTunnelRowOne()); + WriteModel bsonDocumentWriteModelTwo = + rowDataDocumentSerializer.serializeToWriteModel(getSeaTunnelRowTwo()); + List> writeModelList = new ArrayList<>(); + writeModelList.add(bsonDocumentWriteModelOne); + writeModelList.add(bsonDocumentWriteModelTwo); + dropDataCollection.bulkWrite(writeModelList); + } + + private SeaTunnelRow getSeaTunnelRowOne() { + return new SeaTunnelRow(new Object[] {1L, "A", 100}); + } + + private SeaTunnelRow getSeaTunnelRowTwo() { + return new SeaTunnelRow(new Object[] {2L, "B", 200}); + } + + private MongodbSink getSinkInstance(String collection, DataSaveMode dataSaveMode) { + return new MongodbSink( + getMongodbWriterOptions(collection, dataSaveMode), getCatalogTable(collection)); + } + + private MongodbWriterOptions getMongodbWriterOptions( + String collection, DataSaveMode dataSaveMode) { + String host = mongodbContainer.getContainerIpAddress(); + int port = mongodbContainer.getFirstMappedPort(); + String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE); + return MongodbWriterOptions.builder() + .withConnectString(url) + .withDatabase(MONGODB_DATABASE) + .withCollection(collection) + .withDataSaveMode(dataSaveMode) + .withFlushSize(1) + .build(); + } + + private CatalogTable getCatalogTable(String collection) { + return CatalogTable.of( + TableIdentifier.of(CONNECTOR_IDENTITY, MONGODB_DATABASE, collection), + getTableSchema(), + new HashMap<>(), + new ArrayList<>(), + ""); + } + + private TableSchema getTableSchema() { + return new TableSchema(getColumns(), null, null); + } + + private List getColumns() { + List columns = new ArrayList<>(); + columns.add(new PhysicalColumn("c_int", BasicType.LONG_TYPE, 64L, 0, true, "", "")); + columns.add(new PhysicalColumn("name", BasicType.STRING_TYPE, 100L, 0, true, "", "")); + columns.add(new PhysicalColumn("score", BasicType.INT_TYPE, 32L, 0, true, "", "")); + return columns; } }