Skip to content

Commit 6099697

Browse files
authored
[FLINK-37158][tests] Introduce ForSt to existing ITCases (#26000)
1 parent 7b35ede commit 6099697

File tree

5 files changed

+169
-8
lines changed

5 files changed

+169
-8
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,8 @@ public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
608608
@Nonnull
609609
@Override
610610
public SavepointResources<K> savepoint() throws Exception {
611-
throw new UnsupportedOperationException("This method is not supported.");
611+
throw new UnsupportedOperationException(
612+
"Canonical savepoints are not supported by ForSt State Backend.");
612613
}
613614

614615
@Override

flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ public static Collection<Object[]> data() {
116116
new Object[][] {
117117
{"rocksdb", false},
118118
{"rocksdb", true},
119-
{"hashmap", false}
119+
{"hashmap", false},
120+
{"forst", false},
121+
{"forst", true}
120122
});
121123
}
122124

flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java

+34
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.flink.core.fs.Path;
3636
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
3737
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
38+
import org.apache.flink.state.forst.ForStOptions;
39+
import org.apache.flink.state.forst.ForStStateBackend;
3840
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
3941
import org.apache.flink.state.rocksdb.RocksDBOptions;
4042
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -110,6 +112,7 @@ enum StateBackendEnum {
110112
ROCKSDB_FULL,
111113
ROCKSDB_INCREMENTAL,
112114
ROCKSDB_INCREMENTAL_ZK,
115+
FORST_INCREMENTAL
113116
}
114117

115118
@Parameterized.Parameters(name = "statebackend type ={0}")
@@ -191,6 +194,14 @@ private Configuration getConfiguration() throws Exception {
191194
setupRocksDB(config, 16, true);
192195
break;
193196
}
197+
case FORST_INCREMENTAL:
198+
{
199+
config.set(
200+
ForStOptions.TIMER_SERVICE_FACTORY,
201+
ForStStateBackend.PriorityQueueStateType.ForStDB);
202+
setupForSt(config, 16);
203+
break;
204+
}
194205
default:
195206
throw new IllegalStateException("No backend selected.");
196207
}
@@ -229,6 +240,29 @@ private void setupRocksDB(
229240
config.set(RocksDBOptions.LOCAL_DIRECTORIES, rocksDb);
230241
}
231242

243+
private void setupForSt(Configuration config, int fileSizeThreshold) throws IOException {
244+
// Configure the managed memory size as 64MB per slot for rocksDB state backend.
245+
config.set(
246+
TaskManagerOptions.MANAGED_MEMORY_SIZE,
247+
MemorySize.ofMebiBytes(PARALLELISM / NUM_OF_TASK_MANAGERS * 64));
248+
249+
final String forstdb = tempFolder.newFolder().getAbsolutePath();
250+
final File backups = tempFolder.newFolder().getAbsoluteFile();
251+
// we use the fs backend with small threshold here to test the behaviour with file
252+
// references, not self contained byte handles
253+
config.set(StateBackendOptions.STATE_BACKEND, "forst");
254+
config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
255+
config.set(
256+
CheckpointingOptions.CHECKPOINTS_DIRECTORY,
257+
Path.fromLocalFile(backups).toUri().toString());
258+
if (fileSizeThreshold != -1) {
259+
config.set(
260+
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
261+
MemorySize.parse(fileSizeThreshold + "b"));
262+
}
263+
config.set(ForStOptions.LOCAL_DIRECTORIES, forstdb);
264+
}
265+
232266
protected Configuration createClusterConfig() throws IOException {
233267
TemporaryFolder temporaryFolder = new TemporaryFolder();
234268
temporaryFolder.create();

flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java

+18
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@
2525
import org.apache.flink.api.common.state.ValueStateDescriptor;
2626
import org.apache.flink.api.java.functions.KeySelector;
2727
import org.apache.flink.api.java.tuple.Tuple2;
28+
import org.apache.flink.configuration.CheckpointingOptions;
2829
import org.apache.flink.configuration.Configuration;
2930
import org.apache.flink.configuration.MemorySize;
31+
import org.apache.flink.configuration.StateBackendOptions;
3032
import org.apache.flink.configuration.TaskManagerOptions;
3133
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
34+
import org.apache.flink.state.forst.ForStOptions;
3235
import org.apache.flink.state.rocksdb.RocksDBOptions;
3336
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
3437
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -143,6 +146,21 @@ public void testWithRocksDbBackendIncremental() throws Exception {
143146
testProgramWithBackend(env);
144147
}
145148

149+
@Test
150+
public void testWithForStBackendIncremental() throws Exception {
151+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
152+
env.configure(
153+
new Configuration()
154+
.set(StateBackendOptions.STATE_BACKEND, "forst")
155+
.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true)
156+
.set(
157+
ForStOptions.LOCAL_DIRECTORIES,
158+
tmpFolder.newFolder().getAbsolutePath()));
159+
CheckpointStorageUtils.configureFileSystemCheckpointStorage(
160+
env, tmpFolder.newFolder().toURI().toString());
161+
testProgramWithBackend(env);
162+
}
163+
146164
// ------------------------------------------------------------------------
147165

