Skip to content

Commit bc6be75

Browse files
committed
[flink] Flink sink supports dynamic Fluss sink supports dynamic shuffle.
1 parent 49ce39d commit bc6be75

34 files changed

+3334
-53
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.adapter;
19+
20+
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
21+
22+
/**
23+
* This is a small util class that try to hide calls to Flink Internal or PublicEvolve interfaces as
24+
* Flink can change those APIs during minor version release.
25+
*/
26+
public class FlinkCompatibilityUtil {
27+
28+
private FlinkCompatibilityUtil() {}
29+
30+
/** Get index of this subtask. TODO: remove this method when no longer support flink 1.18 */
31+
public static int getIndexOfThisSubtask(StreamingRuntimeContext runtimeContext) {
32+
return runtimeContext.getIndexOfThisSubtask();
33+
}
34+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.flink;
1919

2020
import org.apache.fluss.config.FlussConfigUtils;
21+
import org.apache.fluss.flink.sink.shuffle.DistributionMode;
2122
import org.apache.fluss.flink.utils.FlinkConversions;
2223

2324
import org.apache.flink.configuration.ConfigOption;
@@ -125,6 +126,23 @@ public class FlinkConnectorOptions {
125126
+ BUCKET_KEY.key()
126127
+ "' is defined. For Primary Key table, it is enabled by default.");
127128

129+
public static final ConfigOption<DistributionMode> SINK_DISTRIBUTION_MODE =
130+
ConfigOptions.key("sink.distribution-mode")
131+
.enumType(DistributionMode.class)
132+
.defaultValue(DistributionMode.BUCKET_SHUFFLE)
133+
.withDescription(
134+
"Defines the distribution mode for writing data to the sink. Available options are: \n"
135+
+ "- NONE: No specific distribution strategy. Data is forwarded as is.\n"
136+
+ "- BUCKET_SHUFFLE: Shuffle data by bucket ID before writing to sink. "
137+
+ "Shuffling the data with the same bucket ID to be processed by the same task "
138+
+ "can improve the efficiency of client processing and reduce resource consumption. "
139+
+ "For Log Table, bucket shuffle will only take effect when the '"
140+
+ BUCKET_KEY.key()
141+
+ "' is defined. For Primary Key table, it is enabled by default.\n"
142+
+ "- DYNAMIC_SHUFFLE: Dynamically adjust shuffle strategy based on partition key traffic patterns. "
143+
+ "This mode monitors data distribution and adjusts the shuffle behavior to balance the load. "
144+
+ "It is only supported for partitioned tables.");
145+
128146
// --------------------------------------------------------------------------------------------
129147
// table storage specific options
130148
// --------------------------------------------------------------------------------------------
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.adapter;
19+
20+
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
21+
22+
/**
23+
* This is a small util class that try to hide calls to Flink Internal or PublicEvolve interfaces as
24+
* Flink can change those APIs during minor version release.
25+
*/
26+
public class FlinkCompatibilityUtil {
27+
28+
private FlinkCompatibilityUtil() {}
29+
30+
/** Get index of this subtask. TODO: remove this method when no longer support flink 1.18 */
31+
public static int getIndexOfThisSubtask(StreamingRuntimeContext runtimeContext) {
32+
return runtimeContext.getTaskInfo().getIndexOfThisSubtask();
33+
}
34+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
2424
import org.apache.fluss.flink.lake.LakeTableFactory;
2525
import org.apache.fluss.flink.sink.FlinkTableSink;
26+
import org.apache.fluss.flink.sink.shuffle.DistributionMode;
2627
import org.apache.fluss.flink.source.FlinkTableSource;
2728
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
2829
import org.apache.fluss.metadata.DataLakeFormat;
@@ -175,6 +176,15 @@ public DynamicTableSink createDynamicTableSink(Context context) {
175176
List<String> partitionKeys = resolvedCatalogTable.getPartitionKeys();
176177

177178
RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType();
179+
DistributionMode distributionMode;
180+
if (tableOptions.getOptional(FlinkConnectorOptions.SINK_DISTRIBUTION_MODE).isPresent()) {
181+
distributionMode = tableOptions.get(FlinkConnectorOptions.SINK_DISTRIBUTION_MODE);
182+
} else {
183+
distributionMode =
184+
tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE)
185+
? DistributionMode.BUCKET_SHUFFLE
186+
: DistributionMode.NONE;
187+
}
178188

179189
return new FlinkTableSink(
180190
toFlussTablePath(context.getObjectIdentifier()),
@@ -190,7 +200,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
190200
tableOptions.get(toFlinkOption(TABLE_DELETE_BEHAVIOR)),
191201
tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER),
192202
getBucketKeys(tableOptions),
193-
tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE));
203+
distributionMode);
194204
}
195205

