Skip to content

Commit 38b6c4a

Browse files
authored
[flink] Flink sink support hash by bucket id for PrimaryKey Table (#579)
1 parent 978480c commit 38b6c4a

File tree

13 files changed

+601
-71
lines changed

13 files changed

+601
-71
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,18 @@ public class FlinkConnectorOptions {
106106
.defaultValue(false)
107107
.withDescription("Whether to ignore retract(-U/-D) record.");
108108

109+
public static final ConfigOption<Boolean> SINK_BUCKET_SHUFFLE =
110+
ConfigOptions.key("sink.bucket-shuffle")
111+
.booleanType()
112+
.defaultValue(true)
113+
.withDescription(
114+
"Whether to shuffle by bucket id before write to sink. Shuffling the data with the same "
115+
+ "bucket id to be processed by the same task can improve the efficiency of client "
116+
+ "processing and reduce resource consumption. For Log Table, bucket shuffle will "
117+
+ "only take effect when the '"
118+
+ BUCKET_KEY.key()
119+
+ "' is defined. For Primary Key table, it is enabled by default.");
120+
109121
// --------------------------------------------------------------------------------------------
110122
// table storage specific options
111123
// --------------------------------------------------------------------------------------------

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@
4949
import java.util.Arrays;
5050
import java.util.Collections;
5151
import java.util.HashSet;
52+
import java.util.List;
5253
import java.util.Set;
5354

5455
import static com.alibaba.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
56+
import static com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes;
57+
import static com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys;
5558
import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlinkOption;
5659

5760
/** Factory to create table source and table sink for Fluss. */
@@ -94,8 +97,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
9497
resolvedCatalogTable.getPartitionKeys().stream()
9598
.mapToInt(tableOutputType::getFieldIndex)
9699
.toArray();
97-
int[] bucketKeyIndexes =
98-
FlinkConnectorOptionsUtils.getBucketKeyIndexes(tableOptions, tableOutputType);
100+
int[] bucketKeyIndexes = getBucketKeyIndexes(tableOptions, tableOutputType);
99101

100102
// options for lookup
101103
LookupCache cache = null;
@@ -141,6 +143,9 @@ public DynamicTableSink createDynamicTableSink(Context context) {
141143
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
142144
== RuntimeExecutionMode.STREAMING;
143145

146+
ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
147+
List<String> partitionKeys = resolvedCatalogTable.getPartitionKeys();
148+
144149
RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType();
145150
final ReadableConfig tableOptions = helper.getOptions();
146151

@@ -149,9 +154,14 @@ public DynamicTableSink createDynamicTableSink(Context context) {
149154
toFlussClientConfig(tableOptions, context.getConfiguration()),
150155
rowType,
151156
context.getPrimaryKeyIndexes(),
157+
partitionKeys,
152158
isStreamingMode,
153159
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
154-
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE));
160+
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_FORMAT)),
161+
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE),
162+
tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER),
163+
getBucketKeys(tableOptions),
164+
tableOptions.get(FlinkConnectorOptions.SINK_BUCKET_SHUFFLE));
155165
}
156166

