Skip to content

Commit 4a62762

Browse files
authored
[lake] Support ADD COLUMN AT LAST for dataLake enabled tables (#2189)
1 parent d4a72fa commit 4a62762

File tree

13 files changed

+566
-44
lines changed

13 files changed

+566
-44
lines changed

fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,22 @@ public String getComment() {
257257
public ColumnPosition getPosition() {
258258
return position;
259259
}
260+
261+
@Override
262+
public String toString() {
263+
return "AddColumn{"
264+
+ "name='"
265+
+ name
266+
+ '\''
267+
+ ", dataType="
268+
+ dataType
269+
+ ", comment='"
270+
+ comment
271+
+ '\''
272+
+ ", position="
273+
+ position
274+
+ '}';
275+
}
260276
}
261277

262278
/** A table change to drop a column. */
@@ -270,6 +286,11 @@ private DropColumn(String name) {
270286
public String getName() {
271287
return name;
272288
}
289+
290+
@Override
291+
public String toString() {
292+
return "DropColumn{" + "name='" + name + '\'' + '}';
293+
}
273294
}
274295

275296
/** A table change to modify a column. */
@@ -308,6 +329,22 @@ public String getComment() {
308329
public ColumnPosition getNewPosition() {
309330
return newPosition;
310331
}
332+
333+
@Override
334+
public String toString() {
335+
return "ModifyColumn{"
336+
+ "name='"
337+
+ name
338+
+ '\''
339+
+ ", dataType="
340+
+ dataType
341+
+ ", comment='"
342+
+ comment
343+
+ '\''
344+
+ ", newPosition="
345+
+ newPosition
346+
+ '}';
347+
}
311348
}
312349

313350
/** A table change to modify a column's name. */
@@ -327,6 +364,18 @@ public String getOldColumnName() {
327364
public String getNewColumnName() {
328365
return newColumnName;
329366
}
367+
368+
@Override
369+
public String toString() {
370+
return "RenameColumn{"
371+
+ "oldColumnName='"
372+
+ oldColumnName
373+
+ '\''
374+
+ ", newColumnName='"
375+
+ newColumnName
376+
+ '\''
377+
+ '}';
378+
}
330379
}
331380

332381
/** The position of the modified or added column. */

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.VisibleForTesting;
2121
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.exception.InvalidAlterTableException;
2223
import org.apache.fluss.exception.TableAlreadyExistException;
2324
import org.apache.fluss.exception.TableNotExistException;
2425
import org.apache.fluss.lake.lakestorage.LakeCatalog;
@@ -38,6 +39,8 @@
3839
import org.apache.paimon.table.Table;
3940
import org.apache.paimon.types.DataType;
4041
import org.apache.paimon.types.DataTypes;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
4144

4245
import java.util.LinkedHashMap;
4346
import java.util.List;
@@ -54,6 +57,7 @@
5457
/** A Paimon implementation of {@link LakeCatalog}. */
5558
public class PaimonLakeCatalog implements LakeCatalog {
5659

60+
private static final Logger LOG = LoggerFactory.getLogger(PaimonLakeCatalog.class);
5761
public static final LinkedHashMap<String, DataType> SYSTEM_COLUMNS = new LinkedHashMap<>();
5862

5963
static {
@@ -109,13 +113,92 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
109113
throws TableNotExistException {
110114
try {
111115
List<SchemaChange> paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);
112-
alterTable(tablePath, paimonSchemaChanges);
113-
} catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) {
114-
// shouldn't happen before we support schema change
115-
throw new RuntimeException(e);
116+
117+
// Compare current Paimon table schema with expected target schema before altering
118+
if (shouldAlterTable(tablePath, tableChanges)) {
119+
alterTable(tablePath, paimonSchemaChanges);
120+
} else {
121+
// If schemas already match, treat as idempotent success
122+
LOG.info(
123+
"Skipping schema evolution for Paimon table {} because the column(s) to add {} already exist.",
124+
tablePath,
125+
tableChanges);
126+
}
127+
} catch (Catalog.ColumnAlreadyExistException e) {
128+
// This shouldn't happen if shouldAlterTable works correctly, but keep as safeguard
129+
throw new InvalidAlterTableException(e.getMessage());
130+
} catch (Catalog.ColumnNotExistException e) {
131+
// This shouldn't happen for AddColumn operations
132+
throw new InvalidAlterTableException(e.getMessage());
116133
}
117134
}
118135

136+
private boolean shouldAlterTable(TablePath tablePath, List<TableChange> tableChanges)
137+
throws TableNotExistException {
138+
try {
139+
Table table = paimonCatalog.getTable(toPaimon(tablePath));
140+
FileStoreTable fileStoreTable = (FileStoreTable) table;
141+
Schema currentSchema = fileStoreTable.schema().toSchema();
142+
143+
for (TableChange change : tableChanges) {
144+
if (change instanceof TableChange.AddColumn) {
145+
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
146+
if (!isColumnAlreadyExists(currentSchema, addColumn)) {
147+
return true;
148+
}
149+
} else {
150+
return true;
151+
}
152+
}
153+
154+
return false;
155+
} catch (Catalog.TableNotExistException e) {
156+
throw new TableNotExistException("Table " + tablePath + " does not exist.");
157+
}
158+
}
159+
160+
private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColumn addColumn) {
161+
String columnName = addColumn.getName();
162+
163+
for (org.apache.paimon.types.DataField field : currentSchema.fields()) {
164+
if (field.name().equals(columnName)) {
165+
org.apache.paimon.types.DataType expectedType =
166+
addColumn
167+
.getDataType()
168+
.accept(
169+
org.apache.fluss.lake.paimon.utils
170+
.FlussDataTypeToPaimonDataType.INSTANCE);
171+
172+
if (!field.type().equals(expectedType)) {
173+
throw new InvalidAlterTableException(
174+
String.format(
175+
"Column '%s' already exists but with different type. "
176+
+ "Existing: %s, Expected: %s",
177+
columnName, field.type(), expectedType));
178+
}
179+
String existingComment = field.description();
180+
String expectedComment = addColumn.getComment();
181+
182+
boolean commentsMatch =
183+
(existingComment == null && expectedComment == null)
184+
|| (existingComment != null
185+
&& existingComment.equals(expectedComment));
186+
187+
if (!commentsMatch) {
188+
throw new InvalidAlterTableException(
189+
String.format(
190+
"Column %s already exists but with different comment. "
191+
+ "Existing: %s, Expected: %s",
192+
columnName, existingComment, expectedComment));
193+
}
194+
195+
return true;
196+
}
197+
}
198+
199+
return false;
200+
}
201+
119202
private void createTable(TablePath tablePath, Schema schema, boolean isCreatingFlussTable)
120203
throws Catalog.DatabaseNotExistException {
121204
Identifier paimonPath = toPaimon(tablePath);
@@ -134,7 +217,7 @@ private void createTable(TablePath tablePath, Schema schema, boolean isCreatingF
134217
}
135218
} catch (Catalog.TableNotExistException tableNotExistException) {
136219
// shouldn't happen in normal cases
137-
throw new RuntimeException(
220+
throw new InvalidAlterTableException(
138221
String.format(
139222
"Failed to create table %s in Paimon. The table already existed "
140223
+ "during the initial creation attempt, but subsequently "

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,35 +27,49 @@
2727

2828
import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
2929
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
30-
import static org.apache.fluss.utils.Preconditions.checkState;
3130

3231
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
3332
public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
3433

3534
private final int bucket;
3635
private LogRecord logRecord;
3736
private int originRowFieldCount;
37+
private final int businessFieldCount;
38+
private final int bucketFieldIndex;
39+
private final int offsetFieldIndex;
40+
private final int timestampFieldIndex;
3841

3942
public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
4043
super(tableTowType);
4144
this.bucket = bucket;
45+
this.businessFieldCount = tableRowType.getFieldCount() - SYSTEM_COLUMNS.size();
46+
this.bucketFieldIndex = businessFieldCount;
47+
this.offsetFieldIndex = businessFieldCount + 1;
48+
this.timestampFieldIndex = businessFieldCount + 2;
4249
}
4350

4451
public void setFlussRecord(LogRecord logRecord) {
4552
this.logRecord = logRecord;
4653
this.internalRow = logRecord.getRow();
47-
this.originRowFieldCount = internalRow.getFieldCount();
48-
checkState(
49-
originRowFieldCount == tableRowType.getFieldCount() - SYSTEM_COLUMNS.size(),
50-
"The paimon table fields count must equals to LogRecord's fields count.");
54+
int flussFieldCount = internalRow.getFieldCount();
55+
if (flussFieldCount > businessFieldCount) {
56+
// Fluss record is wider than Paimon schema, which means Lake schema is not yet
57+
// synchronized. With "Lake First" strategy, this should not happen in normal cases.
58+
throw new IllegalStateException(
59+
String.format(
60+
"Fluss record has %d fields but Paimon schema only has %d business fields. "
61+
+ "This indicates the lake schema is not yet synchronized. "
62+
+ "Please retry the schema change operation.",
63+
flussFieldCount, businessFieldCount));
64+
}
65+
this.originRowFieldCount = flussFieldCount;
5166
}
5267

5368
@Override
5469
public int getFieldCount() {
55-
return
56-
// business (including partitions) + system (three system fields: bucket, offset,
70+
// business (including partitions) + system (three system fields: bucket, offset,
5771
// timestamp)
58-
originRowFieldCount + SYSTEM_COLUMNS.size();
72+
return tableRowType.getFieldCount();
5973
}
6074

6175
@Override
@@ -68,38 +82,60 @@ public boolean isNullAt(int pos) {
6882
if (pos < originRowFieldCount) {
6983
return super.isNullAt(pos);
7084
}
85+
if (pos < businessFieldCount) {
86+
// Padding NULL for missing business fields when Paimon schema is wider than Fluss
87+
return true;
88+
}
7189
// is the last three system fields: bucket, offset, timestamp which are never null
7290
return false;
7391
}
7492

7593
@Override
7694
public int getInt(int pos) {
77-
if (pos == originRowFieldCount) {
95+
if (pos == bucketFieldIndex) {
7896
// bucket system column
7997
return bucket;
8098
}
99+
if (pos >= originRowFieldCount) {
100+
throw new IllegalStateException(
101+
String.format(
102+
"Field %s is NULL because Paimon schema is wider than Fluss record.",
103+
pos));
104+
}
81105
return super.getInt(pos);
82106
}
83107

84108
@Override
85109
public long getLong(int pos) {
86-
if (pos == originRowFieldCount + 1) {
110+
if (pos == offsetFieldIndex) {
87111
// offset system column
88112
return logRecord.logOffset();
89-
} else if (pos == originRowFieldCount + 2) {
113+
} else if (pos == timestampFieldIndex) {
90114
// timestamp system column
91115
return logRecord.timestamp();
92116
}
117+
if (pos >= originRowFieldCount) {
118+
throw new IllegalStateException(
119+
String.format(
120+
"Field %s is NULL because Paimon schema is wider than Fluss record.",
121+
pos));
122+
}
93123
// the origin RowData
94124
return super.getLong(pos);
95125
}
96126

97127
@Override
98128
public Timestamp getTimestamp(int pos, int precision) {
99129
// it's timestamp system column
100-
if (pos == originRowFieldCount + 2) {
130+
if (pos == timestampFieldIndex) {
101131
return Timestamp.fromEpochMillis(logRecord.timestamp());
102132
}
133+
if (pos >= originRowFieldCount) {
134+
throw new IllegalStateException(
135+
String.format(
136+
"Field %s is NULL because Paimon schema is wider than Fluss record.",
137+
pos));
138+
}
103139
return super.getTimestamp(pos, precision);
104140
}
105141
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,31 @@ public static List<SchemaChange> toPaimonSchemaChanges(List<TableChange> tableCh
128128
schemaChanges.add(
129129
SchemaChange.removeOption(
130130
convertFlussPropertyKeyToPaimon(resetOption.getKey())));
131+
} else if (tableChange instanceof TableChange.AddColumn) {
132+
TableChange.AddColumn addColumn = (TableChange.AddColumn) tableChange;
133+
134+
if (!(addColumn.getPosition() instanceof TableChange.Last)) {
135+
throw new UnsupportedOperationException(
136+
"Only support to add column at last for paimon table.");
137+
}
138+
139+
org.apache.fluss.types.DataType flussDataType = addColumn.getDataType();
140+
if (!flussDataType.isNullable()) {
141+
throw new UnsupportedOperationException(
142+
"Only support to add nullable column for paimon table.");
143+
}
144+
145+
org.apache.paimon.types.DataType paimonDataType =
146+
flussDataType.accept(FlussDataTypeToPaimonDataType.INSTANCE);
147+
148+
String firstSystemColumnName = SYSTEM_COLUMNS.keySet().iterator().next();
149+
schemaChanges.add(
150+
SchemaChange.addColumn(
151+
addColumn.getName(),
152+
paimonDataType,
153+
addColumn.getComment(),
154+
SchemaChange.Move.before(
155+
addColumn.getName(), firstSystemColumnName)));
131156
} else {
132157
throw new UnsupportedOperationException(
133158
"Unsupported table change: " + tableChange.getClass());

0 commit comments

Comments
 (0)