Skip to content

Commit db2565f

Browse files
committed
[FLINK-38930][checkpoint] Filtering record before processing without spilling strategy
Core filtering mechanism for recovered channel state buffers: - ChannelStateFilteringHandler with per-gate GateFilterHandler - RecordFilterContext with VirtualChannelRecordFilterFactory - Partial data check in SequentialChannelStateReaderImpl - Fix RecordFilterContext for Union downscale scenario
1 parent eabc3ce commit db2565f

16 files changed

+1263
-25
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateFilteringHandler.java

Lines changed: 418 additions & 0 deletions
Large diffs are not rendered by default.

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel;
3232

3333
import javax.annotation.Nonnull;
34+
import javax.annotation.Nullable;
3435

3536
import java.io.IOException;
3637
import java.util.HashMap;
@@ -63,7 +64,7 @@ public void close() {
6364
* case of an error.
6465
*/
6566
void recover(Info info, int oldSubtaskIndex, BufferWithContext<Context> bufferWithContext)
66-
throws IOException;
67+
throws IOException, InterruptedException;
6768
}
6869

6970
class InputChannelRecoveredStateHandler
@@ -75,10 +76,19 @@ class InputChannelRecoveredStateHandler
7576
private final Map<InputChannelInfo, RecoveredInputChannel> rescaledChannels = new HashMap<>();
7677
private final Map<Integer, RescaleMappings> oldToNewMappings = new HashMap<>();
7778

79+
/**
80+
* Optional filtering handler for filtering recovered buffers. When non-null, filtering is
81+
* performed during recovery in the channel-state-unspilling thread.
82+
*/
83+
@Nullable private final ChannelStateFilteringHandler filteringHandler;
84+
7885
InputChannelRecoveredStateHandler(
79-
InputGate[] inputGates, InflightDataRescalingDescriptor channelMapping) {
86+
InputGate[] inputGates,
87+
InflightDataRescalingDescriptor channelMapping,
88+
@Nullable ChannelStateFilteringHandler filteringHandler) {
8089
this.inputGates = inputGates;
8190
this.channelMapping = channelMapping;
91+
this.filteringHandler = filteringHandler;
8292
}
8393

