Skip to content

Commit 91d8adb

Browse files
authored
[lake/paimon] Automatically append system columns for Fluss primary key table when create paimon table
This closes #961.
1 parent d69c8f9 commit 91d8adb

File tree

2 files changed

+39
-14
lines changed

2 files changed

+39
-14
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,19 @@
4545
/** A Paimon implementation of {@link LakeCatalog}. */
4646
public class PaimonLakeCatalog implements LakeCatalog {
4747

48+
private static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new LinkedHashMap<>();
49+
50+
static {
51+
// We need __bucket system column to filter out the given bucket
52+
// for paimon bucket-unaware append only table.
53+
// It's not required for paimon bucket-aware table like primary key table
54+
// and bucket-aware append only table, but we always add the system column
55+
// for consistent behavior
56+
SYSTEM_COLUMNS.put(BUCKET_COLUMN_NAME, DataTypes.INT());
57+
SYSTEM_COLUMNS.put(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
58+
SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
59+
}
60+
4861
private final Catalog paimonCatalog;
4962

5063
// for fluss config
@@ -127,20 +140,11 @@ private Schema toPaimonSchema(TableDescriptor tableDescriptor) {
127140
options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue());
128141
}
129142

130-
Map<String, DataType> systemColumns = new LinkedHashMap<>();
131-
if (!tableDescriptor.hasPrimaryKey()) {
132-
// for log table, need to set bucket, offset and timestamp as system metadata columns
133-
systemColumns.put(BUCKET_COLUMN_NAME, DataTypes.INT());
134-
systemColumns.put(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
135-
// we use timestamp_ltz type
136-
systemColumns.put(TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
137-
}
138-
139143
// set schema
140144
for (com.alibaba.fluss.metadata.Schema.Column column :
141145
tableDescriptor.getSchema().getColumns()) {
142146
String columnName = column.getName();
143-
if (systemColumns.containsKey(columnName)) {
147+
if (SYSTEM_COLUMNS.containsKey(columnName)) {
144148
throw new InvalidTableException(
145149
"Column "
146150
+ columnName
@@ -153,7 +157,7 @@ private Schema toPaimonSchema(TableDescriptor tableDescriptor) {
153157
}
154158

155159
// add system metadata columns to schema
156-
for (Map.Entry<String, DataType> systemColumn : systemColumns.entrySet()) {
160+
for (Map.Entry<String, DataType> systemColumn : SYSTEM_COLUMNS.entrySet()) {
157161
schemaBuilder.column(systemColumn.getKey(), systemColumn.getValue());
158162
}
159163

@@ -186,7 +190,7 @@ private void setFlussPropertyToPaimon(String key, String value, Options options)
186190
}
187191

188192
@Override
189-
public void close() throws Exception {
193+
public void close() {
190194
IOUtils.closeQuietly(paimonCatalog, "paimon catalog");
191195
}
192196
}

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,18 @@ void testCreateLakeEnabledTable() throws Exception {
223223
new DataType[] {
224224
org.apache.paimon.types.DataTypes.INT().notNull(),
225225
org.apache.paimon.types.DataTypes.STRING(),
226+
// for __bucket, __offset, __timestamp
227+
org.apache.paimon.types.DataTypes.INT(),
228+
org.apache.paimon.types.DataTypes.BIGINT(),
229+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
226230
},
227-
new String[] {"pk_c1", "pk_c2"}),
231+
new String[] {
232+
"pk_c1",
233+
"pk_c2",
234+
BUCKET_COLUMN_NAME,
235+
OFFSET_COLUMN_NAME,
236+
TIMESTAMP_COLUMN_NAME
237+
}),
228238
"pk_c1",
229239
BUCKET_NUM);
230240

@@ -256,8 +266,19 @@ void testCreateLakeEnabledTable() throws Exception {
256266
org.apache.paimon.types.DataTypes.INT().notNull(),
257267
org.apache.paimon.types.DataTypes.STRING(),
258268
org.apache.paimon.types.DataTypes.STRING().notNull(),
269+
// for __bucket, __offset, __timestamp
270+
org.apache.paimon.types.DataTypes.INT(),
271+
org.apache.paimon.types.DataTypes.BIGINT(),
272+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
259273
},
260-
new String[] {"c1", "c2", "c3"}),
274+
new String[] {
275+
"c1",
276+
"c2",
277+
"c3",
278+
BUCKET_COLUMN_NAME,
279+
OFFSET_COLUMN_NAME,
280+
TIMESTAMP_COLUMN_NAME
281+
}),
261282
"c1",
262283
BUCKET_NUM);
263284
}

0 commit comments

Comments
 (0)