Skip to content

Commit 9202345

Browse files
authored
[GLUTEN-10246][FLINK] Support processing time window(tumble) for nextmark q12 (apache#10898)
* support proctime window * remove useless changes * remove useless changes * support kafka data source * fix reviews * optimize code * fix memory leak * merge master * xxx * support proc time window * remove useless code * fix ut * restore q5,q7,q8 tests * Fix tz download failure * fix ci * update velox4j version
1 parent 3d585da commit 9202345

19 files changed

Lines changed: 454 additions & 160 deletions

.github/workflows/flink.yml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,18 @@ jobs:
5959
run: |
6060
source /opt/rh/gcc-toolset-11/enable
6161
sudo dnf install -y patchelf
62-
sudo yum install https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm -y
62+
sudo yum install https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2026a-1.el9.noarch.rpm -y
6363
sudo .github/workflows/util/install-flink-deps.sh
64+
# Remove system fmt v11 from the CI image to avoid header conflicts with
65+
# velox bundled fmt 10.x during folly/velox native compilation.
66+
sudo rm -rf /usr/local/include/fmt
67+
sudo rm -rf /usr/local/lib/cmake/fmt
68+
sudo rm -f /usr/local/lib/libfmt* /usr/local/lib64/libfmt*
69+
export VELOX_DEPENDENCY_SOURCE=BUNDLED
70+
export fmt_SOURCE=BUNDLED
71+
export folly_SOURCE=BUNDLED
6472
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
65-
cd velox4j && git reset --hard 889bafcf2fa04e8c31a30edbdf40fe203ef58484
73+
cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca
6674
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
6775
$GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
6876
cd ..

gluten-flink/docs/Flink.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you have to use the follow
4848
## fetch velox4j code
4949
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
5050
cd velox4j
51-
git reset --hard 889bafcf2fa04e8c31a30edbdf40fe203ef58484
51+
git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca
5252
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
5353
```
5454
**Get gluten**

gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java

Lines changed: 31 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.apache.gluten.rexnode.AggregateCallConverter;
2020
import org.apache.gluten.rexnode.Utils;
2121
import org.apache.gluten.rexnode.WindowUtils;
22-
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
2322
import org.apache.gluten.util.LogicalTypeConverter;
2423
import org.apache.gluten.util.PlanNodeIdGenerator;
2524

@@ -37,14 +36,13 @@
3736

3837
import org.apache.flink.FlinkVersion;
3938
import org.apache.flink.api.dag.Transformation;
39+
import org.apache.flink.api.java.tuple.Tuple2;
4040
import org.apache.flink.api.java.tuple.Tuple5;
4141
import org.apache.flink.configuration.ReadableConfig;
4242
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
4343
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
4444
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
4545
import org.apache.flink.table.data.RowData;
46-
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
47-
import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
4846
import org.apache.flink.table.planner.delegation.PlannerBase;
4947
import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
5048
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
@@ -55,24 +53,21 @@
5553
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
5654
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
5755
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
56+
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
5857
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
5958
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
6059
import org.apache.flink.table.planner.utils.TableConfigUtils;
61-
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
6260
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
63-
import org.apache.flink.table.runtime.groupwindow.WindowProperty;
6461
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
65-
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
6662
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
6763
import org.apache.flink.table.runtime.util.TimeWindowUtil;
68-
import org.apache.flink.table.types.DataType;
64+
import org.apache.flink.table.types.logical.LogicalType;
6965
import org.apache.flink.table.types.logical.RowType;
7066

7167
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
7268
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
7369

7470
import org.apache.calcite.rel.core.AggregateCall;
75-
import org.apache.calcite.tools.RelBuilder;
7671
import org.apache.commons.math3.util.ArithmeticUtils;
7772

7873
import javax.annotation.Nullable;
@@ -184,6 +179,14 @@ protected Transformation<RowData> translateToPlanInternal(
184179
final ZoneId shiftTimeZone =
185180
TimeWindowUtil.getShiftTimeZone(
186181
windowing.getTimeAttributeType(), TableConfigUtils.getLocalTimeZone(config));
182+
final AggregateInfoList globalAggInfoList =
183+
AggregateUtil.deriveStreamWindowAggregateInfoList(
184+
planner.getTypeFactory(),
185+
localAggInputRowType, // should use original input here
186+
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
187+
needRetraction,
188+
windowing.getWindow(),
189+
true);
187190

188191
// --- Begin Gluten-specific code changes ---
189192
// TODO: velox window not equal to flink window.
@@ -205,11 +208,15 @@ protected Transformation<RowData> translateToPlanInternal(
205208
// TODO: support more window types.
206209
Tuple5<Long, Long, Long, Integer, Integer> windowSpecParams =
207210
WindowUtils.extractWindowParameters(windowing);
211+
Tuple2<Integer, Integer> windowStartAndEndIndexes =
212+
WindowUtils.getWindowStartAndEndIndexes(namedWindowProperties, (RowType) getOutputType());
208213
long size = windowSpecParams.f0;
209214
long slide = windowSpecParams.f1;
210215
long offset = windowSpecParams.f2;
211216
int rowtimeIndex = windowSpecParams.f3;
212217
int windowType = windowSpecParams.f4;
218+
int windowStartIndex = windowStartAndEndIndexes.f0;
219+
int windowEndIndex = windowStartAndEndIndexes.f1;
213220
PartitionFunctionSpec sliceAssignerSpec =
214221
new StreamWindowPartitionFunctionSpec(
215222
inputType, rowtimeIndex, size, slide, offset, windowType);
@@ -252,24 +259,30 @@ protected Transformation<RowData> translateToPlanInternal(
252259
offset,
253260
windowType,
254261
outputType,
255-
rowtimeIndex);
256-
final OneInputStreamOperator windowOperator =
257-
new GlutenOneInputOperator(
262+
true,
263+
rowtimeIndex,
264+
windowStartIndex,
265+
windowEndIndex);
266+
final LogicalType[] accTypes = convertToLogicalTypes(globalAggInfoList.getAccTypes());
267+
final RowDataKeySelector selector =
268+
KeySelectorUtil.getRowDataSelector(
269+
planner.getFlinkContext().getClassLoader(),
270+
grouping,
271+
InternalTypeInfo.of(inputRowType));
272+
final OneInputStreamOperator<RowData, RowData> windowOperator =
273+
new org.apache.gluten.table.runtime.operators.WindowAggOperator<RowData, RowData, Long>(
258274
new StatefulPlanNode(windowAgg.getId(), windowAgg),
259275
PlanNodeIdGenerator.newId(),
260276
inputType,
261277
Map.of(windowAgg.getId(), outputType),
262278
RowData.class,
263279
RowData.class,
264-
"StreamExecGlobalWindowAggregate");
280+
"StreamExecWindowAggregate",
281+
selector.getProducedType(),
282+
globalAggInfoList.getAggNames(),
283+
accTypes);
265284
// --- End Gluten-specific code changes ---
266285

267-
final RowDataKeySelector selector =
268-
KeySelectorUtil.getRowDataSelector(
269-
planner.getFlinkContext().getClassLoader(),
270-
grouping,
271-
InternalTypeInfo.of(inputRowType));
272-
273286
final OneInputTransformation<RowData, RowData> transform =
274287
ExecNodeUtil.createOneInputTransformation(
275288
inputTransform,
@@ -285,40 +298,4 @@ protected Transformation<RowData> translateToPlanInternal(
285298
transform.setStateKeyType(selector.getProducedType());
286299
return transform;
287300
}
288-
289-
private GeneratedNamespaceAggsHandleFunction<Long> createAggsHandler(
290-
String name,
291-
SliceAssigner sliceAssigner,
292-
AggregateInfoList aggInfoList,
293-
int mergedAccOffset,
294-
boolean mergedAccIsOnHeap,
295-
DataType[] mergedAccExternalTypes,
296-
ExecNodeConfig config,
297-
ClassLoader classLoader,
298-
RelBuilder relBuilder,
299-
ZoneId shifTimeZone) {
300-
final AggsHandlerCodeGenerator generator =
301-
new AggsHandlerCodeGenerator(
302-
new CodeGeneratorContext(config, classLoader),
303-
relBuilder,
304-
JavaScalaConversionUtil.toScala(localAggInputRowType.getChildren()),
305-
true) // copyInputField
306-
.needAccumulate()
307-
.needMerge(mergedAccOffset, mergedAccIsOnHeap, mergedAccExternalTypes);
308-
309-
final List<WindowProperty> windowProperties =
310-
Arrays.asList(
311-
Arrays.stream(namedWindowProperties)
312-
.map(NamedWindowProperty::getProperty)
313-
.toArray(WindowProperty[]::new));
314-
315-
return generator.generateNamespaceAggsHandler(
316-
name,
317-
aggInfoList,
318-
JavaScalaConversionUtil.toScala(windowProperties),
319-
sliceAssigner,
320-
// we use window end timestamp to indicate a slicing window, see SliceAssigner
321-
Long.class,
322-
shifTimeZone);
323-
}
324301
}

gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,10 @@ protected Transformation<RowData> translateToPlanInternal(
220220
offset,
221221
windowType,
222222
outputType,
223-
rowtimeIndex);
223+
false,
224+
rowtimeIndex,
225+
-1,
226+
-1);
224227
final OneInputStreamOperator localAggOperator =
225228
new GlutenOneInputOperator(
226229
new StatefulPlanNode(windowAgg.getId(), windowAgg),

gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import org.apache.flink.FlinkVersion;
3838
import org.apache.flink.api.dag.Transformation;
39+
import org.apache.flink.api.java.tuple.Tuple2;
3940
import org.apache.flink.api.java.tuple.Tuple5;
4041
import org.apache.flink.configuration.ReadableConfig;
4142
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -208,11 +209,16 @@ protected Transformation<RowData> translateToPlanInternal(
208209
// TODO: support more window types.
209210
Tuple5<Long, Long, Long, Integer, Integer> windowSpecParams =
210211
WindowUtils.extractWindowParameters(windowing);
212+
Tuple2<Integer, Integer> windowStartAndEndIndexes =
213+
WindowUtils.getWindowStartAndEndIndexes(namedWindowProperties, (RowType) getOutputType());
211214
long size = windowSpecParams.f0;
212215
long slide = windowSpecParams.f1;
213216
long offset = windowSpecParams.f2;
214217
int rowtimeIndex = windowSpecParams.f3;
215218
int windowType = windowSpecParams.f4;
219+
boolean isRowTime = windowing.isRowtime();
220+
int windowStartIndex = windowStartAndEndIndexes.f0;
221+
int windowEndIndex = windowStartAndEndIndexes.f1;
216222
PartitionFunctionSpec sliceAssignerSpec =
217223
new StreamWindowPartitionFunctionSpec(
218224
inputType, rowtimeIndex, size, slide, offset, windowType);
@@ -221,25 +227,29 @@ protected Transformation<RowData> translateToPlanInternal(
221227
PlanNodeIdGenerator.newId(),
222228
AggregateStep.SINGLE,
223229
groupingKeys,
224-
groupingKeys,
230+
isRowTime ? groupingKeys : List.of(),
225231
aggNames,
226232
aggregates,
227233
false,
228234
List.of(new EmptyNode(inputType)),
229235
null,
230236
List.of());
237+
// processing time window can not apply to local-global aggregate optimization, so here we need
238+
// to set local aggregtate as null when it is not event time window.
231239
PlanNode localAgg =
232-
new AggregationNode(
233-
PlanNodeIdGenerator.newId(),
234-
AggregateStep.SINGLE,
235-
groupingKeys,
236-
groupingKeys,
237-
aggNames,
238-
aggregates,
239-
false,
240-
List.of(new EmptyNode(inputType)),
241-
null,
242-
List.of());
240+
isRowTime
241+
? new AggregationNode(
242+
PlanNodeIdGenerator.newId(),
243+
AggregateStep.SINGLE,
244+
groupingKeys,
245+
groupingKeys,
246+
aggNames,
247+
aggregates,
248+
false,
249+
List.of(new EmptyNode(inputType)),
250+
null,
251+
List.of())
252+
: null;
243253
PlanNode windowAgg =
244254
new StreamWindowAggregationNode(
245255
PlanNodeIdGenerator.newId(),
@@ -255,7 +265,10 @@ protected Transformation<RowData> translateToPlanInternal(
255265
offset,
256266
windowType,
257267
outputType,
258-
rowtimeIndex);
268+
windowing.isRowtime(),
269+
rowtimeIndex,
270+
windowStartIndex,
271+
windowEndIndex);
259272
final RowDataKeySelector selector =
260273
KeySelectorUtil.getRowDataSelector(
261274
planner.getFlinkContext().getClassLoader(),
@@ -266,6 +279,9 @@ protected Transformation<RowData> translateToPlanInternal(
266279
.map(x -> x.getLogicalType())
267280
.collect(Collectors.toList())
268281
.toArray(new LogicalType[] {});
282+
// For TVF windows (Tumbling, Hopping, Cumulative, Session), the window namespace
283+
// is identified by the window end timestamp (Long). If count-based windows are
284+
// supported in the future, a different serializer may be needed.
269285
final OneInputStreamOperator<RowData, RowData> windowOperator =
270286
new org.apache.gluten.table.runtime.operators.WindowAggOperator<RowData, RowData, Long>(
271287
new StatefulPlanNode(windowAgg.getId(), windowAgg),

gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,24 @@
1616
*/
1717
package org.apache.gluten.rexnode;
1818

19+
import org.apache.flink.api.java.tuple.Tuple2;
1920
import org.apache.flink.api.java.tuple.Tuple5;
21+
import org.apache.flink.table.planner.plan.logical.CumulativeWindowSpec;
2022
import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec;
23+
import org.apache.flink.table.planner.plan.logical.SessionWindowSpec;
2124
import org.apache.flink.table.planner.plan.logical.SliceAttachedWindowingStrategy;
2225
import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
2326
import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec;
2427
import org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
2528
import org.apache.flink.table.planner.plan.logical.WindowSpec;
2629
import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
30+
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
31+
import org.apache.flink.table.runtime.groupwindow.WindowEnd;
32+
import org.apache.flink.table.runtime.groupwindow.WindowStart;
33+
import org.apache.flink.table.types.logical.RowType;
2734

2835
import java.time.Duration;
36+
import java.util.List;
2937

3038
/** Utility to store some useful functions. */
3139
public class WindowUtils {
@@ -53,24 +61,27 @@ public static Tuple5<Long, Long, Long, Integer, Integer> extractWindowParameters
5361
if (windowOffset != null) {
5462
offset = windowOffset.toMillis();
5563
}
64+
windowType = 0;
5665
} else if (windowSpec instanceof TumblingWindowSpec) {
5766
size = ((TumblingWindowSpec) windowSpec).getSize().toMillis();
5867
Duration windowOffset = ((TumblingWindowSpec) windowSpec).getOffset();
5968
if (windowOffset != null) {
6069
offset = windowOffset.toMillis();
6170
}
71+
windowType = 1;
72+
} else if (windowSpec instanceof CumulativeWindowSpec) {
73+
windowType = 2;
74+
} else if (windowSpec instanceof SessionWindowSpec) {
75+
windowType = 3;
6276
} else {
6377
throw new RuntimeException("Not support window spec " + windowSpec);
6478
}
65-
6679
if (windowing instanceof TimeAttributeWindowingStrategy) {
6780
if (windowing.isRowtime()) {
6881
rowtimeIndex = ((TimeAttributeWindowingStrategy) windowing).getTimeAttributeIndex();
6982
}
70-
windowType = 0;
7183
} else if (windowing instanceof WindowAttachedWindowingStrategy) {
7284
rowtimeIndex = ((WindowAttachedWindowingStrategy) windowing).getWindowEnd();
73-
windowType = 1;
7485
} else if (windowing instanceof SliceAttachedWindowingStrategy) {
7586
rowtimeIndex = ((SliceAttachedWindowingStrategy) windowing).getSliceEnd();
7687
} else {
@@ -79,4 +90,18 @@ public static Tuple5<Long, Long, Long, Integer, Integer> extractWindowParameters
7990
return new Tuple5<Long, Long, Long, Integer, Integer>(
8091
size, slide, offset, rowtimeIndex, windowType);
8192
}
93+
94+
public static Tuple2<Integer, Integer> getWindowStartAndEndIndexes(
95+
NamedWindowProperty[] props, RowType outputType) {
96+
int windowStartIndex = -1, windowEndIndex = -1;
97+
List<String> outputNames = outputType.getFieldNames();
98+
for (NamedWindowProperty prop : props) {
99+
if (prop.getProperty() instanceof WindowStart) {
100+
windowStartIndex = outputNames.indexOf(prop.getName());
101+
} else if (prop.getProperty() instanceof WindowEnd) {
102+
windowEndIndex = outputNames.indexOf(prop.getName());
103+
}
104+
}
105+
return new Tuple2<Integer, Integer>(windowStartIndex, windowEndIndex);
106+
}
82107
}

0 commit comments

Comments
 (0)