Skip to content

Commit d61cba2

Browse files
authored
[Improve][Connector-file-base] Improved file allocation algorithm for subtasks. (#8453)
1 parent b27a30a commit d61cba2

File tree

4 files changed

+253
-12
lines changed

4 files changed

+253
-12
lines changed

Diff for: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java

+17-12
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@
2525

2626
import java.io.IOException;
2727
import java.util.ArrayList;
28+
import java.util.Comparator;
2829
import java.util.HashSet;
2930
import java.util.List;
3031
import java.util.Set;
32+
import java.util.TreeSet;
33+
import java.util.concurrent.atomic.AtomicInteger;
3134
import java.util.stream.Collectors;
3235

3336
public class FileSourceSplitEnumerator
@@ -36,9 +39,11 @@ public class FileSourceSplitEnumerator
3639
private static final Logger LOGGER = LoggerFactory.getLogger(FileSourceSplitEnumerator.class);
3740

3841
private final Context<FileSourceSplit> context;
39-
private final Set<FileSourceSplit> pendingSplit = new HashSet<>();
42+
private final Set<FileSourceSplit> allSplit =
43+
new TreeSet<>(Comparator.comparing(FileSourceSplit::splitId));
4044
private Set<FileSourceSplit> assignedSplit;
4145
private final List<String> filePaths;
46+
private final AtomicInteger assignCount = new AtomicInteger(0);
4247

4348
public FileSourceSplitEnumerator(
4449
SourceSplitEnumerator.Context<FileSourceSplit> context, List<String> filePaths) {
@@ -57,7 +62,7 @@ public FileSourceSplitEnumerator(
5762

5863
@Override
5964
public void open() {
60-
this.pendingSplit.addAll(discoverySplits());
65+
this.allSplit.addAll(discoverySplits());
6166
}
6267

6368
@Override
@@ -82,7 +87,7 @@ public void close() throws IOException {
8287
@Override
8388
public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
8489
if (!splits.isEmpty()) {
85-
pendingSplit.addAll(splits);
90+
allSplit.addAll(splits);
8691
assignSplit(subtaskId);
8792
}
8893
}
@@ -91,13 +96,14 @@ private void assignSplit(int taskId) {
9196
ArrayList<FileSourceSplit> currentTaskSplits = new ArrayList<>();
9297
if (context.currentParallelism() == 1) {
9398
// if parallelism == 1, we should assign all the splits to reader
94-
currentTaskSplits.addAll(pendingSplit);
99+
currentTaskSplits.addAll(allSplit);
95100
} else {
96-
// if parallelism > 1, according to hashCode of split's id to determine whether to
101+
// if parallelism > 1, according to polling strategy to determine whether to
97102
// allocate the current task
98-
for (FileSourceSplit fileSourceSplit : pendingSplit) {
103+
assignCount.set(0);
104+
for (FileSourceSplit fileSourceSplit : allSplit) {
99105
int splitOwner =
100-
getSplitOwner(fileSourceSplit.splitId(), context.currentParallelism());
106+
getSplitOwner(assignCount.getAndIncrement(), context.currentParallelism());
101107
if (splitOwner == taskId) {
102108
currentTaskSplits.add(fileSourceSplit);
103109
}
@@ -107,8 +113,7 @@ private void assignSplit(int taskId) {
107113
context.assignSplit(taskId, currentTaskSplits);
108114
// save the state of assigned splits
109115
assignedSplit.addAll(currentTaskSplits);
110-
// remove the assigned splits from pending splits
111-
currentTaskSplits.forEach(split -> pendingSplit.remove(split));
116+
112117
LOGGER.info(
113118
"SubTask {} is assigned to [{}]",
114119
taskId,
@@ -118,13 +123,13 @@ private void assignSplit(int taskId) {
118123
context.signalNoMoreSplits(taskId);
119124
}
120125

121-
private static int getSplitOwner(String tp, int numReaders) {
122-
return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
126+
private static int getSplitOwner(int assignCount, int numReaders) {
127+
return assignCount % numReaders;
123128
}
124129

125130
@Override
126131
public int currentUnassignedSplitSize() {
127-
return pendingSplit.size();
132+
return allSplit.size() - assignedSplit.size();
128133
}
129134

130135
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.source.split;
19+
20+
import org.apache.seatunnel.api.common.metrics.MetricsContext;
21+
import org.apache.seatunnel.api.event.EventListener;
22+
import org.apache.seatunnel.api.source.SourceEvent;
23+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
24+
25+
import org.junit.jupiter.api.Assertions;
26+
import org.junit.jupiter.api.Test;
27+
28+
import lombok.extern.slf4j.Slf4j;
29+
30+
import java.util.ArrayList;
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.Set;
35+
import java.util.stream.Collectors;
36+
37+
@Slf4j
38+
public class FileSourceSplitEnumeratorTest {
39+
40+
@Test
41+
void assignSplitRoundTest() {
42+
List<String> filePaths = new ArrayList<>();
43+
int fileSize = 10;
44+
int parallelism = 4;
45+
46+
for (int i = 0; i < fileSize; i++) {
47+
filePaths.add("file" + i + ".txt");
48+
}
49+
50+
Map<Integer, List<FileSourceSplit>> assignSplitMap = new HashMap<>();
51+
52+
SourceSplitEnumerator.Context<FileSourceSplit> context =
53+
new SourceSplitEnumerator.Context<FileSourceSplit>() {
54+
@Override
55+
public int currentParallelism() {
56+
return parallelism;
57+
}
58+
59+
@Override
60+
public Set<Integer> registeredReaders() {
61+
return null;
62+
}
63+
64+
@Override
65+
public void assignSplit(int subtaskId, List<FileSourceSplit> splits) {
66+
assignSplitMap.put(subtaskId, splits);
67+
}
68+
69+
@Override
70+
public void signalNoMoreSplits(int subtask) {}
71+
72+
@Override
73+
public void sendEventToSourceReader(int subtaskId, SourceEvent event) {}
74+
75+
@Override
76+
public MetricsContext getMetricsContext() {
77+
return null;
78+
}
79+
80+
@Override
81+
public EventListener getEventListener() {
82+
return null;
83+
}
84+
};
85+
86+
FileSourceSplitEnumerator fileSourceSplitEnumerator =
87+
new FileSourceSplitEnumerator(context, filePaths);
88+
fileSourceSplitEnumerator.open();
89+
90+
fileSourceSplitEnumerator.run();
91+
92+
// check all files are assigned
93+
Assertions.assertEquals(fileSourceSplitEnumerator.currentUnassignedSplitSize(), 0);
94+
95+
Set<FileSourceSplit> valueSet =
96+
assignSplitMap.values().stream().flatMap(List::stream).collect(Collectors.toSet());
97+
98+
// check no duplicated assigned split
99+
Assertions.assertEquals(valueSet.size(), fileSize);
100+
101+
// check file allocation balance
102+
for (int i = 1; i < parallelism; i++) {
103+
Assertions.assertTrue(
104+
Math.abs(assignSplitMap.get(i).size() - assignSplitMap.get(i - 1).size()) <= 1,
105+
"The number of files assigned to adjacent subtasks is more than 1.");
106+
}
107+
}
108+
}

Diff for: seatunnel-translation/seatunnel-translation-base/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,11 @@
3131
<artifactId>seatunnel-api</artifactId>
3232
<version>${project.version}</version>
3333
</dependency>
34+
<dependency>
35+
<groupId>org.apache.seatunnel</groupId>
36+
<artifactId>connector-file-base</artifactId>
37+
<version>${project.version}</version>
38+
<scope>test</scope>
39+
</dependency>
3440
</dependencies>
3541
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.translation.source;
19+
20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
22+
import org.apache.seatunnel.api.common.PrepareFailException;
23+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
24+
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
25+
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSourceReader;
26+
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
27+
28+
import org.junit.jupiter.api.Assertions;
29+
import org.junit.jupiter.api.Test;
30+
31+
import lombok.extern.slf4j.Slf4j;
32+
33+
import java.util.ArrayList;
34+
import java.util.HashSet;
35+
import java.util.List;
36+
import java.util.Set;
37+
import java.util.stream.Collectors;
38+
39+
@Slf4j
40+
public class ParallelSourceTest {
41+
42+
@Test
43+
void testParallelSourceForPollingFileAllocation() throws Exception {
44+
int fileSize = 15;
45+
int parallelism = 4;
46+
47+
// create file source
48+
BaseFileSource baseFileSource =
49+
new BaseFileSource() {
50+
@Override
51+
public void prepare(Config pluginConfig) throws PrepareFailException {
52+
filePaths = new ArrayList<>();
53+
for (int i = 0; i < fileSize; i++) {
54+
filePaths.add("file" + i + ".txt");
55+
}
56+
}
57+
58+
@Override
59+
public String getPluginName() {
60+
return FileSystemType.HDFS.getFileSystemPluginName();
61+
}
62+
};
63+
64+
// prepare files
65+
baseFileSource.prepare(null);
66+
67+
ParallelSource parallelSource =
68+
new ParallelSource(baseFileSource, null, parallelism, "parallel-source-test", 0);
69+
ParallelSource parallelSource2 =
70+
new ParallelSource(baseFileSource, null, parallelism, "parallel-source-test2", 1);
71+
ParallelSource parallelSource3 =
72+
new ParallelSource(baseFileSource, null, parallelism, "parallel-source-test3", 2);
73+
ParallelSource parallelSource4 =
74+
new ParallelSource(baseFileSource, null, parallelism, "parallel-source-test4", 3);
75+
76+
parallelSource.open();
77+
parallelSource2.open();
78+
parallelSource3.open();
79+
parallelSource4.open();
80+
81+
// execute file allocation process
82+
parallelSource.splitEnumerator.run();
83+
parallelSource2.splitEnumerator.run();
84+
parallelSource3.splitEnumerator.run();
85+
parallelSource4.splitEnumerator.run();
86+
87+
// Gets the splits assigned for each reader
88+
List<FileSourceSplit> sourceSplits =
89+
((BaseFileSourceReader) parallelSource.reader).snapshotState(0);
90+
List<FileSourceSplit> sourceSplits2 =
91+
((BaseFileSourceReader) parallelSource2.reader).snapshotState(0);
92+
List<FileSourceSplit> sourceSplits3 =
93+
((BaseFileSourceReader) parallelSource3.reader).snapshotState(0);
94+
List<FileSourceSplit> sourceSplits4 =
95+
((BaseFileSourceReader) parallelSource4.reader).snapshotState(0);
96+
97+
log.info(
98+
"parallel source1 splits => {}",
99+
sourceSplits.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));
100+
101+
log.info(
102+
"parallel source2 splits => {}",
103+
sourceSplits2.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));
104+
105+
log.info(
106+
"parallel source3 splits => {}",
107+
sourceSplits3.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));
108+
109+
log.info(
110+
"parallel source4 splits => {}",
111+
sourceSplits4.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));
112+
113+
// check that there are no duplicate file assignments
114+
Set<FileSourceSplit> splitSet = new HashSet<>();
115+
splitSet.addAll(sourceSplits);
116+
splitSet.addAll(sourceSplits2);
117+
splitSet.addAll(sourceSplits3);
118+
splitSet.addAll(sourceSplits4);
119+
120+
Assertions.assertEquals(splitSet.size(), fileSize);
121+
}
122+
}

0 commit comments

Comments
 (0)