Skip to content

Commit 89d1878

Browse files
JeremyXinJeremyXin
and
JeremyXin
authored
[Improve][connector-hive] Improved hive file allocation algorithm for subtasks (#8876)
Co-authored-by: JeremyXin <[email protected]>
1 parent 3cfe8c1 commit 89d1878

File tree

2 files changed

+145
-15
lines changed

2 files changed

+145
-15
lines changed

Diff for: seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java

+20-15
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,24 @@
2929

3030
import java.io.IOException;
3131
import java.util.ArrayList;
32+
import java.util.Comparator;
3233
import java.util.HashSet;
3334
import java.util.List;
3435
import java.util.Map;
3536
import java.util.Set;
37+
import java.util.TreeSet;
38+
import java.util.concurrent.atomic.AtomicInteger;
3639
import java.util.stream.Collectors;
3740

3841
@Slf4j
3942
public class MultipleTableHiveSourceSplitEnumerator
4043
implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
4144

4245
private final SourceSplitEnumerator.Context<FileSourceSplit> context;
43-
private final Set<FileSourceSplit> pendingSplit;
46+
private final Set<FileSourceSplit> allSplit;
4447
private final Set<FileSourceSplit> assignedSplit;
4548
private final Map<String, List<String>> filePathMap;
49+
private final AtomicInteger assignCount = new AtomicInteger(0);
4650

4751
public MultipleTableHiveSourceSplitEnumerator(
4852
SourceSplitEnumerator.Context<FileSourceSplit> context,
@@ -60,7 +64,7 @@ public MultipleTableHiveSourceSplitEnumerator(
6064
.toString(),
6165
HiveSourceConfig::getFilePaths));
6266
this.assignedSplit = new HashSet<>();
63-
this.pendingSplit = new HashSet<>();
67+
this.allSplit = new TreeSet<>(Comparator.comparing(FileSourceSplit::splitId));
6468
}
6569

6670
public MultipleTableHiveSourceSplitEnumerator(
@@ -76,13 +80,13 @@ public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
7680
if (CollectionUtils.isEmpty(splits)) {
7781
return;
7882
}
79-
pendingSplit.addAll(splits);
83+
allSplit.addAll(splits);
8084
assignSplit(subtaskId);
8185
}
8286

8387
@Override
8488
public int currentUnassignedSplitSize() {
85-
return pendingSplit.size();
89+
return allSplit.size() - assignedSplit.size();
8690
}
8791

