Skip to content

Commit 4efb48d

Browse files
committed
Avoid performance regression by moving BlobWriteContext update to schema refresh path.
1 parent d530291 commit 4efb48d

2 files changed

Lines changed: 266 additions & 28 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -140,29 +140,6 @@ public Collection<MultiTableCommittable> prepareCommit() {
140140

141141
@Override
142142
public void write(InputT event, Context context) throws IOException {
143-
// Pre-update BlobWriteContext for existing schemas before serialize
144-
// This ensures DataChangeEvent uses the correct BlobWriteContext for BLOB fields
145-
if (serializer instanceof PaimonRecordEventSerializer && event instanceof ChangeEvent) {
146-
org.apache.flink.cdc.common.event.TableId cdcTableId = ((ChangeEvent) event).tableId();
147-
// Only update if schema already exists (not for CreateTableEvent)
148-
if (((PaimonRecordEventSerializer) serializer).getSchemaMaps().containsKey(cdcTableId)
149-
&& !(event instanceof org.apache.flink.cdc.common.event.CreateTableEvent)) {
150-
try {
151-
Identifier paimonTableId =
152-
Identifier.create(
153-
cdcTableId.getSchemaName(), cdcTableId.getTableName());
154-
FileStoreTable table = (FileStoreTable) catalog.getTable(paimonTableId);
155-
BlobWriteContext blobWriteContext =
156-
BlobWriteContext.fromTable(catalog.caseSensitive(), table);
157-
((PaimonRecordEventSerializer) serializer)
158-
.updateBlobWriteContext(cdcTableId, blobWriteContext);
159-
} catch (Exception e) {
160-
// Table might not exist yet, ignore and continue
161-
LOG.debug("Could not get table for BlobWriteContext: {}", e.getMessage());
162-
}
163-
}
164-
}
165-
166143
PaimonEvent paimonEvent = serializer.serialize(event);
167144
Identifier tableId = paimonEvent.getTableId();
168145
if (paimonEvent.isShouldRefreshSchema()) {
@@ -176,6 +153,16 @@ public void write(InputT event, Context context) throws IOException {
176153
} catch (Exception e) {
177154
throw new RuntimeException(e);
178155
}
156+
try {
157+
FileStoreTable table = getTable(tableId);
158+
BlobWriteContext blobWriteContext =
159+
BlobWriteContext.fromTable(catalog.caseSensitive(), table);
160+
((PaimonRecordEventSerializer) serializer)
161+
.updateBlobWriteContext(((ChangeEvent) event).tableId(), blobWriteContext);
162+
} catch (Exception e) {
163+
// Table might not exist yet, ignore and continue
164+
LOGGER.debug("Could not get table for BlobWriteContext: {}", e.getMessage());
165+
}
179166
}
180167
if (paimonEvent.getGenericRows() != null) {
181168
FileStoreTable table;
@@ -215,12 +202,26 @@ public void write(InputT event, Context context) throws IOException {
215202
storeSinkWrite.withCompactExecutor(compactExecutor);
216203
return storeSinkWrite;
217204
});
218-
try {
219-
for (GenericRow genericRow : paimonEvent.getGenericRows()) {
220-
write.write(genericRow, paimonEvent.getBucket());
205+
for (GenericRow genericRow : paimonEvent.getGenericRows()) {
206+
int bucket =
207+
table.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET
208+
? BucketMode.POSTPONE_BUCKET
209+
: paimonEvent.getBucket();
210+
try {
211+
write.write(genericRow, bucket);
212+
} catch (Exception e) {
213+
throw new IOException(
214+
String.format(
215+
"Failed to write record %s of %s with actual schema %s and expected schema %s",
216+
genericRow,
217+
tableId.getTableName(),
218+
table.schema(),
219+
((PaimonRecordEventSerializer) serializer)
220+
.getSchemaMaps()
221+
.get(((ChangeEvent) event).tableId())
222+
.getSchema()),
223+
e);
221224
}
222-
} catch (Exception e) {
223-
throw new IOException(e);
224225
}
225226
}
226227
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.paimon.sink.v2;
19+
20+
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
21+
import org.apache.flink.cdc.common.event.AddColumnEvent;
22+
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
23+
import org.apache.flink.cdc.common.event.CreateTableEvent;
24+
import org.apache.flink.cdc.common.event.DataChangeEvent;
25+
import org.apache.flink.cdc.common.event.Event;
26+
import org.apache.flink.cdc.common.event.TableId;
27+
import org.apache.flink.cdc.common.schema.Column;
28+
import org.apache.flink.cdc.common.schema.Schema;
29+
import org.apache.flink.cdc.common.types.DataType;
30+
import org.apache.flink.cdc.common.types.DataTypes;
31+
import org.apache.flink.cdc.common.types.RowType;
32+
import org.apache.flink.cdc.connectors.paimon.sink.dlf.DlfCatalogUtil;
33+
import org.apache.flink.cdc.connectors.paimon.sink.v2.blob.BlobWriteContext;
34+
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
35+
import org.apache.flink.configuration.Configuration;
36+
import org.apache.flink.metrics.MetricGroup;
37+
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
38+
39+
import org.apache.paimon.CoreOptions;
40+
import org.apache.paimon.catalog.Catalog;
41+
import org.apache.paimon.catalog.Identifier;
42+
import org.apache.paimon.flink.FlinkCatalogFactory;
43+
import org.apache.paimon.options.Options;
44+
import org.apache.paimon.table.FileStoreTable;
45+
import org.junit.jupiter.api.AfterEach;
46+
import org.junit.jupiter.api.BeforeEach;
47+
import org.junit.jupiter.api.Test;
48+
import org.junit.jupiter.api.extension.ExtendWith;
49+
import org.mockito.Mock;
50+
import org.mockito.MockedConstruction;
51+
import org.mockito.MockedStatic;
52+
import org.mockito.Mockito;
53+
import org.mockito.junit.jupiter.MockitoExtension;
54+
55+
import java.lang.reflect.Field;
56+
import java.time.ZoneId;
57+
import java.util.ArrayList;
58+
import java.util.HashMap;
59+
import java.util.List;
60+
import java.util.Map;
61+
62+
import static org.mockito.ArgumentMatchers.any;
63+
import static org.mockito.ArgumentMatchers.anyBoolean;
64+
import static org.mockito.ArgumentMatchers.anyLong;
65+
import static org.mockito.Mockito.lenient;
66+
import static org.mockito.Mockito.times;
67+
import static org.mockito.Mockito.verify;
68+
import static org.mockito.Mockito.when;
69+
70+
/** Unit tests for {@link PaimonWriter#write}. */
71+
@ExtendWith(MockitoExtension.class)
72+
class PaimonWriterTest {
73+
74+
@Mock private Catalog mockCatalog;
75+
76+
@Mock private FileStoreTable mockTable;
77+
78+
@Mock private CoreOptions mockCoreOptions;
79+
80+
private MockedStatic<FlinkCatalogFactory> mockedFactory;
81+
private MockedStatic<DlfCatalogUtil> mockedDlfUtil;
82+
private MockedStatic<BlobWriteContext> mockedBlobWriteContext;
83+
private MockedConstruction<StoreSinkWriteImpl> mockedConstruction;
84+
85+
private PaimonWriter<Event> writer;
86+
87+
private final TableId testTableId = TableId.parse("test.table1");
88+
private final Identifier testIdentifier = Identifier.fromString("test.table1");
89+
90+
@BeforeEach
91+
void setUp() throws Exception {
92+
mockedFactory = Mockito.mockStatic(FlinkCatalogFactory.class);
93+
mockedDlfUtil = Mockito.mockStatic(DlfCatalogUtil.class);
94+
mockedBlobWriteContext = Mockito.mockStatic(BlobWriteContext.class);
95+
96+
mockedFactory
97+
.when(() -> FlinkCatalogFactory.createPaimonCatalog(any(Options.class)))
98+
.thenReturn(mockCatalog);
99+
100+
mockedDlfUtil
101+
.when(() -> DlfCatalogUtil.convertOptionToDlf(any(Options.class), any()))
102+
.thenCallRealMethod();
103+
104+
BlobWriteContext mockBlobContext = BlobWriteContext.empty();
105+
mockedBlobWriteContext
106+
.when(() -> BlobWriteContext.fromTable(anyBoolean(), any(FileStoreTable.class)))
107+
.thenReturn(mockBlobContext);
108+
109+
when(mockCatalog.getTable(any(Identifier.class))).thenReturn(mockTable);
110+
when(mockCoreOptions.writeBufferSize()).thenReturn(32 * 1024 * 1024L);
111+
when(mockCoreOptions.pageSize()).thenReturn(32 * 1024);
112+
when(mockCoreOptions.bucket()).thenReturn(1);
113+
when(mockTable.coreOptions()).thenReturn(mockCoreOptions);
114+
115+
MetricGroup metricGroup = UnregisteredMetricsGroup.createOperatorMetricGroup();
116+
PaimonRecordEventSerializer serializer =
117+
new PaimonRecordEventSerializer(ZoneId.systemDefault());
118+
119+
Options catalogOptions = new Options();
120+
121+
mockedConstruction =
122+
Mockito.mockConstruction(
123+
StoreSinkWriteImpl.class,
124+
(mock, context) ->
125+
lenient()
126+
.when(mock.prepareCommit(anyBoolean(), anyLong()))
127+
.thenReturn(java.util.Collections.emptyList()));
128+
129+
writer =
130+
new PaimonWriter<>(
131+
catalogOptions,
132+
metricGroup,
133+
"test_commit_user",
134+
serializer,
135+
new Configuration(),
136+
0L);
137+
138+
injectMockCatalog(writer, mockCatalog);
139+
}
140+
141+
private void injectMockCatalog(PaimonWriter<Event> writer, Catalog catalog) throws Exception {
142+
Field catalogField = PaimonWriter.class.getDeclaredField("catalog");
143+
catalogField.setAccessible(true);
144+
catalogField.set(writer, catalog);
145+
}
146+
147+
@AfterEach
148+
void tearDown() throws Exception {
149+
if (writer != null) {
150+
writer.close();
151+
}
152+
if (mockedFactory != null) {
153+
mockedFactory.close();
154+
}
155+
if (mockedDlfUtil != null) {
156+
mockedDlfUtil.close();
157+
}
158+
if (mockedBlobWriteContext != null) {
159+
mockedBlobWriteContext.close();
160+
}
161+
if (mockedConstruction != null) {
162+
mockedConstruction.close();
163+
}
164+
}
165+
166+
@Test
167+
void testGetTableInvocationCount() throws Exception {
168+
Schema schema =
169+
Schema.newBuilder()
170+
.physicalColumn("id", DataTypes.STRING().notNull())
171+
.physicalColumn("value", DataTypes.INT())
172+
.physicalColumn("path", DataTypes.STRING())
173+
.build();
174+
175+
writer.write(new CreateTableEvent(testTableId, schema), null);
176+
177+
RowType rowType =
178+
RowType.of(DataTypes.STRING().notNull(), DataTypes.INT(), DataTypes.STRING());
179+
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
180+
181+
for (int i = 1; i <= 5; i++) {
182+
writer.write(
183+
DataChangeEvent.insertEvent(
184+
testTableId,
185+
generator.generate(
186+
new Object[] {
187+
BinaryStringData.fromString("id" + i),
188+
i,
189+
BinaryStringData.fromString("path" + i)
190+
})),
191+
null);
192+
}
193+
194+
verify(mockCatalog, times(1)).getTable(testIdentifier);
195+
196+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
197+
addedColumns.add(
198+
new AddColumnEvent.ColumnWithPosition(
199+
Column.physicalColumn("age", DataTypes.INT())));
200+
AddColumnEvent addColumnEvent = new AddColumnEvent(testTableId, addedColumns);
201+
writer.write(addColumnEvent, null);
202+
203+
verify(mockCatalog, times(2)).getTable(testIdentifier);
204+
205+
RowType rowTypeWithAge =
206+
RowType.of(
207+
DataTypes.STRING().notNull(),
208+
DataTypes.INT(),
209+
DataTypes.STRING(),
210+
DataTypes.INT());
211+
BinaryRecordDataGenerator generatorWithAge = new BinaryRecordDataGenerator(rowTypeWithAge);
212+
213+
for (int i = 6; i <= 10; i++) {
214+
writer.write(
215+
DataChangeEvent.insertEvent(
216+
testTableId,
217+
generatorWithAge.generate(
218+
new Object[] {
219+
BinaryStringData.fromString("id" + i),
220+
i,
221+
BinaryStringData.fromString("path" + i),
222+
i + 10
223+
})),
224+
null);
225+
}
226+
227+
verify(mockCatalog, times(2)).getTable(testIdentifier);
228+
229+
Map<String, DataType> typeMapping = new HashMap<>();
230+
typeMapping.put("value", DataTypes.BIGINT());
231+
AlterColumnTypeEvent alterColumnTypeEvent =
232+
new AlterColumnTypeEvent(testTableId, typeMapping);
233+
writer.write(alterColumnTypeEvent, null);
234+
235+
verify(mockCatalog, times(3)).getTable(testIdentifier);
236+
}
237+
}

0 commit comments

Comments
 (0)