Skip to content

Commit 5f55e31

Browse files
authored
[Improve][connector-doris] Improved doris source enumerator splits allocation algorithm for subtasks (#9108)
1 parent 29cf3a7 commit 5f55e31

File tree

6 files changed

+326
-82
lines changed

6 files changed

+326
-82
lines changed

pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,13 @@
568568
<scope>test</scope>
569569
</dependency>
570570

571+
<dependency>
572+
<groupId>org.mockito</groupId>
573+
<artifactId>mockito-inline</artifactId>
574+
<version>${mockito.version}</version>
575+
<scope>test</scope>
576+
</dependency>
577+
571578
<!-- The prometheus simpleclient -->
572579
<dependency>
573580
<groupId>io.prometheus</groupId>

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public int hashCode() {
129129
@Override
130130
public String toString() {
131131
return "PartitionDefinition{"
132-
+ ", database='"
132+
+ "database='"
133133
+ database
134134
+ '\''
135135
+ ", table='"

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@
3333
import java.util.ArrayList;
3434
import java.util.Collection;
3535
import java.util.Collections;
36+
import java.util.Comparator;
3637
import java.util.List;
3738
import java.util.Map;
3839
import java.util.Set;
3940
import java.util.concurrent.ConcurrentHashMap;
41+
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.stream.Collectors;
4043

4144
@Slf4j
4245
public class DorisSourceSplitEnumerator
@@ -52,6 +55,8 @@ public class DorisSourceSplitEnumerator
5255
private final Map<TablePath, DorisSourceTable> dorisSourceTables;
5356
private final Object stateLock = new Object();
5457

58+
private final AtomicInteger assignCount = new AtomicInteger(0);
59+
5560
public DorisSourceSplitEnumerator(
5661
Context<DorisSourceSplit> context,
5762
DorisSourceConfig dorisSourceConfig,
@@ -162,15 +167,24 @@ private List<DorisSourceSplit> getDorisSourceSplit() {
162167

163168
private void addPendingSplit(Collection<DorisSourceSplit> splits) {
164169
int readerCount = context.currentParallelism();
165-
for (DorisSourceSplit split : splits) {
166-
int ownerReader = getSplitOwner(split.splitId(), readerCount);
170+
171+
// sorting the splits to ensure the order
172+
List<DorisSourceSplit> sortedSplits =
173+
splits.stream()
174+
.sorted(Comparator.comparing(DorisSourceSplit::getSplitId))
175+
.collect(Collectors.toList());
176+
177+
// allocate splits in load balancing mode
178+
assignCount.set(0);
179+
for (DorisSourceSplit split : sortedSplits) {
180+
int ownerReader = getSplitOwner(assignCount.getAndIncrement(), readerCount);
167181
log.info("Assigning split {} to reader {} .", split.splitId(), ownerReader);
168182
pendingSplit.computeIfAbsent(ownerReader, f -> new ArrayList<>()).add(split);
169183
}
170184
}
171185

172-
private static int getSplitOwner(String tp, int numReaders) {
173-
return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
186+
private static int getSplitOwner(int assignCount, int numReaders) {
187+
return assignCount % numReaders;
174188
}
175189

176190
private void assignSplit(Collection<Integer> readers) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.doris.split;
19+
20+
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
21+
22+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
23+
import org.apache.seatunnel.api.table.catalog.TablePath;
24+
import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig;
25+
import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition;
26+
import org.apache.seatunnel.connectors.doris.rest.RestService;
27+
import org.apache.seatunnel.connectors.doris.source.DorisSourceTable;
28+
import org.apache.seatunnel.connectors.doris.source.split.DorisSourceSplit;
29+
import org.apache.seatunnel.connectors.doris.source.split.DorisSourceSplitEnumerator;
30+
31+
import org.junit.jupiter.api.Assertions;
32+
import org.junit.jupiter.api.Test;
33+
import org.mockito.ArgumentCaptor;
34+
import org.mockito.MockedStatic;
35+
import org.mockito.Mockito;
36+
37+
import lombok.extern.slf4j.Slf4j;
38+
39+
import java.util.ArrayList;
40+
import java.util.HashSet;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.stream.Collectors;
44+
import java.util.stream.IntStream;
45+
46+
import static org.mockito.ArgumentMatchers.any;
47+
48+
@Slf4j
49+
public class DorisSourceSplitEnumeratorTest {
50+
51+
private static final String DATABASE = "default";
52+
private static final String TABLE = "default_table";
53+
private static final String BE_ADDRESS_PREFIX = "doris-be-";
54+
private static final String QUERY_PLAN = "DAABDAACDwABDAAAAAEIAA";
55+
56+
private static final int PARALLELISM = 4;
57+
58+
private static final int PARTITION_NUMS = 10;
59+
60+
@Test
61+
public void dorisSourceSplitEnumeratorTest() {
62+
DorisSourceConfig dorisSourceConfig = Mockito.mock(DorisSourceConfig.class);
63+
DorisSourceTable dorisSourceTable = Mockito.mock(DorisSourceTable.class);
64+
65+
SourceSplitEnumerator.Context<DorisSourceSplit> context =
66+
Mockito.mock(SourceSplitEnumerator.Context.class);
67+
68+
Mockito.when(context.registeredReaders())
69+
.thenReturn(IntStream.range(0, PARALLELISM).boxed().collect(Collectors.toSet()));
70+
Mockito.when(context.currentParallelism()).thenReturn(PARALLELISM);
71+
72+
Map<TablePath, DorisSourceTable> dorisSourceTableMap = Maps.newHashMap();
73+
dorisSourceTableMap.put(new TablePath(DATABASE, null, TABLE), dorisSourceTable);
74+
75+
DorisSourceSplitEnumerator dorisSourceSplitEnumerator =
76+
new DorisSourceSplitEnumerator(context, dorisSourceConfig, dorisSourceTableMap);
77+
78+
MockedStatic<RestService> restServiceMockedStatic = Mockito.mockStatic(RestService.class);
79+
80+
restServiceMockedStatic
81+
.when(() -> RestService.findPartitions(any(), any(), any()))
82+
.thenReturn(buildPartitionDefinitions());
83+
84+
dorisSourceSplitEnumerator.run();
85+
86+
ArgumentCaptor<Integer> subtaskId = ArgumentCaptor.forClass(Integer.class);
87+
ArgumentCaptor<List> split = ArgumentCaptor.forClass(List.class);
88+
89+
Mockito.verify(context, Mockito.times(PARALLELISM))
90+
.assignSplit(subtaskId.capture(), split.capture());
91+
92+
List<Integer> subTaskAllValues = subtaskId.getAllValues();
93+
List<List> splitAllValues = split.getAllValues();
94+
95+
for (int i = 0; i < PARALLELISM; i++) {
96+
Assertions.assertEquals(i, subTaskAllValues.get(i));
97+
Assertions.assertEquals(
98+
allocateFiles(i, PARALLELISM, PARTITION_NUMS), splitAllValues.get(i).size());
99+
}
100+
101+
// check no duplicate file assigned
102+
Assertions.assertEquals(0, dorisSourceSplitEnumerator.currentUnassignedSplitSize());
103+
}
104+
105+
private List<PartitionDefinition> buildPartitionDefinitions() {
106+
107+
List<PartitionDefinition> partitions = new ArrayList<>();
108+
109+
IntStream.range(0, PARTITION_NUMS)
110+
.forEach(
111+
i -> {
112+
PartitionDefinition partitionDefinition =
113+
new PartitionDefinition(
114+
DATABASE,
115+
TABLE,
116+
BE_ADDRESS_PREFIX + i,
117+
new HashSet<>(i),
118+
QUERY_PLAN);
119+
120+
partitions.add(partitionDefinition);
121+
});
122+
123+
return partitions;
124+
}
125+
126+
/**
127+
* calculate the number of files assigned each time
128+
*
129+
* @param id id
130+
* @param parallelism parallelism
131+
* @param fileSize file size
132+
* @return
133+
*/
134+
public int allocateFiles(int id, int parallelism, int fileSize) {
135+
int filesPerIteration = fileSize / parallelism;
136+
int remainder = fileSize % parallelism;
137+
138+
if (id < remainder) {
139+
return filesPerIteration + 1;
140+
} else {
141+
return filesPerIteration;
142+
}
143+
}
144+
}

seatunnel-translation/seatunnel-translation-base/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,12 @@
3737
<version>${project.version}</version>
3838
<scope>test</scope>
3939
</dependency>
40+
41+
<dependency>
42+
<groupId>org.apache.seatunnel</groupId>
43+
<artifactId>connector-doris</artifactId>
44+
<version>${project.version}</version>
45+
<scope>test</scope>
46+
</dependency>
4047
</dependencies>
4148
</project>

0 commit comments

Comments
 (0)