Skip to content

Commit ed02146

Browse files
yzeng1618zengyi
andauthored
[Fix][connector-fake] Signal no more splits to avoid FakeSourceReader wait-split hang after restore (#10275)
Co-authored-by: zengyi <zengyi@chinatelecom.cn>
1 parent 402291d commit ed02146

File tree

3 files changed

+218
-21
lines changed

3 files changed

+218
-21
lines changed

.github/workflows/backend.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ jobs:
439439
matrix:
440440
java: [ '8', '11' ]
441441
os: [ 'ubuntu-latest' ]
442-
timeout-minutes: 150
442+
timeout-minutes: 180
443443
steps:
444444
- uses: actions/checkout@v2
445445
- name: Set up JDK ${{ matrix.java }}

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class FakeSourceSplitEnumerator
4444
private final Set<FakeSourceSplit> assignedSplits;
4545

4646
private final Object lock = new Object();
47+
private volatile boolean splitsDiscovered = false;
4748

4849
public FakeSourceSplitEnumerator(
4950
SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext,
@@ -61,6 +62,7 @@ public void open() {}
6162
@Override
6263
public void run() throws Exception {
6364
discoverySplits();
65+
splitsDiscovered = true;
6466
assignPendingSplits();
6567
}
6668

@@ -71,6 +73,9 @@ public void close() throws IOException {}
7173
public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
7274
log.debug("Fake source add splits back {}, subtaskId:{}", splits, subtaskId);
7375
addSplitChangeToPendingAssignments(splits);
76+
if (splitsDiscovered) {
77+
assignPendingSplits();
78+
}
7479
}
7580

7681
@Override
@@ -79,11 +84,17 @@ public int currentUnassignedSplitSize() {
7984
}
8085

8186
@Override
82-
public void handleSplitRequest(int subtaskId) {}
87+
public void handleSplitRequest(int subtaskId) {
88+
if (splitsDiscovered) {
89+
assignPendingSplits(subtaskId);
90+
}
91+
}
8392

8493
@Override
8594
public void registerReader(int subtaskId) {
86-
// nothing
95+
if (splitsDiscovered) {
96+
assignPendingSplits(subtaskId);
97+
}
8798
}
8899

89100
@Override
@@ -121,39 +132,46 @@ private void discoverySplits() {
121132
allSplit.size());
122133
}
123134

124-
assignedSplits.forEach(allSplit::remove);
135+
synchronized (lock) {
136+
assignedSplits.forEach(allSplit::remove);
137+
}
125138
addSplitChangeToPendingAssignments(allSplit);
126139
log.info("Assigned {} to {} readers.", allSplit, numReaders);
127140
log.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
128141
}
129142

130143
private void addSplitChangeToPendingAssignments(Collection<FakeSourceSplit> newSplits) {
131-
for (FakeSourceSplit split : newSplits) {
132-
int ownerReader = split.getSplitId() % enumeratorContext.currentParallelism();
133-
pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>()).add(split);
144+
synchronized (lock) {
145+
for (FakeSourceSplit split : newSplits) {
146+
int ownerReader = split.getSplitId() % enumeratorContext.currentParallelism();
147+
pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>()).add(split);
148+
}
134149
}
135150
}
136151

