Skip to content

Commit d62196a

Browse files
committed
[FLINK-39567][paimon] Add blob support.
1 parent 24ab548 commit d62196a

14 files changed

Lines changed: 1841 additions & 37 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,27 @@ limitations under the License.
159159
</exclusions>
160160
</dependency>
161161

162+
<dependency>
163+
<groupId>org.apache.hadoop</groupId>
164+
<artifactId>hadoop-mapreduce-client-core</artifactId>
165+
<version>${hadoop.version}</version>
166+
<scope>test</scope>
167+
<exclusions>
168+
<exclusion>
169+
<groupId>log4j</groupId>
170+
<artifactId>log4j</artifactId>
171+
</exclusion>
172+
<exclusion>
173+
<groupId>org.slf4j</groupId>
174+
<artifactId>slf4j-log4j12</artifactId>
175+
</exclusion>
176+
<exclusion>
177+
<groupId>com.google.protobuf</groupId>
178+
<artifactId>protobuf-java</artifactId>
179+
</exclusion>
180+
</exclusions>
181+
</dependency>
182+
162183
<!-- hive dependency -->
163184

164185
<dependency>
@@ -198,6 +219,10 @@ limitations under the License.
198219
<groupId>com.google.protobuf</groupId>
199220
<artifactId>protobuf-java</artifactId>
200221
</exclusion>
222+
<exclusion>
223+
<groupId>org.apache.hbase</groupId>
224+
<artifactId>hbase-client</artifactId>
225+
</exclusion>
201226
</exclusions>
202227
</dependency>
203228

@@ -253,6 +278,18 @@ limitations under the License.
253278
</exclusion>
254279
</exclusions>
255280
</dependency>
281+
<dependency>
282+
<groupId>org.apache.flink</groupId>
283+
<artifactId>flink-test-utils</artifactId>
284+
<version>${flink.version}</version>
285+
<scope>test</scope>
286+
<exclusions>
287+
<exclusion>
288+
<groupId>org.testcontainers</groupId>
289+
<artifactId>testcontainers</artifactId>
290+
</exclusion>
291+
</exclusions>
292+
</dependency>
256293
</dependencies>
257294

258295
<build>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,24 @@
3030
import org.apache.flink.cdc.common.event.TruncateTableEvent;
3131
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
3232
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
33+
import org.apache.flink.cdc.common.schema.Column;
3334
import org.apache.flink.cdc.common.schema.Schema;
3435
import org.apache.flink.cdc.common.sink.MetadataApplier;
36+
import org.apache.flink.cdc.common.types.DataType;
3537
import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
3638

3739
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
3840

41+
import org.apache.paimon.CoreOptions;
3942
import org.apache.paimon.catalog.Catalog;
4043
import org.apache.paimon.catalog.Identifier;
4144
import org.apache.paimon.flink.FlinkCatalogFactory;
4245
import org.apache.paimon.options.Options;
4346
import org.apache.paimon.schema.SchemaChange;
47+
import org.apache.paimon.table.FileStoreTable;
4448
import org.apache.paimon.table.Table;
4549
import org.apache.paimon.table.sink.BatchTableCommit;
50+
import org.apache.paimon.types.DataTypes;
4651
import org.slf4j.Logger;
4752
import org.slf4j.LoggerFactory;
4853

@@ -52,6 +57,8 @@
5257
import java.util.Map;
5358
import java.util.Set;
5459

60+
import static org.apache.flink.cdc.common.types.DataTypeFamily.BINARY_STRING;
61+
import static org.apache.flink.cdc.common.types.DataTypeFamily.CHARACTER_STRING;
5562
import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
5663
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
5764

