Skip to content

Commit 21c2b9b

Browse files
loserwang1024Mrart
authored andcommitted
[FLINK-39204][pipeline-connector/fluss] Fluss yaml sink support add column at last
This closes apache#4305.
1 parent 6cf0cf7 commit 21c2b9b

9 files changed

Lines changed: 488 additions & 103 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
package org.apache.flink.cdc.connectors.fluss.sink;
1919

20+
import org.apache.flink.cdc.common.event.AddColumnEvent;
2021
import org.apache.flink.cdc.common.event.CreateTableEvent;
2122
import org.apache.flink.cdc.common.event.DataChangeEvent;
2223
import org.apache.flink.cdc.common.event.Event;
2324
import org.apache.flink.cdc.common.event.OperationType;
2425
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2526
import org.apache.flink.cdc.common.event.TableId;
27+
import org.apache.flink.cdc.common.schema.Schema;
2628
import org.apache.flink.cdc.common.utils.Preconditions;
29+
import org.apache.flink.cdc.common.utils.SchemaUtils;
2730
import org.apache.flink.cdc.connectors.fluss.sink.row.CdcAsFlussRow;
2831
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEvent;
2932
import org.apache.flink.cdc.connectors.fluss.sink.v2.FlussEventSerializer;
@@ -43,19 +46,19 @@
4346
import static org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.APPEND;
4447
import static org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.DELETE;
4548
import static org.apache.flink.cdc.connectors.fluss.sink.v2.FlussOperationType.UPSERT;
46-
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue;
49+
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.sameSchemaIgnoreCommentAndDefaultValue;
4750
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussSchema;
4851

