Skip to content

Commit 0555e07

Browse files
committed
[connector] flink read support for fluss primary key table changelog auxiliary table/ cdc events
1 parent ad99cb0 commit 0555e07

File tree

9 files changed

+146
-11
lines changed

9 files changed

+146
-11
lines changed

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import com.alibaba.fluss.utils.ExceptionUtils;
3333
import com.alibaba.fluss.utils.IOUtils;
3434

35+
import org.apache.flink.table.api.DataTypes;
36+
import org.apache.flink.table.api.Schema;
3537
import org.apache.flink.table.catalog.Catalog;
3638
import org.apache.flink.table.catalog.CatalogBaseTable;
3739
import org.apache.flink.table.catalog.CatalogDatabase;
@@ -63,6 +65,7 @@
6365

6466
import javax.annotation.Nullable;
6567

68+
import java.util.ArrayList;
6669
import java.util.Collections;
6770
import java.util.HashMap;
6871
import java.util.List;
@@ -78,6 +81,8 @@ public class FlinkCatalog implements Catalog {
7881

7982
public static final String LAKE_TABLE_SPLITTER = "$lake";
8083

84+
public static final String CHANGELOG_TABLE_SPLITTER = "$changelog";
85+
8186
protected final ClassLoader classLoader;
8287

8388
protected final String catalogName;
@@ -263,6 +268,48 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
263268
tableName.split("\\" + LAKE_TABLE_SPLITTER)[0])));
264269
}
265270
return getLakeTable(objectPath.getDatabaseName(), tableName);
271+
} else if ((tableName.contains(CHANGELOG_TABLE_SPLITTER))) {
272+
String baseTableName = tableName.split("\\" + CHANGELOG_TABLE_SPLITTER)[0];
273+
TablePath baseTablePath = TablePath.of(objectPath.getDatabaseName(), baseTableName);
274+
tableInfo = admin.getTableInfo(baseTablePath).get();
275+
if (!tableInfo.hasPrimaryKey()) {
276+
throw new UnsupportedOperationException(
277+
String.format(
278+
"\"Table %s has no primary key, only primary key tables support changelog.",
279+
baseTableName));
280+
}
281+
CatalogTable originalTable = FlinkConversions.toFlinkTable(tableInfo);
282+
Schema originalSchema = originalTable.getUnresolvedSchema();
283+
List<Schema.UnresolvedColumn> newColumns =
284+
new ArrayList<>(originalSchema.getColumns());
285+
newColumns.add(
286+
new Schema.UnresolvedPhysicalColumn("_change_type", DataTypes.STRING()));
287+
newColumns.add(
288+
new Schema.UnresolvedPhysicalColumn("_log_offset", DataTypes.BIGINT()));
289+
newColumns.add(
290+
new Schema.UnresolvedPhysicalColumn(
291+
"_commit_timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()));
292+
Schema changeLogSchema =
293+
Schema.newBuilder()
294+
.fromColumns(newColumns)
295+
.primaryKey(
296+
originalSchema
297+
.getPrimaryKey()
298+
.orElse(null)
299+
.getColumnNames())
300+
.build();
301+
302+
Map<String, String> options = new HashMap<>(originalTable.getOptions());
303+
options.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
304+
options.put("changelog", "true");
305+
306+
return CatalogTable.newBuilder()
307+
.schema(changeLogSchema)
308+
.comment(originalTable.getComment())
309+
.options(options)
310+
.partitionKeys(originalTable.getPartitionKeys())
311+
.build();
312+
266313
} else {
267314
tableInfo = admin.getTableInfo(tablePath).get();
268315
}

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.HashSet;
5252
import java.util.Set;
5353

54+
import static com.alibaba.fluss.connector.flink.catalog.FlinkCatalog.CHANGELOG_TABLE_SPLITTER;
5455
import static com.alibaba.fluss.connector.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
5556
import static com.alibaba.fluss.connector.flink.utils.FlinkConversions.toFlinkOption;
5657

@@ -69,6 +70,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
6970
lakeTableFactory = mayInitLakeTableFactory();
7071
return lakeTableFactory.createDynamicTableSource(context, tableName);
7172
}
73+
boolean isChangelog = false;
74+
if (tableName.contains(CHANGELOG_TABLE_SPLITTER)) {
75+
isChangelog = true;
76+
}
7277

7378
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
7479
helper.validateExcept("table.", "client.");
@@ -129,7 +134,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
129134
cache,
130135
partitionDiscoveryIntervalMs,
131136
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
132-
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
137+
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
138+
isChangelog);
133139
}
134140

135141
@Override

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkSource.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class FlinkSource implements Source<RowData, SourceSplitBase, SourceEnume
5555
private final OffsetsInitializer offsetsInitializer;
5656
private final long scanPartitionDiscoveryIntervalMs;
5757
private final boolean streaming;
58+
private boolean isChangelog;
5859

