Skip to content

Commit 9302545

Browse files
committed
[FLINK-37605][runtime] Infer checkpoint id on endInput in sink
So far, we used a special value for the final checkpoint on endInput. However, as shown in the description of this ticket, final doesn't mean final. Hence, multiple committables with EOI could be created at different times. With this commit, we stop using a special value for such committables and instead try to guess the checkpoint id of the next checkpoint. There are various factors that influence the checkpoint id but we can mostly ignore them all because we just need to pick a checkpoint id that is - higher than all checkpoint ids of the previous, successful checkpoints of this attempt - higher than the checkpoint id of the restored checkpoint - lower than any future checkpoint id. Hence, we just remember the last observed checkpoint id (initialized with max(0, restored id)), and use last id + 1 for endInput. Naturally, multiple endInput calls happening through restarts will result in unique checkpoint ids. Note that aborted checkpoints before endInput may result in diverged checkpoint ids across subtasks. However, each of the id satisfies above requirements and any id of endInput1 will be smaller than any id of endInput2. Thus, diverged checkpoint ids will not impact correctness at all.
1 parent 98284c4 commit 9302545

File tree

13 files changed

+106
-185
lines changed

13 files changed

+106
-185
lines changed

Diff for: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.core.memory.DataInputDeserializer;
3535
import org.apache.flink.core.memory.DataInputView;
3636
import org.apache.flink.core.memory.DataOutputSerializer;
37+
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
3738
import org.apache.flink.runtime.state.StateInitializationContext;
3839
import org.apache.flink.runtime.state.StateSnapshotContext;
3940
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -107,6 +108,8 @@ public class CompactorOperator
107108
// submitted again while restoring
108109
private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState;
109110

