Skip to content

Commit b0357bb

Browse files
buvbwuchong
authored andcommitted
[lake/paimon] Fix schema evolution safety and address review comments
- Add shouldAlterTable() to verify schema before altering - Add isColumnAlreadyExists() for precise column property validation - Fix null-safe comment comparison logic - Use containsExactly() for precise field order assertions in tests This addresses all review comments from wuchong in PR apache#2189
1 parent cd66a59 commit b0357bb

File tree

3 files changed

+152
-11
lines changed

3 files changed

+152
-11
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,87 @@ public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Cont
109109
throws TableNotExistException {
110110
try {
111111
List<SchemaChange> paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);
112-
alterTable(tablePath, paimonSchemaChanges);
112+
113+
// Compare current Paimon table schema with expected target schema before altering
114+
if (shouldAlterTable(tablePath, tableChanges)) {
115+
alterTable(tablePath, paimonSchemaChanges);
116+
}
117+
// If schemas already match, treat as idempotent success
113118
} catch (Catalog.ColumnAlreadyExistException e) {
114-
// Column already exists, treat as idempotent success for retry scenarios.
119+
// This shouldn't happen if shouldAlterTable works correctly, but keep as safeguard
120+
throw new RuntimeException("Unexpected ColumnAlreadyExistException", e);
115121
} catch (Catalog.ColumnNotExistException e) {
116122
// This shouldn't happen for AddColumn operations
117123
throw new RuntimeException(e);
118124
}
119125
}
120126