5960
public FlinkSource(
6061
Configuration flussConf,
@@ -65,7 +66,8 @@ public FlinkSource(
6566
@Nullable int[] projectedFields,
6667
OffsetsInitializer offsetsInitializer,
6768
long scanPartitionDiscoveryIntervalMs,
68-
boolean streaming) {
69+
boolean streaming,
70+
boolean isChangelog) {
6971
this.flussConf = flussConf;
7072
this.tablePath = tablePath;
7173
this.hasPrimaryKey = hasPrimaryKey;
@@ -75,6 +77,7 @@ public FlinkSource(
7577
this.offsetsInitializer = offsetsInitializer;
7678
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
7779
this.streaming = streaming;
80+
this.isChangelog = isChangelog;
7881
}
7982

8083
@Override
@@ -136,6 +139,7 @@ public SourceReader<RowData, SourceSplitBase> createReader(SourceReaderContext c
136139
sourceOutputType,
137140
context,
138141
projectedFields,
139-
flinkSourceReaderMetrics);
142+
flinkSourceReaderMetrics,
143+
isChangelog);
140144
}
141145
}

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public class FlinkTableSource
9696
// will be empty if no partition key
9797
private final int[] partitionKeyIndexes;
9898
private final boolean streaming;
99+
private final boolean isChangelog;
99100
private final FlinkConnectorOptionsUtils.StartupOptions startupOptions;
100101

101102
// options for lookup source
@@ -137,7 +138,8 @@ public FlinkTableSource(
137138
@Nullable LookupCache cache,
138139
long scanPartitionDiscoveryIntervalMs,
139140
boolean isDataLakeEnabled,
140-
@Nullable MergeEngineType mergeEngineType) {
141+
@Nullable MergeEngineType mergeEngineType,
142+
boolean isChangelog) {
141143
this.tablePath = tablePath;
142144
this.flussConfig = flussConfig;
143145
this.tableOutputType = tableOutputType;
@@ -155,6 +157,7 @@ public FlinkTableSource(
155157
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
156158
this.isDataLakeEnabled = isDataLakeEnabled;
157159
this.mergeEngineType = mergeEngineType;
160+
this.isChangelog = isChangelog;
158161
}
159162

160163
@Override
@@ -261,7 +264,8 @@ public boolean isBounded() {
261264
projectedFields,
262265
offsetsInitializer,
263266
scanPartitionDiscoveryIntervalMs,
264-
streaming);
267+
streaming,
268+
isChangelog);
265269

266270
if (!streaming) {
267271
// return a bounded source provide to make planner happy,
@@ -350,7 +354,8 @@ public DynamicTableSource copy() {
350354
cache,
351355
scanPartitionDiscoveryIntervalMs,
352356
isDataLakeEnabled,
353-
mergeEngineType);
357+
mergeEngineType,
358+
isChangelog);
354359
source.producedDataType = producedDataType;
355360
source.projectedFields = projectedFields;
356361
source.singleRowFilter = singleRowFilter;

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/emitter/FlinkRecordEmitter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.alibaba.fluss.connector.flink.source.reader.RecordAndPos;
2323
import com.alibaba.fluss.connector.flink.source.split.HybridSnapshotLogSplitState;
2424
import com.alibaba.fluss.connector.flink.source.split.SourceSplitState;
25+
import com.alibaba.fluss.connector.flink.utils.ChangelogRowConverter;
2526
import com.alibaba.fluss.connector.flink.utils.FlussRowToFlinkRowConverter;
2627
import com.alibaba.fluss.types.RowType;
2728

@@ -48,8 +49,12 @@ public class FlinkRecordEmitter implements RecordEmitter<RecordAndPos, RowData,
4849

4950
private LakeRecordRecordEmitter lakeRecordRecordEmitter;
5051