196206
@Override
@@ -216,6 +226,7 @@ public Set<ConfigOption<?>> optionalOptions() {
216226
FlinkConnectorOptions.LOOKUP_ASYNC,
217227
FlinkConnectorOptions.SINK_IGNORE_DELETE,
218228
FlinkConnectorOptions.SINK_BUCKET_SHUFFLE,
229+
FlinkConnectorOptions.SINK_DISTRIBUTION_MODE,
219230
LookupOptions.MAX_RETRIES,
220231
LookupOptions.CACHE_TYPE,
221232
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java

Lines changed: 98 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,27 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.config.Configuration;
2222
import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
23+
import org.apache.fluss.flink.sink.shuffle.DataStatisticsOperatorFactory;
24+
import org.apache.fluss.flink.sink.shuffle.DistributionMode;
25+
import org.apache.fluss.flink.sink.shuffle.StatisticsOrRecord;
26+
import org.apache.fluss.flink.sink.shuffle.StatisticsOrRecordChannelComputer;
27+
import org.apache.fluss.flink.sink.shuffle.StatisticsOrRecordTypeInformation;
2328
import org.apache.fluss.flink.sink.writer.AppendSinkWriter;
2429
import org.apache.fluss.flink.sink.writer.FlinkSinkWriter;
2530
import org.apache.fluss.flink.sink.writer.UpsertSinkWriter;
2631
import org.apache.fluss.metadata.DataLakeFormat;
2732
import org.apache.fluss.metadata.TablePath;
2833

34+
import org.apache.flink.api.common.functions.FlatMapFunction;
2935
import org.apache.flink.api.common.operators.MailboxExecutor;
36+
import org.apache.flink.api.common.typeinfo.TypeInformation;
3037
import org.apache.flink.api.connector.sink2.Sink;
3138
import org.apache.flink.api.connector.sink2.SinkWriter;
3239
import org.apache.flink.api.connector.sink2.WriterInitContext;
3340
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
3441
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
3542
import org.apache.flink.streaming.api.datastream.DataStream;
43+
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
3644
import org.apache.flink.table.types.logical.RowType;
3745

3846
import javax.annotation.Nullable;
@@ -97,8 +105,9 @@ static class AppendSinkWriterBuilder<InputT>
97105
private final List<String> bucketKeys;
98106
private final List<String> partitionKeys;
99107
private final @Nullable DataLakeFormat lakeFormat;
100-
private final boolean shuffleByBucketId;
108+
private final DistributionMode shuffleMode;
101109
private final FlussSerializationSchema<InputT> flussSerializationSchema;
110+
private final @Nullable TypeInformation<InputT> rowTypeInformation;
102111

103112
public AppendSinkWriterBuilder(
104113
TablePath tablePath,
@@ -108,17 +117,19 @@ public AppendSinkWriterBuilder(
108117
List<String> bucketKeys,
109118
List<String> partitionKeys,
110119
@Nullable DataLakeFormat lakeFormat,
111-
boolean shuffleByBucketId,
112-
FlussSerializationSchema<InputT> flussSerializationSchema) {
120+
DistributionMode shuffleMode,
121+
FlussSerializationSchema<InputT> flussSerializationSchema,
122+
@Nullable TypeInformation<InputT> rowTypeInformation) {
113123
this.tablePath = tablePath;
114124
this.flussConfig = flussConfig;
115125
this.tableRowType = tableRowType;
116126
this.numBucket = numBucket;
117127
this.bucketKeys = bucketKeys;
118128
this.partitionKeys = partitionKeys;
119129
this.lakeFormat = lakeFormat;
120-
this.shuffleByBucketId = shuffleByBucketId;
130+
this.shuffleMode = shuffleMode;
121131
this.flussSerializationSchema = flussSerializationSchema;
132+
this.rowTypeInformation = rowTypeInformation;
122133
}
123134

124135
@Override
@@ -133,20 +144,76 @@ public AppendSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) {
133144

134145
@Override
135146
public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
136-
// For append only sink, we will do bucket shuffle only if bucket keys are not empty.
137-
if (!bucketKeys.isEmpty() && shuffleByBucketId) {
138-
return partition(
139-
input,
140-
new FlinkRowDataChannelComputer<>(
141-
toFlussRowType(tableRowType),
142-
bucketKeys,
143-
partitionKeys,
144-
lakeFormat,
145-
numBucket,
146-
flussSerializationSchema),
147-
input.getParallelism());
148-
} else {
149-
return input;
147+
switch (shuffleMode) {
148+
case BUCKET_SHUFFLE:
149+
if (!bucketKeys.isEmpty()) {
150+
return partition(
151+
input,
152+
new FlinkRowDataChannelComputer<>(
153+
toFlussRowType(tableRowType),
154+
bucketKeys,
155+
partitionKeys,
156+
lakeFormat,
157+
numBucket,
158+
flussSerializationSchema),
159+
input.getParallelism());
160+
}
161+
return input;
162+
case NONE:
163+
return input;
164+
case DYNAMIC_SHUFFLE:
165+
if (partitionKeys.isEmpty()) {
166+
throw new UnsupportedOperationException(
167+
"DYNAMIC_SHUFFLE is only supported for partition tables");
168+
}
169+
170+
if (rowTypeInformation == null) {
171+
throw new UnsupportedOperationException(
172+
"RowTypeInformation is required for DYNAMIC_SHUFFLE mode.");
173+
}
174+
TypeInformation<StatisticsOrRecord<InputT>> statisticsOrRecordTypeInformation =
175+
new StatisticsOrRecordTypeInformation<>(rowTypeInformation);
176+
SingleOutputStreamOperator<StatisticsOrRecord<InputT>> shuffleStream =
177+
input.transform(
178+
"Range shuffle Collector",
179+
statisticsOrRecordTypeInformation,
180+
new DataStatisticsOperatorFactory<>(
181+
toFlussRowType(tableRowType),
182+
partitionKeys,
183+
flussSerializationSchema))
184+
.uid("Range shuffle Collector" + tablePath)
185+
// Set the parallelism same as input operator to encourage
186+
// chaining
187+
.setParallelism(input.getParallelism());
188+
189+
return partition(
190+
shuffleStream,
191+
new StatisticsOrRecordChannelComputer<>(
192+
toFlussRowType(tableRowType),
193+
bucketKeys,
194+
partitionKeys,
195+
numBucket,
196+
lakeFormat,
197+
flussSerializationSchema),
198+
input.getParallelism())
199+
.flatMap(
200+
(FlatMapFunction<StatisticsOrRecord<InputT>, InputT>)
201+
(statisticsOrRecord, out) -> {
202+
if (statisticsOrRecord.hasRecord()) {
203+
out.collect(statisticsOrRecord.record());
204+
}
205+
})
206+
.uid("flat map" + tablePath)
207+
// To promote operator chaining with the downstream writer operator,
208+
// setting slot sharing group and the parallelism as default, {@link
209+
// SinkTransformationTranslator} will set the parallelism same as sink
210+
// transformation.
211+
.slotSharingGroup("shuffle-partition-custom-group")
212+
.returns(rowTypeInformation);
213+
214+
default:
215+
throw new UnsupportedOperationException(
216+
"Unsupported distribution mode: " + shuffleMode);
150217
}
151218
}
152219
}
@@ -165,7 +232,7 @@ static class UpsertSinkWriterBuilder<InputT>
165232
private final List<String> bucketKeys;
166233
private final List<String> partitionKeys;
167234
private final @Nullable DataLakeFormat lakeFormat;
168-
private final boolean shuffleByBucketId;
235+
private final DistributionMode shuffleMode;
169236
private final FlussSerializationSchema<InputT> flussSerializationSchema;
170237

171238
UpsertSinkWriterBuilder(
@@ -177,7 +244,7 @@ static class UpsertSinkWriterBuilder<InputT>
177244
List<String> bucketKeys,
178245
List<String> partitionKeys,
179246
@Nullable DataLakeFormat lakeFormat,
180-
boolean shuffleByBucketId,
247+
DistributionMode shuffleMode,
181248
FlussSerializationSchema<InputT> flussSerializationSchema) {
182249
this.tablePath = tablePath;
183250
this.flussConfig = flussConfig;
@@ -187,7 +254,7 @@ static class UpsertSinkWriterBuilder<InputT>
187254
this.bucketKeys = bucketKeys;
188255
this.partitionKeys = partitionKeys;
189256
this.lakeFormat = lakeFormat;
190-
this.shuffleByBucketId = shuffleByBucketId;
257+
this.shuffleMode = shuffleMode;
191258
this.flussSerializationSchema = flussSerializationSchema;
192259
}
193260

@@ -204,8 +271,9 @@ public UpsertSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) {
204271

205272
@Override
206273
public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
207-
return shuffleByBucketId
208-
? partition(
274+
switch (shuffleMode) {
275+
case BUCKET_SHUFFLE:
276+
return partition(
209277
input,
210278
new FlinkRowDataChannelComputer<>(
211279
toFlussRowType(tableRowType),
@@ -214,8 +282,13 @@ public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
214282
lakeFormat,
215283
numBucket,
216284
flussSerializationSchema),
217-
input.getParallelism())
218-
: input;
285+
input.getParallelism());
286+
case NONE:
287+
return input;
288+
default:
289+
throw new UnsupportedOperationException(
290+
"Unsupported distribution mode: " + shuffleMode);
291+
}
219292
}
220293
}
221294
}

0 commit comments

Comments
 (0)