127+
private boolean shouldAlterTable(TablePath tablePath, List<TableChange> tableChanges)
128+
throws TableNotExistException {
129+
try {
130+
Table table = paimonCatalog.getTable(toPaimon(tablePath));
131+
FileStoreTable fileStoreTable = (FileStoreTable) table;
132+
Schema currentSchema = fileStoreTable.schema().toSchema();
133+
134+
for (TableChange change : tableChanges) {
135+
if (change instanceof TableChange.AddColumn) {
136+
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
137+
if (!isColumnAlreadyExists(currentSchema, addColumn)) {
138+
return true;
139+
}
140+
} else {
141+
return true;
142+
}
143+
}
144+
145+
return false;
146+
} catch (Catalog.TableNotExistException e) {
147+
throw new TableNotExistException("Table " + tablePath + " does not exist.");
148+
}
149+
}
150+
151+
private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColumn addColumn) {
152+
String columnName = addColumn.getName();
153+
154+
for (org.apache.paimon.types.DataField field : currentSchema.fields()) {
155+
if (field.name().equals(columnName)) {
156+
org.apache.paimon.types.DataType expectedType =
157+
addColumn
158+
.getDataType()
159+
.accept(
160+
org.apache.fluss.lake.paimon.utils
161+
.FlussDataTypeToPaimonDataType.INSTANCE);
162+
163+
if (!field.type().equals(expectedType)) {
164+
throw new IllegalStateException(
165+
String.format(
166+
"Column %s already exists but with different type. "
167+
+ "Existing: %s, Expected: %s",
168+
columnName, field.type(), expectedType));
169+
}
170+
String existingComment = field.description();
171+
String expectedComment = addColumn.getComment();
172+
173+
boolean commentsMatch =
174+
(existingComment == null && expectedComment == null)
175+
|| (existingComment != null
176+
&& existingComment.equals(expectedComment));
177+
178+
if (!commentsMatch) {
179+
throw new IllegalStateException(
180+
String.format(
181+
"Column %s already exists but with different comment. "
182+
+ "Existing: %s, Expected: %s",
183+
columnName, existingComment, expectedComment));
184+
}
185+
186+
return true;
187+
}
188+
}
189+
190+
return false;
191+
}
192+
121193
private void createTable(TablePath tablePath, Schema schema, boolean isCreatingFlussTable)
122194
throws Catalog.DatabaseNotExistException {
123195
Identifier paimonPath = toPaimon(tablePath);

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,4 +606,63 @@ void testEmptyArray() {
606606
assertThat(array).isNotNull();
607607
assertThat(array.size()).isEqualTo(0);
608608
}
609+
610+
@Test
611+
void testPaimonSchemaWiderThanFlussRecord() {
612+
int tableBucket = 0;
613+
RowType tableRowType =
614+
RowType.of(
615+
new org.apache.paimon.types.BooleanType(),
616+
new org.apache.paimon.types.VarCharType(),
617+
// append three system columns: __bucket, __offset,__timestamp
618+
new org.apache.paimon.types.IntType(),
619+
new org.apache.paimon.types.BigIntType(),
620+
new org.apache.paimon.types.LocalZonedTimestampType(3));
621+
622+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
623+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
624+
625+
long logOffset = 7L;
626+
long timeStamp = System.currentTimeMillis();
627+
GenericRow genericRow = new GenericRow(1);
628+
genericRow.setField(0, true);
629+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
630+
flussRecordAsPaimonRow.setFlussRecord(logRecord);
631+
632+
assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(5);
633+
634+
assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue();
635+
assertThat(flussRecordAsPaimonRow.isNullAt(1)).isTrue();
636+
assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket);
637+
assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset);
638+
assertThat(flussRecordAsPaimonRow.getLong(4)).isEqualTo(timeStamp);
639+
}
640+
641+
@Test
642+
void testFlussRecordWiderThanPaimonSchema() {
643+
int tableBucket = 0;
644+
RowType tableRowType =
645+
RowType.of(
646+
new org.apache.paimon.types.BooleanType(),
647+
// append three system columns: __bucket, __offset,__timestamp
648+
new org.apache.paimon.types.IntType(),
649+
new org.apache.paimon.types.BigIntType(),
650+
new org.apache.paimon.types.LocalZonedTimestampType(3));
651+
652+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
653+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
654+
655+
long logOffset = 7L;
656+
long timeStamp = System.currentTimeMillis();
657+
GenericRow genericRow = new GenericRow(2);
658+
genericRow.setField(0, true);
659+
genericRow.setField(1, BinaryString.fromString("extra"));
660+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
661+
662+
// Should throw exception instead of silently truncating data
663+
assertThatThrownBy(() -> flussRecordAsPaimonRow.setFlussRecord(logRecord))
664+
.isInstanceOf(IllegalStateException.class)
665+
.hasMessageContaining(
666+
"Fluss record has 2 fields but Paimon schema only has 1 business fields");
667+
}
609668
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -643,20 +643,30 @@ void testTieringWithAddColumn() throws Exception {
643643
// 7. Wait for new data to be tiered
644644
assertReplicaStatus(tableBucket, 6);
645645

646-
// 8. Verify Paimon table has the new column
646+
// 8. Verify Paimon table has the new column with exact field names and order
647647
Identifier tableIdentifier =
648648
Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
649649
FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(tableIdentifier);
650650
List<String> fieldNames = paimonTable.rowType().getFieldNames();
651651

652-
// Should have: a, b, c3, __bucket, __offset, __timestamp
653-
assertThat(fieldNames).contains("a", "b", "c3");
654-
655-
// 9. Verify all data is present in Paimon (no data loss)
656-
List<InternalRow> allRows = new ArrayList<>();
657-
allRows.addAll(initialRows);
658-
allRows.addAll(newRows);
659-
checkDataInPaimonAppendOnlyTable(tablePath, allRows, 0);
652+
// Should have exact fields in order: a, b, c3, __bucket, __offset, __timestamp
653+
assertThat(fieldNames)
654+
.containsExactly("a", "b", "c3", "__bucket", "__offset", "__timestamp");
655+
656+
// 9. Verify both schema evolution and data correctness
657+
// For initial rows (before ADD COLUMN), c3 should be NULL
658+
// For new rows (after ADD COLUMN), c3 should have the provided values
659+
List<InternalRow> expectedRows = new ArrayList<>();
660+
// Initial rows with NULL for c3
661+
expectedRows.add(row(1, "v1", null));
662+
expectedRows.add(row(2, "v2", null));
663+
expectedRows.add(row(3, "v3", null));
664+
// New rows with c3 values
665+
expectedRows.add(row(4, "v4", 40));
666+
expectedRows.add(row(5, "v5", 50));
667+
expectedRows.add(row(6, "v6", 60));
668+
669+
checkDataInPaimonAppendOnlyTable(tablePath, expectedRows, 0);
660670

661671
} finally {
662672
jobClient.cancel().get();

0 commit comments

Comments
 (0)