Skip to content

Commit b3c7c30

Browse files
sd4324530ThorneANN
authored andcommitted
[FLINK-38833][paimon] Shuffle record to different subtasks by table, partition and bucket. (apache#4298)
Signed-off-by: Pei Yu <125331682@qq.com>
1 parent 711dfb9 commit b3c7c30

4 files changed

Lines changed: 26 additions & 5 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {
6969
if (event instanceof BucketWrapperChangeEvent) {
7070
// Add hash of tableId to avoid data skew.
7171
return ((BucketWrapperChangeEvent) event).getBucket()
72+
+ ((BucketWrapperChangeEvent) event).getPartition()
7273
+ ((BucketWrapperChangeEvent) event).tableId().hashCode();
7374
} else {
7475
return ((BucketWrapper) event).getBucket();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
157157
bucketAssignerMap.computeIfAbsent(
158158
dataChangeEvent.tableId(), this::getTableInfo);
159159
int bucket;
160+
int partition;
160161
GenericRow genericRow =
161162
PaimonWriterHelper.convertEventToGenericRow(
162163
dataChangeEvent,
@@ -171,17 +172,20 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
171172
tuple4.f2.assign(
172173
tuple4.f3.partition(genericRow),
173174
tuple4.f3.trimmedPrimaryKey(genericRow).hashCode());
175+
partition = tuple4.f3.partition(genericRow).hashCode();
174176
break;
175177
}
176178
case HASH_FIXED:
177179
{
178180
tuple4.f1.setRecord(genericRow);
179181
bucket = tuple4.f1.bucket();
182+
partition = tuple4.f1.partition().hashCode();
180183
break;
181184
}
182185
case BUCKET_UNAWARE:
183186
{
184187
bucket = 0;
188+
partition = 0;
185189
break;
186190
}
187191
case KEY_DYNAMIC:
@@ -191,14 +195,16 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
191195
}
192196
}
193197
output.collect(
194-
new StreamRecord<>(new BucketWrapperChangeEvent(bucket, dataChangeEvent)));
198+
new StreamRecord<>(
199+
new BucketWrapperChangeEvent(bucket, partition, dataChangeEvent)));
195200
} else {
196201
// Broadcast SchemachangeEvent.
197202
for (int index = 0; index < totalTasksNumber; index++) {
198203
output.collect(
199204
new StreamRecord<>(
200205
new BucketWrapperChangeEvent(
201206
index,
207+
0,
202208
convertSchemaChangeEvent((SchemaChangeEvent) event))));
203209
}
204210
}
@@ -279,6 +285,7 @@ public DataChangeEvent convertDataChangeEvent(DataChangeEvent dataChangeEvent)
279285
new StreamRecord<>(
280286
new BucketWrapperChangeEvent(
281287
index,
288+
0,
282289
new CreateTableEvent(
283290
tableId,
284291
mixedSchemaInfo.paimonSchemaInfo

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,24 @@
2727
public class BucketWrapperChangeEvent implements ChangeEvent, BucketWrapper, Serializable {
2828
private static final long serialVersionUID = 1L;
2929
private final int bucket;
30+
private final int partition;
3031

3132
private final ChangeEvent innerEvent;
3233

33-
public BucketWrapperChangeEvent(int bucket, ChangeEvent innerEvent) {
34+
public BucketWrapperChangeEvent(int bucket, int partition, ChangeEvent innerEvent) {
3435
this.bucket = bucket;
36+
this.partition = partition;
3537
this.innerEvent = innerEvent;
3638
}
3739

3840
public int getBucket() {
3941
return bucket;
4042
}
4143

44+
public int getPartition() {
45+
return partition;
46+
}
47+
4248
public ChangeEvent getInnerEvent() {
4349
return innerEvent;
4450
}
@@ -57,19 +63,23 @@ public boolean equals(Object o) {
5763
return false;
5864
}
5965
BucketWrapperChangeEvent that = (BucketWrapperChangeEvent) o;
60-
return bucket == that.bucket && Objects.equals(innerEvent, that.innerEvent);
66+
return bucket == that.bucket
67+
&& partition == that.partition
68+
&& Objects.equals(innerEvent, that.innerEvent);
6169
}
6270

6371
@Override
6472
public int hashCode() {
65-
return Objects.hash(bucket, innerEvent);
73+
return Objects.hash(bucket, partition, innerEvent);
6674
}
6775

6876
@Override
6977
public String toString() {
7078
return "BucketWrapperChangeEvent{"
7179
+ "bucket="
7280
+ bucket
81+
+ ", partition="
82+
+ partition
7383
+ ", innerEvent="
7484
+ innerEvent
7585
+ '}';

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public void serialize(Event event, DataOutputView dataOutputView) throws IOExcep
8282
BucketWrapperChangeEvent bucketWrapperChangeEvent = (BucketWrapperChangeEvent) event;
8383
enumSerializer.serialize(EventClass.BUCKET_WRAPPER_CHANGE_EVENT, dataOutputView);
8484
dataOutputView.writeInt(bucketWrapperChangeEvent.getBucket());
85+
dataOutputView.writeInt(bucketWrapperChangeEvent.getPartition());
8586
eventSerializer.serialize(bucketWrapperChangeEvent.getInnerEvent(), dataOutputView);
8687
} else if (event instanceof BucketWrapperFlushEvent) {
8788
enumSerializer.serialize(EventClass.BUCKET_WRAPPER_FLUSH_EVENT, dataOutputView);
@@ -107,7 +108,9 @@ public Event deserialize(DataInputView source) throws IOException {
107108
schemaChangeEventTypeEnumSerializer.deserialize(source));
108109
} else {
109110
return new BucketWrapperChangeEvent(
110-
source.readInt(), (ChangeEvent) eventSerializer.deserialize(source));
111+
source.readInt(),
112+
source.readInt(),
113+
(ChangeEvent) eventSerializer.deserialize(source));
111114
}
112115
}
113116

0 commit comments

Comments
 (0)