137152
private void assignPendingSplits() {
138-
// Check if there's any pending splits for given readers
139153
for (int pendingReader : enumeratorContext.registeredReaders()) {
140-
// Remove pending assignment for the reader
154+
assignPendingSplits(pendingReader);
155+
}
156+
}
157+
158+
private void assignPendingSplits(int pendingReader) {
159+
synchronized (lock) {
141160
final Set<FakeSourceSplit> pendingAssignmentForReader =
142161
pendingSplits.remove(pendingReader);
143162

144163
if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
145-
// Mark pending splits as already assigned
146-
synchronized (lock) {
147-
assignedSplits.addAll(pendingAssignmentForReader);
148-
// Assign pending splits to reader
149-
log.info(
150-
"Assigning splits to readers {} {}",
151-
pendingReader,
152-
pendingAssignmentForReader);
153-
enumeratorContext.assignSplit(
154-
pendingReader, new ArrayList<>(pendingAssignmentForReader));
155-
enumeratorContext.signalNoMoreSplits(pendingReader);
156-
}
164+
assignedSplits.addAll(pendingAssignmentForReader);
165+
log.info(
166+
"Assigning splits to readers {} {}",
167+
pendingReader,
168+
pendingAssignmentForReader);
169+
enumeratorContext.assignSplit(
170+
pendingReader, new ArrayList<>(pendingAssignmentForReader));
171+
}
172+
// Avoid readers waiting for split request forever after restore/restart.
173+
if (splitsDiscovered) {
174+
enumeratorContext.signalNoMoreSplits(pendingReader);
157175
}
158176
}
159177
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.fake.source;
19+
20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
22+
23+
import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
24+
import org.apache.seatunnel.api.common.metrics.MetricsContext;
25+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
26+
import org.apache.seatunnel.api.event.Event;
27+
import org.apache.seatunnel.api.event.EventListener;
28+
import org.apache.seatunnel.api.source.SourceEvent;
29+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
30+
import org.apache.seatunnel.connectors.seatunnel.fake.config.MultipleTableFakeSourceConfig;
31+
32+
import org.junit.jupiter.api.Assertions;
33+
import org.junit.jupiter.api.Test;
34+
35+
import java.io.File;
36+
import java.net.URISyntaxException;
37+
import java.net.URL;
38+
import java.nio.file.Paths;
39+
import java.util.ArrayList;
40+
import java.util.Arrays;
41+
import java.util.Collections;
42+
import java.util.HashMap;
43+
import java.util.HashSet;
44+
import java.util.List;
45+
import java.util.Map;
46+
import java.util.Set;
47+
import java.util.stream.Collectors;
48+
49+
class FakeSourceSplitEnumeratorTest {
50+
51+
@Test
52+
void signalNoMoreSplitsAfterRestoreWhenNoPendingSplits() throws Exception {
53+
MultipleTableFakeSourceConfig sourceConfig = loadSingleTableFakeSourceConfig();
54+
55+
TestingEnumeratorContext firstContext =
56+
new TestingEnumeratorContext(2, new HashSet<>(Arrays.asList(0, 1)));
57+
FakeSourceSplitEnumerator firstRunEnumerator =
58+
new FakeSourceSplitEnumerator(firstContext, sourceConfig, Collections.emptySet());
59+
firstRunEnumerator.run();
60+
61+
Set<FakeSourceSplit> assignedSplits = new HashSet<>(firstContext.getAllAssignedSplits());
62+
Assertions.assertFalse(assignedSplits.isEmpty(), "Expected assigned splits in first run");
63+
64+
TestingEnumeratorContext restoredContext =
65+
new TestingEnumeratorContext(2, new HashSet<>(Arrays.asList(0, 1)));
66+
FakeSourceSplitEnumerator restoredEnumerator =
67+
new FakeSourceSplitEnumerator(restoredContext, sourceConfig, assignedSplits);
68+
restoredEnumerator.run();
69+
70+
Assertions.assertTrue(
71+
restoredContext.getAllAssignedSplits().isEmpty(),
72+
"Expected no split assignments on restore when all splits were already assigned");
73+
Assertions.assertEquals(
74+
new HashSet<>(Arrays.asList(0, 1)),
75+
restoredContext.getNoMoreSplitsReaders(),
76+
"Expected signalNoMoreSplits for all registered readers");
77+
}
78+
79+
@Test
80+
void assignAndSignalOnLateRegisterReaderAfterDiscovery() throws Exception {
81+
MultipleTableFakeSourceConfig sourceConfig = loadSingleTableFakeSourceConfig();
82+
83+
TestingEnumeratorContext context = new TestingEnumeratorContext(2, new HashSet<>());
84+
FakeSourceSplitEnumerator enumerator =
85+
new FakeSourceSplitEnumerator(context, sourceConfig, Collections.emptySet());
86+
87+
enumerator.run();
88+
Assertions.assertTrue(
89+
context.getAllAssignedSplits().isEmpty(),
90+
"Expected no split assignments when no readers are registered during run()");
91+
92+
enumerator.registerReader(0);
93+
enumerator.registerReader(1);
94+
95+
Assertions.assertFalse(
96+
context.getAllAssignedSplits().isEmpty(),
97+
"Expected split assignments after late reader registration");
98+
Assertions.assertEquals(
99+
new HashSet<>(Arrays.asList(0, 1)),
100+
context.getNoMoreSplitsReaders(),
101+
"Expected signalNoMoreSplits for late registered readers");
102+
}
103+
104+
private static MultipleTableFakeSourceConfig loadSingleTableFakeSourceConfig()
105+
throws URISyntaxException {
106+
URL resource = FakeSourceSplitEnumeratorTest.class.getResource("/simple.schema.conf");
107+
Config config = ConfigFactory.parseFile(new File(Paths.get(resource.toURI()).toString()));
108+
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config.getConfig("FakeSource"));
109+
return new MultipleTableFakeSourceConfig(readonlyConfig);
110+
}
111+
112+
private static final class TestingEnumeratorContext
113+
implements SourceSplitEnumerator.Context<FakeSourceSplit> {
114+
private final int parallelism;
115+
private final Set<Integer> registeredReaders;
116+
private final Map<Integer, List<FakeSourceSplit>> assignedSplitsByReader = new HashMap<>();
117+
private final Set<Integer> noMoreSplitsReaders = new HashSet<>();
118+
private final MetricsContext metricsContext = new AbstractMetricsContext() {};
119+
private final EventListener eventListener =
120+
new EventListener() {
121+
@Override
122+
public void onEvent(Event event) {
123+
// no-op
124+
}
125+
};
126+
127+
private TestingEnumeratorContext(int parallelism, Set<Integer> registeredReaders) {
128+
this.parallelism = parallelism;
129+
this.registeredReaders = registeredReaders;
130+
}
131+
132+
@Override
133+
public int currentParallelism() {
134+
return parallelism;
135+
}
136+
137+
@Override
138+
public Set<Integer> registeredReaders() {
139+
return registeredReaders;
140+
}
141+
142+
@Override
143+
public void assignSplit(int subtaskId, List<FakeSourceSplit> splits) {
144+
assignedSplitsByReader
145+
.computeIfAbsent(subtaskId, ignored -> new ArrayList<>())
146+
.addAll(splits);
147+
}
148+
149+
@Override
150+
public void signalNoMoreSplits(int subtask) {
151+
noMoreSplitsReaders.add(subtask);
152+
}
153+
154+
@Override
155+
public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
156+
// no-op
157+
}
158+
159+
@Override
160+
public MetricsContext getMetricsContext() {
161+
return metricsContext;
162+
}
163+
164+
@Override
165+
public EventListener getEventListener() {
166+
return eventListener;
167+
}
168+
169+
private List<FakeSourceSplit> getAllAssignedSplits() {
170+
return assignedSplitsByReader.values().stream()
171+
.flatMap(List::stream)
172+
.collect(Collectors.toList());
173+
}
174+
175+
private Set<Integer> getNoMoreSplitsReaders() {
176+
return noMoreSplitsReaders;
177+
}
178+
}
179+
}

0 commit comments

Comments
 (0)