Skip to content

Commit bde2350

Browse files
[Fix][Connector-V2] Fix load state check in MilvusSourceReader to consider partition-level status (#8937)
1 parent 3e64a42 commit bde2350

File tree

5 files changed

+210
-14
lines changed

5 files changed

+210
-14
lines changed

Diff for: seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSource.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ public class MilvusSource
4242
private final ReadonlyConfig config;
4343
private final Map<TablePath, CatalogTable> sourceTables;
4444

45-
public MilvusSource(ReadonlyConfig sourceConfing) {
46-
this.config = sourceConfing;
47-
MilvusConvertUtils milvusConvertUtils = new MilvusConvertUtils(sourceConfing);
45+
public MilvusSource(ReadonlyConfig sourceConfig) {
46+
this.config = sourceConfig;
47+
MilvusConvertUtils milvusConvertUtils = new MilvusConvertUtils(sourceConfig);
4848
this.sourceTables = milvusConvertUtils.getSourceTables();
4949
}
5050

@@ -66,15 +66,15 @@ public SourceReader<SeaTunnelRow, MilvusSourceSplit> createReader(
6666
@Override
6767
public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> createEnumerator(
6868
SourceSplitEnumerator.Context<MilvusSourceSplit> context) throws Exception {
69-
return new MilvusSourceSplitEnumertor(context, config, sourceTables, null);
69+
return new MilvusSourceSplitEnumerator(context, config, sourceTables, null);
7070
}
7171

7272
@Override
7373
public SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> restoreEnumerator(
7474
SourceSplitEnumerator.Context<MilvusSourceSplit> context,
7575
MilvusSourceState checkpointState)
7676
throws Exception {
77-
return new MilvusSourceSplitEnumertor(context, config, sourceTables, checkpointState);
77+
return new MilvusSourceSplitEnumerator(context, config, sourceTables, checkpointState);
7878
}
7979

8080
@Override

Diff for: seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceReader.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,16 @@ private void pollNextData(MilvusSourceSplit split, Collector<SeaTunnelRow> outpu
162162
MilvusConnectionErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL);
163163
}
164164

165-
R<GetLoadStateResponse> loadStateResponse =
166-
client.getLoadState(
167-
GetLoadStateParam.newBuilder()
168-
.withDatabaseName(tablePath.getDatabaseName())
169-
.withCollectionName(tablePath.getTableName())
170-
.build());
165+
GetLoadStateParam.Builder loadStateParam =
166+
GetLoadStateParam.newBuilder()
167+
.withDatabaseName(tablePath.getDatabaseName())
168+
.withCollectionName(tablePath.getTableName());
169+
170+
if (StringUtils.isNotEmpty(partitionName)) {
171+
loadStateParam.withPartitionNames(Collections.singletonList(partitionName));
172+
}
173+
174+
R<GetLoadStateResponse> loadStateResponse = client.getLoadState(loadStateParam.build());
171175
if (loadStateResponse.getStatus() != R.Status.Success.getCode()) {
172176
throw new MilvusConnectorException(
173177
MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED,
+2-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import java.util.concurrent.ConcurrentLinkedQueue;
4949

5050
@Slf4j
51-
public class MilvusSourceSplitEnumertor
51+
public class MilvusSourceSplitEnumerator
5252
implements SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> {
5353

5454
private final Map<TablePath, CatalogTable> tables;
@@ -60,7 +60,7 @@ public class MilvusSourceSplitEnumertor
6060

6161
private final ReadonlyConfig config;
6262

63-
public MilvusSourceSplitEnumertor(
63+
public MilvusSourceSplitEnumerator(
6464
Context<MilvusSourceSplit> context,
6565
ReadonlyConfig config,
6666
Map<TablePath, CatalogTable> sourceTables,

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java

+155-1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public class MilvusIT extends TestSuiteBase implements TestResource {
9797
private static final String COLLECTION_NAME = "simple_example";
9898
private static final String COLLECTION_NAME_1 = "simple_example_1";
9999
private static final String COLLECTION_NAME_2 = "simple_example_2";
100+
private static final String COLLECTION_NAME_WITH_PARTITIONKEY =
101+
"simple_example_with_partitionkey";
100102
private static final String ID_FIELD = "book_id";
101103
private static final String VECTOR_FIELD = "book_intro";
102104
private static final String VECTOR_FIELD2 = "book_kind";
@@ -243,6 +245,112 @@ private void initSourceData() {
243245

244246
log.info("Collection created");
245247

248+
// Define fields With Partition Key
249+
List<FieldType> fieldsSchemaWithPartitionKey =
250+
Arrays.asList(
251+
FieldType.newBuilder()
252+
.withName(ID_FIELD)
253+
.withDataType(DataType.Int64)
254+
.withPrimaryKey(true)
255+
.withAutoID(false)
256+
.build(),
257+
FieldType.newBuilder()
258+
.withName(VECTOR_FIELD)
259+
.withDataType(DataType.FloatVector)
260+
.withDimension(VECTOR_DIM)
261+
.build(),
262+
FieldType.newBuilder()
263+
.withName(VECTOR_FIELD2)
264+
.withDataType(DataType.Float16Vector)
265+
.withDimension(VECTOR_DIM)
266+
.build(),
267+
FieldType.newBuilder()
268+
.withName(VECTOR_FIELD3)
269+
.withDataType(DataType.BinaryVector)
270+
.withDimension(VECTOR_DIM * 2)
271+
.build(),
272+
FieldType.newBuilder()
273+
.withName(VECTOR_FIELD4)
274+
.withDataType(DataType.SparseFloatVector)
275+
.build(),
276+
FieldType.newBuilder()
277+
.withName(TITLE_FIELD)
278+
.withDataType(DataType.VarChar)
279+
.withPartitionKey(true)
280+
.withMaxLength(64)
281+
.build());
282+
283+
// Create the collection with 3 fields
284+
R<RpcStatus> ret2 =
285+
milvusClient.createCollection(
286+
CreateCollectionParam.newBuilder()
287+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
288+
.withFieldTypes(fieldsSchemaWithPartitionKey)
289+
.build());
290+
if (ret2.getStatus() != R.Status.Success.getCode()) {
291+
throw new RuntimeException("Failed to create collection! Error: " + ret.getMessage());
292+
}
293+
294+
// Specify an index type on the vector field.
295+
ret2 =
296+
milvusClient.createIndex(
297+
CreateIndexParam.newBuilder()
298+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
299+
.withFieldName(VECTOR_FIELD)
300+
.withIndexType(IndexType.FLAT)
301+
.withMetricType(MetricType.L2)
302+
.build());
303+
if (ret2.getStatus() != R.Status.Success.getCode()) {
304+
throw new RuntimeException(
305+
"Failed to create index on vector field! Error: " + ret.getMessage());
306+
}
307+
308+
ret2 =
309+
milvusClient.createIndex(
310+
CreateIndexParam.newBuilder()
311+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
312+
.withFieldName(VECTOR_FIELD2)
313+
.withIndexType(IndexType.FLAT)
314+
.withMetricType(MetricType.L2)
315+
.build());
316+
if (ret2.getStatus() != R.Status.Success.getCode()) {
317+
throw new RuntimeException(
318+
"Failed to create index on vector field! Error: " + ret.getMessage());
319+
}
320+
ret2 =
321+
milvusClient.createIndex(
322+
CreateIndexParam.newBuilder()
323+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
324+
.withFieldName(VECTOR_FIELD3)
325+
.withIndexType(IndexType.BIN_FLAT)
326+
.withMetricType(MetricType.HAMMING)
327+
.build());
328+
if (ret2.getStatus() != R.Status.Success.getCode()) {
329+
throw new RuntimeException(
330+
"Failed to create index on vector field! Error: " + ret.getMessage());
331+
}
332+
333+
ret2 =
334+
milvusClient.createIndex(
335+
CreateIndexParam.newBuilder()
336+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
337+
.withFieldName(VECTOR_FIELD4)
338+
.withIndexType(IndexType.SPARSE_INVERTED_INDEX)
339+
.withMetricType(MetricType.IP)
340+
.build());
341+
if (ret2.getStatus() != R.Status.Success.getCode()) {
342+
throw new RuntimeException(
343+
"Failed to create index on vector field! Error: " + ret.getMessage());
344+
}
345+
346+
// Call loadCollection() to enable automatically loading data into memory for searching
347+
milvusClient.loadCollection(
348+
LoadCollectionParam.newBuilder()
349+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
350+
.build());
351+
352+
log.info("Collection created");
353+
246354
// Insert 10 records into the collection
247355
List<JsonObject> rows = new ArrayList<>();
248356
for (long i = 1L; i <= 10; ++i) {
@@ -272,7 +380,16 @@ private void initSourceData() {
272380
.withCollectionName(COLLECTION_NAME)
273381
.withRows(rows)
274382
.build());
275-
if (insertRet.getStatus() != R.Status.Success.getCode()) {
383+
384+
R<MutationResult> insertRet2 =
385+
milvusClient.insert(
386+
InsertParam.newBuilder()
387+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
388+
.withRows(rows)
389+
.build());
390+
391+
if (insertRet.getStatus() != R.Status.Success.getCode()
392+
|| insertRet2.getStatus() != R.Status.Success.getCode()) {
276393
throw new RuntimeException("Failed to insert! Error: " + insertRet.getMessage());
277394
}
278395
}
@@ -322,6 +439,43 @@ public void testMilvus(TestContainer container) throws IOException, InterruptedE
322439
Assertions.assertTrue(fileds.contains(TITLE_FIELD));
323440
}
324441

442+
@TestTemplate
443+
public void testMilvusWithPartitionKey(TestContainer container)
444+
throws IOException, InterruptedException {
445+
Container.ExecResult execResult =
446+
container.executeJob("/milvus-to-milvus-with-partitionkey.conf");
447+
Assertions.assertEquals(0, execResult.getExitCode());
448+
449+
// assert table exist
450+
R<Boolean> hasCollectionResponse =
451+
this.milvusClient.hasCollection(
452+
HasCollectionParam.newBuilder()
453+
.withDatabaseName("test")
454+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
455+
.build());
456+
Assertions.assertTrue(hasCollectionResponse.getData());
457+
458+
// check table fields
459+
R<DescribeCollectionResponse> describeCollectionResponseR =
460+
this.milvusClient.describeCollection(
461+
DescribeCollectionParam.newBuilder()
462+
.withDatabaseName("test")
463+
.withCollectionName(COLLECTION_NAME_WITH_PARTITIONKEY)
464+
.build());
465+
466+
DescribeCollectionResponse data = describeCollectionResponseR.getData();
467+
List<String> fileds =
468+
data.getSchema().getFieldsList().stream()
469+
.map(FieldSchema::getName)
470+
.collect(Collectors.toList());
471+
Assertions.assertTrue(fileds.contains(ID_FIELD));
472+
Assertions.assertTrue(fileds.contains(VECTOR_FIELD));
473+
Assertions.assertTrue(fileds.contains(VECTOR_FIELD2));
474+
Assertions.assertTrue(fileds.contains(VECTOR_FIELD3));
475+
Assertions.assertTrue(fileds.contains(VECTOR_FIELD4));
476+
Assertions.assertTrue(fileds.contains(TITLE_FIELD));
477+
}
478+
325479
@TestTemplate
326480
public void testFakeToMilvus(TestContainer container) throws IOException, InterruptedException {
327481
Container.ExecResult execResult = container.executeJob("/fake-to-milvus.conf");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
}
22+
23+
source {
24+
Milvus {
25+
url = "http://milvus-e2e:19530"
26+
token = "root:Milvus"
27+
collection = "simple_example_with_partitionkey"
28+
}
29+
}
30+
31+
sink {
32+
Milvus {
33+
url = "http://milvus-e2e:19530"
34+
token = "root:Milvus"
35+
database="test"
36+
collection = "simple_example_with_partitionkey"
37+
}
38+
}

0 commit comments

Comments
 (0)