Skip to content

Commit 0849018

Browse files
committed
[FLINK-37670][runtime] Fix watermark alignment can deadlock job if there are no more splits to be assigned
Previously WatermarkAlignmentEvent was incorrectly ignored and never sent if source enumerator had no more splits to assign/send. This was causing deadlocks, with watermark aligmnent not being able to progress in such cases.
1 parent 9d6ae16 commit 0849018

File tree

8 files changed

+158
-15
lines changed

8 files changed

+158
-15
lines changed

Diff for: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEvent.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,12 @@
2424
* Root interface for all events sent between {@link OperatorCoordinator} and an {@link
2525
* OperatorEventHandler}.
2626
*/
27-
public interface OperatorEvent extends Serializable {}
27+
public interface OperatorEvent extends Serializable {
28+
/**
29+
* @return true if event is optional and an occasional loss or inability to deliver that event
30+
* doesn't affect the job's correctness.
31+
*/
32+
default boolean isLossTolerant() {
33+
return false;
34+
}
35+
}

Diff for: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java

+6
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,12 @@ public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt) {
111111
sendResult.whenCompleteAsync(
112112
(success, failure) -> {
113113
if (failure != null && subtaskAccess.isStillRunning()) {
114+
if (ExceptionUtils.findThrowable(
115+
failure, TaskNotRunningException.class)
116+
.isPresent()
117+
&& evt.isLossTolerant()) {
118+
return;
119+
}
114120
String msg =
115121
String.format(
116122
EVENT_LOSS_ERROR_MESSAGE,

Diff for: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -218,15 +218,12 @@ void announceCombinedWatermark() {
218218
operatorName);
219219

220220
for (Integer subtaskId : subTaskIds) {
221-
// when subtask have been finished, do not send event.
222-
if (!context.hasNoMoreSplits(subtaskId)) {
223-
// Subtask maybe during deploying or restarting, so we only send
224-
// WatermarkAlignmentEvent to ready task to avoid period task fail
225-
// (Java-ThreadPoolExecutor will not schedule the period task if it throws an
226-
// exception).
227-
context.sendEventToSourceOperatorIfTaskReady(
228-
subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark));
229-
}
221+
// Subtask maybe during deploying or restarting, so we only send
222+
// WatermarkAlignmentEvent to ready task to avoid period task fail
223+
// (Java-ThreadPoolExecutor will not schedule the period task if it throws an
224+
// exception).
225+
context.sendEventToSourceOperatorIfTaskReady(
226+
subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark));
230227
}
231228
}
232229

Diff for: flink-runtime/src/main/java/org/apache/flink/runtime/source/event/WatermarkAlignmentEvent.java

+9
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,15 @@ public long getMaxWatermark() {
3636
return maxWatermark;
3737
}
3838

39+
/**
40+
* @return true, because even if lost, {@link WatermarkAlignmentEvent} will re-send again, and
41+
* it doesn't hold any critical information that could lead to a data loss.
42+
*/
43+
@Override
44+
public boolean isLossTolerant() {
45+
return true;
46+
}
47+
3948
@Override
4049
public boolean equals(Object o) {
4150
if (this == o) {

Diff for: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,12 @@ public String toString() {
173173

174174
// ------------------------------------------------------------------------
175175

176-
private final class TestSubtaskAccess implements SubtaskAccess {
176+
final class TestSubtaskAccess implements SubtaskAccess {
177177

178178
private final ExecutionAttemptID executionAttemptId;
179179
private final CompletableFuture<?> running;
180180
private final int subtaskIndex;
181+
private final List<Throwable> taskFailoverReasons = new ArrayList<>();
181182

182183
private TestSubtaskAccess(int subtaskIndex, int attemptNumber, boolean isRunning) {
183184
this.subtaskIndex = subtaskIndex;
@@ -234,7 +235,11 @@ void switchToRunning() {
234235

235236
@Override
236237
public void triggerTaskFailover(Throwable cause) {
237-
// ignore this in the tests
238+
taskFailoverReasons.add(cause);
239+
}
240+
241+
public List<Throwable> getTaskFailoverReasons() {
242+
return taskFailoverReasons;
238243
}
239244
}
240245

Diff for: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java

+25
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
2222
import org.apache.flink.runtime.messages.Acknowledge;
2323
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks.EventWithSubtask;
24+
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks.TestSubtaskAccess;
2425
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
2526
import org.apache.flink.util.FlinkException;
2627
import org.apache.flink.util.Preconditions;
@@ -166,6 +167,30 @@ void releasedEventsForwardSendFailures() {
166167
assertThat(future).isCompletedExceptionally();
167168
}
168169

170+
@Test
171+
void optionalEventsIgnoreTaskNotRunning() {
172+
final EventReceivingTasks receiver =
173+
EventReceivingTasks.createForRunningTasksFailingRpcs(
174+
new FlinkException(new TaskNotRunningException("test")));
175+
TestSubtaskAccess subtaskAccess =
176+
(TestSubtaskAccess) getUniqueElement(receiver.getAccessesForSubtask(10));
177+
final SubtaskGatewayImpl gateway =
178+
new SubtaskGatewayImpl(
179+
subtaskAccess,
180+
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
181+
new IncompleteFuturesTracker());
182+
183+
gateway.markForCheckpoint(17L);
184+
gateway.tryCloseGateway(17L);
185+
186+
final CompletableFuture<Acknowledge> future =
187+
gateway.sendEvent(new TestOperatorEvent(42, true));
188+
gateway.openGatewayAndUnmarkAllCheckpoint();
189+
190+
assertThat(future).isCompletedExceptionally();
191+
assertThat(subtaskAccess.getTaskFailoverReasons()).isEmpty();
192+
}
193+
169194
private static <T> T getUniqueElement(Collection<T> collection) {
170195
Iterator<T> iterator = collection.iterator();
171196
T element = iterator.next();

Diff for: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java

+12
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,26 @@ public final class TestOperatorEvent implements OperatorEvent {
2323
private static final long serialVersionUID = 1L;
2424

2525
private final int value;
26+
private final boolean optional;
2627

2728
public TestOperatorEvent() {
2829
// pick some random and rather unique value
2930
this.value = System.identityHashCode(this);
31+
this.optional = false;
3032
}
3133

3234
public TestOperatorEvent(int value) {
35+
this(value, false);
36+
}
37+
38+
public TestOperatorEvent(int value, boolean optional) {
3339
this.value = value;
40+
this.optional = optional;
41+
}
42+
43+
@Override
44+
public boolean isLossTolerant() {
45+
return optional;
3446
}
3547

3648
public int getValue() {

Diff for: flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java

+84-3
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,30 @@
2121
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
2222
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2323
import org.apache.flink.api.common.functions.FilterFunction;
24+
import org.apache.flink.api.connector.source.SplitEnumerator;
25+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
26+
import org.apache.flink.api.connector.source.SupportsBatchSnapshot;
2427
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
28+
import org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
2529
import org.apache.flink.streaming.api.datastream.DataStream;
2630
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2731

2832
import org.junit.jupiter.api.Assertions;
2933
import org.junit.jupiter.api.Test;
3034

35+
import javax.annotation.Nullable;
36+
3137
import java.time.Duration;
38+
import java.util.ArrayDeque;
39+
import java.util.Collection;
3240
import java.util.Collections;
3341
import java.util.List;
42+
import java.util.Queue;
3443
import java.util.stream.Collectors;
3544
import java.util.stream.LongStream;
3645

46+
import static org.apache.flink.util.Preconditions.checkNotNull;
47+
3748
/** This ITCase class tests the behavior of task execution with watermark alignment. */
3849
class WatermarkAlignmentITCase {
3950

@@ -45,13 +56,13 @@ class WatermarkAlignmentITCase {
4556
@Test
4657
void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception {
4758
// Set up the execution environment with parallelism of 2
48-
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
59+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
4960
env.setParallelism(2);
5061

5162
// Create a stream from a custom source with watermark strategy
5263
DataStream<Long> stream =
5364
env.fromSource(
54-
new NumberSequenceSource(0, 100),
65+
new EagerlyFinishingNumberSequenceSource(0, 100),
5566
WatermarkStrategy.<Long>forMonotonousTimestamps()
5667
.withTimestampAssigner(
5768
(SerializableTimestampAssigner<Long>)
@@ -67,11 +78,81 @@ void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception {
6778
});
6879

6980
// Execute the stream and collect the results
70-
final List<Long> result = stream.executeAndCollect(101);
81+
List<Long> result = stream.executeAndCollect(101);
7182
Collections.sort(result);
7283

7384
// Assert that the collected result contains all numbers from 0 to 100
7485
Assertions.assertIterableEquals(
7586
result, LongStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()));
7687
}
88+
89+
static class EagerlyFinishingNumberSequenceSource extends NumberSequenceSource {
90+
public EagerlyFinishingNumberSequenceSource(long from, long to) {
91+
super(from, to);
92+
}
93+
94+
@Override
95+
public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>>
96+
createEnumerator(SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
97+
98+
List<NumberSequenceSplit> splits =
99+
splitNumberRange(getFrom(), getTo(), enumContext.currentParallelism());
100+
return new EagerlyFinishingIteratorSourceEnumerator(enumContext, splits);
101+
}
102+
}
103+
104+
/**
105+
* Contrary to the {@link
106+
* org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator}, this enumerator
107+
* signals no more available splits as soon as possible.
108+
*/
109+
static class EagerlyFinishingIteratorSourceEnumerator
110+
implements SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>>,
111+
SupportsBatchSnapshot {
112+
113+
private final SplitEnumeratorContext<NumberSequenceSplit> context;
114+
private final Queue<NumberSequenceSplit> remainingSplits;
115+
116+
public EagerlyFinishingIteratorSourceEnumerator(
117+
SplitEnumeratorContext<NumberSequenceSplit> context,
118+
Collection<NumberSequenceSplit> splits) {
119+
this.context = checkNotNull(context);
120+
this.remainingSplits = new ArrayDeque<>(splits);
121+
this.context
122+
.metricGroup()
123+
.setUnassignedSplitsGauge(() -> (long) remainingSplits.size());
124+
}
125+
126+
@Override
127+
public void start() {}
128+
129+
@Override
130+
public void close() {}
131+
132+
@Override
133+
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
134+
NumberSequenceSplit nextSplit = remainingSplits.poll();
135+
if (nextSplit != null) {
136+
context.assignSplit(nextSplit, subtaskId);
137+
}
138+
if (remainingSplits.size() == 0) {
139+
for (int i = 0; i < context.currentParallelism(); i++) {
140+
context.signalNoMoreSplits(i);
141+
}
142+
}
143+
}
144+
145+
@Override
146+
public void addSplitsBack(List<NumberSequenceSplit> splits, int subtaskId) {
147+
throw new UnsupportedOperationException();
148+
}
149+
150+
@Override
151+
public Collection<NumberSequenceSplit> snapshotState(long checkpointId) throws Exception {
152+
return remainingSplits;
153+
}
154+
155+
@Override
156+
public void addReader(int subtaskId) {}
157+
}
77158
}

0 commit comments

Comments
 (0)