@@ -149,11 +156,11 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti
149156
new org.apache.paimon.schema.Schema.Builder();
150157
schema.getColumns()
151158
.forEach(
152-
(column) ->
153-
builder.column(
154-
column.getName(),
155-
TypeUtils.toPaimonDataType(column.getType()),
156-
column.getComment()));
159+
(column) -> {
160+
org.apache.paimon.types.DataType dataType =
161+
convertToBlobIfNeeded(column, tableOptions);
162+
builder.column(column.getName(), dataType, column.getComment());
163+
});
157164
List<String> partitionKeys = new ArrayList<>();
158165
List<String> primaryKeys = schema.primaryKeys();
159166
if (partitionMaps.containsKey(event.tableId())) {
@@ -205,10 +212,12 @@ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
205212
SchemaChangeProvider.add(
206213
columnWithPosition,
207214
SchemaChange.Move.first(
208-
columnWithPosition.getAddColumn().getName())));
215+
columnWithPosition.getAddColumn().getName()),
216+
tableOptions));
209217
break;
210218
case LAST:
211-
tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition));
219+
tableChangeList.addAll(
220+
SchemaChangeProvider.add(columnWithPosition, tableOptions));
212221
break;
213222
case BEFORE:
214223
tableChangeList.addAll(
@@ -225,7 +234,8 @@ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
225234
SchemaChange.Move.after(
226235
columnWithPosition.getAddColumn().getName(),
227236
columnWithPosition.getExistedColumnName());
228-
tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition, after));
237+
tableChangeList.addAll(
238+
SchemaChangeProvider.add(columnWithPosition, after, tableOptions));
229239
break;
230240
default:
231241
throw new SchemaEvolveException(
@@ -253,7 +263,8 @@ private List<SchemaChange> applyAddColumnWithBeforePosition(
253263
columnWithPosition,
254264
(index == 0)
255265
? SchemaChange.Move.first(columnName)
256-
: SchemaChange.Move.after(columnName, columnNames.get(index - 1)));
266+
: SchemaChange.Move.after(columnName, columnNames.get(index - 1)),
267+
tableOptions);
257268
}
258269

259270
private int checkColumnPosition(String existedColumnName, List<String> columnNames) {
@@ -303,13 +314,22 @@ private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveExcep
303314

304315
private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolveException {
305316
try {
317+
FileStoreTable table =
318+
(FileStoreTable)
319+
catalog.getTable(
320+
new Identifier(
321+
event.tableId().getSchemaName(),
322+
event.tableId().getTableName()));
306323
List<SchemaChange> tableChangeList = new ArrayList<>();
307324
event.getTypeMapping()
308325
.forEach(
309-
(oldName, newType) ->
310-
tableChangeList.add(
311-
SchemaChangeProvider.updateColumnType(
312-
oldName, newType)));
326+
(columnName, newType) -> {
327+
// Modifying the primary key data type may lead to exceptions in
328+
// read/write/merge operations.
329+
SchemaChangeProvider.updateColumnType(
330+
table.schema(), columnName, newType, tableOptions)
331+
.ifPresent(tableChangeList::add);
332+
});
313333
event.getComments()
314334
.forEach(
315335
(name, comment) -> {
@@ -359,4 +379,33 @@ private void applyAlterTableComment(AlterTableCommentEvent event) throws SchemaE
359379
private static Identifier tableIdToIdentifier(SchemaChangeEvent event) {
360380
return new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName());
361381
}
382+
383+
/**
384+
* Convert CDC VARBINARY/BINARY/CHAR/VARCHAR/STRING type to Paimon BLOB type if configured.
385+
*
386+
* @param column The CDC column definition.
387+
* @param tableOptions The table options containing blob-field configuration.
388+
* @return The Paimon DataType (BLOB if configured, otherwise original converted type).
389+
*/
390+
private org.apache.paimon.types.DataType convertToBlobIfNeeded(
391+
Column column, Map<String, String> tableOptions) {
392+
org.apache.paimon.types.DataType dataType = TypeUtils.toPaimonDataType(column.getType());
393+
394+
// Check if this field should be converted to BLOB type using Paimon's CoreOptions
395+
List<String> blobFields = CoreOptions.blobField(tableOptions);
396+
if (!blobFields.isEmpty() && isSupportedTypeForBlob(column.getType())) {
397+
if (blobFields.contains(column.getName())) {
398+
// Convert VARBINARY/BINARY/VARCHAR/STRING to BLOB type
399+
// BLOB type is always nullable in Paimon
400+
return DataTypes.BLOB();
401+
}
402+
}
403+
404+
return dataType;
405+
}
406+
407+
/** Check if DataType can be converted to BLOB (BINARY, VARBINARY, CHAR or VARCHAR). */
408+
private boolean isSupportedTypeForBlob(DataType dataType) {
409+
return dataType.isAnyOf(BINARY_STRING, CHARACTER_STRING);
410+
}
362411
}

0 commit comments

Comments
 (0)