Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,4 +315,64 @@ void testPartitionedTable() {
.hasMessage(
"Bucket key [f0, f3] shouldn't include any column in partition keys [f0].");
}

@Test
void testChangelogTable() {
// Test a valid changelog table descriptor (with primary key)
TableDescriptor validChangelogTable =
TableDescriptor.builder()
.schema(SCHEMA_1)
.property("table.changelog.enabled", "true")
.build();

// Verify properties and schema
assertThat(validChangelogTable.getProperties())
.containsEntry("table.changelog.enabled", "true");
assertThat(validChangelogTable.getSchema().getPrimaryKey()).isPresent();

// Test a changelog table descriptor without primary key
Schema schemaWithoutPK =
Schema.newBuilder()
.column("f0", DataTypes.STRING())
.column("f1", DataTypes.BIGINT())
.build();

TableDescriptor invalidChangelogTable =
TableDescriptor.builder()
.schema(schemaWithoutPK)
.property("table.changelog.enabled", "true")
.build();

// Verify lack of primary key
assertThat(invalidChangelogTable.getSchema().getPrimaryKey()).isEmpty();

// Test schemas with reserved column names
Schema.Builder schemaWithReservedColumn =
Schema.newBuilder()
.column("f0", DataTypes.STRING())
.column("_change_type", DataTypes.STRING())
.primaryKey("f0");

// Verify column exists in schema
assertThat(schemaWithReservedColumn.build().getColumnNames()).contains("_change_type");

// Test with other reserved column names
Schema schemaWithReservedColumn2 =
Schema.newBuilder()
.column("f0", DataTypes.STRING())
.column("_log_offset", DataTypes.BIGINT())
.primaryKey("f0")
.build();

assertThat(schemaWithReservedColumn2.getColumnNames()).contains("_log_offset");

Schema schemaWithReservedColumn3 =
Schema.newBuilder()
.column("f0", DataTypes.STRING())
.column("_commit_timestamp", DataTypes.TIMESTAMP())
.primaryKey("f0")
.build();

assertThat(schemaWithReservedColumn3.getColumnNames()).contains("_commit_timestamp");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public class FlinkConnectorOptions {
.defaultValue(false)
.withDescription("Whether to ignore retract(-U/-D) record.");

public static final ConfigOption<Boolean> CHANGELOG =
ConfigOptions.key("enable.changelog")
.booleanType()
.defaultValue(false)
.withDescription("Whether the table is a changelog table");

// --------------------------------------------------------------------------------------------
// table storage specific options
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import com.alibaba.fluss.utils.ExceptionUtils;
import com.alibaba.fluss.utils.IOUtils;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
Expand Down Expand Up @@ -87,6 +89,8 @@ public class FlinkCatalog implements Catalog {

public static final String LAKE_TABLE_SPLITTER = "$lake";

public static final String CHANGELOG_TABLE_SPLITTER = "$changelog";

protected final ClassLoader classLoader;

protected final String catalogName;
Expand Down Expand Up @@ -273,6 +277,46 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
}
return getLakeTable(
objectPath.getDatabaseName(), tableName, tableInfo.getProperties());
// table name contains $changelog, means to read from changlog table which contains
// additional metadata columns
} else if ((tableName.contains(CHANGELOG_TABLE_SPLITTER))) {
String baseTableName = tableName.split("\\" + CHANGELOG_TABLE_SPLITTER)[0];
TablePath baseTablePath = TablePath.of(objectPath.getDatabaseName(), baseTableName);
tableInfo = admin.getTableInfo(baseTablePath).get();
if (!tableInfo.hasPrimaryKey()) {
throw new UnsupportedOperationException(
String.format(
"\"Table %s has no primary key, only primary key tables support changelog.",
baseTableName));
}
CatalogTable originalTable = FlinkConversions.toFlinkTable(tableInfo);
Schema originalSchema = originalTable.getUnresolvedSchema();
Schema changeLogSchema =
Schema.newBuilder()
.column("_change_type", DataTypes.STRING())
.withComment(
"+I: Insert, -U: Before the update, +U: After the update, -D: Delete")
.column("_log_offset", DataTypes.BIGINT())
.withComment("the offset of the log")
.column(
"_commit_timestamp",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
.withComment(
"the timestamp associated when the change was happended")
.fromSchema(originalSchema)
.build();

Map<String, String> options = new HashMap<>(originalTable.getOptions());
options.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
options.put("enable.changelog", "true");

return CatalogTable.newBuilder()
.schema(changeLogSchema)
.comment(originalTable.getComment())
.options(options)
.partitionKeys(originalTable.getPartitionKeys())
.build();

} else {
tableInfo = admin.getTableInfo(tablePath).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.HashSet;
import java.util.Set;

import static com.alibaba.fluss.flink.catalog.FlinkCatalog.CHANGELOG_TABLE_SPLITTER;
import static com.alibaba.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlinkOption;

Expand All @@ -69,6 +70,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {
lakeTableFactory = mayInitLakeTableFactory();
return lakeTableFactory.createDynamicTableSource(context, tableName);
}
boolean enableChangelog = false;
if (tableName.contains(CHANGELOG_TABLE_SPLITTER)) {
tableName = tableName.substring(0, tableName.indexOf(CHANGELOG_TABLE_SPLITTER));
enableChangelog = true;
}

FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept("table.", "client.");
Expand Down Expand Up @@ -114,9 +120,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tableOptions
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
.toMillis();

TablePath tablePath = TablePath.of(tableIdentifier.getDatabaseName(), tableName);
return new FlinkTableSource(
toFlussTablePath(context.getObjectIdentifier()),
tablePath,
toFlussClientConfig(tableOptions, context.getConfiguration()),
tableOutputType,
primaryKeyIndexes,
Expand All @@ -129,7 +135,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
cache,
partitionDiscoveryIntervalMs,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
enableChangelog);
}

@Override
Expand Down Expand Up @@ -176,6 +183,7 @@ public Set<ConfigOption<?>> optionalOptions() {
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
FlinkConnectorOptions.LOOKUP_ASYNC,
FlinkConnectorOptions.SINK_IGNORE_DELETE,
FlinkConnectorOptions.CHANGELOG,
LookupOptions.MAX_RETRIES,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
/** Flink source for Fluss. */
public class FlinkSource implements Source<RowData, SourceSplitBase, SourceEnumeratorState> {
private static final long serialVersionUID = 1L;

private final Configuration flussConf;
private final TablePath tablePath;
private final boolean hasPrimaryKey;
Expand All @@ -55,6 +54,8 @@ public class FlinkSource implements Source<RowData, SourceSplitBase, SourceEnume
private final OffsetsInitializer offsetsInitializer;
private final long scanPartitionDiscoveryIntervalMs;
private final boolean streaming;
private boolean enableChangelog;
@Nullable private final int[] selectedMetadataFields;

public FlinkSource(
Configuration flussConf,
Expand All @@ -65,7 +66,9 @@ public FlinkSource(
@Nullable int[] projectedFields,
OffsetsInitializer offsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
boolean streaming) {
boolean streaming,
boolean enableChangelog,
@Nullable int[] selectedMetadataFields) {
this.flussConf = flussConf;
this.tablePath = tablePath;
this.hasPrimaryKey = hasPrimaryKey;
Expand All @@ -75,6 +78,8 @@ public FlinkSource(
this.offsetsInitializer = offsetsInitializer;
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
this.streaming = streaming;
this.enableChangelog = enableChangelog;
this.selectedMetadataFields = selectedMetadataFields;
}

@Override
Expand Down Expand Up @@ -136,6 +141,8 @@ public SourceReader<RowData, SourceSplitBase> createReader(SourceReaderContext c
sourceOutputType,
context,
projectedFields,
flinkSourceReaderMetrics);
flinkSourceReaderMetrics,
enableChangelog,
selectedMetadataFields);
}
}
Loading