157167
@Override
@@ -176,6 +186,7 @@ public Set<ConfigOption<?>> optionalOptions() {
176186
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
177187
FlinkConnectorOptions.LOOKUP_ASYNC,
178188
FlinkConnectorOptions.SINK_IGNORE_DELETE,
189+
FlinkConnectorOptions.SINK_BUCKET_SHUFFLE,
179190
LookupOptions.MAX_RETRIES,
180191
LookupOptions.CACHE_TYPE,
181192
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.sink;
18+
19+
import java.io.Serializable;
20+
21+
/**
22+
* A utility class to compute which downstream channel a given record should be sent to before flink
23+
* sink.
24+
*
25+
* @param <T> type of record
26+
*/
27+
public interface ChannelComputer<T> extends Serializable {
28+
void setup(int numChannels);
29+
30+
int channel(T record);
31+
32+
static int select(String partitionName, int bucket, int numChannels) {
33+
int startChannel = Math.abs(partitionName.hashCode()) % numChannels;
34+
return (startChannel + bucket) % numChannels;
35+
}
36+
37+
static int select(int bucket, int numChannels) {
38+
return bucket % numChannels;
39+
}
40+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.sink;
18+
19+
import com.alibaba.fluss.annotation.VisibleForTesting;
20+
import com.alibaba.fluss.bucketing.BucketingFunction;
21+
import com.alibaba.fluss.client.table.getter.PartitionGetter;
22+
import com.alibaba.fluss.flink.row.FlinkAsFlussRow;
23+
import com.alibaba.fluss.metadata.DataLakeFormat;
24+
import com.alibaba.fluss.row.encode.KeyEncoder;
25+
import com.alibaba.fluss.types.RowType;
26+
27+
import org.apache.flink.table.data.RowData;
28+
29+
import javax.annotation.Nullable;
30+
31+
import java.util.List;
32+
33+
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
34+
35+
/** {@link ChannelComputer} for flink {@link RowData}. */
36+
public class FlinkRowDataChannelComputer implements ChannelComputer<RowData> {
37+
38+
private static final long serialVersionUID = 1L;
39+
40+
private final @Nullable DataLakeFormat lakeFormat;
41+
private final int numBucket;
42+
private final RowType flussRowType;
43+
private final List<String> bucketKeys;
44+
private final List<String> partitionKeys;
45+
46+
private transient int numChannels;
47+
private transient BucketingFunction bucketingFunction;
48+
private transient KeyEncoder bucketKeyEncoder;
49+
private transient boolean combineShuffleWithPartitionName;
50+
private transient @Nullable PartitionGetter partitionGetter;
51+
52+
public FlinkRowDataChannelComputer(
53+
RowType flussRowType,
54+
List<String> bucketKeys,
55+
List<String> partitionKeys,
56+
@Nullable DataLakeFormat lakeFormat,
57+
int numBucket) {
58+
this.flussRowType = flussRowType;
59+
this.bucketKeys = bucketKeys;
60+
this.partitionKeys = partitionKeys;
61+
this.lakeFormat = lakeFormat;
62+
this.numBucket = numBucket;
63+
}
64+
65+
@Override
66+
public void setup(int numChannels) {
67+
this.numChannels = numChannels;
68+
this.bucketingFunction = BucketingFunction.of(lakeFormat);
69+
this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys, lakeFormat);
70+
if (partitionKeys.isEmpty()) {
71+
this.partitionGetter = null;
72+
} else {
73+
this.partitionGetter = new PartitionGetter(flussRowType, partitionKeys);
74+
}
75+
76+
// Only when partition keys exist and the Flink job parallelism and the bucket number are
77+
// not divisible, then we need to include the partition name as part of the shuffle key.
78+
// This approach can help avoid the possible data skew. For example, if bucket number is 3
79+
// and task parallelism is 2, it is highly possible that data shuffle becomes uneven. For
80+
// instance, in task1, it might have 'partition0-bucket0', 'partition1-bucket0',
81+
// 'partition0-bucket2', and 'partition1-bucket2', whereas in task2, it would only have
82+
// 'partition0-bucket1' and 'partition1-bucket1'. As partition number increases, this
83+
// situation becomes even more severe.
84+
this.combineShuffleWithPartitionName =
85+
partitionGetter != null && numBucket % numChannels != 0;
86+
}
87+
88+
@Override
89+
public int channel(RowData record) {
90+
FlinkAsFlussRow row = new FlinkAsFlussRow().replace(record);
91+
int bucketId = bucketingFunction.bucketing(bucketKeyEncoder.encodeKey(row), numBucket);
92+
if (!combineShuffleWithPartitionName) {
93+
return ChannelComputer.select(bucketId, numChannels);
94+
} else {
95+
checkNotNull(partitionGetter, "partitionGetter is null");
96+
String partitionName = partitionGetter.getPartition(row);
97+
return ChannelComputer.select(partitionName, bucketId, numChannels);
98+
}
99+
}
100+
101+
@Override
102+
public String toString() {
103+
return "BUCKET_SHUFFLE";
104+
}
105+
106+
@VisibleForTesting
107+
boolean isCombineShuffleWithPartitionName() {
108+
return combineShuffleWithPartitionName;
109+
}
110+
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkSink.java

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,34 @@
2121
import com.alibaba.fluss.flink.sink.writer.AppendSinkWriter;
2222
import com.alibaba.fluss.flink.sink.writer.FlinkSinkWriter;
2323
import com.alibaba.fluss.flink.sink.writer.UpsertSinkWriter;
24+
import com.alibaba.fluss.metadata.DataLakeFormat;
2425
import com.alibaba.fluss.metadata.TablePath;
2526

2627
import org.apache.flink.api.connector.sink2.Sink;
2728
import org.apache.flink.api.connector.sink2.SinkWriter;
2829
import org.apache.flink.api.connector.sink2.WriterInitContext;
2930
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
31+
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
32+
import org.apache.flink.streaming.api.datastream.DataStream;
3033
import org.apache.flink.table.data.RowData;
3134
import org.apache.flink.table.types.logical.RowType;
3235

3336
import javax.annotation.Nullable;
3437

3538
import java.io.IOException;
3639
import java.io.Serializable;
40+
import java.util.List;
3741

38-
/** Flink sink for Fluss. */
39-
class FlinkSink implements Sink<RowData> {
42+
import static com.alibaba.fluss.flink.sink.FlinkStreamPartitioner.partition;
43+
import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlussRowType;
44+
45+
/**
46+
* Flink sink for Fluss.
47+
*
48+
* <p>TODO: WithPreWriteTopology need to be changed to supportsPreWriteTopology in Flink 1.20. Trace
49+
* by https://github.com/alibaba/fluss/issues/622.
50+
*/
51+
class FlinkSink implements Sink<RowData>, WithPreWriteTopology<RowData> {
4052

4153
private static final long serialVersionUID = 1L;
4254

@@ -61,9 +73,16 @@ public SinkWriter<RowData> createWriter(WriterInitContext context) throws IOExce
6173
return flinkSinkWriter;
6274
}
6375

76+
@Override
77+
public DataStream<RowData> addPreWriteTopology(DataStream<RowData> input) {
78+
return builder.addPreWriteTopology(input);
79+
}
80+
6481
@Internal
6582
interface SinkWriterBuilder<W extends FlinkSinkWriter> extends Serializable {
6683
W createWriter();
84+
85+
DataStream<RowData> addPreWriteTopology(DataStream<RowData> input);
6786
}
6887

6988
@Internal
@@ -75,22 +94,55 @@ static class AppendSinkWriterBuilder implements SinkWriterBuilder<AppendSinkWrit
7594
private final Configuration flussConfig;
7695
private final RowType tableRowType;
7796
private final boolean ignoreDelete;
97+
private final int numBucket;
98+
private final List<String> bucketKeys;
99+
private final List<String> partitionKeys;
100+
private final @Nullable DataLakeFormat lakeFormat;
101+
private final boolean shuffleByBucketId;
78102

79103
public AppendSinkWriterBuilder(
80104
TablePath tablePath,
81105
Configuration flussConfig,
82106
RowType tableRowType,
83-
boolean ignoreDelete) {
107+
boolean ignoreDelete,
108+
int numBucket,
109+
List<String> bucketKeys,
110+
List<String> partitionKeys,
111+
@Nullable DataLakeFormat lakeFormat,
112+
boolean shuffleByBucketId) {
84113
this.tablePath = tablePath;
85114
this.flussConfig = flussConfig;
86115
this.tableRowType = tableRowType;
87116
this.ignoreDelete = ignoreDelete;
117+
this.numBucket = numBucket;
118+
this.bucketKeys = bucketKeys;
119+
this.partitionKeys = partitionKeys;
120+
this.lakeFormat = lakeFormat;
121+
this.shuffleByBucketId = shuffleByBucketId;
88122
}
89123

90124
@Override
91125
public AppendSinkWriter createWriter() {
92126
return new AppendSinkWriter(tablePath, flussConfig, tableRowType, ignoreDelete);
93127
}
128+
129+
@Override
130+
public DataStream<RowData> addPreWriteTopology(DataStream<RowData> input) {
131+
// For append only sink, we will do bucket shuffle only if bucket keys are not empty.
132+
if (!bucketKeys.isEmpty() && shuffleByBucketId) {
133+
return partition(
134+
input,
135+
new FlinkRowDataChannelComputer(
136+
toFlussRowType(tableRowType),
137+
bucketKeys,
138+
partitionKeys,
139+
lakeFormat,
140+
numBucket),
141+
input.getParallelism());
142+
} else {
143+
return input;
144+
}
145+
}
94146
}
95147

96148
@Internal
@@ -103,24 +155,54 @@ static class UpsertSinkWriterBuilder implements SinkWriterBuilder<UpsertSinkWrit
103155
private final RowType tableRowType;
104156
private final @Nullable int[] targetColumnIndexes;
105157
private final boolean ignoreDelete;
158+
private final int numBucket;
159+
private final List<String> bucketKeys;
160+
private final List<String> partitionKeys;
161+
private final @Nullable DataLakeFormat lakeFormat;
162+
private final boolean shuffleByBucketId;
106163

107164
UpsertSinkWriterBuilder(
108165
TablePath tablePath,
109166
Configuration flussConfig,
110167
RowType tableRowType,
111168
@Nullable int[] targetColumnIndexes,
112-
boolean ignoreDelete) {
169+
boolean ignoreDelete,
170+
int numBucket,
171+
List<String> bucketKeys,
172+
List<String> partitionKeys,
173+
@Nullable DataLakeFormat lakeFormat,
174+
boolean shuffleByBucketId) {
113175
this.tablePath = tablePath;
114176
this.flussConfig = flussConfig;
115177
this.tableRowType = tableRowType;
116178
this.targetColumnIndexes = targetColumnIndexes;
117179
this.ignoreDelete = ignoreDelete;
180+
this.numBucket = numBucket;
181+
this.bucketKeys = bucketKeys;
182+
this.partitionKeys = partitionKeys;
183+
this.lakeFormat = lakeFormat;
184+
this.shuffleByBucketId = shuffleByBucketId;
118185
}
119186

120187
@Override
121188
public UpsertSinkWriter createWriter() {
122189
return new UpsertSinkWriter(
123190
tablePath, flussConfig, tableRowType, targetColumnIndexes, ignoreDelete);
124191
}
192+
193+
@Override
194+
public DataStream<RowData> addPreWriteTopology(DataStream<RowData> input) {
195+
return shuffleByBucketId
196+
? partition(
197+
input,
198+
new FlinkRowDataChannelComputer(
199+
toFlussRowType(tableRowType),
200+
bucketKeys,
201+
partitionKeys,
202+
lakeFormat,
203+
numBucket),
204+
input.getParallelism())
205+
: input;
206+
}
125207
}
126208
}

0 commit comments

Comments
 (0)