Skip to content

Commit 8a111f8

Browse files
committed
add shuffle strategy for bucket
1 parent 7c20667 commit 8a111f8

File tree

6 files changed

+272
-184
lines changed

6 files changed

+272
-184
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
192192
toFlussRowType(tableRowType),
193193
bucketKeys,
194194
partitionKeys,
195+
numBucket,
195196
lakeFormat,
196197
flussSerializationSchema),
197198
input.getParallelism())

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.fluss.flink.sink.serializer;
1919

20-
import org.apache.flink.table.data.StringData;
21-
import org.apache.flink.table.data.binary.BinaryStringData;
2220
import org.apache.fluss.flink.row.FlinkAsFlussRow;
2321
import org.apache.fluss.flink.row.OperationType;
2422
import org.apache.fluss.flink.row.RowWithOp;
@@ -31,7 +29,9 @@
3129
import org.apache.fluss.types.RowType;
3230

3331
import org.apache.flink.table.data.RowData;
32+
import org.apache.flink.table.data.StringData;
3433
import org.apache.flink.table.data.binary.BinaryFormat;
34+
import org.apache.flink.table.data.binary.BinaryStringData;
3535
import org.apache.flink.types.RowKind;
3636

3737
/** Default implementation of RowDataConverter for RowData. */
@@ -115,9 +115,9 @@ public long size(RowData value, RowType rowType) {
115115
break;
116116
case STRING:
117117
StringData stringData = value.getString(i);
118-
if(stringData instanceof BinaryStringData){
119-
size += ((BinaryStringData)stringData).getSizeInBytes();
120-
}else {
118+
if (stringData instanceof BinaryStringData) {
119+
size += ((BinaryStringData) stringData).getSizeInBytes();
120+
} else {
121121
size += converter.getString(i).getSizeInBytes();
122122
}
123123
break;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.sink.shuffle;
19+
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.util.Arrays;
24+
import java.util.List;
25+
import java.util.concurrent.ThreadLocalRandom;
26+
27+
import static org.apache.fluss.utils.Preconditions.checkArgument;
28+
import static org.apache.fluss.utils.Preconditions.checkState;
29+
30+
/** BucketIdAssignment. */
31+
public class BucketIdAssignment extends RandomAssignment {
32+
private static final Logger LOG = LoggerFactory.getLogger(BucketIdAssignment.class);
33+
private final double bucketWeights;
34+
private final int bucketNums;
35+
36+
BucketIdAssignment(List<Integer> assignedSubtasks, List<Long> subtaskWeights, int bucketNums) {
37+
super(assignedSubtasks, subtaskWeights);
38+
this.bucketNums = bucketNums;
39+
this.bucketWeights = keyWeight * 1.0 / bucketNums;
40+
}
41+
42+
@Override
43+
public int select(Integer bucketId) {
44+
checkArgument(bucketId >= 0 && bucketId < bucketNums, "Invalid bucketId: %s", bucketId);
45+
double calculatedWeight =
46+
Math.floor(
47+
ThreadLocalRandom.current()
48+
.nextDouble(
49+
bucketId * bucketWeights, (bucketId + 1) * bucketWeights));
50+
int index = Arrays.binarySearch(cumulativeWeights, calculatedWeight);
51+
int position = Math.abs(index + 1);
52+
checkState(
53+
position < assignedSubtasks.size(),
54+
"Invalid selected position: out of range. key weight = %s, random number = %s, cumulative weights array = %s",
55+
keyWeight,
56+
calculatedWeight,
57+
cumulativeWeights);
58+
return assignedSubtasks.get(position);
59+
}
60+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/KeyAssignment.java

Lines changed: 3 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -17,134 +17,9 @@
1717

1818
package org.apache.fluss.flink.sink.shuffle;
1919

20-
import org.apache.fluss.utils.MurmurHashUtils;
21-
2220
import javax.annotation.Nullable;
2321

24-
import java.util.Arrays;
25-
import java.util.List;
26-
import java.util.Objects;
27-
import java.util.concurrent.ThreadLocalRandom;
28-
29-
import static org.apache.fluss.utils.Preconditions.checkArgument;
30-
import static org.apache.fluss.utils.Preconditions.checkState;
31-
import static org.apache.fluss.utils.UnsafeUtils.BYTE_ARRAY_BASE_OFFSET;
32-
33-
/** Subtask assignment for a key for Map statistics based. */
34-
public class KeyAssignment {
35-
private final List<Integer> assignedSubtasks;
36-
private final List<Long> subtaskWeights;
37-
private final long keyWeight;
38-
private final long[] cumulativeWeights;
39-
40-
/**
41-
* @param assignedSubtasks assigned subtasks for this key. It could be a single subtask. It
42-
* could also be multiple subtasks if the key has heavy weight that should be handled by
43-
* multiple subtasks.
44-
* @param subtaskWeights assigned weight for each subtask. E.g., if the keyWeight is 27 and the
45-
* key is assigned to 3 subtasks, subtaskWeights could contain values as [10, 10, 7] for
46-
* target weight of 10 per subtask.
47-
*/
48-
public KeyAssignment(List<Integer> assignedSubtasks, List<Long> subtaskWeights) {
49-
checkArgument(
50-
assignedSubtasks != null && !assignedSubtasks.isEmpty(),
51-
"Invalid assigned subtasks: null or empty");
52-
checkArgument(
53-
subtaskWeights != null && !subtaskWeights.isEmpty(),
54-
"Invalid assigned subtasks weights: null or empty");
55-
checkArgument(
56-
assignedSubtasks.size() == subtaskWeights.size(),
57-
"Invalid assignment: size mismatch (tasks length = %s, weights length = %s)",
58-
assignedSubtasks.size(),
59-
subtaskWeights.size());
60-
61-
this.assignedSubtasks = assignedSubtasks;
62-
this.subtaskWeights = subtaskWeights;
63-
this.keyWeight = subtaskWeights.stream().mapToLong(Long::longValue).sum();
64-
this.cumulativeWeights = new long[subtaskWeights.size()];
65-
long cumulativeWeight = 0;
66-
for (int i = 0; i < subtaskWeights.size(); ++i) {
67-
cumulativeWeight += subtaskWeights.get(i);
68-
cumulativeWeights[i] = cumulativeWeight;
69-
}
70-
}
71-
72-
public List<Integer> assignedSubtasks() {
73-
return assignedSubtasks;
74-
}
75-
76-
public List<Long> subtaskWeights() {
77-
return subtaskWeights;
78-
}
79-
80-
public long keyWeight() {
81-
return keyWeight;
82-
}
83-
84-
/**
85-
* Select a subtask for the key. If bucket key is existed , same key will be assigned to the
86-
* same subtask.
87-
*
88-
* @return subtask id
89-
*/
90-
int select(@Nullable byte[] bucketKey) {
91-
if (assignedSubtasks.size() == 1) {
92-
// only choice. no need to run random number generator.
93-
return assignedSubtasks.get(0);
94-
} else {
95-
long randomNumber;
96-
if (bucketKey != null) {
97-
randomNumber =
98-
MurmurHashUtils.hashUnsafeBytes(
99-
bucketKey, BYTE_ARRAY_BASE_OFFSET, bucketKey.length)
100-
% keyWeight;
101-
} else {
102-
randomNumber = ThreadLocalRandom.current().nextLong(keyWeight);
103-
}
104-
105-
int index = Arrays.binarySearch(cumulativeWeights, randomNumber);
106-
// choose the subtask where randomNumber < cumulativeWeights[pos].
107-
// this works regardless whether index is negative or not.
108-
int position = Math.abs(index + 1);
109-
checkState(
110-
position < assignedSubtasks.size(),
111-
"Invalid selected position: out of range. key weight = %s, random number = %s, cumulative weights array = %s",
112-
keyWeight,
113-
randomNumber,
114-
cumulativeWeights);
115-
return assignedSubtasks.get(position);
116-
}
117-
}
118-
119-
@Override
120-
public boolean equals(Object o) {
121-
if (o == null || getClass() != o.getClass()) {
122-
return false;
123-
}
124-
KeyAssignment that = (KeyAssignment) o;
125-
return keyWeight == that.keyWeight
126-
&& Objects.equals(assignedSubtasks, that.assignedSubtasks)
127-
&& Objects.equals(subtaskWeights, that.subtaskWeights)
128-
&& Objects.deepEquals(cumulativeWeights, that.cumulativeWeights);
129-
}
130-
131-
@Override
132-
public int hashCode() {
133-
return Objects.hash(
134-
assignedSubtasks, subtaskWeights, keyWeight, Arrays.hashCode(cumulativeWeights));
135-
}
136-
137-
@Override
138-
public String toString() {
139-
return "KeyAssignment{"
140-
+ "assignedSubtasks="
141-
+ assignedSubtasks
142-
+ ", subtaskWeights="
143-
+ subtaskWeights
144-
+ ", keyWeight="
145-
+ keyWeight
146-
+ ", cumulativeWeights="
147-
+ Arrays.toString(cumulativeWeights)
148-
+ '}';
149-
}
22+
/** KeyAssignment. */
23+
public interface KeyAssignment {
24+
int select(@Nullable Integer bucketId);
15025
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.sink.shuffle;
19+
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import javax.annotation.Nullable;
24+
25+
import java.util.Arrays;
26+
import java.util.List;
27+
import java.util.Objects;
28+
import java.util.concurrent.ThreadLocalRandom;
29+
30+
import static org.apache.fluss.utils.Preconditions.checkArgument;
31+
import static org.apache.fluss.utils.Preconditions.checkState;
32+
33+
/**
34+
* Subtask assignment for a key for Map statistics based. todo: 两种KeyAssignment, 一种是随机的,这种也是随机取值。
35+
* 第二种是先获取bucketId, 再为每个bucketId分配subtask
36+
*/
37+
public class RandomAssignment implements KeyAssignment {
38+
private static final Logger LOG = LoggerFactory.getLogger(RandomAssignment.class);
39+
protected final List<Integer> assignedSubtasks;
40+
protected final List<Long> subtaskWeights;
41+
protected final long keyWeight;
42+
protected final double[] cumulativeWeights;
43+
44+
/**
45+
* @param assignedSubtasks assigned subtasks for this key. It could be a single subtask. It
46+
* could also be multiple subtasks if the key has heavy weight that should be handled by
47+
* multiple subtasks.
48+
* @param subtaskWeights assigned weight for each subtask. E.g., if the keyWeight is 27 and the
49+
* key is assigned to 3 subtasks, subtaskWeights could contain values as [10, 10, 7] for
50+
* target weight of 10 per subtask.
51+
*/
52+
public RandomAssignment(List<Integer> assignedSubtasks, List<Long> subtaskWeights) {
53+
checkArgument(
54+
assignedSubtasks != null && !assignedSubtasks.isEmpty(),
55+
"Invalid assigned subtasks: null or empty");
56+
checkArgument(
57+
subtaskWeights != null && !subtaskWeights.isEmpty(),
58+
"Invalid assigned subtasks weights: null or empty");
59+
checkArgument(
60+
assignedSubtasks.size() == subtaskWeights.size(),
61+
"Invalid assignment: size mismatch (tasks length = %s, weights length = %s)",
62+
assignedSubtasks.size(),
63+
subtaskWeights.size());
64+
65+
this.assignedSubtasks = assignedSubtasks;
66+
this.subtaskWeights = subtaskWeights;
67+
this.keyWeight = subtaskWeights.stream().mapToLong(Long::longValue).sum();
68+
this.cumulativeWeights = new double[subtaskWeights.size()];
69+
long cumulativeWeight = 0;
70+
for (int i = 0; i < subtaskWeights.size(); ++i) {
71+
cumulativeWeight += subtaskWeights.get(i);
72+
cumulativeWeights[i] = cumulativeWeight;
73+
}
74+
}
75+
76+
/**
77+
* Select a subtask for the key. If bucket key is existed , same key will be assigned to the
78+
* same subtask.
79+
*
80+
* @return subtask id
81+
*/
82+
@Override
83+
public int select(@Nullable Integer bucketId) {
84+
if (assignedSubtasks.size() == 1) {
85+
// only choice. no need to run random number generator.
86+
return assignedSubtasks.get(0);
87+
} else {
88+
long randomNumber = ThreadLocalRandom.current().nextLong(keyWeight);
89+
int index = Arrays.binarySearch(cumulativeWeights, randomNumber);
90+
// choose the subtask where randomNumber < cumulativeWeights[pos].
91+
// this works regardless whether index is negative or not.
92+
int position = Math.abs(index + 1);
93+
checkState(
94+
position < assignedSubtasks.size(),
95+
"Invalid selected position: out of range. key weight = %s, random number = %s, cumulative weights array = %s",
96+
keyWeight,
97+
randomNumber,
98+
cumulativeWeights);
99+
return assignedSubtasks.get(position);
100+
}
101+
}
102+
103+
@Override
104+
public boolean equals(Object o) {
105+
if (o == null || getClass() != o.getClass()) {
106+
return false;
107+
}
108+
RandomAssignment that = (RandomAssignment) o;
109+
return keyWeight == that.keyWeight
110+
&& Objects.equals(assignedSubtasks, that.assignedSubtasks)
111+
&& Objects.equals(subtaskWeights, that.subtaskWeights)
112+
&& Objects.deepEquals(cumulativeWeights, that.cumulativeWeights);
113+
}
114+
115+
@Override
116+
public int hashCode() {
117+
return Objects.hash(
118+
assignedSubtasks, subtaskWeights, keyWeight, Arrays.hashCode(cumulativeWeights));
119+
}
120+
121+
@Override
122+
public String toString() {
123+
return "KeyAssignment{"
124+
+ "assignedSubtasks="
125+
+ assignedSubtasks
126+
+ ", subtaskWeights="
127+
+ subtaskWeights
128+
+ ", keyWeight="
129+
+ keyWeight
130+
+ ", cumulativeWeights="
131+
+ Arrays.toString(cumulativeWeights)
132+
+ '}';
133+
}
134+
}

0 commit comments

Comments
 (0)