8892
@Override
@@ -94,7 +98,7 @@ public void registerReader(int subtaskId) {
9498
String tableId = filePathEntry.getKey();
9599
List<String> filePaths = filePathEntry.getValue();
96100
for (String filePath : filePaths) {
97-
pendingSplit.add(new FileSourceSplit(tableId, filePath));
101+
allSplit.add(new FileSourceSplit(tableId, filePath));
98102
}
99103
}
100104
assignSplit(subtaskId);
@@ -114,13 +118,14 @@ private void assignSplit(int taskId) {
114118
List<FileSourceSplit> currentTaskSplits = new ArrayList<>();
115119
if (context.currentParallelism() == 1) {
116120
// if parallelism == 1, we should assign all the splits to reader
117-
currentTaskSplits.addAll(pendingSplit);
121+
currentTaskSplits.addAll(allSplit);
118122
} else {
119-
// if parallelism > 1, according to hashCode of split's id to determine whether to
123+
// if parallelism > 1, according to polling strategy to determine whether to
120124
// allocate the current task
121-
for (FileSourceSplit fileSourceSplit : pendingSplit) {
125+
assignCount.set(0);
126+
for (FileSourceSplit fileSourceSplit : allSplit) {
122127
int splitOwner =
123-
getSplitOwner(fileSourceSplit.splitId(), context.currentParallelism());
128+
getSplitOwner(assignCount.getAndIncrement(), context.currentParallelism());
124129
if (splitOwner == taskId) {
125130
currentTaskSplits.add(fileSourceSplit);
126131
}
@@ -130,19 +135,19 @@ private void assignSplit(int taskId) {
130135
context.assignSplit(taskId, currentTaskSplits);
131136
// save the state of assigned splits
132137
assignedSplit.addAll(currentTaskSplits);
133-
// remove the assigned splits from pending splits
134-
currentTaskSplits.forEach(pendingSplit::remove);
138+
135139
log.info(
136-
"SubTask {} is assigned to [{}]",
140+
"SubTask {} is assigned to [{}], size {}",
137141
taskId,
138142
currentTaskSplits.stream()
139143
.map(FileSourceSplit::splitId)
140-
.collect(Collectors.joining(",")));
144+
.collect(Collectors.joining(",")),
145+
currentTaskSplits.size());
141146
context.signalNoMoreSplits(taskId);
142147
}
143148

144-
private static int getSplitOwner(String tp, int numReaders) {
145-
return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
149+
private static int getSplitOwner(int assignCount, int numReaders) {
150+
return assignCount % numReaders;
146151
}
147152

148153
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hive.split;
19+
20+
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
21+
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
22+
23+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
24+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
25+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
26+
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
27+
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig;
28+
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig;
29+
import org.apache.seatunnel.connectors.seatunnel.hive.source.split.MultipleTableHiveSourceSplitEnumerator;
30+
31+
import org.junit.jupiter.api.Assertions;
32+
import org.junit.jupiter.api.Test;
33+
import org.mockito.Mockito;
34+
35+
import lombok.extern.slf4j.Slf4j;
36+
37+
import java.util.ArrayList;
38+
import java.util.Arrays;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.concurrent.atomic.AtomicInteger;
43+
import java.util.stream.IntStream;
44+
45+
@Slf4j
46+
public class MultipleTableHiveSourceSplitEnumeratorTest {
47+
48+
@Test
49+
void assignSplitRoundTest() {
50+
int parallelism = 4;
51+
int fileSize = 50;
52+
53+
MultipleTableHiveSourceConfig mockConfig =
54+
Mockito.mock(MultipleTableHiveSourceConfig.class);
55+
56+
Map<String, List<String>> filePathMap = new HashMap<>();
57+
List<String> filePaths = new ArrayList<>();
58+
IntStream.range(0, fileSize).forEach(i -> filePaths.add("filePath" + i));
59+
filePathMap.put("hive_table1", filePaths);
60+
61+
HiveSourceConfig mockHiveSourceConfig = Mockito.mock(HiveSourceConfig.class);
62+
Mockito.when(mockHiveSourceConfig.getFilePaths()).thenReturn(filePaths);
63+
64+
CatalogTable catalogTable =
65+
CatalogTable.of(
66+
TableIdentifier.of("catalog", "test", "hive_table1"),
67+
null,
68+
Maps.newHashMap(),
69+
Lists.newArrayList(),
70+
null);
71+
72+
Mockito.when(mockHiveSourceConfig.getCatalogTable()).thenReturn(catalogTable);
73+
74+
Mockito.when(mockConfig.getHiveSourceConfigs())
75+
.thenReturn(Arrays.asList(mockHiveSourceConfig));
76+
77+
SourceSplitEnumerator.Context<FileSourceSplit> context =
78+
Mockito.mock(SourceSplitEnumerator.Context.class);
79+
80+
Mockito.when(context.currentParallelism()).thenReturn(parallelism);
81+
MultipleTableHiveSourceSplitEnumerator enumerator =
82+
new MultipleTableHiveSourceSplitEnumerator(context, mockConfig);
83+
84+
AtomicInteger unAssignedSplitSize = new AtomicInteger(fileSize);
85+
IntStream.range(0, parallelism)
86+
.forEach(
87+
id -> {
88+
enumerator.registerReader(id);
89+
90+
// check the number of files assigned each time
91+
Assertions.assertEquals(
92+
enumerator.currentUnassignedSplitSize(),
93+
unAssignedSplitSize.get()
94+
- allocateFiles(id, parallelism, fileSize));
95+
unAssignedSplitSize.set(enumerator.currentUnassignedSplitSize());
96+
97+
log.info(
98+
"unAssigned splits => {}, allocate files => {}",
99+
enumerator.currentUnassignedSplitSize(),
100+
allocateFiles(id, parallelism, fileSize));
101+
});
102+
103+
// check no duplicate file assigned
104+
Assertions.assertEquals(0, enumerator.currentUnassignedSplitSize());
105+
}
106+
107+
/**
108+
* calculate the number of files assigned each time
109+
*
110+
* @param id id
111+
* @param parallelism parallelism
112+
* @param fileSize file size
113+
* @return
114+
*/
115+
public int allocateFiles(int id, int parallelism, int fileSize) {
116+
int filesPerIteration = fileSize / parallelism;
117+
int remainder = fileSize % parallelism;
118+
119+
if (id < remainder) {
120+
return filesPerIteration + 1;
121+
} else {
122+
return filesPerIteration;
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)