148166
protected void testProgramWithBackend(StreamExecutionEnvironment env) throws Exception {

flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java

+112-6
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
package org.apache.flink.test.checkpointing;
2020

2121
import org.apache.flink.api.common.JobID;
22+
import org.apache.flink.api.common.functions.OpenContext;
2223
import org.apache.flink.api.common.functions.RichFlatMapFunction;
2324
import org.apache.flink.api.common.state.ValueState;
2425
import org.apache.flink.api.common.state.ValueStateDescriptor;
26+
import org.apache.flink.api.common.state.v2.StateFuture;
27+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
2528
import org.apache.flink.api.connector.sink2.Sink;
2629
import org.apache.flink.api.connector.sink2.SinkWriter;
2730
import org.apache.flink.api.connector.sink2.WriterInitContext;
@@ -40,8 +43,10 @@
4043
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
4144
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
4245
import org.apache.flink.streaming.api.datastream.DataStream;
46+
import org.apache.flink.streaming.api.datastream.KeyedStream;
4347
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4448
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
49+
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
4550
import org.apache.flink.test.util.MiniClusterWithClientResource;
4651
import org.apache.flink.testutils.junit.SharedObjects;
4752
import org.apache.flink.testutils.junit.SharedReference;
@@ -56,8 +61,12 @@
5661
import org.junit.Rule;
5762
import org.junit.Test;
5863
import org.junit.rules.TemporaryFolder;
64+
import org.junit.runner.RunWith;
65+
import org.junit.runners.Parameterized;
5966

6067
import java.io.IOException;
68+
import java.util.Arrays;
69+
import java.util.Collection;
6170
import java.util.Collections;
6271
import java.util.HashSet;
6372
import java.util.Optional;
@@ -74,6 +83,7 @@
7483
* NotifyingDefiniteKeySource, SubtaskIndexFlatMapper and CollectionSink refer to RescalingITCase,
7584
* because the static fields in these classes can not be shared.
7685
*/
86+
@RunWith(Parameterized.class)
7787
public class RescaleCheckpointManuallyITCase extends TestLogger {
7888

7989
private static final int NUM_TASK_MANAGERS = 2;
@@ -84,10 +94,24 @@ public class RescaleCheckpointManuallyITCase extends TestLogger {
8494

8595
@ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
8696

97+
@Parameterized.Parameter(0)
98+
public String statebackendType;
99+
100+
@Parameterized.Parameter(1)
101+
public boolean enableAsyncState;
102+
103+
@Parameterized.Parameters(name = "statebackend type ={0}, enableAsyncState={1}")
104+
public static Collection<Object[]> parameter() {
105+
return Arrays.asList(
106+
new Object[][] {
107+
{"forst", true}, {"forst", false}, {"rocksdb", true}, {"rocksdb", false}
108+
});
109+
}
110+
87111
@Before
88112
public void setup() throws Exception {
89113
Configuration config = new Configuration();
90-
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
114+
config.set(StateBackendOptions.STATE_BACKEND, statebackendType);
91115
config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
92116

93117
cluster =
@@ -263,7 +287,7 @@ private JobGraph createJobGraphWithKeyedState(
263287

264288
SharedReference<JobID> jobID = sharedObjects.add(new JobID());
265289
SharedReference<MiniCluster> miniClusterRef = sharedObjects.add(miniCluster);
266-
DataStream<Integer> input =
290+
KeyedStream<Integer, Integer> input =
267291
env.addSource(
268292
new NotifyingDefiniteKeySource(
269293
numberKeys, numberElements, failAfterEmission) {
@@ -300,10 +324,18 @@ public Integer getKey(Integer value) {
300324
return value;
301325
}
302326
});
303-
DataStream<Tuple2<Integer, Integer>> result =
304-
input.flatMap(new SubtaskIndexFlatMapper(numberElementsExpect));
327+
if (enableAsyncState) {
328+
input.enableAsyncState();
329+
DataStream<Tuple2<Integer, Integer>> result =
330+
input.flatMap(new AsyncSubtaskIndexFlatMapper(numberElementsExpect));
305331

306-
result.sinkTo(new CollectionSink<>());
332+
result.sinkTo(new CollectionSink<>());
333+
} else {
334+
DataStream<Tuple2<Integer, Integer>> result =
335+
input.flatMap(new SubtaskIndexFlatMapper(numberElementsExpect));
336+
337+
result.sinkTo(new CollectionSink<>());
338+
}
307339

308340
return env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get());
309341
}
@@ -349,8 +381,9 @@ public void run(SourceContext<Integer> ctx) throws Exception {
349381
} else {
350382
boolean newCheckpoint = false;
351383
long waited = 0L;
384+
running = false;
352385
// maximum wait 5min
353-
while (!newCheckpoint && waited < 30000L) {
386+
while (!newCheckpoint && waited < 300000L) {
354387
synchronized (ctx.getCheckpointLock()) {
355388
newCheckpoint = waitCheckpointCompleted();
356389
}
@@ -423,6 +456,79 @@ public void initializeState(FunctionInitializationContext context) throws Except
423456
}
424457
}
425458

459+
private static class AsyncSubtaskIndexFlatMapper
460+
extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
461+
implements CheckpointedFunction {
462+
463+
private static final long serialVersionUID = 1L;
464+
465+
private transient org.apache.flink.api.common.state.v2.ValueState<Integer> counter;
466+
private transient org.apache.flink.api.common.state.v2.ValueState<Integer> sum;
467+
468+
private final int numberElements;
469+
470+
public AsyncSubtaskIndexFlatMapper(int numberElements) {
471+
this.numberElements = numberElements;
472+
}
473+
474+
@Override
475+
public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>> out)
476+
throws Exception {
477+
StateFuture<Integer> counterFuture =
478+
counter.asyncValue()
479+
.thenCompose(
480+
(Integer c) -> {
481+
int updated = c == null ? 1 : c + 1;
482+
return counter.asyncUpdate(updated)
483+
.thenApply(nothing -> updated);
484+
});
485+
StateFuture<Integer> sumFuture =
486+
sum.asyncValue()
487+
.thenCompose(
488+
(Integer s) -> {
489+
int updated = s == null ? value : s + value;
490+
return sum.asyncUpdate(updated)
491+
.thenApply(nothing -> updated);
492+
});
493+
494+
counterFuture.thenCombine(
495+
sumFuture,
496+
(c, s) -> {
497+
if (c == numberElements) {
498+
out.collect(
499+
Tuple2.of(
500+
getRuntimeContext()
501+
.getTaskInfo()
502+
.getIndexOfThisSubtask(),
503+
s));
504+
}
505+
return null;
506+
});
507+
}
508+
509+
@Override
510+
public void snapshotState(FunctionSnapshotContext context) throws Exception {
511+
// all managed, nothing to do.
512+
}
513+
514+
@Override
515+
public void initializeState(FunctionInitializationContext context) throws Exception {}
516+
517+
@Override
518+
public void open(OpenContext openContext) throws Exception {
519+
counter =
520+
((StreamingRuntimeContext) getRuntimeContext())
521+
.getValueState(
522+
new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
523+
"counter", BasicTypeInfo.INT_TYPE_INFO));
524+
sum =
525+
((StreamingRuntimeContext) getRuntimeContext())
526+
.getValueState(
527+
new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
528+
"sum", BasicTypeInfo.INT_TYPE_INFO));
529+
}
530+
}
531+
426532
private static class CollectionSink<IN> implements Sink<IN> {
427533

428534
private static final ConcurrentHashMap<JobID, CollectionSinkWriter<?>> writers =

0 commit comments

Comments
 (0)