4952
/** Serialization schema that converts a CDC data record to a Fluss event. */
5053
public class FlussEventSerializationSchema implements FlussEventSerializer<Event> {
5154
private static final long serialVersionUID = 1L;
5255

53-
private transient Map<TableId, TableSchemaInfo> tableInfoMap;
56+
private transient Map<TableId, TableSchemaInfo> schemaMaps;
5457
private transient Connection connection;
5558

5659
@Override
5760
public void open(Connection connection) {
58-
this.tableInfoMap = new HashMap<>();
61+
this.schemaMaps = new HashMap<>();
5962
this.connection = connection;
6063
}
6164

@@ -82,29 +85,45 @@ private void applySchemaChangeEvent(SchemaChangeEvent event) {
8285
org.apache.flink.cdc.common.schema.Schema newSchema =
8386
((CreateTableEvent) event).getSchema();
8487
// if the table is not exist or the schema is changed, update the table info.
85-
if (!tableInfoMap.containsKey(tableId)
86-
|| !sameCdcColumnsIgnoreCommentAndDefaultValue(
87-
tableInfoMap.get(tableId).upstreamCdcSchema, newSchema)) {
88+
if (!schemaMaps.containsKey(tableId)
89+
|| !sameSchemaIgnoreCommentAndDefaultValue(
90+
schemaMaps.get(tableId).upstreamCdcSchema, newSchema)) {
8891
Table table = connection.getTable(getTablePath(tableId));
8992
TableSchemaInfo newSchemaInfo =
9093
new TableSchemaInfo(newSchema, table.getTableInfo().getSchema());
91-
tableInfoMap.put(tableId, newSchemaInfo);
94+
schemaMaps.put(tableId, newSchemaInfo);
95+
}
96+
} else if (event instanceof AddColumnEvent) {
97+
TableSchemaInfo schemaInfo = schemaMaps.get(event.tableId());
98+
if (schemaInfo == null) {
99+
throw new IllegalStateException(
100+
"Cannot apply AddColumnEvent for table "
101+
+ event.tableId()
102+
+ ": table schema not found. Ensure CreateTableEvent is processed before AddColumnEvent.");
103+
}
104+
Schema schema = schemaInfo.upstreamCdcSchema;
105+
if (!SchemaUtils.isSchemaChangeEventRedundant(schema, event)) {
106+
Table table = connection.getTable(getTablePath(tableId));
107+
TableSchemaInfo newSchemaInfo =
108+
new TableSchemaInfo(
109+
SchemaUtils.applySchemaChangeEvent(schema, event),
110+
table.getTableInfo().getSchema());
111+
schemaMaps.put(tableId, newSchemaInfo);
92112
}
93113
} else {
94-
// TODO: Logics for altering tables are not supported yet.
95-
// This is anticipated to be supported in Fluss version 0.8.0.
96-
throw new RuntimeException(
97-
"Schema change type not supported. Only CreateTableEvent is allowed at the moment.");
114+
throw new UnsupportedOperationException(
115+
String.format(
116+
"Schema change type %s not supported. Only CreateTableEvent and AddColumnEvent are allowed at the moment.",
117+
event.getClass()));
98118
}
99119
}
100120

101121
private FlussRowWithOp applyDataChangeEvent(DataChangeEvent record) {
102122
OperationType op = record.op();
103-
TableSchemaInfo tableSchemaInfo = tableInfoMap.get(record.tableId());
123+
TableSchemaInfo tableSchemaInfo = schemaMaps.get(record.tableId());
104124
Preconditions.checkNotNull(
105125
tableSchemaInfo, "Table schema not found for table " + record.tableId());
106-
int flussFieldCount =
107-
tableSchemaInfo.downStreamFlusstreamSchema.getRowType().getFieldCount();
126+
int flussFieldCount = tableSchemaInfo.downstreamFlussSchema.getRowType().getFieldCount();
108127
boolean hasPrimaryKey = !tableSchemaInfo.upstreamCdcSchema.primaryKeys().isEmpty();
109128
switch (op) {
110129
case INSERT:
@@ -130,17 +149,17 @@ private TablePath getTablePath(TableId tableId) {
130149

131150
private static class TableSchemaInfo {
132151
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema;
133-
org.apache.fluss.metadata.Schema downStreamFlusstreamSchema;
152+
org.apache.fluss.metadata.Schema downstreamFlussSchema;
134153
Map<Integer, Integer> indexMapping;
135154

136155
private TableSchemaInfo(
137156
org.apache.flink.cdc.common.schema.Schema upstreamCdcSchema,
138-
org.apache.fluss.metadata.Schema downStreamFlusstreamSchema) {
157+
org.apache.fluss.metadata.Schema downstreamFlussSchema) {
139158
this.upstreamCdcSchema = upstreamCdcSchema;
140-
this.downStreamFlusstreamSchema = downStreamFlusstreamSchema;
159+
this.downstreamFlussSchema = downstreamFlussSchema;
141160
this.indexMapping =
142161
sanityCheckAndGenerateIndexMapping(
143-
toFlussSchema(upstreamCdcSchema), downStreamFlusstreamSchema);
162+
toFlussSchema(upstreamCdcSchema), downstreamFlussSchema);
144163
}
145164
}
146165

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
package org.apache.flink.cdc.connectors.fluss.sink;
1919

20+
import org.apache.flink.cdc.common.event.AddColumnEvent;
2021
import org.apache.flink.cdc.common.event.CreateTableEvent;
2122
import org.apache.flink.cdc.common.event.DropTableEvent;
2223
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2324
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2425
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
2526
import org.apache.flink.cdc.common.event.TableId;
27+
import org.apache.flink.cdc.common.schema.Column;
2628
import org.apache.flink.cdc.common.sink.MetadataApplier;
2729
import org.apache.flink.table.api.ValidationException;
2830

@@ -31,12 +33,14 @@
3133
import org.apache.fluss.client.admin.Admin;
3234
import org.apache.fluss.config.Configuration;
3335
import org.apache.fluss.metadata.DatabaseDescriptor;
36+
import org.apache.fluss.metadata.TableChange;
3437
import org.apache.fluss.metadata.TableDescriptor;
3538
import org.apache.fluss.metadata.TableInfo;
3639
import org.apache.fluss.metadata.TablePath;
3740
import org.slf4j.Logger;
3841
import org.slf4j.LoggerFactory;
3942

43+
import java.util.ArrayList;
4044
import java.util.Arrays;
4145
import java.util.HashSet;
4246
import java.util.List;
@@ -47,6 +51,7 @@
4751
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
4852
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
4953
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussTable;
54+
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussType;
5055

5156
/** {@link MetadataApplier} for fluss. */
5257
public class FlussMetaDataApplier implements MetadataApplier {
@@ -58,9 +63,6 @@ public class FlussMetaDataApplier implements MetadataApplier {
5863
private Set<SchemaChangeEventType> enabledEventTypes =
5964
new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));
6065

61-
private transient Connection connection;
62-
private transient Admin admin;
63-
6466
public FlussMetaDataApplier(
6567
Configuration flussClientConfig,
6668
Map<String, String> tableProperties,
@@ -92,22 +94,25 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
9294
@Override
9395
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
9496
LOG.info("fluss metadata applier receive schemaChangeEvent {}", schemaChangeEvent);
95-
Admin admin = getAdmin();
9697
if (schemaChangeEvent instanceof CreateTableEvent) {
9798
CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
98-
applyCreateTable(admin, createTableEvent);
99+
applyCreateTable(createTableEvent);
99100
} else if (schemaChangeEvent instanceof DropTableEvent) {
100101
DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent;
101-
applyDropTable(admin, dropTableEvent);
102+
applyDropTable(dropTableEvent);
103+
} else if (schemaChangeEvent instanceof AddColumnEvent) {
104+
AddColumnEvent addColumnEvent = (AddColumnEvent) schemaChangeEvent;
105+
applyAddColumnTable(addColumnEvent);
102106
} else {
103107
throw new IllegalArgumentException(
104-
"fluss metadata applier only support CreateTableEvent now but receives "
108+
"fluss metadata applier only supports CreateTableEvent and AddColumnEvent now but receives "
105109
+ schemaChangeEvent);
106110
}
107111
}
108112

109-
private void applyCreateTable(Admin admin, CreateTableEvent event) {
110-
try {
113+
private void applyCreateTable(CreateTableEvent event) {
114+
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
115+
Admin admin = connection.getAdmin()) {
111116
TableId tableId = event.tableId();
112117
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
113118
String tableIdentifier = tablePath.getDatabaseName() + "." + tablePath.getTableName();
@@ -129,8 +134,9 @@ private void applyCreateTable(Admin admin, CreateTableEvent event) {
129134
}
130135
}
131136

132-
private void applyDropTable(Admin admin, DropTableEvent event) {
133-
try {
137+
private void applyDropTable(DropTableEvent event) {
138+
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
139+
Admin admin = connection.getAdmin()) {
134140
TableId tableId = event.tableId();
135141
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
136142
admin.dropTable(tablePath, true).get();
@@ -140,21 +146,36 @@ private void applyDropTable(Admin admin, DropTableEvent event) {
140146
}
141147
}
142148

143-
private Admin getAdmin() {
144-
if (connection == null) {
145-
connection = ConnectionFactory.createConnection(flussClientConfig);
146-
admin = connection.getAdmin();
147-
}
148-
return admin;
149-
}
150-
151-
@Override
152-
public void close() throws Exception {
153-
if (admin != null) {
154-
admin.close();
155-
}
156-
if (connection != null) {
157-
connection.close();
149+
private void applyAddColumnTable(AddColumnEvent event) {
150+
List<TableChange> tableChanges = new ArrayList<>();
151+
event.getAddedColumns()
152+
.forEach(
153+
columnWithPosition -> {
154+
if (columnWithPosition.getPosition()
155+
!= AddColumnEvent.ColumnPosition.LAST) {
156+
throw new IllegalArgumentException(
157+
"Fluss metadata applier only supports LAST position for adding columns now but receives "
158+
+ columnWithPosition.getPosition()
159+
+ ". Consider using 'schema.change.behavior' configuration with 'LENIENT' mode to handle schema changes more flexibly.");
160+
}
161+
162+
Column column = columnWithPosition.getAddColumn();
163+
tableChanges.add(
164+
TableChange.addColumn(
165+
column.getName(),
166+
toFlussType(column.getType()),
167+
column.getComment(),
168+
TableChange.ColumnPosition.last()));
169+
});
170+
171+
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
172+
Admin admin = connection.getAdmin()) {
173+
TableId tableId = event.tableId();
174+
TablePath tablePath = new TablePath(tableId.getSchemaName(), tableId.getTableName());
175+
admin.alterTable(tablePath, tableChanges, true).get();
176+
} catch (Exception e) {
177+
LOG.error("Failed to apply schema change {}", event, e);
178+
throw new RuntimeException(e);
158179
}
159180
}
160181

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/CdcAsFlussRow.java renamed to flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/row/row/CdcAsFlussRow.java

File renamed without changes.

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversions.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.apache.flink.cdc.common.utils.Preconditions;
4242
import org.apache.flink.util.CollectionUtil;
4343

44-
import org.apache.fluss.annotation.VisibleForTesting;
4544
import org.apache.fluss.metadata.Schema;
4645
import org.apache.fluss.metadata.TableDescriptor;
4746

@@ -94,28 +93,25 @@ public static org.apache.fluss.metadata.Schema toFlussSchema(
9493
schemBuilder.primaryKey(cdcSchema.primaryKeys());
9594
}
9695

97-
Schema schema =
98-
schemBuilder
99-
.fromColumns(
100-
cdcSchema.getColumns().stream()
101-
.map(
102-
column ->
103-
new Schema.Column(
104-
column.getName(),
105-
toFlussType(column.getType()),
106-
column.getComment()))
107-
.collect(Collectors.toList()))
108-
.build();
109-
return schema;
96+
// use schemBuilder.column rather than schemBuilder.fromColumns to reassign nested row id.
97+
cdcSchema
98+
.getColumns()
99+
.forEach(
100+
column ->
101+
schemBuilder
102+
.column(
103+
column.getName(),
104+
column.getType().accept(TO_FLUSS_TYPE_INSTANCE))
105+
.withComment(column.getComment()));
106+
return schemBuilder.build();
110107
}
111108

112-
@VisibleForTesting
113-
private static org.apache.fluss.types.DataType toFlussType(
109+
public static org.apache.fluss.types.DataType toFlussType(
114110
org.apache.flink.cdc.common.types.DataType flinkDataType) {
115111
return flinkDataType.accept(TO_FLUSS_TYPE_INSTANCE);
116112
}
117113

118-
public static Boolean sameCdcColumnsIgnoreCommentAndDefaultValue(
114+
public static Boolean sameSchemaIgnoreCommentAndDefaultValue(
119115
org.apache.flink.cdc.common.schema.Schema oldSchema,
120116
org.apache.flink.cdc.common.schema.Schema newSchema) {
121117
List<org.apache.flink.cdc.common.schema.Column> upstreamColumns = oldSchema.getColumns();

0 commit comments

Comments
 (0)