19
19
package org .apache .flink .test .checkpointing ;
20
20
21
21
import org .apache .flink .api .common .JobID ;
22
- import org .apache .flink .api .common .functions .OpenContext ;
23
22
import org .apache .flink .api .common .functions .RichFlatMapFunction ;
24
23
import org .apache .flink .api .common .state .ValueState ;
25
24
import org .apache .flink .api .common .state .ValueStateDescriptor ;
26
- import org .apache .flink .api .common .state .v2 .StateFuture ;
27
- import org .apache .flink .api .common .typeinfo .BasicTypeInfo ;
28
25
import org .apache .flink .api .connector .sink2 .Sink ;
29
26
import org .apache .flink .api .connector .sink2 .SinkWriter ;
30
27
import org .apache .flink .api .connector .sink2 .WriterInitContext ;
43
40
import org .apache .flink .runtime .testutils .MiniClusterResourceConfiguration ;
44
41
import org .apache .flink .streaming .api .checkpoint .CheckpointedFunction ;
45
42
import org .apache .flink .streaming .api .datastream .DataStream ;
46
- import org .apache .flink .streaming .api .datastream .KeyedStream ;
47
43
import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
48
44
import org .apache .flink .streaming .api .functions .source .legacy .RichParallelSourceFunction ;
49
- import org .apache .flink .streaming .api .operators .StreamingRuntimeContext ;
50
45
import org .apache .flink .test .util .MiniClusterWithClientResource ;
51
46
import org .apache .flink .testutils .junit .SharedObjects ;
52
47
import org .apache .flink .testutils .junit .SharedReference ;
61
56
import org .junit .Rule ;
62
57
import org .junit .Test ;
63
58
import org .junit .rules .TemporaryFolder ;
64
- import org .junit .runner .RunWith ;
65
- import org .junit .runners .Parameterized ;
66
59
67
60
import java .io .IOException ;
68
- import java .util .Arrays ;
69
- import java .util .Collection ;
70
61
import java .util .Collections ;
71
62
import java .util .HashSet ;
72
63
import java .util .Optional ;
83
74
* NotifyingDefiniteKeySource, SubtaskIndexFlatMapper and CollectionSink refer to RescalingITCase,
84
75
* because the static fields in these classes can not be shared.
85
76
*/
86
- @ RunWith (Parameterized .class )
87
77
public class RescaleCheckpointManuallyITCase extends TestLogger {
88
78
89
79
private static final int NUM_TASK_MANAGERS = 2 ;
@@ -94,24 +84,10 @@ public class RescaleCheckpointManuallyITCase extends TestLogger {
94
84
95
85
@ ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder ();
96
86
97
- @ Parameterized .Parameter (0 )
98
- public String statebackendType ;
99
-
100
- @ Parameterized .Parameter (1 )
101
- public boolean enableAsyncState ;
102
-
103
- @ Parameterized .Parameters (name = "statebackend type ={0}, enableAsyncState={1}" )
104
- public static Collection <Object []> parameter () {
105
- return Arrays .asList (
106
- new Object [][] {
107
- {"forst" , true }, {"forst" , false }, {"rocksdb" , true }, {"rocksdb" , false }
108
- });
109
- }
110
-
111
87
@ Before
112
88
public void setup () throws Exception {
113
89
Configuration config = new Configuration ();
114
- config .set (StateBackendOptions .STATE_BACKEND , statebackendType );
90
+ config .set (StateBackendOptions .STATE_BACKEND , "rocksdb" );
115
91
config .set (CheckpointingOptions .INCREMENTAL_CHECKPOINTS , true );
116
92
117
93
cluster =
@@ -287,7 +263,7 @@ private JobGraph createJobGraphWithKeyedState(
287
263
288
264
SharedReference <JobID > jobID = sharedObjects .add (new JobID ());
289
265
SharedReference <MiniCluster > miniClusterRef = sharedObjects .add (miniCluster );
290
- KeyedStream < Integer , Integer > input =
266
+ DataStream < Integer > input =
291
267
env .addSource (
292
268
new NotifyingDefiniteKeySource (
293
269
numberKeys , numberElements , failAfterEmission ) {
@@ -324,18 +300,10 @@ public Integer getKey(Integer value) {
324
300
return value ;
325
301
}
326
302
});
327
- if (enableAsyncState ) {
328
- input .enableAsyncState ();
329
- DataStream <Tuple2 <Integer , Integer >> result =
330
- input .flatMap (new AsyncSubtaskIndexFlatMapper (numberElementsExpect ));
303
+ DataStream <Tuple2 <Integer , Integer >> result =
304
+ input .flatMap (new SubtaskIndexFlatMapper (numberElementsExpect ));
331
305
332
- result .sinkTo (new CollectionSink <>());
333
- } else {
334
- DataStream <Tuple2 <Integer , Integer >> result =
335
- input .flatMap (new SubtaskIndexFlatMapper (numberElementsExpect ));
336
-
337
- result .sinkTo (new CollectionSink <>());
338
- }
306
+ result .sinkTo (new CollectionSink <>());
339
307
340
308
return env .getStreamGraph ().getJobGraph (env .getClass ().getClassLoader (), jobID .get ());
341
309
}
@@ -381,9 +349,8 @@ public void run(SourceContext<Integer> ctx) throws Exception {
381
349
} else {
382
350
boolean newCheckpoint = false ;
383
351
long waited = 0L ;
384
- running = false ;
385
352
// maximum wait 5min
386
- while (!newCheckpoint && waited < 300000L ) {
353
+ while (!newCheckpoint && waited < 30000L ) {
387
354
synchronized (ctx .getCheckpointLock ()) {
388
355
newCheckpoint = waitCheckpointCompleted ();
389
356
}
@@ -456,79 +423,6 @@ public void initializeState(FunctionInitializationContext context) throws Except
456
423
}
457
424
}
458
425
459
- private static class AsyncSubtaskIndexFlatMapper
460
- extends RichFlatMapFunction <Integer , Tuple2 <Integer , Integer >>
461
- implements CheckpointedFunction {
462
-
463
- private static final long serialVersionUID = 1L ;
464
-
465
- private transient org .apache .flink .api .common .state .v2 .ValueState <Integer > counter ;
466
- private transient org .apache .flink .api .common .state .v2 .ValueState <Integer > sum ;
467
-
468
- private final int numberElements ;
469
-
470
- public AsyncSubtaskIndexFlatMapper (int numberElements ) {
471
- this .numberElements = numberElements ;
472
- }
473
-
474
- @ Override
475
- public void flatMap (Integer value , Collector <Tuple2 <Integer , Integer >> out )
476
- throws Exception {
477
- StateFuture <Integer > counterFuture =
478
- counter .asyncValue ()
479
- .thenCompose (
480
- (Integer c ) -> {
481
- int updated = c == null ? 1 : c + 1 ;
482
- return counter .asyncUpdate (updated )
483
- .thenApply (nothing -> updated );
484
- });
485
- StateFuture <Integer > sumFuture =
486
- sum .asyncValue ()
487
- .thenCompose (
488
- (Integer s ) -> {
489
- int updated = s == null ? value : s + value ;
490
- return sum .asyncUpdate (updated )
491
- .thenApply (nothing -> updated );
492
- });
493
-
494
- counterFuture .thenCombine (
495
- sumFuture ,
496
- (c , s ) -> {
497
- if (c == numberElements ) {
498
- out .collect (
499
- Tuple2 .of (
500
- getRuntimeContext ()
501
- .getTaskInfo ()
502
- .getIndexOfThisSubtask (),
503
- s ));
504
- }
505
- return null ;
506
- });
507
- }
508
-
509
- @ Override
510
- public void snapshotState (FunctionSnapshotContext context ) throws Exception {
511
- // all managed, nothing to do.
512
- }
513
-
514
- @ Override
515
- public void initializeState (FunctionInitializationContext context ) throws Exception {}
516
-
517
- @ Override
518
- public void open (OpenContext openContext ) throws Exception {
519
- counter =
520
- ((StreamingRuntimeContext ) getRuntimeContext ())
521
- .getValueState (
522
- new org .apache .flink .api .common .state .v2 .ValueStateDescriptor <>(
523
- "counter" , BasicTypeInfo .INT_TYPE_INFO ));
524
- sum =
525
- ((StreamingRuntimeContext ) getRuntimeContext ())
526
- .getValueState (
527
- new org .apache .flink .api .common .state .v2 .ValueStateDescriptor <>(
528
- "sum" , BasicTypeInfo .INT_TYPE_INFO ));
529
- }
530
- }
531
-
532
426
private static class CollectionSink <IN > implements Sink <IN > {
533
427
534
428
private static final ConcurrentHashMap <JobID , CollectionSinkWriter <?>> writers =
0 commit comments