Skip to content

Commit 8c8667e

Browse files
authored
[FLINK-37623][datastream] Async state support for process() in Datastream API (#26439)
1 parent 125a665 commit 8c8667e

File tree

2 files changed

+51
-5
lines changed

2 files changed

+51
-5
lines changed

Diff for: flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
3737
import org.apache.flink.api.java.typeutils.TypeExtractor;
3838
import org.apache.flink.runtime.asyncprocessing.operators.AsyncIntervalJoinOperator;
39+
import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
3940
import org.apache.flink.runtime.asyncprocessing.operators.AsyncStreamFlatMap;
4041
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
4142
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -358,9 +359,10 @@ public <R> SingleOutputStreamOperator<R> process(
358359
@Internal
359360
public <R> SingleOutputStreamOperator<R> process(
360361
KeyedProcessFunction<KEY, T, R> keyedProcessFunction, TypeInformation<R> outputType) {
361-
362-
KeyedProcessOperator<KEY, T, R> operator =
363-
new KeyedProcessOperator<>(clean(keyedProcessFunction));
362+
OneInputStreamOperator<T, R> operator =
363+
isEnableAsyncState()
364+
? new AsyncKeyedProcessOperator<>(clean(keyedProcessFunction))
365+
: new KeyedProcessOperator<>(clean(keyedProcessFunction));
364366
return transform("KeyedProcess", outputType, operator);
365367
}
366368

@@ -370,9 +372,9 @@ public <R> SingleOutputStreamOperator<R> process(
370372
@Override
371373
public <R> SingleOutputStreamOperator<R> flatMap(
372374
FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
373-
OneInputStreamOperator operator =
375+
OneInputStreamOperator<T, R> operator =
374376
isEnableAsyncState()
375-
? new AsyncStreamFlatMap(clean(flatMapper))
377+
? new AsyncStreamFlatMap<>(clean(flatMapper))
376378
: new StreamFlatMap<>(clean(flatMapper));
377379
return transform("Flat Map", outputType, operator);
378380
}

Diff for: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java

+44
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
4141
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
4242
import org.apache.flink.api.java.typeutils.TypeExtractor;
43+
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
44+
import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
4345
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
4446
import org.apache.flink.streaming.api.datastream.BroadcastStream;
4547
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
@@ -934,6 +936,48 @@ public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out)
934936
assertThat(getOperatorForDataStream(processed)).isInstanceOf(KeyedProcessOperator.class);
935937
}
936938

939+
/**
940+
* Verify that a {@link KeyedStream#process(KeyedProcessFunction)} call is correctly translated
941+
* to an async operator.
942+
*/
943+
@Test
944+
void testAsyncKeyedStreamKeyedProcessTranslation() {
945+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
946+
DataStreamSource<Long> src = env.fromSequence(0, 0);
947+
948+
KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction =
949+
new KeyedProcessFunction<Long, Long, Integer>() {
950+
private static final long serialVersionUID = 1L;
951+
952+
@Override
953+
public void processElement(Long value, Context ctx, Collector<Integer> out)
954+
throws Exception {
955+
// Do nothing
956+
}
957+
958+
@Override
959+
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out)
960+
throws Exception {
961+
// Do nothing
962+
}
963+
};
964+
965+
DataStream<Integer> processed =
966+
src.keyBy(new IdentityKeySelector<Long>())
967+
.enableAsyncState()
968+
.process(keyedProcessFunction);
969+
970+
processed.sinkTo(new DiscardingSink<Integer>());
971+
972+
assertThat(
973+
((AbstractAsyncStateUdfStreamOperator<?, ?>)
974+
getOperatorForDataStream(processed))
975+
.getUserFunction())
976+
.isEqualTo(keyedProcessFunction);
977+
assertThat(getOperatorForDataStream(processed))
978+
.isInstanceOf(AsyncKeyedProcessOperator.class);
979+
}
980+
937981
/**
938982
* Verify that a {@link DataStream#process(ProcessFunction)} call is correctly translated to an
939983
* operator.

0 commit comments

Comments
 (0)