8494
@Override
@@ -95,23 +105,60 @@ public void recover(
95105
InputChannelInfo channelInfo,
96106
int oldSubtaskIndex,
97107
BufferWithContext<Buffer> bufferWithContext)
98-
throws IOException {
108+
throws IOException, InterruptedException {
99109
Buffer buffer = bufferWithContext.context;
100110
try {
101111
if (buffer.readableBytes() > 0) {
102112
RecoveredInputChannel channel = getMappedChannels(channelInfo);
103-
channel.onRecoveredStateBuffer(
104-
EventSerializer.toBuffer(
105-
new SubtaskConnectionDescriptor(
106-
oldSubtaskIndex, channelInfo.getInputChannelIdx()),
107-
false));
108-
channel.onRecoveredStateBuffer(buffer.retainBuffer());
113+
114+
if (filteringHandler != null) {
115+
// Filtering mode: filter records and rewrite to new buffers
116+
recoverWithFiltering(channel, channelInfo, oldSubtaskIndex, buffer);
117+
} else {
118+
// Non-filtering mode: pass through original buffer with descriptor
119+
channel.onRecoveredStateBuffer(
120+
EventSerializer.toBuffer(
121+
new SubtaskConnectionDescriptor(
122+
oldSubtaskIndex, channelInfo.getInputChannelIdx()),
123+
false));
124+
channel.onRecoveredStateBuffer(buffer.retainBuffer());
125+
}
109126
}
110127
} finally {
111128
buffer.recycleBuffer();
112129
}
113130
}
114131

132+
private void recoverWithFiltering(
133+
RecoveredInputChannel channel,
134+
InputChannelInfo channelInfo,
135+
int oldSubtaskIndex,
136+
Buffer buffer)
137+
throws IOException, InterruptedException {
138+
checkState(filteringHandler != null, "filtering handler not set.");
139+
// Extra retain: filterAndRewrite consumes one ref, caller's finally releases another.
140+
buffer.retainBuffer();
141+
142+
List<Buffer> filteredBuffers;
143+
try {
144+
filteredBuffers =
145+
filteringHandler.filterAndRewrite(
146+
channelInfo.getGateIdx(),
147+
oldSubtaskIndex,
148+
channelInfo.getInputChannelIdx(),
149+
buffer,
150+
channel::requestBufferBlocking);
151+
} catch (Throwable t) {
152+
// filterAndRewrite didn't consume the buffer, release the extra ref.
153+
buffer.recycleBuffer();
154+
throw t;
155+
}
156+
157+
for (Buffer filteredBuffer : filteredBuffers) {
158+
channel.onRecoveredStateBuffer(filteredBuffer);
159+
}
160+
}
161+
115162
@Override
116163
public void close() throws IOException {
117164
// note that we need to finish all RecoveredInputChannels, not just those with state
@@ -191,7 +238,7 @@ public void recover(
191238
ResultSubpartitionInfo subpartitionInfo,
192239
int oldSubtaskIndex,
193240
BufferWithContext<BufferBuilder> bufferWithContext)
194-
throws IOException {
241+
throws IOException, InterruptedException {
195242
try (BufferBuilder bufferBuilder = bufferWithContext.context;
196243
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumerFromBeginning()) {
197244
bufferBuilder.finish();

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReader.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,22 @@
2020
import org.apache.flink.annotation.Internal;
2121
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
2222
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
23+
import org.apache.flink.streaming.runtime.io.recovery.RecordFilterContext;
2324

2425
import java.io.IOException;
2526

2627
/** Reads channel state saved during checkpoint/savepoint. */
2728
@Internal
2829
public interface SequentialChannelStateReader extends AutoCloseable {
2930

30-
void readInputData(InputGate[] inputGates) throws IOException, InterruptedException;
31+
/**
32+
* Reads input channel state with filtering support.
33+
*
34+
* @param inputGates The input gates to recover state for.
35+
* @param filterContext The filter context containing input configs and rescaling info.
36+
*/
37+
void readInputData(InputGate[] inputGates, RecordFilterContext filterContext)
38+
throws IOException, InterruptedException;
3139

3240
void readOutputData(ResultPartitionWriter[] writers, boolean notifyAndBlockOnCompletion)
3341
throws IOException, InterruptedException;
@@ -39,7 +47,8 @@ void readOutputData(ResultPartitionWriter[] writers, boolean notifyAndBlockOnCom
3947
new SequentialChannelStateReader() {
4048

4149
@Override
42-
public void readInputData(InputGate[] inputGates) {}
50+
public void readInputData(
51+
InputGate[] inputGates, RecordFilterContext filterContext) {}
4352

4453
@Override
4554
public void readOutputData(

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImpl.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
2929
import org.apache.flink.runtime.state.ChannelStateHelper;
3030
import org.apache.flink.runtime.state.StreamStateHandle;
31+
import org.apache.flink.streaming.runtime.io.recovery.RecordFilterContext;
3132

3233
import java.io.Closeable;
3334
import java.io.IOException;
@@ -43,6 +44,7 @@
4344
import static java.util.Comparator.comparingLong;
4445
import static java.util.stream.Collectors.groupingBy;
4546
import static java.util.stream.Collectors.toList;
47+
import static org.apache.flink.util.Preconditions.checkState;
4648

4749
/** {@link SequentialChannelStateReader} implementation. */
4850
public class SequentialChannelStateReaderImpl implements SequentialChannelStateReader {
@@ -58,10 +60,21 @@ public SequentialChannelStateReaderImpl(TaskStateSnapshot taskStateSnapshot) {
5860
}
5961

6062
@Override
61-
public void readInputData(InputGate[] inputGates) throws IOException, InterruptedException {
63+
public void readInputData(InputGate[] inputGates, RecordFilterContext filterContext)
64+
throws IOException, InterruptedException {
65+
66+
// Create filtering handler if filtering is needed
67+
ChannelStateFilteringHandler filteringHandler = null;
68+
if (filterContext.isUnalignedDuringRecoveryEnabled()) {
69+
filteringHandler =
70+
ChannelStateFilteringHandler.createFromContext(filterContext, inputGates);
71+
}
72+
6273
try (InputChannelRecoveredStateHandler stateHandler =
6374
new InputChannelRecoveredStateHandler(
64-
inputGates, taskStateSnapshot.getInputRescalingDescriptor())) {
75+
inputGates,
76+
taskStateSnapshot.getInputRescalingDescriptor(),
77+
filteringHandler)) {
6578
read(
6679
stateHandler,
6780
groupByDelegate(
@@ -72,6 +85,18 @@ public void readInputData(InputGate[] inputGates) throws IOException, Interrupte
7285
groupByDelegate(
7386
streamSubtaskStates(),
7487
OperatorSubtaskState::getUpstreamOutputBufferState));
88+
89+
if (filteringHandler != null) {
90+
checkState(
91+
!filteringHandler.hasPartialData(),
92+
"Not all data has been fully consumed during filtering");
93+
}
94+
} finally {
95+
// Clean up filtering handler resources (e.g., temp files from
96+
// SpillingAdaptiveSpanningRecordDeserializer) on both success and error paths
97+
if (filteringHandler != null) {
98+
filteringHandler.clear();
99+
}
75100
}
76101
}
77102

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.api.common.TaskInfo;
2424
import org.apache.flink.api.common.typeutils.TypeSerializer;
2525
import org.apache.flink.api.java.functions.KeySelector;
26+
import org.apache.flink.configuration.CheckpointingOptions;
2627
import org.apache.flink.configuration.Configuration;
2728
import org.apache.flink.core.memory.ManagedMemoryUseCase;
2829
import org.apache.flink.metrics.Counter;
@@ -103,6 +104,10 @@ public static StreamMultipleInputProcessor create(
103104
"Number of configured inputs in StreamConfig [%s] doesn't match the main operator's number of inputs [%s]",
104105
configuredInputs.length,
105106
inputsCount);
107+
108+
boolean unalignedDuringRecoveryEnabled =
109+
CheckpointingOptions.isUnalignedDuringRecoveryEnabled(jobConfig);
110+
106111
StreamTaskInput[] inputs = new StreamTaskInput[inputsCount];
107112
for (int i = 0; i < inputsCount; i++) {
108113
StreamConfig.InputConfig configuredInput = configuredInputs[i];
@@ -121,7 +126,8 @@ public static StreamMultipleInputProcessor create(
121126
gatePartitioners,
122127
taskInfo,
123128
canEmitBatchOfRecords,
124-
streamConfig.getWatermarkDeclarations(userClassloader));
129+
streamConfig.getWatermarkDeclarations(userClassloader),
130+
unalignedDuringRecoveryEnabled);
125131
} else if (configuredInput instanceof StreamConfig.SourceInputConfig) {
126132
StreamConfig.SourceInputConfig sourceInput =
127133
(StreamConfig.SourceInputConfig) configuredInput;

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputFactory.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,14 @@ public static <T> StreamTaskInput<T> create(
4747
Function<Integer, StreamPartitioner<?>> gatePartitioners,
4848
TaskInfo taskInfo,
4949
CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
50-
Set<AbstractInternalWatermarkDeclaration<?>> watermarkDeclarationSet) {
50+
Set<AbstractInternalWatermarkDeclaration<?>> watermarkDeclarationSet,
51+
boolean unalignedDuringRecoveryEnabled) {
5152
return rescalingDescriptorinflightDataRescalingDescriptor.equals(
52-
InflightDataRescalingDescriptor.NO_RESCALE)
53+
InflightDataRescalingDescriptor.NO_RESCALE)
54+
// When filter during recovery is enabled, records are already filtered in
55+
// the channel-state-unspilling thread. Use StreamTaskNetworkInput to avoid
56+
// redundant demultiplexing/filtering in the Task thread.
57+
|| unalignedDuringRecoveryEnabled
5358
? new StreamTaskNetworkInput<>(
5459
checkpointedInputGate,
5560
inputSerializer,

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.api.common.TaskInfo;
2323
import org.apache.flink.api.common.typeutils.TypeSerializer;
2424
import org.apache.flink.api.java.functions.KeySelector;
25+
import org.apache.flink.configuration.CheckpointingOptions;
2526
import org.apache.flink.configuration.Configuration;
2627
import org.apache.flink.core.memory.ManagedMemoryUseCase;
2728
import org.apache.flink.metrics.Counter;
@@ -84,6 +85,10 @@ public static <IN1, IN2> StreamMultipleInputProcessor create(
8485
checkNotNull(operatorChain);
8586

8687
taskIOMetricGroup.reuseRecordsInputCounter(numRecordsIn);
88+
89+
boolean unalignedDuringRecoveryEnabled =
90+
CheckpointingOptions.isUnalignedDuringRecoveryEnabled(jobConfig);
91+
8792
TypeSerializer<IN1> typeSerializer1 = streamConfig.getTypeSerializerIn(0, userClassloader);
8893
StreamTaskInput<IN1> input1 =
8994
StreamTaskNetworkInputFactory.create(
@@ -96,7 +101,8 @@ public static <IN1, IN2> StreamMultipleInputProcessor create(
96101
gatePartitioners,
97102
taskInfo,
98103
canEmitBatchOfRecords,
99-
streamConfig.getWatermarkDeclarations(userClassloader));
104+
streamConfig.getWatermarkDeclarations(userClassloader),
105+
unalignedDuringRecoveryEnabled);
100106
TypeSerializer<IN2> typeSerializer2 = streamConfig.getTypeSerializerIn(1, userClassloader);
101107
StreamTaskInput<IN2> input2 =
102108
StreamTaskNetworkInputFactory.create(
@@ -109,7 +115,8 @@ public static <IN1, IN2> StreamMultipleInputProcessor create(
109115
gatePartitioners,
110116
taskInfo,
111117
canEmitBatchOfRecords,
112-
streamConfig.getWatermarkDeclarations(userClassloader));
118+
streamConfig.getWatermarkDeclarations(userClassloader),
119+
unalignedDuringRecoveryEnabled);
113120

114121
InputSelectable inputSelectable =
115122
streamOperator instanceof InputSelectable ? (InputSelectable) streamOperator : null;

0 commit comments

Comments
 (0)