111+
private long lastKnownCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
112+
110113
public CompactorOperator(
111114
StreamOperatorParameters<CommittableMessage<FileSinkCommittable>> parameters,
112115
FileCompactStrategy strategy,
@@ -139,15 +142,16 @@ public void processElement(StreamRecord<CompactorRequest> element) throws Except
139142
@Override
140143
public void endInput() throws Exception {
141144
// add collecting requests into the final snapshot
142-
checkpointRequests.put(CommittableMessage.EOI, collectingRequests);
145+
long checkpointId = lastKnownCheckpointId + 1;
146+
checkpointRequests.put(checkpointId, collectingRequests);
143147
collectingRequests = new ArrayList<>();
144148

145149
// submit all requests and wait until they are done
146-
submitUntil(CommittableMessage.EOI);
150+
submitUntil(checkpointId);
147151
assert checkpointRequests.isEmpty();
148152

149153
getAllTasksFuture().join();
150-
emitCompacted(CommittableMessage.EOI);
154+
emitCompacted(checkpointId);
151155
assert compactingRequests.isEmpty();
152156
}
153157

@@ -225,6 +229,8 @@ private void submitUntil(long checkpointId) {
225229
}
226230

227231
private void emitCompacted(long checkpointId) throws Exception {
232+
lastKnownCheckpointId = checkpointId;
233+
228234
List<FileSinkCommittable> compacted = new ArrayList<>();
229235
Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> iter =
230236
compactingRequests.iterator();
@@ -252,7 +258,6 @@ private void emitCompacted(long checkpointId) throws Exception {
252258
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
253259
checkpointId,
254260
compacted.size(),
255-
compacted.size(),
256261
0);
257262
output.collect(new StreamRecord<>(summary));
258263
for (FileSinkCommittable c : compacted) {

Diff for: flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ public interface CommittableMessage<CommT> {
2626
/**
2727
* Special value for checkpointId for the end of input in case of batch commit or final
2828
* checkpoint.
29+
*
30+
* @deprecated the special value is not used anymore at all (remove with Flink 2.2)
2931
*/
32+
@Deprecated(forRemoval = true)
3033
long EOI = Long.MAX_VALUE;
3134

3235
/** The subtask that created this committable. */
@@ -35,6 +38,13 @@ public interface CommittableMessage<CommT> {
3538
/**
3639
* Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch
3740
* commit.
41+
*
42+
* @deprecated the special value EOI is not used anymore
3843
*/
39-
long getCheckpointIdOrEOI();
44+
@Deprecated(forRemoval = true)
45+
default long getCheckpointIdOrEOI() {
46+
return getCheckpointId();
47+
}
48+
49+
long getCheckpointId();
4050
}

Diff for: flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> {
4242
/** The number of committables coming from the given subtask in the particular checkpoint. */
4343
private final int numberOfCommittables;
4444

45-
@Deprecated
45+
@Deprecated(forRemoval = true)
4646
/** The number of committables that have not been successfully committed. */
4747
private final int numberOfPendingCommittables;
4848

@@ -88,7 +88,7 @@ public int getNumberOfSubtasks() {
8888
return numberOfSubtasks;
8989
}
9090

91-
public long getCheckpointIdOrEOI() {
91+
public long getCheckpointId() {
9292
return checkpointId;
9393
}
9494

Diff for: flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public int getSubtaskId() {
5050
return subtaskId;
5151
}
5252

53-
public long getCheckpointIdOrEOI() {
53+
public long getCheckpointId() {
5454
return checkpointId;
5555
}
5656

Diff for: flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java

+10-13
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.configuration.SinkOptions;
2626
import org.apache.flink.core.io.SimpleVersionedSerializer;
2727
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
28+
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
2829
import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
2930
import org.apache.flink.runtime.state.StateInitializationContext;
3031
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -51,7 +52,6 @@
5152
import java.util.Collections;
5253
import java.util.OptionalLong;
5354

54-
import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
5555
import static org.apache.flink.util.IOUtils.closeAll;
5656
import static org.apache.flink.util.Preconditions.checkNotNull;
5757

@@ -76,11 +76,9 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage
7676
private SinkCommitterMetricGroup metricGroup;
7777
private Committer<CommT> committer;
7878
private CommittableCollector<CommT> committableCollector;
79-
private long lastCompletedCheckpointId = -1;
79+
private long lastCompletedCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
8080
private int maxRetries;
8181

82-
private boolean endInput = false;
83-
8482
/** The operator's state descriptor. */
8583
private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
8684
new ListStateDescriptor<>(
@@ -134,11 +132,11 @@ public void initializeState(StateInitializationContext context) throws Exception
134132
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
135133
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
136134
metricGroup));
137-
if (context.isRestored()) {
135+
if (checkpointId.isPresent()) {
138136
committableCollectorState.get().forEach(cc -> committableCollector.merge(cc));
139137
lastCompletedCheckpointId = checkpointId.getAsLong();
140138
// try to re-commit recovered transactions as quickly as possible
141-
commitAndEmitCheckpoints();
139+
commitAndEmitCheckpoints(lastCompletedCheckpointId);
142140
}
143141
}
144142

@@ -151,24 +149,23 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
151149

152150
@Override
153151
public void endInput() throws Exception {
154-
endInput = true;
155152
if (!isCheckpointingEnabled || isBatchMode) {
156153
// There will be no final checkpoint, all committables should be committed here
157-
commitAndEmitCheckpoints();
154+
commitAndEmitCheckpoints(lastCompletedCheckpointId + 1);
158155
}
159156
}
160157

161158
@Override
162159
public void notifyCheckpointComplete(long checkpointId) throws Exception {
163160
super.notifyCheckpointComplete(checkpointId);
164-
lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId);
165-
commitAndEmitCheckpoints();
161+
commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, checkpointId));
166162
}
167163

168-
private void commitAndEmitCheckpoints() throws IOException, InterruptedException {
169-
long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId;
164+
private void commitAndEmitCheckpoints(long checkpointId)
165+
throws IOException, InterruptedException {
166+
lastCompletedCheckpointId = checkpointId;
170167
for (CheckpointCommittableManager<CommT> checkpointManager :
171-
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
168+
committableCollector.getCheckpointCommittablesUpTo(checkpointId)) {
172169
// ensure that all committables of the first checkpoint are fully committed before
173170
// attempting the next committable
174171
commitAndEmit(checkpointManager);

Diff for: flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java

+9-49
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.api.common.state.ListState;
2424
import org.apache.flink.api.common.state.ListStateDescriptor;
2525
import org.apache.flink.api.common.typeutils.TypeSerializer;
26-
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
2726
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
2827
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
2928
import org.apache.flink.api.connector.sink2.Sink;
@@ -54,8 +53,6 @@
5453
import org.apache.flink.streaming.runtime.tasks.StreamTask;
5554
import org.apache.flink.util.UserCodeClassLoader;
5655

57-
import org.apache.flink.shaded.guava33.com.google.common.collect.Lists;
58-
5956
import javax.annotation.Nullable;
6057

6158
import java.io.IOException;
@@ -64,6 +61,7 @@
6461
import java.util.List;
6562
import java.util.OptionalLong;
6663

64+
import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
6765
import static org.apache.flink.util.IOUtils.closeAll;
6866
import static org.apache.flink.util.Preconditions.checkNotNull;
6967
import static org.apache.flink.util.Preconditions.checkState;
@@ -93,13 +91,6 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
9391
@Nullable private final SimpleVersionedSerializer<CommT> committableSerializer;
9492
private final List<CommT> legacyCommittables = new ArrayList<>();
9593

96-
/**
97-
* Used to remember that EOI has already happened so that we don't emit the last committables of
98-
* the final checkpoints twice.
99-
*/
100-
private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC =
101-
new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE);
102-
10394
/** The runtime information of the input element. */
10495
private final Context<InputT> context;
10596

@@ -118,10 +109,7 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
118109

119110
private boolean endOfInput = false;
120111

121-
/**
122-
* Remembers the endOfInput state for (final) checkpoints iff the operator emits committables.
123-
*/
124-
@Nullable private ListState<Boolean> endOfInputState;
112+
private long lastKnownCheckpointId = INITIAL_CHECKPOINT_ID - 1;
125113

126114
SinkWriterOperator(
127115
StreamOperatorParameters<CommittableMessage<CommT>> parameters,
@@ -164,8 +152,10 @@ protected void setup(
164152
@Override
165153
public void initializeState(StateInitializationContext context) throws Exception {
166154
super.initializeState(context);
167-
WriterInitContext initContext = createInitContext(context.getRestoredCheckpointId());
168-
if (context.isRestored()) {
155+
OptionalLong restoredCheckpointId = context.getRestoredCheckpointId();
156+
WriterInitContext initContext = createInitContext(restoredCheckpointId);
157+
if (restoredCheckpointId.isPresent()) {
158+
lastKnownCheckpointId = restoredCheckpointId.getAsLong();
169159
if (committableSerializer != null) {
170160
final ListState<List<CommT>> legacyCommitterState =
171161
new SimpleVersionedListState<>(
@@ -179,41 +169,12 @@ public void initializeState(StateInitializationContext context) throws Exception
179169
}
180170

181171
sinkWriter = writerStateHandler.createWriter(initContext, context);
182-
183-
if (emitDownstream) {
184-
// Figure out if we have seen end of input before and if we can suppress creating
185-
// transactions and sending them downstream to the CommitterOperator. We have the
186-
// following
187-
// cases:
188-
// 1. state is empty:
189-
// - First time initialization
190-
// - Restoring from a previous version of Flink that didn't handle EOI
191-
// - Upscaled from a final or regular checkpoint
192-
// In all cases, we regularly handle EOI, potentially resulting in duplicate summaries
193-
// that the CommitterOperator needs to handle.
194-
// 2. state is not empty:
195-
// - This implies Flink restores from a version that handles EOI.
196-
// - If there is one entry, no rescaling happened (for this subtask), so if it's true,
197-
// we recover from a final checkpoint (for this subtask) and can ignore another EOI
198-
// else we have a regular checkpoint.
199-
// - If there are multiple entries, Flink downscaled, and we need to check if all are
200-
// true and do the same as above. As soon as one entry is false, we regularly start
201-
// the writer and potentially emit duplicate summaries if we indeed recovered from a
202-
// final checkpoint.
203-
endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
204-
ArrayList<Boolean> previousState = Lists.newArrayList(endOfInputState.get());
205-
endOfInput = !previousState.isEmpty() && !previousState.contains(false);
206-
}
207172
}
208173

209174
@Override
210175
public void snapshotState(StateSnapshotContext context) throws Exception {
211176
super.snapshotState(context);
212177
writerStateHandler.snapshotState(context.getCheckpointId());
213-
if (endOfInputState != null) {
214-
endOfInputState.clear();
215-
endOfInputState.add(this.endOfInput);
216-
}
217178
}
218179

219180
@Override
@@ -243,17 +204,16 @@ public void processWatermark(Watermark mark) throws Exception {
243204

244205
@Override
245206
public void endInput() throws Exception {
207+
LOG.info("Received endInput");
246208
if (!endOfInput) {
247209
endOfInput = true;
248-
if (endOfInputState != null) {
249-
endOfInputState.add(true);
250-
}
251210
sinkWriter.flush(true);
252-
emitCommittables(CommittableMessage.EOI);
211+
emitCommittables(lastKnownCheckpointId + 1);
253212
}
254213
}
255214

256215
private void emitCommittables(long checkpointId) throws IOException, InterruptedException {
216+
lastKnownCheckpointId = checkpointId;
257217
if (!emitDownstream) {
258218
// To support SinkV1 topologies with only a writer we have to call prepareCommit
259219
// although no committables are forwarded

Diff for: flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ void addSummary(CommittableSummary<CommT> summary) {
9595
summary.getSubtaskId(),
9696
checkpointId,
9797
metricGroup);
98+
// Remove branch once CommittableMessage.EOI has been removed (earliest 2.2)
9899
if (checkpointId == CommittableMessage.EOI) {
99100
SubtaskCommittableManager<CommT> merged =
100101
subtasksCommittableManagers.merge(

Diff for: flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java

-12
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.Map.Entry;
3434
import java.util.NavigableMap;
3535
import java.util.Objects;
36-
import java.util.Optional;
3736
import java.util.TreeMap;
3837
import java.util.stream.Collectors;
3938

@@ -49,8 +48,6 @@
4948
*/
5049
@Internal
5150
public class CommittableCollector<CommT> {
52-
private static final long EOI = Long.MAX_VALUE;
53-
5451
/** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. */
5552
private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>>
5653
checkpointCommittables;
@@ -144,15 +141,6 @@ public Collection<? extends CheckpointCommittableManager<CommT>> getCheckpointCo
144141
return new ArrayList<>(checkpointCommittables.headMap(checkpointId, true).values());
145142
}
146143

147-
/**
148-
* Returns {@link CheckpointCommittableManager} belonging to the last input.
149-
*
150-
* @return {@link CheckpointCommittableManager}
151-
*/
152-
public Optional<CheckpointCommittableManager<CommT>> getEndOfInputCommittable() {
153-
return Optional.ofNullable(checkpointCommittables.get(EOI));
154-
}
155-
156144
/**
157145
* Returns whether all {@link CheckpointCommittableManager} currently hold by the collector are
158146
* either committed or failed.

Diff for: flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java

-33
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.Collection;
3636
import java.util.List;
3737

38-
import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
3938
import static org.assertj.core.api.Assertions.assertThat;
4039

4140
class GlobalCommitterOperatorTest {
@@ -140,38 +139,6 @@ void testStateRestore() throws Exception {
140139
}
141140
}
142141

143-
@ParameterizedTest
144-
@ValueSource(booleans = {true, false})
145-
void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) throws Exception {
146-
final MockCommitter committer = new MockCommitter();
147-
final OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness =
148-
createTestHarness(committer, commitOnInput);
149-
testHarness.open();
150-
151-
final CommittableSummary<Integer> committableSummary =
152-
new CommittableSummary<>(1, 2, EOI, 1, 0);
153-
testHarness.processElement(new StreamRecord<>(committableSummary));
154-
final CommittableSummary<Integer> committableSummary2 =
155-
new CommittableSummary<>(2, 2, EOI, 1, 0);
156-
testHarness.processElement(new StreamRecord<>(committableSummary2));
157-
158-
final CommittableWithLineage<Integer> first = new CommittableWithLineage<>(1, EOI, 1);
159-
testHarness.processElement(new StreamRecord<>(first));
160-
final CommittableWithLineage<Integer> second = new CommittableWithLineage<>(2, EOI, 2);
161-
testHarness.processElement(new StreamRecord<>(second));
162-
163-
// commitOnInput implies that the global committer is not using notifyCheckpointComplete
164-
if (commitOnInput) {
165-
assertThat(committer.committed).containsExactly(1, 2);
166-
} else {
167-
assertThat(committer.committed).isEmpty();
168-
testHarness.notifyOfCompletedCheckpoint(EOI);
169-
assertThat(committer.committed).containsExactly(1, 2);
170-
}
171-
172-
assertThat(testHarness.getOutput()).isEmpty();
173-
}
174-
175142
private OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> createTestHarness(
176143
Committer<Integer> committer, boolean commitOnInput) throws Exception {
177144
return new OneInputStreamOperatorTestHarness<>(

0 commit comments

Comments
 (0)