Skip to content

Commit 58ee7dc

Browse files
committed
Improve lake-first schema evolution safety
1 parent 31fdaa2 commit 58ee7dc

File tree

5 files changed

+83
-11
lines changed

5 files changed

+83
-11
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,18 @@ public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
5151
public void setFlussRecord(LogRecord logRecord) {
5252
this.logRecord = logRecord;
5353
this.internalRow = logRecord.getRow();
54-
this.originRowFieldCount = Math.min(internalRow.getFieldCount(), businessFieldCount);
54+
int flussFieldCount = internalRow.getFieldCount();
55+
if (flussFieldCount > businessFieldCount) {
56+
// Fluss record is wider than Paimon schema, which means Lake schema is not yet
57+
// synchronized. With "Lake First" strategy, this should not happen in normal cases.
58+
throw new IllegalStateException(
59+
String.format(
60+
"Fluss record has %d fields but Paimon schema only has %d business fields. "
61+
+ "This indicates the lake schema is not yet synchronized. "
62+
+ "Please retry the schema change operation.",
63+
flussFieldCount, businessFieldCount));
64+
}
65+
this.originRowFieldCount = flussFieldCount;
5566
}
5667

5768
@Override

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static org.apache.fluss.record.ChangeType.UPDATE_AFTER;
3939
import static org.apache.fluss.record.ChangeType.UPDATE_BEFORE;
4040
import static org.assertj.core.api.Assertions.assertThat;
41+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4142

4243
/** Test for {@link FlussRecordAsPaimonRow}. */
4344
class FlussRecordAsPaimonRowTest {
@@ -197,6 +198,7 @@ void testPaimonSchemaWiderThanFlussRecord() {
197198

198199
@Test
199200
void testFlussRecordWiderThanPaimonSchema() {
201+
// With "Lake First" strategy, Fluss record wider than Paimon schema should throw exception
200202
int tableBucket = 0;
201203
RowType tableRowType =
202204
RowType.of(
@@ -215,12 +217,11 @@ void testFlussRecordWiderThanPaimonSchema() {
215217
genericRow.setField(0, true);
216218
genericRow.setField(1, BinaryString.fromString("extra"));
217219
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
218-
flussRecordAsPaimonRow.setFlussRecord(logRecord);
219220

220-
assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(4);
221-
assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue();
222-
assertThat(flussRecordAsPaimonRow.getInt(1)).isEqualTo(tableBucket);
223-
assertThat(flussRecordAsPaimonRow.getLong(2)).isEqualTo(logOffset);
224-
assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(timeStamp);
221+
// Should throw exception instead of silently truncating data
222+
assertThatThrownBy(() -> flussRecordAsPaimonRow.setFlussRecord(logRecord))
223+
.isInstanceOf(IllegalStateException.class)
224+
.hasMessageContaining(
225+
"Fluss record has 2 fields but Paimon schema only has 1 business fields");
225226
}
226227
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,63 @@ private CloseableIterator<org.apache.paimon.data.InternalRow> getPaimonRowClosea
604604
return reader.toCloseableIterator();
605605
}
606606

607+
@Test
608+
void testTieringWithAddColumn() throws Exception {
609+
// Test ADD COLUMN during tiering with "Lake First" strategy
610+
611+
// 1. Create a datalake enabled table with initial schema (c1: INT, c2: STRING)
612+
TablePath tablePath = TablePath.of(DEFAULT_DB, "addColumnTable");
613+
long tableId = createLogTable(tablePath);
614+
TableBucket tableBucket = new TableBucket(tableId, 0);
615+
616+
// 2. Write initial data before ADD COLUMN
617+
List<InternalRow> initialRows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
618+
writeRows(tablePath, initialRows, true);
619+
620+
// 3. Start tiering job
621+
JobClient jobClient = buildTieringJob(execEnv);
622+
623+
try {
624+
// 4. Wait for initial data to be tiered
625+
assertReplicaStatus(tableBucket, 3);
626+
627+
// 5. Execute ADD COLUMN (c3: INT, nullable)
628+
List<TableChange> addColumnChanges =
629+
Collections.singletonList(
630+
TableChange.addColumn(
631+
"c3",
632+
DataTypes.INT(),
633+
"new column",
634+
TableChange.ColumnPosition.last()));
635+
admin.alterTable(tablePath, addColumnChanges, false).get();
636+
637+
// 6. Write more data after ADD COLUMN (with new column value)
638+
List<InternalRow> newRows = Arrays.asList(row(4, "v4"), row(5, "v5"), row(6, "v6"));
639+
writeRows(tablePath, newRows, true);
640+
641+
// 7. Wait for new data to be tiered
642+
assertReplicaStatus(tableBucket, 6);
643+
644+
// 8. Verify Paimon table has the new column
645+
Identifier tableIdentifier =
646+
Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
647+
FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(tableIdentifier);
648+
List<String> fieldNames = paimonTable.rowType().getFieldNames();
649+
650+
// Should have: c1, c2, c3, __bucket, __offset, __timestamp
651+
assertThat(fieldNames).contains("c1", "c2", "c3");
652+
653+
// 9. Verify all data is present in Paimon (no data loss)
654+
List<InternalRow> allRows = new ArrayList<>();
655+
allRows.addAll(initialRows);
656+
allRows.addAll(newRows);
657+
checkDataInPaimonAppendOnlyTable(tablePath, allRows, 0);
658+
659+
} finally {
660+
jobClient.cancel().get();
661+
}
662+
}
663+
607664
@Override
608665
protected FlussClusterExtension getFlussClusterExtension() {
609666
return FLUSS_CLUSTER_EXTENSION;

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -335,13 +335,15 @@ public void alterTableSchema(
335335
// validate the table column changes
336336
if (!schemaChanges.isEmpty()) {
337337
Schema newSchema = SchemaUpdate.applySchemaChanges(table, schemaChanges);
338-
// update the schema in Fluss (ZK) first - Fluss is the source of truth
339-
if (!newSchema.equals(table.getSchema())) {
340-
zookeeperClient.registerSchema(tablePath, newSchema, table.getSchemaId() + 1);
341-
}
342338

339+
// Lake First: sync to Lake before updating Fluss schema
343340
syncSchemaChangesToLake(
344341
tablePath, table, schemaChanges, lakeCatalog, lakeCatalogContext);
342+
343+
// Update Fluss schema (ZK) after Lake sync succeeds
344+
if (!newSchema.equals(table.getSchema())) {
345+
zookeeperClient.registerSchema(tablePath, newSchema, table.getSchemaId() + 1);
346+
}
345347
}
346348
} catch (Exception e) {
347349
if (e instanceof TableNotExistException) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public SchemaUpdate applySchemaChange(TableChange columnChange) {
8686
private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
8787
Schema.Column existingColumn = existedColumns.get(addColumn.getName());
8888
if (existingColumn != null) {
89+
// Allow idempotent retries: if column name/type/comment match existing, treat as no-op
8990
if (!existingColumn.getDataType().equals(addColumn.getDataType())
9091
|| !Objects.equals(existingColumn.getComment(), addColumn.getComment())) {
9192
throw new IllegalArgumentException(

0 commit comments

Comments
 (0)