Skip to content

Commit 5247e17

Browse files
authored
[improve] milvus options (#9165)
1 parent b24a022 commit 5247e17

File tree

15 files changed

+59
-61
lines changed

15 files changed

+59
-61
lines changed

seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,11 @@ private Set<String> buildWhiteList() {
204204
whiteList.add("SlsSourceOptions");
205205
whiteList.add("SentrySinkOptions");
206206
whiteList.add("QdrantSinkOptions");
207-
whiteList.add("MilvusSourceOptions");
208207
whiteList.add("RocketMqSinkOptions");
209208
whiteList.add("MaxcomputeSourceOptions");
210209
whiteList.add("KuduSourceOptions");
211210
whiteList.add("SocketSinkOptions");
212211
whiteList.add("SelectDBSinkOptions");
213-
whiteList.add("MilvusSinkOptions");
214212
whiteList.add("RocketMqSourceOptions");
215213
whiteList.add("TablestoreSinkOptions");
216214
whiteList.add("TableStoreDBSourceOptions");

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/catalog/MilvusCatalog.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
3434
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
3535
import org.apache.seatunnel.api.table.type.CommonOptions;
36-
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
36+
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions;
3737
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
3838
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
3939
import org.apache.seatunnel.connectors.seatunnel.milvus.utils.sink.MilvusSinkConverter;
@@ -71,7 +71,7 @@
7171
import java.util.Objects;
7272
import java.util.Optional;
7373

74-
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.CREATE_INDEX;
74+
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.CREATE_INDEX;
7575
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
7676

7777
@Slf4j
@@ -91,8 +91,8 @@ public MilvusCatalog(String catalogName, ReadonlyConfig config) {
9191
public void open() throws CatalogException {
9292
ConnectParam connectParam =
9393
ConnectParam.newBuilder()
94-
.withUri(config.get(MilvusSinkConfig.URL))
95-
.withToken(config.get(MilvusSinkConfig.TOKEN))
94+
.withUri(config.get(MilvusSinkOptions.URL))
95+
.withToken(config.get(MilvusSinkOptions.TOKEN))
9696
.build();
9797
try {
9898
this.client = new MilvusServiceClient(connectParam);
@@ -242,9 +242,9 @@ public void createTableInternal(TablePath tablePath, CatalogTable catalogTable)
242242
String partitionKeyField =
243243
existPartitionKeyField ? options.get(MilvusOptions.PARTITION_KEY_FIELD) : null;
244244
// if options set, will overwrite aut read
245-
if (StringUtils.isNotEmpty(config.get(MilvusSinkConfig.PARTITION_KEY))) {
245+
if (StringUtils.isNotEmpty(config.get(MilvusSinkOptions.PARTITION_KEY))) {
246246
existPartitionKeyField = true;
247-
partitionKeyField = config.get(MilvusSinkConfig.PARTITION_KEY);
247+
partitionKeyField = config.get(MilvusSinkOptions.PARTITION_KEY);
248248
}
249249

250250
TableSchema tableSchema = catalogTable.getTableSchema();
@@ -261,21 +261,21 @@ public void createTableInternal(TablePath tablePath, CatalogTable catalogTable)
261261
column,
262262
tableSchema.getPrimaryKey(),
263263
partitionKeyField,
264-
config.get(MilvusSinkConfig.ENABLE_AUTO_ID));
264+
config.get(MilvusSinkOptions.ENABLE_AUTO_ID));
265265
fieldTypes.add(fieldType);
266266
}
267267

268268
Boolean enableDynamicField =
269269
(options.containsKey(MilvusOptions.ENABLE_DYNAMIC_FIELD))
270270
? Boolean.valueOf(options.get(MilvusOptions.ENABLE_DYNAMIC_FIELD))
271-
: config.get(MilvusSinkConfig.ENABLE_DYNAMIC_FIELD);
271+
: config.get(MilvusSinkOptions.ENABLE_DYNAMIC_FIELD);
272272
String collectionDescription = "";
273-
if (config.get(MilvusSinkConfig.COLLECTION_DESCRIPTION) != null
274-
&& config.get(MilvusSinkConfig.COLLECTION_DESCRIPTION)
273+
if (config.get(MilvusSinkOptions.COLLECTION_DESCRIPTION) != null
274+
&& config.get(MilvusSinkOptions.COLLECTION_DESCRIPTION)
275275
.containsKey(tablePath.getTableName())) {
276276
// use description from config first
277277
collectionDescription =
278-
config.get(MilvusSinkConfig.COLLECTION_DESCRIPTION)
278+
config.get(MilvusSinkOptions.COLLECTION_DESCRIPTION)
279279
.get(tablePath.getTableName());
280280
} else if (null != catalogTable.getComment()) {
281281
collectionDescription = catalogTable.getComment();

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusCommonConfig.java renamed to seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusBaseOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
2222

23-
public abstract class MilvusCommonConfig {
23+
public abstract class MilvusBaseOptions {
2424

2525
public static final String CONNECTOR_IDENTITY = "Milvus";
2626

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkConfig.java renamed to seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
3131
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
3232

33-
public class MilvusSinkConfig extends MilvusCommonConfig {
33+
public class MilvusSinkOptions extends MilvusBaseOptions {
3434

3535
public static final Option<String> DATABASE =
3636
Options.key("database").stringType().noDefaultValue().withDescription("database");

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceConfig.java renamed to seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSourceOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
2222

23-
public class MilvusSourceConfig extends MilvusCommonConfig {
23+
public class MilvusSourceOptions extends MilvusBaseOptions {
2424

2525
public static final Option<String> DATABASE =
2626
Options.key("database")

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusBufferBatchWriter.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@
5353
import java.util.Objects;
5454
import java.util.concurrent.atomic.AtomicLong;
5555

56-
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.BATCH_SIZE;
57-
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.CREATE_INDEX;
58-
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.ENABLE_AUTO_ID;
59-
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.ENABLE_UPSERT;
60-
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.LOAD_COLLECTION;
61-
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.RATE_LIMIT;
62-
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.TOKEN;
63-
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig.URL;
56+
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.BATCH_SIZE;
57+
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.CREATE_INDEX;
58+
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.ENABLE_AUTO_ID;
59+
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.ENABLE_UPSERT;
60+
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.LOAD_COLLECTION;
61+
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.RATE_LIMIT;
62+
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.TOKEN;
63+
import static org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions.URL;
6464

6565
@Slf4j
6666
public class MilvusBufferBatchWriter {

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.seatunnel.api.table.factory.CatalogFactory;
3434
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3535
import org.apache.seatunnel.connectors.seatunnel.milvus.catalog.MilvusCatalogFactory;
36-
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
36+
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions;
3737
import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusAggregatedCommitInfo;
3838
import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusCommitInfo;
3939
import org.apache.seatunnel.connectors.seatunnel.milvus.state.MilvusSinkState;
@@ -90,7 +90,7 @@ public Optional<Serializer<MilvusCommitInfo>> getCommitInfoSerializer() {
9090

9191
@Override
9292
public String getPluginName() {
93-
return MilvusSinkConfig.CONNECTOR_IDENTITY;
93+
return MilvusSinkOptions.CONNECTOR_IDENTITY;
9494
}
9595

9696
@Override
@@ -102,8 +102,8 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
102102
CatalogFactory catalogFactory = new MilvusCatalogFactory();
103103
Catalog catalog = catalogFactory.createCatalog(catalogTable.getCatalogName(), config);
104104

105-
SchemaSaveMode schemaSaveMode = config.get(MilvusSinkConfig.SCHEMA_SAVE_MODE);
106-
DataSaveMode dataSaveMode = config.get(MilvusSinkConfig.DATA_SAVE_MODE);
105+
SchemaSaveMode schemaSaveMode = config.get(MilvusSinkOptions.SCHEMA_SAVE_MODE);
106+
DataSaveMode dataSaveMode = config.get(MilvusSinkOptions.DATA_SAVE_MODE);
107107

108108
return Optional.of(
109109
new DefaultSaveModeHandler(

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkFactory.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.seatunnel.api.table.factory.Factory;
2626
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
2727
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
28-
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
28+
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkOptions;
2929

3030
import org.apache.commons.lang3.StringUtils;
3131

@@ -42,13 +42,13 @@ public String factoryIdentifier() {
4242
@Override
4343
public OptionRule optionRule() {
4444
return OptionRule.builder()
45-
.required(MilvusSinkConfig.URL, MilvusSinkConfig.TOKEN)
45+
.required(MilvusSinkOptions.URL, MilvusSinkOptions.TOKEN)
4646
.optional(
47-
MilvusSinkConfig.ENABLE_UPSERT,
48-
MilvusSinkConfig.ENABLE_DYNAMIC_FIELD,
49-
MilvusSinkConfig.ENABLE_AUTO_ID,
50-
MilvusSinkConfig.SCHEMA_SAVE_MODE,
51-
MilvusSinkConfig.DATA_SAVE_MODE)
47+
MilvusSinkOptions.ENABLE_UPSERT,
48+
MilvusSinkOptions.ENABLE_DYNAMIC_FIELD,
49+
MilvusSinkOptions.ENABLE_AUTO_ID,
50+
MilvusSinkOptions.SCHEMA_SAVE_MODE,
51+
MilvusSinkOptions.DATA_SAVE_MODE)
5252
.build();
5353
}
5454

@@ -62,8 +62,8 @@ private CatalogTable renameCatalogTable(
6262
ReadonlyConfig config, CatalogTable sourceCatalogTable) {
6363
TableIdentifier sourceTableId = sourceCatalogTable.getTableId();
6464
String databaseName;
65-
if (StringUtils.isNotEmpty(config.get(MilvusSinkConfig.DATABASE))) {
66-
databaseName = config.get(MilvusSinkConfig.DATABASE);
65+
if (StringUtils.isNotEmpty(config.get(MilvusSinkOptions.DATABASE))) {
66+
databaseName = config.get(MilvusSinkOptions.DATABASE);
6767
} else {
6868
databaseName = sourceTableId.getDatabaseName();
6969
}

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2828
import org.apache.seatunnel.api.table.catalog.TablePath;
2929
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
30-
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
30+
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions;
3131
import org.apache.seatunnel.connectors.seatunnel.milvus.utils.MilvusConvertUtils;
3232

3333
import java.util.ArrayList;
@@ -79,6 +79,6 @@ public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> restoreEnumer
7979

8080
@Override
8181
public String getPluginName() {
82-
return MilvusSourceConfig.CONNECTOR_IDENTITY;
82+
return MilvusSourceOptions.CONNECTOR_IDENTITY;
8383
}
8484
}

seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.seatunnel.api.table.factory.Factory;
2525
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
2626
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
27-
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceConfig;
27+
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions;
2828

2929
import com.google.auto.service.AutoService;
3030
import lombok.extern.slf4j.Slf4j;
@@ -44,8 +44,8 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
4444
@Override
4545
public OptionRule optionRule() {
4646
return OptionRule.builder()
47-
.required(MilvusSourceConfig.URL, MilvusSourceConfig.TOKEN)
48-
.optional(MilvusSourceConfig.DATABASE, MilvusSourceConfig.COLLECTION)
47+
.required(MilvusSourceOptions.URL, MilvusSourceOptions.TOKEN)
48+
.optional(MilvusSourceOptions.DATABASE, MilvusSourceOptions.COLLECTION)
4949
.build();
5050
}
5151

0 commit comments

Comments
 (0)