51-
public FlinkRecordEmitter(RowType rowType) {
52-
this.converter = new FlussRowToFlinkRowConverter(rowType);
52+
public FlinkRecordEmitter(RowType rowType, boolean isChangeLogMode) {
53+
if (!isChangeLogMode) {
54+
this.converter = new FlussRowToFlinkRowConverter(rowType);
55+
} else {
56+
this.converter = new ChangelogRowConverter(rowType);
57+
}
5358
}
5459

5560
@Override

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReader.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,16 @@
4848
public class FlinkSourceReader
4949
extends SingleThreadMultiplexSourceReaderBase<
5050
RecordAndPos, RowData, SourceSplitBase, SourceSplitState> {
51-
51+
// todo take changes for changeloftable columns
5252
public FlinkSourceReader(
5353
FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordAndPos>> elementsQueue,
5454
Configuration flussConfig,
5555
TablePath tablePath,
5656
RowType sourceOutputType,
5757
SourceReaderContext context,
5858
@Nullable int[] projectedFields,
59-
FlinkSourceReaderMetrics flinkSourceReaderMetrics) {
59+
FlinkSourceReaderMetrics flinkSourceReaderMetrics,
60+
boolean isChangelog) {
6061
super(
6162
elementsQueue,
6263
new FlinkSourceFetcherManager(
@@ -69,7 +70,9 @@ public FlinkSourceReader(
6970
projectedFields,
7071
flinkSourceReaderMetrics),
7172
(ignore) -> {}),
72-
new FlinkRecordEmitter(sourceOutputType),
73+
// todo should have a special FlussRowToFlinkRowConverter that converts the Fluss
74+
// InternalRow into Flink RowData with the additional metadata columns
75+
new FlinkRecordEmitter(sourceOutputType, isChangelog),
7376
context.getConfiguration(),
7477
context);
7578
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.alibaba.fluss.connector.flink.utils;
2+
3+
import com.alibaba.fluss.client.table.scanner.ScanRecord;
4+
import com.alibaba.fluss.types.RowType;
5+
6+
import org.apache.flink.table.data.GenericRowData;
7+
import org.apache.flink.table.data.RowData;
8+
9+
public class ChangelogRowConverter extends FlussRowToFlinkRowConverter {
10+
public ChangelogRowConverter(RowType rowType) {
11+
super(rowType);
12+
}
13+
14+
public RowData toFlinkRowData(ScanRecord scanRecord) {
15+
RowData baseRowData = super.toFlinkRowData(scanRecord);
16+
GenericRowData rowWithMetadata = new GenericRowData(baseRowData.getArity() + 3);
17+
rowWithMetadata.setRowKind(baseRowData.getRowKind());
18+
19+
for (int i = 0; i < baseRowData.getArity(); i++) {
20+
rowWithMetadata.setField(i, baseRowData.getRawValue(i));
21+
}
22+
23+
int baseArity = baseRowData.getArity();
24+
String changeType;
25+
switch (scanRecord.getRowKind()) {
26+
case INSERT:
27+
changeType = "+I";
28+
break;
29+
case UPDATE_BEFORE:
30+
changeType = "-U";
31+
break;
32+
case UPDATE_AFTER:
33+
changeType = "+U";
34+
break;
35+
case DELETE:
36+
changeType = "-D";
37+
break;
38+
default:
39+
changeType = "+I";
40+
break;
41+
}
42+
rowWithMetadata.setField(baseArity, changeType);
43+
rowWithMetadata.setField(baseArity + 1, scanRecord.logOffset());
44+
rowWithMetadata.setField(baseArity + 2, scanRecord.timestamp());
45+
46+
return rowWithMetadata;
47+
}
48+
}

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlussRowToFlinkRowConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class FlussRowToFlinkRowConverter {
4747
private final FlussDeserializationConverter[] toFlinkFieldConverters;
4848
private final InternalRow.FieldGetter[] flussFieldGetters;
4949

50+
// todo converts the Fluss InternalRow into Flink RowData with the additional metadata columns
5051
public FlussRowToFlinkRowConverter(RowType rowType) {
5152
this.toFlinkFieldConverters = new FlussDeserializationConverter[rowType.getFieldCount()];
5253
this.flussFieldGetters = new InternalRow.FieldGetter[rowType.getFieldCount()];

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
import javax.annotation.Nullable;
6666

6767
import java.io.UncheckedIOException;
68+
import java.util.Arrays;
69+
import java.util.List;
6870
import java.util.Map;
6971
import java.util.concurrent.CompletableFuture;
7072
import java.util.function.Supplier;
@@ -135,6 +137,9 @@ public CompletableFuture<DropDatabaseResponse> dropDatabase(DropDatabaseRequest
135137
return CompletableFuture.completedFuture(response);
136138
}
137139

140+
// todo keep a validation that whether the created table using system reserved columns
141+
// (_change_type, _log_offset, _commit_timestamp)
142+
138143
@Override
139144
public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest request) {
140145
TablePath tablePath = toTablePath(request.getTablePath());
@@ -143,6 +148,17 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
143148
TableDescriptor tableDescriptor;
144149
try {
145150
tableDescriptor = TableDescriptor.fromJsonBytes(request.getTableJson());
151+
// Validate reserved column names
152+
List<String> reservedColumns =
153+
Arrays.asList("_change_type", "_log_offset", "_commit_timestamp");
154+
for (String columnName : tableDescriptor.getSchema().getColumnNames()) {
155+
if (reservedColumns.contains(columnName)) {
156+
throw new InvalidTableException(
157+
String.format(
158+
"Column name '%s' is reserved for system use.", columnName));
159+
}
160+
}
161+
146162
} catch (Exception e) {
147163
if (e instanceof UncheckedIOException) {
148164
throw new InvalidTableException(

0 commit comments

Comments
 (0)