Skip to content

Commit c347651

Browse files
authored
[FLINK-37731][pipeline-paimon]Support Postpone Bucket (#4386)
1 parent 24ab548 commit c347651

3 files changed

Lines changed: 270 additions & 1 deletion

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.paimon.memory.HeapMemorySegmentPool;
3737
import org.apache.paimon.memory.MemoryPoolFactory;
3838
import org.apache.paimon.options.Options;
39+
import org.apache.paimon.table.BucketMode;
3940
import org.apache.paimon.table.FileStoreTable;
4041
import org.apache.paimon.utils.ExecutorThreadFactory;
4142
import org.slf4j.Logger;
@@ -190,8 +191,12 @@ public void write(InputT event, Context context) throws IOException {
190191
return storeSinkWrite;
191192
});
192193
try {
194+
int bucket =
195+
table.bucketMode() == BucketMode.POSTPONE_MODE
196+
? BucketMode.POSTPONE_BUCKET
197+
: paimonEvent.getBucket();
193198
for (GenericRow genericRow : paimonEvent.getGenericRows()) {
194-
write.write(genericRow, paimonEvent.getBucket());
199+
write.write(genericRow, bucket);
195200
}
196201
} catch (Exception e) {
197202
throw new IOException(e);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,18 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
188188
partition = 0;
189189
break;
190190
}
191+
case POSTPONE_MODE:
192+
{
193+
// Postpone bucket tables: the actual bucket written to Paimon is -2
194+
// (assigned by a downstream compaction job). However, using -2 as the
195+
// shuffle key here would route all postpone events to the same writer
196+
// subtask and cause severe data skew. Use currentTaskNumber so events
197+
// are evenly spread across writer subtasks; PaimonWriter will rewrite
198+
// the bucket back to POSTPONE_BUCKET (-2) when persisting records.
199+
bucket = currentTaskNumber;
200+
partition = tuple4.f3.partition(genericRow).hashCode();
201+
break;
202+
}
191203
case KEY_DYNAMIC:
192204
default:
193205
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
19+
20+
import org.apache.flink.api.common.TaskInfo;
21+
import org.apache.flink.api.common.TaskInfoImpl;
22+
import org.apache.flink.api.java.tuple.Tuple2;
23+
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
24+
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
25+
import org.apache.flink.cdc.common.event.CreateTableEvent;
26+
import org.apache.flink.cdc.common.event.DataChangeEvent;
27+
import org.apache.flink.cdc.common.event.Event;
28+
import org.apache.flink.cdc.common.event.TableId;
29+
import org.apache.flink.cdc.common.schema.Schema;
30+
import org.apache.flink.cdc.common.types.DataType;
31+
import org.apache.flink.cdc.common.types.RowType;
32+
import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier;
33+
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordEventSerializer;
34+
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer;
35+
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
36+
import org.apache.flink.streaming.api.operators.Output;
37+
import org.apache.flink.streaming.api.watermark.Watermark;
38+
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
39+
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
40+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
41+
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
42+
import org.apache.flink.util.OutputTag;
43+
44+
import org.apache.paimon.data.GenericRow;
45+
import org.apache.paimon.flink.FlinkCatalogFactory;
46+
import org.apache.paimon.options.Options;
47+
import org.apache.paimon.table.BucketMode;
48+
import org.apache.paimon.table.FileStoreTable;
49+
import org.assertj.core.api.Assertions;
50+
import org.junit.jupiter.api.Test;
51+
import org.junit.jupiter.api.io.TempDir;
52+
import org.mockito.MockedConstruction;
53+
import org.mockito.Mockito;
54+
55+
import java.io.File;
56+
import java.lang.reflect.Field;
57+
import java.time.ZoneId;
58+
import java.util.ArrayList;
59+
import java.util.Arrays;
60+
import java.util.List;
61+
import java.util.UUID;
62+
63+
import static org.apache.flink.cdc.common.types.DataTypes.STRING;
64+
65+
/** Tests for postpone bucket mode in Paimon bucket assignment. */
66+
class PaimonPostponeBucketTest {
67+
68+
@TempDir public static java.nio.file.Path temporaryFolder;
69+
70+
private static final String TEST_DATABASE = "test";
71+
private static final TableId TABLE_ID = TableId.tableId(TEST_DATABASE, "table1");
72+
73+
@Test
74+
void testBucketAssignOperatorUsesCurrentSubtaskForPostponeBucketTable() throws Exception {
75+
Options catalogOptions = createCatalogOptions();
76+
Schema schema = createPostponeBucketSchema();
77+
new PaimonMetadataApplier(catalogOptions)
78+
.applySchemaChange(new CreateTableEvent(TABLE_ID, schema));
79+
80+
BucketAssignOperator bucketAssignOperator =
81+
new BucketAssignOperator(catalogOptions, null, ZoneId.systemDefault(), null);
82+
CollectingOutput output = new CollectingOutput();
83+
setField(bucketAssignOperator, "output", output);
84+
int currentSubtask = 2;
85+
bucketAssignOperator.open(createTaskInfo(4, currentSubtask));
86+
bucketAssignOperator.convertSchemaChangeEvent(new CreateTableEvent(TABLE_ID, schema));
87+
88+
bucketAssignOperator.processElement(new StreamRecord<>(createInsertEvent("1", "Alice")));
89+
90+
Assertions.assertThat(output.records).hasSize(1);
91+
Event event = output.records.get(0).getValue();
92+
Assertions.assertThat(event).isInstanceOf(BucketWrapperChangeEvent.class);
93+
BucketWrapperChangeEvent bucketWrapperChangeEvent = (BucketWrapperChangeEvent) event;
94+
Assertions.assertThat(bucketWrapperChangeEvent.getBucket()).isEqualTo(currentSubtask);
95+
Assertions.assertThat(bucketWrapperChangeEvent.getBucket())
96+
.isNotEqualTo(BucketMode.POSTPONE_BUCKET);
97+
Assertions.assertThat(bucketWrapperChangeEvent.getInnerEvent())
98+
.isInstanceOf(DataChangeEvent.class);
99+
}
100+
101+
@Test
102+
void testSerializerKeepsAssignedBucketBeforeWriterRewrite() throws Exception {
103+
Schema schema = createPostponeBucketSchema();
104+
PaimonRecordSerializer<Event> serializer =
105+
new PaimonRecordEventSerializer(ZoneId.systemDefault());
106+
serializer.serialize(new CreateTableEvent(TABLE_ID, schema));
107+
int assignedBucket = 2;
108+
BucketWrapperChangeEvent bucketWrapperChangeEvent =
109+
new BucketWrapperChangeEvent(assignedBucket, 0, createInsertEvent("1", "Alice"));
110+
111+
Assertions.assertThat(serializer.serialize(bucketWrapperChangeEvent).getBucket())
112+
.isEqualTo(assignedBucket);
113+
}
114+
115+
@Test
116+
void testPostponeBucketTableRewritesAssignedBucketToMinusTwoWhenWriting() throws Exception {
117+
Options catalogOptions = createCatalogOptions();
118+
Schema schema = createPostponeBucketSchema();
119+
new PaimonMetadataApplier(catalogOptions)
120+
.applySchemaChange(new CreateTableEvent(TABLE_ID, schema));
121+
FileStoreTable table =
122+
(FileStoreTable)
123+
FlinkCatalogFactory.createPaimonCatalog(catalogOptions)
124+
.getTable(
125+
org.apache.paimon.catalog.Identifier.fromString(
126+
TABLE_ID.toString()));
127+
128+
int assignedBucket = 2;
129+
List<Integer> writtenBuckets = new ArrayList<>();
130+
try (MockedConstruction<org.apache.flink.cdc.connectors.paimon.sink.v2.StoreSinkWriteImpl>
131+
ignored =
132+
Mockito.mockConstruction(
133+
org.apache.flink.cdc.connectors.paimon.sink.v2.StoreSinkWriteImpl
134+
.class,
135+
(mock, context) -> {
136+
Mockito.doAnswer(
137+
invocation -> {
138+
writtenBuckets.add(
139+
invocation.getArgument(1));
140+
return null;
141+
})
142+
.when(mock)
143+
.write(Mockito.any(GenericRow.class), Mockito.anyInt());
144+
})) {
145+
org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriter<Event> writer =
146+
new org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriter<>(
147+
catalogOptions,
148+
UnregisteredMetricsGroup.createSinkWriterMetricGroup(),
149+
"test-user",
150+
new PaimonRecordEventSerializer(ZoneId.systemDefault()),
151+
0);
152+
153+
writer.write(new CreateTableEvent(TABLE_ID, schema), null);
154+
writer.write(
155+
new BucketWrapperChangeEvent(
156+
assignedBucket, 0, createInsertEvent("1", "Alice")),
157+
null);
158+
writer.close();
159+
}
160+
161+
Assertions.assertThat(table.bucketMode()).isEqualTo(BucketMode.POSTPONE_MODE);
162+
Assertions.assertThat(writtenBuckets).containsExactly(BucketMode.POSTPONE_BUCKET);
163+
}
164+
165+
private Options createCatalogOptions() {
166+
Options catalogOptions = new Options();
167+
String warehouse =
168+
new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
169+
catalogOptions.setString("warehouse", warehouse);
170+
catalogOptions.setString("cache-enabled", "false");
171+
return catalogOptions;
172+
}
173+
174+
private Schema createPostponeBucketSchema() {
175+
return Schema.newBuilder()
176+
.physicalColumn("col1", STRING().notNull())
177+
.physicalColumn("col2", STRING())
178+
.primaryKey("col1")
179+
.option("bucket", String.valueOf(BucketMode.POSTPONE_BUCKET))
180+
.build();
181+
}
182+
183+
private DataChangeEvent createInsertEvent(String col1, String col2) {
184+
return DataChangeEvent.insertEvent(
185+
TABLE_ID,
186+
generate(Arrays.asList(Tuple2.of(STRING(), col1), Tuple2.of(STRING(), col2))));
187+
}
188+
189+
private BinaryRecordData generate(List<Tuple2<DataType, Object>> elements) {
190+
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator generator =
191+
new org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator(
192+
RowType.of(elements.stream().map(e -> e.f0).toArray(DataType[]::new)));
193+
return generator.generate(
194+
elements.stream()
195+
.map(e -> e.f1)
196+
.map(o -> o instanceof String ? BinaryStringData.fromString((String) o) : o)
197+
.toArray(Object[]::new));
198+
}
199+
200+
private TaskInfo createTaskInfo(int numberOfParallelSubtasks, int indexOfThisSubtask) {
201+
return new TaskInfoImpl(
202+
"test-task",
203+
numberOfParallelSubtasks,
204+
indexOfThisSubtask,
205+
numberOfParallelSubtasks,
206+
0);
207+
}
208+
209+
private void setField(Object target, String fieldName, Object value) throws Exception {
210+
Class<?> current = target.getClass();
211+
while (current != null) {
212+
try {
213+
Field field = current.getDeclaredField(fieldName);
214+
field.setAccessible(true);
215+
field.set(target, value);
216+
return;
217+
} catch (NoSuchFieldException e) {
218+
current = current.getSuperclass();
219+
}
220+
}
221+
throw new NoSuchFieldException(fieldName);
222+
}
223+
224+
private static class CollectingOutput implements Output<StreamRecord<Event>> {
225+
private final List<StreamRecord<Event>> records = new ArrayList<>();
226+
227+
public void emitWatermark(org.apache.flink.runtime.event.WatermarkEvent watermark) {}
228+
229+
@Override
230+
public void emitWatermark(Watermark mark) {}
231+
232+
@Override
233+
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}
234+
235+
@Override
236+
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {}
237+
238+
@Override
239+
public void emitLatencyMarker(LatencyMarker latencyMarker) {}
240+
241+
@Override
242+
public void emitRecordAttributes(RecordAttributes recordAttributes) {}
243+
244+
@Override
245+
public void collect(StreamRecord<Event> record) {
246+
records.add(record);
247+
}
248+
249+
@Override
250+
public void close() {}
251+
}
252+
}

0 commit comments

Comments
 (0)